Spring AMQP StatefulRetryOperationsInterceptor + cluster = RetryCacheCapacityExceededException

In last few months I had a chance to develop quite complex distributed system, driven by push notifications from Google Calendar. I decided to use Spring AMQP backed by RabbitMQ broker. Due to scaling and high availability requirements main backend service and broker was deployed to multiple servers.

Spring AMQP is very nice library which facilitates building message driven applications. As with most distributed systems, I had to deal with transient issues like: DNS resolution problems, network timeouts, transactions failed due to optimistic locking and so on. If you make your processing idempotent, retries are very efficient way of dealing with this kind of problems.

When processing fails, message is rejected and by default it goes back to AMQP queue. To handle it properly Spring AMQP uses Spring Retry for tracking how many retries were performed and then passes failed messages to DLQs. You can configure various aspects of retry policy. StatefulRetryOperationsInterceptor helps to build workflow in which non transient exceptions cause message to go immediately DLQ and potentially transient exceptions to be retried given number of times.

There is however one very dangerous aspect of using Spring Retry for this purpose. AMQP protocol allows storing headers for each message, which seems to be perfect place to keep retry counter. Spring Retry is build for more generic use case, so it has its own storage for context which, by default  is local. If you dig into documentation/code you can easily understand that for each failed message, library computes key and then stores RetryContext object in RetryContextCache.

This seems to be quite reasonable solution for generic retry library design. I started thinking: What would happen in clustered environment? If RetryContext is not clustered, but each server has its own copy then in worst case scenario I would end up with:

 (MAX_RETRY_NUMBER - 1) * NUMBER_OF_NODES + 1

So for deployment of two machines and max retries equal 3, in worst case scenario message would be rejected after 5 retries instead of 3. I though for a while about implementing custom clustered RetryContextCache implementation, but it did not seem justifiable. Exceptions were quite rare and this small difference in maximal number of retries seemed to be acceptable. Even Spring Retry documentation states that clustered RetryContextCache is overkill in most situations:

The default implementation of the RetryContextCache is in memory, using a simple Map. Advanced usage with multiple processes in a clustered environment might also consider implementing the RetryContextCache with a cluster cache of some sort (though, even in a clustered environment this might be overkill).

It turned out you actually MUST IMPLEMENT custom RetryContextCache. It does not have to be clustered, but still default Map based cache will sooner or later blow up in production. The problem with default implementation is that it can store 4096 keys and there is no expiry. If you have transient issues it might happen that it will fail on server A, and then server B will correctly consume message. Then you end up with 4095 limit... When you reach 0, Spring Retry starts throwing RetryCacheCapacityExceededException and no messages ever end up in DLQ.

2014-11-23 01:00:00,017 ERROR [com.XXX.calendarservice.exception.ServiceErrorHandler] (SimpleAsyncTaskExecutor-11539489) c416b403-e2b5-477f-af5f-293627c5837d INTERNAL_ERROR: class org.springframework.retry.TerminatedRetryException: Could not register throwable; nested exception is org.springframework.retry.policy.RetryCacheCapacityExceededException: Retry cache capacity limit breached. Do you need to re-consider the implementation of the key generator, or the equals and hashCode of the items that failed?: org.springframework.retry.TerminatedRetryException: Could not register throwable; nested exception is org.springframework.retry.policy.RetryCacheCapacityExceededException: Retry cache capacity limit breached. Do you need to re-consider the implementation of the key generator, or the equals and hashCode of the items that failed?

        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:266) [spring-retry-1.0.3.RELEASE.jar:]

        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:188) [spring-retry-1.0.3.RELEASE.jar:]

        at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145) [spring-retry-1.0.3.RELEASE.jar:]

        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.0.4.RELEASE.jar:4.0.4.RELEASE]

        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) [spring-aop-4.0.4.RELEASE.jar:4.0.4.RELEASE]

        at sun.proxy.$Proxy117.invokeListener(Unknown Source)

        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1111) [spring-rabbit-1.3.2.RELEASE.jar:]

        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:559) [spring-rabbit-1.3.2.RELEASE.jar:]

        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:904) [spring-rabbit-1.3.2.RELEASE.jar:]

        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:888) [spring-rabbit-1.3.2.RELEASE.jar:]

        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$500(SimpleMessageListenerContainer.java:75) [spring-rabbit-1.3.2.RELEASE.jar:]

        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:989) [spring-rabbit-1.3.2.RELEASE.jar:]

        at java.lang.Thread.run(Thread.java:722) [rt.jar:1.7.0_09-icedtea]

Caused by: org.springframework.retry.policy.RetryCacheCapacityExceededException: Retry cache capacity limit breached. Do you need to re-consider the implementation of the key generator, or the equals and hashCode of the items that failed?

        at org.springframework.retry.policy.MapRetryContextCache.put(MapRetryContextCache.java:82) [spring-retry-1.0.3.RELEASE.jar:]

        at org.springframework.retry.support.RetryTemplate.registerThrowable(RetryTemplate.java:362) [spring-retry-1.0.3.RELEASE.jar:]

        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:264) [spring-retry-1.0.3.RELEASE.jar:]

        ... 12 more


If you take a look on this dashboard you can immediately see when retry state storage stopped working correctly:

Grafana graph


Lessons learned: Do not always believe documentation but read implementation as well. I ended up with implementing simple RetryContextCache backed by guava-cache with expiration.

2 comments:

Graphhopper Docker container

08:36 , , 0 Comments

In my most recent project, travel planning tool tribear.com, we had to optimize routes for independent travelers. After evaluating several alternatives we decided to go with open source routing engine GraphHopper.

GraphHopper can be used in several ways: webapp, android lib or regular java lib. We decided to use webapp deployment and call REST API to calculate routes between points. Moreover we wanted to run multiple instances of GraphHopper for each city/region. This way we do not need to have one huge GraphHopper instance for whole world (it needs over 16GB of memory), but separate small ones.

After playing a bit with docker project it seemed to be a perfect fit for automating configuration. I wanted to have simple image with java and GraphHopper run time with data directory mounted to host. You can find my dockerfile in github repo.

To run GraphHopper web app you need to provide map data by passing .pbf  file to program args. In my docker image, file with pbf extension is picked up from directory mounted to host directory. So first step is creating directory on your host and downloading .pbf file for particular region.

$ mkdir -p ~/private/graphhopper-data/berlin/
$ cd ~/private/graphhopper-data/berlin/
$ wget http://download.geofabrik.de/europe/germany/berlin-latest.osm.pbf

Take a look on config.properties. You can customize your GraphHopper instance here (for instance change routing from default car to bike). When you are ready, build image and run new docker container:

$ sudo docker build -t sogorkis/graphhopper .
$ sudo docker run \
      -d \
      --name=graphhopper-berlin \
      -v /home/stanislaw/private/graphhopper-data/berlin/:/data \
      -p 8990:8989 \
      sogorkis/graphhopper \
      /graphhopper/start.sh
$ sudo docker logs -f graphhopper-berlin
...
2014-10-04 11:21:30,110 [main] INFO  graphhopper.http.DefaultModule - loaded graph at:/data/berlin-latest.osm-gh, source:/data/berlin-latest.osm.pbf, acceptWay:car, class:LevelGraphStorage
2014-10-04 11:21:30,611 [main] INFO  graphhopper.http.GHServer - Started server at HTTP 8989

Please note that first run might take some time as GraphHopper needs to processes .pbf file and create additional work files. Tail logs till you see that server is started and run following url in your browser:

http://localhost:8990/?point=52.516534%2C13.381568&point=52.519877%2C13.40493&locale=en-US



The last thing which was needed is ability to override JAVA_OPTS, especially memory settings. To do that, just create env.sh file in mounted directory. For instance to reduce max heap size for graphhopper-berlin image, create following file:

$ cat /home/stanislaw/private/graphhopper-data/berlin/env.sh
JAVA_OPTS="-Xms128m -Xmx128m -XX:MaxPermSize=64m -Djava.net.preferIPv4Stack=true -server -Djava.awt.headless=true -Xconcurrentio"

0 comments: