Spring AMQP StatefulRetryOperationsInterceptor + cluster = RetryCacheCapacityExceededException

In last few months I had a chance to develop quite complex distributed system, driven by push notifications from Google Calendar. I decided to use Spring AMQP backed by RabbitMQ broker. Due to scaling and high availability requirements main backend service and broker was deployed to multiple servers.

Spring AMQP is very nice library which facilitates building message driven applications. As with most distributed systems, I had to deal with transient issues like: DNS resolution problems, network timeouts, transactions failed due to optimistic locking and so on. If you make your processing idempotent, retries are very efficient way of dealing with this kind of problems.

When processing fails, message is rejected and by default it goes back to AMQP queue. To handle it properly Spring AMQP uses Spring Retry for tracking how many retries were performed and then passes failed messages to DLQs. You can configure various aspects of retry policy. StatefulRetryOperationsInterceptor helps to build workflow in which non transient exceptions cause message to go immediately DLQ and potentially transient exceptions to be retried given number of times.

There is however one very dangerous aspect of using Spring Retry for this purpose. AMQP protocol allows storing headers for each message, which seems to be perfect place to keep retry counter. Spring Retry is build for more generic use case, so it has its own storage for context which, by default  is local. If you dig into documentation/code you can easily understand that for each failed message, library computes key and then stores RetryContext object in RetryContextCache.

This seems to be quite reasonable solution for generic retry library design. I started thinking: What would happen in clustered environment? If RetryContext is not clustered, but each server has its own copy then in worst case scenario I would end up with:

 (MAX_RETRY_NUMBER - 1) * NUMBER_OF_NODES + 1

So for deployment of two machines and max retries equal 3, in worst case scenario message would be rejected after 5 retries instead of 3. I though for a while about implementing custom clustered RetryContextCache implementation, but it did not seem justifiable. Exceptions were quite rare and this small difference in maximal number of retries seemed to be acceptable. Even Spring Retry documentation states that clustered RetryContextCache is overkill in most situations:

The default implementation of the RetryContextCache is in memory, using a simple Map. Advanced usage with multiple processes in a clustered environment might also consider implementing the RetryContextCache with a cluster cache of some sort (though, even in a clustered environment this might be overkill).

It turned out you actually MUST IMPLEMENT custom RetryContextCache. It does not have to be clustered, but still default Map based cache will sooner or later blow up in production. The problem with default implementation is that it can store 4096 keys and there is no expiry. If you have transient issues it might happen that it will fail on server A, and then server B will correctly consume message. Then you end up with 4095 limit... When you reach 0, Spring Retry starts throwing RetryCacheCapacityExceededException and no messages ever end up in DLQ.

2014-11-23 01:00:00,017 ERROR [com.XXX.calendarservice.exception.ServiceErrorHandler] (SimpleAsyncTaskExecutor-11539489) c416b403-e2b5-477f-af5f-293627c5837d INTERNAL_ERROR: class org.springframework.retry.TerminatedRetryException: Could not register throwable; nested exception is org.springframework.retry.policy.RetryCacheCapacityExceededException: Retry cache capacity limit breached. Do you need to re-consider the implementation of the key generator, or the equals and hashCode of the items that failed?: org.springframework.retry.TerminatedRetryException: Could not register throwable; nested exception is org.springframework.retry.policy.RetryCacheCapacityExceededException: Retry cache capacity limit breached. Do you need to re-consider the implementation of the key generator, or the equals and hashCode of the items that failed?

        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:266) [spring-retry-1.0.3.RELEASE.jar:]

        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:188) [spring-retry-1.0.3.RELEASE.jar:]

        at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145) [spring-retry-1.0.3.RELEASE.jar:]

        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.0.4.RELEASE.jar:4.0.4.RELEASE]

        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) [spring-aop-4.0.4.RELEASE.jar:4.0.4.RELEASE]

        at sun.proxy.$Proxy117.invokeListener(Unknown Source)

        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1111) [spring-rabbit-1.3.2.RELEASE.jar:]

        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:559) [spring-rabbit-1.3.2.RELEASE.jar:]

        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:904) [spring-rabbit-1.3.2.RELEASE.jar:]

        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:888) [spring-rabbit-1.3.2.RELEASE.jar:]

        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$500(SimpleMessageListenerContainer.java:75) [spring-rabbit-1.3.2.RELEASE.jar:]

        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:989) [spring-rabbit-1.3.2.RELEASE.jar:]

        at java.lang.Thread.run(Thread.java:722) [rt.jar:1.7.0_09-icedtea]

Caused by: org.springframework.retry.policy.RetryCacheCapacityExceededException: Retry cache capacity limit breached. Do you need to re-consider the implementation of the key generator, or the equals and hashCode of the items that failed?

        at org.springframework.retry.policy.MapRetryContextCache.put(MapRetryContextCache.java:82) [spring-retry-1.0.3.RELEASE.jar:]

        at org.springframework.retry.support.RetryTemplate.registerThrowable(RetryTemplate.java:362) [spring-retry-1.0.3.RELEASE.jar:]

        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:264) [spring-retry-1.0.3.RELEASE.jar:]

        ... 12 more


If you take a look on this dashboard you can immediately see when retry state storage stopped working correctly:

Grafana graph


Lessons learned: Do not always believe documentation but read implementation as well. I ended up with implementing simple RetryContextCache backed by guava-cache with expiration.

Stanisław Ogórkis

Software engineer specializing in delivering highly interactive web applications and complex enterprise systems.

2 comments: