Every now and then, a developer comes to a moment where he needs a queue. In my case, it has been mostly a situation where I have to execute a call to some remote system in reliable manner, meaning, with retries if necessary. Since the retries have to happen even in case of caller process restart, that means that having persistence of pending call requests is a must
Now, there are multitude of messaging options available — message brokers such as ActiveMQ, RabbitMQ or even Kafka (though it’s not primarily developed for that use case), but also more lightweight solutions using libraries such as Apache Camel or Spring Integration. For sure there are many other options, but these are the ones that I have worked with.
Of course, I’m guessing that also many of you implemented persistent queue with a combination of plain SQL table with periodic polling job. Still, if you look around the web, it is not considered a good solution for “serious” queueing, so one can find many posts titled something like “SQL table as message queue is anti-pattern”.
The issues are mostly about how non-performant SQL database is for big throughput, especially with guarantees that multiple consumers will not pick same message within the queue. SQL databases generally don’t shine when records are very short-lived, as is the case with queue message records, and record locking needed to avoid race-condition can also degrade performance significantly.
But, in my experience, many applications don’t have need for big throughput, and also when we say “queueing”, typical application requires much from queueing mechanism than only “FIFO data structure”. I usually require following characteristics out of it:
- basic FIFO (first in — first out) processing
- persistence — to survive process restarts
- transactional message production and consumption — we want guarantees that message is placed and consumed from queue together with other database actions in all-or-nothing manner (no “dual-write” problem)
- retries with plug-able policy
- storing the reason of failed processing
- searching of queue messages by their status (ERROR, PENDING) or by various domain-specific filters (eg. search all queue messages that have
- removing messages by various domain-specific filters — when invalid messages ended up in the queue
- changing specific message priority — eg. when particular message scheduled for future processing wants to be processed as soon as possible
- sometimes delayed initial consumption
When looking at all possible messaging solutions, you will frequently find out that practically all of them have some of following problems:
- Significantly degrade performance when persistence is enabled
- Sometimes don’t offer transactional message production and consumption, or they do, but the performance is also degraded significantly
- Queue content is mostly opaque — generic messaging solutions usually treat message content as binary array (serialized in any application-specific way), but that usually means there is no way to query these messages by any domain-specific fields
- Postponed retry requires some other other additional mechanism, raising complexity of implementation
- changing priority or removing specific messages from the queue using various filters is not possible at all (or hard at least)
- Postponing initial consumption is not possible because queue usually works in “process as soon as possible” manner
When taking into account all those desires stated above, and also that high throughput is usually not of real concern, I usually end up with SQL table being the best overall solution. And because almost every application already uses SQL database, you won’t introduce another piece of technology into your system which is always a good thing.
Maybe you’re wondering what “not so big throughput” means? Well, I almost always had sufficient throughput, so there wasn’t any real need for performance test, but there was one situation where we had to satisfy some fair throughput requirements, so we picked a setup with one virtual machine with 8 cores that hosted one such queueing service, and on another machine we used “not-tuned-in-any-way” PostgreSQL DB.
Within few days of testing we reached around 700 msg/sec when having a consuming transaction per single message (“exactly once” processing), and around 1400 msg/sec when having a transaction per-batch of messages (“at least once” processing, since in case of restart, some messages can be re-processed due to previous transaction not being committed). BTW, consumption hasn’t been to increment some memory counter, but real-world example of executing remote call to external system.
I know this description is far too vague to be considered accurate in any way, but still, these numbers could give you at least some idea what throughput are we talking about here.
General implementation sketch
Proposed mechanism requires 2 components:
- Few additional SQL columns in message table to keep queueing state
- Consumer job that periodically polls pending messages and processes them
Queuing state consists of following SQL columns:
- status (not null)— enumeration of NOT_ATTEMPTED, SUCCESS, ERROR values; I’m not too happy with NOT_ATTEMPTED name, so if you find some better one just use it here (initially I thought something like PENDING, but it actually doesn’t reflect that pending messages are also the ones in ERROR status usually, waiting for retry)
- next_attempt_time (nullable) — timestamp of next processing attempt; should be indexed for performance reasons; when populated, this column basically marks the message as in “pending” state, regardless of its status
- attempt_count (not null) — number of processing attempts; is useful as information itself, but also necessary for some types of retry policies
- last_attempt_time (nullable)— time of last attempt, regardless if successful or not; also necessary for some types of retry policies
- last_attempt_error_message (nullable)— text information about last failed processing attempt; any arbitrary piece of text, such as exception message or maybe even whole stack trace dump
Mentioned next_attempt_time is arguably the most important column, serving many purposes:
- when populated with current/past timestamps, marks the message as in “pending” status to be consumed as soon as possible
- when populated with future timestamps, delays message consumption to desired time (“delayed” queue)
- when empty, the message is not supposed to be processed anymore (eg. when it’s in SUCCESS status, or when it’s in ERROR status, but retry limit is reached)
- acts as a “priority” field, since polling job usually fetch messages ordered based on this value — messages with older timestamp will get processed first
- updating this column can provide:
– retry in case of processing failure (just set to desired future time)
– re-sending of successful message (null -> some time)
– stopping the processing of pending message (some time -> null)
– reordering of processing (some time -> some other time)
All these columns can be placed within any existing table that represents entity to be queued. In some cases, the entity represents the processing itself and is not needed once the queued processing is successful.
In such cases the consumer job can delete such table records after processing and SUCCESS status is not needed at all. Sometimes successful messages are kept temporarily for auditing reasons, and are deleted afterwards via some cleanup mechanism. So feel free to tailor the mechanism to your exact needs.
Consumer job should periodically execute following steps:
- poll limited amount of “pending” messages by querying all records with next_attempt_time before current time, ordered by next_attempt_time; SQL query would look something like this (ignore DB specifics):
SELECT * FROM some_table WHERE next_attempt_time < CURRENT_TIMESTAMP() ORDER BY next_attempt_time ASC LIMIT 200
- for each polled message…
- open DB transaction
- execute processing logic
- set last_attempt_time to current time and increment attempt_count
- handle processing result:
– in case of success — set status to SUCCESS and clear next_attempt_time
– in case of failure — set status to ERROR, last_attempt_error_message to some error description, and next_attempt_time to some future retry time if retry limit has not been reached
- close DB transaction
Scheduling mechanism utilized to execute this consumer job should take care that the job is not executed concurrently from perspective of the whole system.
Part 2: Do it simply using Spring Boot and JPA
Part 3: Clustering and parallelism