Practical Queueing Using SQL (Part 2): Do It Simply Using Spring Boot and JPA

Software development
Practical Queueing Using SQL (Part 2): Do It Simply Using Spring Boot and JPA

We are going to implement the queueing mechanism in Java. For this little demo, we are picking a combination of Spring Boot and JPA, but also Gradle, since it’s a fairly quick setup with these.

Although I am not a fan of premature abstraction due to frequent situations where it gets more in the way that helps. Just for the sake of an easier explanation, I will abstract the whole queueing mechanism in a separate package (or two). And the rest of app acts simply as a user of such “queueing library”. This way one can easily see what is application-specific, and what is general queueing code.

We will demonstrate this queueing mechanism of spring boot and jpa on some dummy SMS message-sending application. But let’s first get you introduced to queueing code.

The code and build/run instructions are available at the end of this post.

Queueing library

Whole “library” is present under com.ag04.jpaqueue package.

Since entities that are to be processed in queueing fashion are application-specific, we are going to extract only their queueing state as JPA Embeddableobject (some code omitted for brevity):

public class QueueingState {
	public enum Status {
	private Status status;
	private LocalDateTime nextAttemptTime;
	private int attemptCount;
	private LocalDateTime lastAttemptTime;
	private String lastAttemptErrorMessage;
	public QueueingState() {
		this.status = Status.NOT_ATTEMPTED;
		this.attemptCount = 0;
	}Code language: PHP (php)

Queue producer

As you can guess, just embed this QueueingState object into entity of your choice to prepare it for queueing. Also, there is one important public method in QueueingState class, the one that you have to call when pushing the item entity into queue:

public void scheduleNextAttempt(LocalDateTime nextAttemptTime);Code language: JavaScript (javascript)

So prior to saving the queuing entity, just make sure to call this method with desired processing time, thus populating next_attempt_time column. Usually, this is the current time, which means the item will get processed as soon as possible, but it can even be in the future which effectively makes for delayed/scheduled processing.

So, pushing to queue would typically look something like this:

SomeEntity item = new SomeEntity(....);
save(item);Code language: JavaScript (javascript)

Queue consumer

Now comes the most complex part — the consumer code. It is located in one class — com.ag04.jpaqueue.QueueConsumer

Looking at its constructor, you can see it is configured with item batch size and also with polling period (in seconds). The constructor also requires a few other dependency arguments:

  • QueueConsumerModule — application-specific logic required for item consumption
  • RetryPolicy — strategy for how to handle retries in case of processing failures; there are a couple of implementations available in com.ag04.jpaqueue.retry package
  • PlatformTransactionManager — Spring-provided bean for managing transactions because we need it to open some internal transactions programmatically (this is very rare nowadays in Spring world — other approach would be normally to use @Transactional with additional bean class, but it would require separating code in more classes, so I considered current approach simpler for this demo)

Arguably, the most interesting is QueueConsumerModule which looks like this:

public interface QueueConsumerModule<ID> {
   List<ID> findItemIdsWhereQueueingNextAttemptTimeIsBefore(LocalDateTime time, int limit);
   Optional<QueueingState> getQueueingStateForItem(ID itemId);
   Optional<QueueingState> processItem(ID itemId);
}Code language: HTML, XML (xml)

So, these methods should be implemented by concrete application to provide:

  • IDs of limited list of pending items where next attempt time is before given time value
  • QueueingState instance for specified item entity (sometimes there can be multiple QueueingState embeddables within same entity when there are multiple queued processings present for same entity); return value is optional because it can happen that item is not present in DB for some external reason
  • Processing logic for specific item entity which returns item’s QueueingState in case of success; return value is again optional for same reason described above

Understandably, we need a scheduler for executing processing job periodically, and as explained in previous post, we want to ensure there must be no parallel executions of such processing task, so we will simply use single-thread executor, and schedule processing task upon application start:

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
// ...
private void startProcessingTask() {"Starting queue processing task with delay of {} secs", this.pollingPeriodInSecs);
   Runnable command = this::processQueuedItems;
   this.processingTask = this.scheduledExecutorService.scheduleWithFixedDelay(command, pollingPeriodInSecs, pollingPeriodInSecs, TimeUnit.SECONDS);
}Code language: JavaScript (javascript)

Consuming logic is within this method:

public void processQueuedItems() {
	try {
		LocalDateTime now =;
		List<?> itemIds = this.queueConsumerModule.findItemIdsWhereQueueingNextAttemptTimeIsBefore(now, itemsPollSize);
		if (!itemIds.isEmpty()) {"Fetched {} pending queued items", itemIds.size());
			for (Object itemId : itemIds) {
	} catch (Throwable th) {
		logger.error("Error while fetching queued items: " + th.getMessage(), th);
private void processItemAndHandleErrorIfRaised(Object itemId) {
	try {
		executeUnderTransaction(() -> processItem(itemId));
	} catch (Throwable error) {
		executeUnderTransaction(() -> registerProcessingFailure(itemId, error));
}Code language: JavaScript (javascript)

So you can see how consumer first polls for pending items, and then process each of them within its own separate transaction. If the error is caught during the processing, then we handle it in a new transaction. This separate transaction for error handling is required because in some cases JPA’s PersistenceContext cannot be reused for error handling if the exception has already rolled back some nested transaction in processing logic.

The consumer starts its processing job in its initialization method, which in Spring is usually annotated with @PostConstruct, and respectively stops it in its destroy method annotated with @PreDestroy. We will see in next post that we need to change this to make our app cluster-friendly.

Contact block

Contact block

Demo app — SMS sending

OK, so let’s get to business — now we gonna set up simple Spring Boot app that simply enqueues some amount of SMS messages and “sends” them using previously described queueing library. Application code is located under com.ag04.smsqueueing package(s) and starter class is com.ag04.smsqueueing.SmsQueueingApplication (annotated with @SpringBootApplication).

By SMS “sending” I mean just logging SMS content to console, but this dummy sender will also introduce some short delay to simulate real world case, as well as trigger occasional exceptions to show how error handling works (actually errors will be triggered quite often). Sender implementation is located in com.ag04.smsqueueing.sender.SmsSenderImplclass.

Queued entity — SMS message

Our queued item entity is SmsMessage class, stored in SMS_MESSAGE table in the database.

@Table(name = "sms_message", indexes = @Index(name = "idx_sms_msg_queue_polling_fields", columnList = "next_attempt_time"))
public class SmsMessage {
	@Column(name = "id", nullable = false)
	private Long id;
	@Column(name = "uid", nullable = false, unique = true)
	private String uid; // app-assigned unique ID
	@Column(name = "from_address", nullable = false)
	private String fromAddress;
	@Column(name = "to_address", nullable = false)
	private String toAddress;
	@Column(name = "text", nullable = false)
	private String text;
	private QueueingState sendingState = new QueueingState();
  // ...
}Code language: PHP (php)

So you see, this entity has only a few specific fields (fromAddress, toAddress, text…), and also includes QueueingState embeddable previously described. Note that we also used JPA’s @Index annotation to index “next_attempt_time” column, which is of big importance for efficient polling.

SMS production

Upon application start, the application enqueues some amount of SMS messages. This is visible in com.ag04.smsqueueing.MainApplicationRunner#run method.

SMS consumption

We instantiate QueueConsumer within com.ag04.smsqueueing.SmsQueueingApplication#smsSendingQueueConsumer method:

public QueueConsumer smsSendingQueueConsumer(SmsSendingQueueConsumerModule smsSendingQueueConsumerModule, PlatformTransactionManager transactionManager) {
	RetryPolicy retryPolicy = new LimitedRetryPolicy(3, new FixedDelayRetryPolicy(Duration.ofMinutes(1)));
	return new QueueConsumer(smsSendingQueueConsumerModule, retryPolicy, transactionManager, 100, 10);
}Code language: PHP (php)

You can see how we configured this instance to work with batch size 100, and with polling period of 10 seconds. Also, RetryPolicy that we configured will try to process SMS message at most 3 times (LimitedRetryPolicy). Each retry will be delayed from previous one by 1 minute (FixedDelayRetryPolicy). ExponentialDelayRetryPolicy is another interesting RetryPolicy in the package, which makes each subsequent retry delayed by exponentially increasing duration.

Our implementation of QueueConsumerModule is SmsSendingQueueConsumerModule, and uses Spring Data JPA repository to access SmsMessage in database. You will notice that Implementation of processItem(id) is just a call to SmsSender. Also, our findItemIdsWhereQueueingNextAttemptTimeIsBefore(time, limit) doesn’t use SmsMessageRepository because Spring Data JPA currently doesn’t offer parameterized result limiting together with simple projection (ID selection). So we ended up using plain JPA via EntityManager.

Building and running

The project code can be downloaded from BitBucket at:

You can download the sources using the following git command:

git clone language: PHP (php)

Gradle is picked as a build tool. As one can see in, there are only 3 dependencies required for this demo project:

dependencies {
   implementation 'org.springframework.boot:spring-boot-starter'
   implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
   runtimeOnly 'org.postgresql:postgresql'
}Code language: JavaScript (javascript)

To build the project execute following command line:

./gradlew clean build

Now we have built the app package (build/libs/smsqueueing-1.0.jar). But prior to running it, we need to make sure we have some place to store our data.

Database setup

We decided to use PostgreSQL database. But any other SQL database would work too, since we don’t use any DB specifics here.

One has to create appropriate database (“smsqueueing”) and a user (also “smsqueueing”). For PostgreSQL, this “psql” script can come handy:

shell> psql postgres
CREATE USER smsqueueing WITH
PASSWORD 'smsqueueing';
OWNER = smsqueueing
CONNECTION LIMIT = -1;Code language: JavaScript (javascript)

DB connection configuration is done as standard Spring Boot setup in

spring.datasource.password=smsqueueingCode language: JavaScript (javascript)

If not already present, all necessary database objects (tables, sequences, etc…) will get created upon application start. You don’t have to worry about it.


Finally, we can run the app via:

java -jar ./build/libs/smsqueueing-1.0.jar

Of course, if you gonna start it from your favorite IDE, just import it as a Gradle project, and start it via the main application class — com.ag04.smsqueueing.SmsQueueingApplication

When you start it, you are going to see at first log lines about SMS production, something like:

2019-05-10 07:41:25.619  INFO 8074 --- [           main] c.a.s.producer.SmsProducerImpl           : Producing SMS: fromAddress=80111, toAddress=385913344599, text=Hello, this is text generated at 2019-05-10T07:41:25.618, sendTime=2019-05-10T07:41:25.618
2019-05-10 07:41:25.622  INFO 8074 --- [           main] c.a.s.producer.SmsProducerImpl           : Producing SMS: fromAddress=80444, toAddress=385913344606, text=Hello, this is text generated at 2019-05-10T07:41:25.622, sendTime=2019-05-10T07:41:25.622
2019-05-10 07:41:25.625  INFO 8074 --- [           main] c.a.s.producer.SmsProducerImpl Code language: JavaScript (javascript)

And each periodic execution of a consumption task, when it polls some pending items, it will log their count:

2019-05-10 07:45:11.512  INFO 8415 --- [pool-1-thread-1] com.ag04.jpaqueue.QueueConsumer          : Fetched 100 pending queued itemsCode language: CSS (css)

After which it logs individual item processing lines, such as these for successful processing:

2019-05-10 07:45:11.518  INFO 8415 --- [pool-1-thread-1] c.ag04.smsqueueing.sender.SmsSenderImpl  : Sending SMS: SmsMessage[id=311, uid='e2a334af-4398-420a-b493-e5a05549fcc9', fromAddress='80444', toAddress='385913344829', text='Hello, this is text generated at 2019-05-10T07:40:36.360', sendingState=QueueingState[status=NOT_ATTEMPTED, nextAttemptTime=2019-05-10T07:40:36.360, attemptCount=0, lastAttemptTime=null, lastAttemptErrorMessage='null']]
2019-05-10 07:45:11.605  INFO 8415 --- [pool-1-thread-1] c.ag04.smsqueueing.sender.SmsSenderImpl  : Sending SMS: SmsMessage[id=312, uid='dda97da8-af60-4eca-87d7-c406bf1c3161', fromAddress='80555', toAddress='385913344533', text='Hello, this is text generated at 2019-05-10T07:40:36.363', sendingState=QueueingState[status=NOT_ATTEMPTED, nextAttemptTime=2019-05-10T07:40:36.363, attemptCount=0, lastAttemptTime=null, lastAttemptErrorMessage='null']]
2019-05-10 07:45:11.699  INFO 8415 --- [pool-1-thread-1] c.ag04.smsqueueing.sender.SmsSenderImpl  : Sending SMS: SmsMessage[id=313, uid='628b630e-813c-45c6-b889-f0f037d3ec3c', fromAddress='80333', toAddress='385913344634', text='Hello, this is text generated at 2019-05-10T07:40:36.367', sendingState=QueueingState[status=NOT_ATTEMPTED, nextAttemptTime=2019-05-10T07:40:36.367, attemptCount=0, lastAttemptTime=null, lastAttemptErrorMessage='null']]Code language: CSS (css)

And something like this for unsuccessful ones (and these will be frequent in our demo):

2019-05-10 07:45:12.160 ERROR 8415 --- [pool-1-thread-1] com.ag04.jpaqueue.QueueConsumer          : Error while processing item by ID 316: Wrong time picked for send
java.lang.IllegalStateException: Wrong time picked for send
	at com.ag04.smsqueueing.sender.SmsSenderImpl.send( ~[classes/:na]
	at com.ag04.smsqueueing.SmsSendingQueueConsumerModule.processItem( ~[classes/:na]
	at com.ag04.smsqueueing.SmsSendingQueueConsumerModule.processItem( ~[classes/:na]
	at com.ag04.jpaqueue.QueueConsumer.processItem( [classes/:na]
	at com.ag04.jpaqueue.QueueConsumer.lambda$processItemAndHandleErrorIfRaised$0( [classes/:na]
	at com.ag04.jpaqueue.QueueConsumer$1.doInTransactionWithoutResult( ~[classes/:na]
	at ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
	at ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
	at com.ag04.jpaqueue.QueueConsumer.executeUnderTransaction( [classes/:na]
	at com.ag04.jpaqueue.QueueConsumer.processItemAndHandleErrorIfRaised( [classes/:na]
	at com.ag04.jpaqueue.QueueConsumer.processQueuedItems( [classes/:na]
	at java.util.concurrent.Executors$ ~[na:1.8.0_201]
	at java.util.concurrent.FutureTask.runAndReset( ~[na:1.8.0_201]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301( ~[na:1.8.0_201]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ ~[na:1.8.0_201]
	at java.util.concurrent.ThreadPoolExecutor.runWorker( ~[na:1.8.0_201]
	at java.util.concurrent.ThreadPoolExecutor$ ~[na:1.8.0_201]
	at ~[na:1.8.0_201]
Caused by: java.lang.RuntimeException: I'm evil, that's the root cause!
	... 18 common frames omitted
2019-05-10 07:45:12.163  INFO 8415 --- [pool-1-thread-1] com.ag04.jpaqueue.QueueConsumer          : Retry for item by ID 316 scheduled for time: 2019-05-10T07:46:12.163
Code language: PHP (php)

Notice the last line that informs about next attempt time when the retry has been scheduled (if any, maybe it’s the last attempt).

A quick look at database (SMS_MESSAGE table) during app runtime shows all relevant fields:

Of course, you can try restarting the service. This will add a new bunch of SMS messages to our queue. Naturally, if there are old ones that are still in a pending state, they will continue to get processed according to their scheduled attempt time.

Now, with this extremely simple queueing setup, you can do all sorts of things, something like the following, and much, much more:

  • count number of pending items:
    SELECT COUNT(*) FROM sms_message WHERE next_attempt_time <> null
  • count number of pending items aggregated by domain-specific field (eg. fromAddress):
    SELECT from_address, COUNT(*) FROM sms_message WHERE next_attempt_time <> null GROUP BY from_address
  • re-schedule all failed items with retry limit reached, to execute as soon as possible:
    UPDATE sms_message SET next_attempt_time = CURRENT_TIMESTAMP() WHERE status = 'ERROR' AND next_attempt_time = null
  • remove pending items that failed too many times (say 50):
    DELETE FROM sms_message WHERE next_attempt_time <> null and attempt_count > 50
  • remove some specific invalid item which ended up in queue due to a bug:
    DELETE FROM sms_message WHERE ID = 34254642991
  • display failure descriptions for items which last processing attempt was after some specified time:
    SELECT last_error_attempt_message FROM sms_message WHERE status = 'ERROR' and last_attempt_time > to_timestamp('20190523', 'YYYYMMDD')

… and anything else that comes to your mind.

So tell me — how easy would be to implement all those mentioned features with some setup including popular queueing library/server?

To be continued …

And that’s it. In our next post we gonna see how to ensure that only single consumption job is executed in clustered environment, and how to add parallelism to improve the throughput.

Part 1: Rationale and general design
Part 3: Clustering and parallelism


Exceptional ideas need experienced partners.