Revisiting Global Data Consistency in Distributed (Microservice) Architectures

Back in 2015 I wrote a couple of articles about how you can piggyback a standard Java EE Transaction Manager to get data consistency across distributed services (here is the original article and here is an article about doing it with Spring Boot, Tomcat or Jetty).

Last year I was fortunate enough to work on a small project where we questioned data consistency from the ground up. Our conclusion was that there is another way of getting data consistency guarantees, one that I had not considered in another article that I wrote about patterns for binding resources into transactions. This other solution is to change the architecture from a synchronous one to an asynchronous one. The basic idea is to save business data together with “commands” within a single database transaction. Commands are simply facts that other systems still need to be called.
By reducing the number of concurrent transactions to just one, it is possible to guarantee that data will never be lost. Commands which have been committed are then executed as soon as possible and it is the command execution (in a new transaction) which then makes calls to remote systems. Effectively it is an implementation of the BASE consistency model, because from a global point of view, data is only eventually consistent.

Imagine the situation where updating an insurance case should result in creating a task in a workflow system so that a person gets a reminder to do something, for example write to the customer. The code to handle a request to update an insurance case might look like this:

    @Inject
    EntityManager em;

    @PUT
    @Path("case")
    @Produces("application/json")
    public void updateCase(Case case) {
        case = em.merge(case);

        if(anEmployeeShouldWriteToTheCustomer(case)){
            long taskId = taskService
                .createTask(case.getNr(),
                            "Write to customer...");
            case.addTask(taskId);
        }
    }

The call to the task service results in a remote call to the task application, which is a microservice
responsible for workflow and human tasks (work that needs to be done by a human).

There are two problems with our service as described above. First of all, imagine that the task application is offline at the time of the call. That reduces the availability of our application. For every additional remote application that our application connects to, there is a reduction in availability of our system. Imagine one of those application has an allowed downtime of 4 hours per month and a second application has one of 8 hours. That could cause our application to be offline for 12 hours per month, in addition to our own downtimes, since there is never a guarantee that the downtimes will occur at the same time.

The second problem with the service design above, comes when there is a problem committing the data to the database after the call to the task application is made. The code above uses JPA which may choose to flush the SQL statements generated by the call to the merge method or the updates to the entity, at some time after those calls, and at latest at commit time. That means a database error could occur after the call to the task application. The database call might even fail for other reasons such as the network not being available. So conceptually we have the problem that we might have created a task asking an employee to send a letter to the customer, but it wasn’t possible to update the case, so the employee might not even have the information necessary to write the letter.

If the task application were transaction aware, i.e. capable of being bound into a transaction so that the
transaction manager in our application could deal with the remote commit/rollback, it would certainly help to avoid the second problem described above (data consistency). But the increase in downtime wouldn’t be handled.

Changing the architecture so that the call to the task application occurs asynchronously will however solve both of those problems. Note that I am not talking about simple asynchronous method invocation but instead I am talking about calling the task application after our application commits the database transaction. It is only at that point that we have a guarantee that no data will be lost. We can then attempt the remote call as often as is necessary until the task is created successfully. At that stage global data is consistent. Being able to retry failed attempts means that the system as a whole becomes more reliable and our downtime is reduced. Note that I am also not talking about non-blocking methods which are often referred to as being asynchronous.

To make this work, I have created a simple library which requires the developer to do two things.
More information about the rudimentary implementation used in the demo application is available here. First of all, the developer needs to make a call to the CommandService, passing in the data which is required when the actual command is executed. Secondly, the developer needs to provide an implementation of the command, which the framework will execute.
The first part looks like this:

public class TaskService {

    @Inject
    CommandService commandService;

    /** will create a command which causes a task to be
     *  created in the task app, asynchronously, but robustly. */
    public void createTask(long caseNr, String textForTask) {
        String context = createContext(caseNr, textForTask);

        Command command = new Command(CreateTaskCommand.NAME, context);

        commandService.persistCommand(command);
    }

    private String createContext(long nr, String textForTask) {
        //TODO use object mapper rather than build string ourselves...
        return "{\"caseNr\": " + nr + ", \"textForTask\": \"" + textForTask + "\"}";
    }

The command service shown here takes a command object which contains two pieces of information: the name of the command and a JSON string containing data which the command will need. A more mature implementation which I have written for my customer takes an object as input rather than a JSON string, and the API uses generics.

The command implementation supplied by the developer looks as follows:

public class CreateTaskCommand implements ExecutableCommand {

    public static final String NAME = "CreateTask";

    @Override
    public void execute(String idempotencyId, JsonNode context) {
        long caseNr = context.get("caseNr").longValue();

        CALL THE TASK MICROSERVICE HERE
    }

    @Override
    public String getName() { return NAME; }
}

The execute method of the command is where the developer implements the stuff which needs to be done. I haven’t shown the code used to call the task application since it isn’t really relevant here, it’s just an HTTP call.

The interesting part of such an asynchronous design isn’t in the above two listings, rather in the framework code which ensures that the command is executed. The algorithm is a lot more complicated than you might first think because it has to be able to deal with failures, which causes it to also have to deal with locking. When the call to the command service is made, the following happens:

  • The command is persisted to the database
  • A CDI event is fired
  • When the application commits the transaction, the framework is called since it
    observes the transaction success
  • The framework “reserves” the command in the database, so that multiple instances of the application wouldn’t attempt to execute the same command at the same time
  • The framework uses an asynchronous EJB call to execute the command
  • Executing the command works by using the container to search for implementations of the
    ExecutableCommand interface and using any which have the name saved in the command
  • All matching commands are executed by calling their execute method, passing them the input that was saved in the database
  • Successfully executed commands are removed from the database
  • Commands which fail are updated in the database, so that the number of execution attempts is incremented

As well as that fairly complex algorithm, the framework also needs to do some house keeping:

  • Periodically check to see if there are commands which need to be executed. Criteria are:
    • The command has failed, but has not been attempted more than 5 times
    • The command is not currently being executed
    • The command is not hanging
    • (a more complex implementation might also restrict how quickly the retry is attempted, for example after a minute, two minutes, then four, etc.)
  • Periodically check to see if there are commands which are hanging, and unlock them so that they will be reattempted

Commands might hang if for example the application crashes during execution. So as you can see, the solution isn’t trivial and as such belongs in framework code, so that the wheel doesn’t keep getting invented. Unfortunately the implementation very much depends on the environment in which it is supposed to run and so that makes writing a portable library very difficult (which is why I have not
done more than publishing the classes in the commands package of the demo application). Interestingly it even depends on the database being used because for example select for update isn’t properly supported by Hibernate when used with Oracle. For completions sake, commands which fail 5 times should be monitored so that an administrator can resolve the problem and update the commands so that they are reattempted.

The right question at this stage is whether or not changing the architecture to an asynchronous one is the best solution?
On the surface it certainly looks as though it solves all our data consistency problems.
But in reality there are a few things that need to be considered in detail. Here are a few examples.

A) Imagine that after updating the insurance case, the user wants to close it, and part of the business rules dictating whether or not a case may be closed includes checking whether any tasks are incomplete. The best place to check whether any tasks are incomplete is the task application! So the developer adds a few lines of code to call it. At this stage it already gets complicated, because should the developer make a synchronous call to the task application, or use a command? Advice is given below, and for simplicity, let’s assume the call is made synchronously. But what if three seconds ago, the task application was down and so an incomplete command is still in our database, which when executed will create a task. If we just relied on the task application, we’d close the case and at the next attempt to execute the incomplete command we’d save the task even though the case is already closed. It get’s messy, because we’d have to build extra logic to re-open the case when a user clicks the task to deal with it. A more proper solution would be to first ask the task application and then check commands in our database. Even then, because commands are executed asynchronously, we could end up with timing issues, where we miss something. The general problem that we have here is one of ordering. It is well known that eventually consistent systems suffer from ordering problems and can require extra compensatory mechanisms, like the one described above where the case gets reopened. These kind of things can have quite complex impacts on the overall design, so be careful!

B) Imagine an event occurs in the system landscape which results in the case application being called in order to create an insurance case. Imagine then that a second event occurs which should cause that case to be updated. Imagine that the application wishing to create and update the case was implemented asynchronously using the commands framework. Finally, imagine that the case application was unavailable during the first event, so that the command to create the case stayed in the database in an incompleted state. What happens if the second command is executed before the first one, i.e. the case is updated before it even exists? Sure, we could design the case application to be smart and if the case doesn’t exist, it simply creates it in the updated state. But what do we then do when the command to create the case is executed? Do we update it to its original state? That would be bad. Do we ignore the second command? That could be bad if some business logic depended on a delta, i.e. a change in the case. I have heard that systems like Elastic Search use timestamps in requests to decide if they were sent before the current state, and it ignores such calls. Do we create a second case? That might happen if we don’t have idempotency under control, and that would also be bad. One could implement some kind of complex state machine for tracking commands and for example only allow the update command to be executed after the creation command. But that needs an extra place to store the update command until the creation command has been executed. So as you can see, ordering problems strike again!

C) When do we need to use commands, and when can we get away with synchronous calls to remote applications? The general rule appears to be that as soon as we need to access more than one resource in order to write to it we should use commands, if global data consistency is important to us. So if a certain call requires lots of data to be read from multiple remote applications, so that we can update our database, it isn’t necessary to use commands, although it may be necessary to implement idempotency or for the caller to implement some kind of retry mechanism, or indeed use a command to call our system. If, on the other hand, we want to write to a remote application and our database in a consistent manner, then we need to use a command to call the remote application.

D) What do we do if we want to call multiple remote applications? If they all offer idempotent APIs, there doesn’t appear to be a problem in calling them all from a single command. Otherwise it might be necessary to use one command per remote application call. If they need to be called in a certain order, it will be necessary that one command implementation creates the command that should be called next in the chain. A chain of commands reminds me of choreography. It might be easier or more maintainable to implement a business process as an orchestration. See here for more details.

E) Thread Local Storage (TLS) can cause headaches because commands are not executed on
the same thread that creates the command. As such, mechanisms like the injection of @RequestScoped CDI beans also no longer work as you might expect. The normal Java EE rules which apply to @Asynchronous EJB calls also apply here, precisely because the framework code uses that mechanism in its implementation. If you need TLS or scoped beans then you should considering adding the data from such places into the input which is saved with the command in the database, and as soon as the command is executed, restore the state before calling any local service/bean which relies on it.

F) What do we do if the response from a remote application is required? Most of the time we call remote systems and need response data from them in order to continue processing. Sometimes it is possible to separate reads and writes, for example with CQRS. One solution is to break up the process into smaller steps, so that each time a remote system needs to be called it is handled by a new command, and that command not only makes the remote call, but also updates the local data when the response arrives. We have however noticed that if an optimistic locking strategy is in place it can result in errors when the user wants to persist changes that they have made to their data, which is now “stale” compared to the version in the database, even though they might only want to change certain attributes which the command did not change. One solution to this problem is to propagate events from the backend over a web socket to the client so that it can do a partial update to the attributes affected by the command, so that the user is still able to save their data later on. A different solution is to question why you need the response data. In the example above, I put the task ID into the case. That could be one way to track tasks relating to the case. A better way is to pass the case ID to the task application, and get it to store the case ID in the task. If you need a list of tasks related to the case, you query them using *your* ID, rather than tracking their ID. By doing this you eliminate the dependency on the response data (other than to check that the task is created without an error), and as such there is no need to update your data based upon the response from the remote application.

Hopefully I have been able to demonstrate that an asynchronous architecture using commands as described above offers a suitable alternative to the patterns for guaranteeing global data consistency, which I wrote about a few years ago.

Please note that after implementing the framework and applying it to several of our applications we learned that we are not the only ones to have such ideas. Although I have not read up about Eventuate Tram and its transactional commands, it appears to be very similar. It would be interesting to compare the implementations.

Finally, as well as commands, we added “events” on top of the commands. Events in this case are messages sent via JMS, Kafka, choose your favourite messaging system, in a consistent and guaranteed manner. Both sides, namely publication and consumption of the event is implemented as a command, which provides very good at-least-once delivery guarantees. Events inform 1..n applications in the landscape that something has happened, whereas commands tell a single remote application to do something. These, together with websocket technology and the ability to inform clients of asynchronous changes in the backend, complete the architecture required to guarantee global data consistency. Whether or not such an asynchronous architecture is better than say piggy backing a transaction manager in order to guarantee global data consistency, is something that I am still learning about. Both have their challenges, advantages and disadvantages. Probably, the best solution relies on a mix, as is normally the case with complex software system 🙂

Copyright ©2018, Ant Kutschera