Tuesday, February 26, 2013

Synchronizing Snapshots and Incrementals With Single Threading

Code available on: https://github.com/jaycfields/snapshot-incremental-synchronize

Many of the applications that I write these days have a lot of data - so much that there's no reasonable way to continually send all of it. Instead, most of the applications I work with will have the ability to receive a snapshot of the current state, and the ability to receive deltas (incrementals) that must be applied to the previous snapshot. To further complicate things, incomplete data is unacceptable and ordering matters. This type of environment breeds many solutions for synchronizing snapshots and incrementals. This entry is about using single threading (via jetlang) for synchronization and guaranteed accuracy.

Let's take a very simple example, you have two processes a client and server. The server has a list and the client needs to display that list - completely and in order. The list on the client also needs to be updated whenever the list on the server is updated.

There are several issues that you could encounter in a multithreaded environment.
  • If you request a snapshot and then start listening to incrementals, you may miss data that isn't in the snapshot, but was broadcast before you started listening to incrementals
  • If you start listening to incrementals and request a snapshot at the same time, you may apply an incremental to the snapshot, even though the snapshot already reflects the incremental.
  • If you start listening to the incrementals first, you'll need some way to throw away the incrementals that are already reflected in the snapshot.
It's time to get into some code.

Here's some simple server code.


The above code contains a server-list, which is a list that represents the ordered random numbers being generated on the server side. Our task is to mirror this list in our client. The appending scheduled task and appending fiber are stored to allow for easy starting and stopping of appending. The server-start and server-stop functions are provided for convenience, should you choose to run this example locally.

The subscriber atom and the subscribe function are a simple way for a client to subscribe to snapshots and incrementals. The publish-to-client function derefs a fn and immediately calls it with a snapshot or incremental. In a prod application, publish and subscribe logic would probably involve a socket or messaging system - our solution is purposefully naive, to focus on the point of the post: synchronization.

The get-snapshot function publishes the current state of the the server-list to a client. The append-to-list function is removing elements so it's easy to see the server-list changing - without the data growing to an unmanageable size, in prod this would (likely) not exist; however, the rest of the code in append-to-list is fairly representative of a common practice - generate a delta, apply it to the local list and publish it out to clients.

Looking at this code, it's easy to see that one fiber is appending to the list and publishing to the client, while another fiber would return the value of get-snapshot. This code can work, but the way it's currently written data accuracy cannot be guaranteed.

Let's look at some client code.


The client-start function subscribes to server updates, and then requests a snapshot. The handle update function resets a client-list on snapshot and conjs an incremental to the existing list. (note: the client list is kept at 10 elements for simplicity, just like the server - I would not expect this type of code to be in prod).

Below is a full snapshot of the current code.


The client and server code is the same as above, but this example also contains some function calls in a comment. At this point you can paste this code into your favorite editor, start the client and the server and inspect both lists. The update frequency is so large that you can even compare the two lists, and it's highly likely that they are equal.

For a lot of problems this code may be sufficient; however, as we noted above, there is definitely an opportunity for you to see invalid state. With this specific code the append fiber could update the atom with an incremental X, on the main fiber get-snapshot could deref a snapshot with X included (and publish it) and then the append fiber could also publish the incremental X. Luckily there's a simple solution, publish the snapshot, update the server-list, and publish the incrementals all on the same fiber.

The code below shows how easy it is to create a jetlang fiber and execute an anonymous function.


As you can see, very little changed with the code. We've defined another fiber, synchro-fiber, which we will use to single thread our updates to server-list and our publishes to the client. The synchro-fiber will execute the runnables (in our example, anonymous functions) that are put on it's queue, in order. The body of get-snapshot and append-to-list were slightly modified to call the execute function with their previous body as an anonymous function. Other technical differences are also true - the code isn't immediately run, it's no longer blocking, and the return value has been altered. While all of these observations are true, they are irrelevant with respect to what we were trying to accomplish.

Using jetlang fibers we've accomplished our goal - we can guarantee that snapshots and incrementals will be easy to synchronize (without sequence ids), accurate, and in order. Of course, you'll need to consume both of these messages on a single fiber as well, but that should be equally easy to accomplish.