Solving High Data Contention Problems with a Typical Web Stack

Intro

Most problems we deal with involve little contention. A shopping cart is a great example, we might have very high throughput but the shopping cart updates can be easily parallelised. This is why we see so many systems designed around eventually consistent and that favour parallelism like most of AWSs cloud-native services.

There are a lot of times where we have a field that will be updated simultaneously from multiple processes. In these cases we usually introduce a very simple lock that is contended for rarely.

Sometimes though we hit a very different type of problem and in this article we will look at a data contention problem where all requests are interdependent and act on the same integer value. In this case a naive lock will hurt performance too much as it would be hit frequently. Our goal is to achieve a humble 10k Transactions per Second (TPS) and 50ms latency with a typical web stack. This is not a high performance problem since HPC is at least 100x higher scale than this and requires a complete rethink of your whole stack. But some of us may come across the type of problem described here at medium-scale.

To solve this problem we will have to achieve linearizability so each command is processed one at a time. We’ll also need to find ways other than using a naive lock on our database to achieve our performance goal.

Two approaches will be presented: command sourcing and a ledger. The command sourcing approaches keeps a log of commands in the database and all state in-memory. The ledger keeps all state in a ledger database table and processes commands and writes ledger entries while holding a lock. Both approaches achieve the performance and consistency requirements. The recommended approach is the ledger since it is simpler and stays closer to typical web application architectures.

Definitions

  • Data contention - when there are multiple writers competing to access the same bit of data at the same time.
  • Strict total ordering - for all elements (x, y) it is known that x precedes y or that y precedes x
  • Strictly monotonic - only every increasing
  • Linearizable - stricter than serializable, it is where each operation is run in isolation i.e. no other operation can be running at the same time
  • Command - an input into a system and a request to take a particular action
  • Event - an output from a system and a statement of fact
  • Throughput - the volume of data or requests that can be processed in a certain amount of time
  • Latency - the time it takes to process a request
  • Commutative - where apply ordering doesn’t matter, e.g. multiplication is commutative since a x b = b x a
  • Log - a strict totally ordered set of records. Different to a queue in that a queue discards old records after processing.
  • Typical web stack - a load balancer, 2 HTTP app servers and a HA relational or document database.
  • Advisory lock - a database lock defined and used by app servers
  • Serial - one after the other with no overlap

Problem Definition

Imagine a single database field that holds a number which we’ll call the counter. There are multiple users sending requests to a server in the order of 10k requests per second, and the counter is updated based on the commands sent by users. All commands are interdependent as we’ll see shortly. The commands position in the queue, the command itself and the current value of the counter may be used as inputs to determine the next value of the counter. Users expect prompt responses with the counters’ new value within 50ms.

Here are the commands for this hypothetical system:

  1. Increment amount - increments the counter by amount, returns the value of the counter subsequent to applying the command
  2. Drain - sets the counter to zero and returns the exact amount immediately before the Drain operation

Consider for a second if only the first command was required. We could then wait for 1k commands to come in, aggregate these commands into one mega-command and apply only one command to the counter, returning the new counter value to all users. This works because Increment is commutative.

The Drain command is not commutative with Increment. The problem is the requirement that we return the counter value exactly before the operation was applied. The only way to solve this is to require strict ordering, where the drain command has a position amongst all commands and all commands get run against the counter in that order.

Position Command Counter Value after Processing Return Value
1 Increment 2 2 2
2 Increment 7 9 9
3 Drain 0 9
4 Increment 3 3 3

The final counter value and return values are entirely dependent on which order the commands are processed. If we rearrange any of the commands we get a different counter value at the end and different return values.

A naive approach would be to lock the counter value in the database before processing and releasing it after processing each command. This would achieve the correct result but with dismal performance. This is because the time to take the lock and read/write the value is a significant time in itself.

Approach 1: Ledger Approach

Remember that we have two app servers for availability both receiving requests from users to update the counter.

Here are the steps executed by a single app server:

  1. An app server receives a number of commands from users
  2. The app server takes an advisory lock out
  3. The app server reads the latest counter value from the counter_ledger table into memory
  4. All commands are processed
  5. Results of the commands are ledger entries, and are inserted as a batch into the counter_ledger table
  6. The advisory lock is released
  7. Users are then notified of results

Notice that for steps 3, 4, 5 a lock is held. Everything within this lock should be as fast as possible as the other app server is contending for that lock and user requests are waiting idle. Step 4 takes negligible time, processing is the fastest part. Reading from disk and transferring over a network are what incur the most time here. Batching command processing and ledger entry inserts means we are processing thousands of commands for only 2 round trips to the database.

In terms of correctness you can see that the current counter value can not change while the lock is held. And since we process the batch of commands one after the other the counter value will remain 100% consistent in-memory during execution.

Approach 2: Command Sourcing Approach

  1. An app server receives a number of commands from users
  2. It stores all commands in the command_log table with a strict total ordering.
  3. The app server reads commands after it’s bookmark into memory
  4. The app server processes each command that comes before the latest commands. After each command the counter value is updated in-memory and the bookmark is updated
  5. Once our current commands are reached the counter value state is up-to-date and the commands are processed. The appropriate counter values are returned to the users. The bookmark is kept up to date.

Notice with the use of bookmarks that we will never have to read a command twice. All commands are written once, and read once per app server.

Each step would operate on a batch of 100s or 1000s of commands at a time. We would lock the table once per batch of commands in order to achieve strictly monotonic auto-increment IDs while not spending too much time holding locks.

The Results

Command Log Approach

➜ ./start
ntxs: 300000
max(ms): 258
avg(ms): 35
p50(ms): 28
p90(ms): 48
p95(ms): 89
p99(ms): 224
tps: 17160

Ledger Approach

➜ ./start
ntxs: 300000
max(ms): 297
avg(ms): 32
p50(ms): 27
p90(ms): 48
p95(ms): 62
p99(ms): 167
tps: 17435

Both approaches were implemented using the Javalin framework on Java and Postgres 11.6 running in docker locally.

Both approaches produced 100% correct results and didn’t drop any user requests. Both approaches achieve our goal of 10k TPS at 50ms p90 latency. Rerunning this test shows that both approaches are within a small margin of error of each other so no material difference.

This surprises me as I expected the ledger to perform better since the number of reads from the database is greatly reduced in the ledger approach. Although the lock is held for the duration of processing in the ledger approach whereas processing is done without a lock in the command sourcing approach. The command sourcing approach also would require some additional complexity to implement Snapshotting.

In real world problems you may have multiple counters operating, in which case the reads in the ledger may need to do more database lookups to find the latest entry for each counter.

Conclusion

The ledger approach is far easier to implement, should perform a little bit better and also stays close to how web apps typically use databases. For this reason I recommend it for most cases.

There are some advantages to command sourcing though. First of all command sourcing keeps a complete history of all inputs into the system. This can be advantageous for debugging and testing. It is also the approach taken with very high performance systems like trading since it can achieve very high throughput and low latency if implemented carefully and also fits naturally to a RAFT type consistency model that allows very high availability guarantees.

It also allows us to keep all state in-memory without having to map it to database tables. With more complex state this may work better and make it easier to create rich domain models. For example if you were building a graph database you wouldn’t want to be writing the state of the graph to a database all the time, you would only want to write the command that changed the graph. Changes to the states structure become easier also as this does not need to be coordinated across app servers. In this case it is also possible to separate read and write responsibilities by recreating different state structures for different purposes.

Notice also that that the difference between these models can be thought of similarly to the difference between Operation-based and State-based CRDTs.

Everything presented in the article are tools to keep in your toolbox, only you will know if they are really appropriate for the problem you’re trying to solve.

Further Reading

Relevant Code Snippets

Optimise for Batch Inserts

Always enable reWriteBatchedInserts to get the benefits of them. allowMultiQueries gives you some optimisation by concatenating statements.

HikariConfig config = new HikariConfig();
config.setDriverClassName("org.postgresql.Driver");
config.setJdbcUrl("jdbc:postgresql://"+host+":5432/postgres?&reWriteBatchedInserts=true&allowMultiQueries=true");
config.setUsername(username);
config.setPassword(password);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "1024");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "4096");

Batch Inserting Commands into a Command Log

public static void batchInsertSql(List<Command> commands) throws SQLException {
  if(writeConn == null) {
    writeConn = ds.getConnection();
    writeConn.setAutoCommit(false);
  }

  long nItems = 0;
  // The advisory lock ensures auto-increment IDs are monotonic. See the next blog post for details.
  var lockStmt = writeConn.prepareStatement("select pg_advisory_xact_lock(898874326372568)"); 
  var stmt = writeConn.prepareStatement("insert into command_log (command_type, transaction_id, counter_id, increment_amount) values (?,?,?,?)");

  for (var command : commands) {
    stmt.setString(1, command.commandType);
    stmt.setString(2, command.transactionId);
    stmt.setString(3, command.counterId);
    stmt.setLong(4, command.incrementAmount);
    stmt.addBatch();
    stmt.clearParameters();
    nItems += 1;
  }

  lockStmt.execute();
  stmt.executeBatch();
  lockStmt.close();
  stmt.close();
  writeConn.commit();
  out.println("Appended " + nItems + " commands to the commandlog");
}

Ledger Approach

static ConcurrentLinkedQueue<Command> queue = new ConcurrentLinkedQueue<Command>();
static ConcurrentHashMap<String, CompletableFuture<Long>> futures = new ConcurrentHashMap<String, CompletableFuture<Long>>();

// handleCommand receives commands from user requests and puts them on the queue
// a CompletableFuture is used to notify users of the result from a separate thread
public static CompletableFuture<Long> handleCommand(Command command) {
  CompletableFuture<Long> future = new CompletableFuture<Long>();
  futures.put(command.transactionId, future);
  queue.add(command);
  return future;
}

public static void startLedgerBackgroundThread() {
  new Thread(() -> {
    while(true) {
      var commands = new ArrayList<Command>(10000);
      var start = System.currentTimeMillis();
      while (System.currentTimeMillis() - start < 5 || commands.size() == 0) {
        if (queue.isEmpty()) {
          Thread.onSpinWait();
        } else {
          // Step 1: pull requests from users off the queue
          commands.add(queue.remove());
        }
      }

      // Step 2: Take out an advisory lock
      DbRepo.ledgerLock();

      // Step 3: pul latest ledger entry into memory
      var latestLedgerEntry = DbRepo.readLatestLedgerEntry(commands.get(0).counterId);
      var ledgerEntries = new ArrayList<LedgerEntry>();

      // Step 4: Process Commands
      for(var command: commands) {
        if(command.commandType.equals("INCREMENT")) {
          latestLedgerEntry = new LedgerEntry(
              command.counterId, // which counter
              command.transactionId, // transactionId for idempotency tests
              latestLedgerEntry.version+1, // version
              latestLedgerEntry.amount + command.amount, // counterValue
              latestLedgerEntry.amount + command.amount, // returnValue
          );
          ledgerEntries.add(latestLedgerEntry);
        } else if(command.commandType.equals("DRAIN")) {
          latestLedgerEntry = new LedgerEntry(
              command.counterId, // which counter
              command.transactionId, // transactionId for idempotency tests
              latestLedgerEntry.version+1, //version
              0L, // counterValue
              latestLedgerEntry.amount, // returnValue
          );
          ledgerEntries.add(latestLedgerEntry);
        }
      }

      // Step 5: save the ledgerEntry results into the ledger table
      DbRepo.saveLedgerEntries(ledgerEntries);

      // Step 6: Release the lock
      DbRepo.ledgerCancelLock();

      // Step 7: notify users of results
      for(var entry : ledgerEntries) {
        futures.get(entry.transactionId).complete(entry.returnValue);
      }
    }
  }).start();
}

Command Sourcing Approach

static ConcurrentLinkedQueue<Command> queue = new ConcurrentLinkedQueue<>();
static ConcurrentHashMap<String, CompletableFuture<Long>> futures = new ConcurrentHashMap<>();
static long lastCommandSequenceNumber = -1L;

// handleCommand receives commands from user requests and puts them on the queue
// a CompletableFuture is used to notify users of the result from a separate thread
public static CompletableFuture<Long> handleCommand(Command command) {
  CompletableFuture<Long> future = new CompletableFuture<Long>();
  futures.put(command.transactionId, future);
  queue.add(command);
  return future;
}

public static void startCommandLogPersistenceBackgroundThread() {
  new Thread(() -> {
    while (true) {
      var commands = new ArrayList<Command>(10000);
      var start = System.currentTimeMillis();
      while (System.currentTimeMillis() - start < 10 || commands.size() == 0) {
        if (queue.isEmpty()) {
          Thread.onSpinWait();
        } else {
          // Step 1: pull commands from users off the queue
          commands.add(queue.remove());
        }
      }

      // Step 2: Store all commands in the command log
      DbRepo.batchInsert(commands);

      handleCommandsFromLog(); // read from command log, process and resolve futures for users
    } 
  }).start();
}

public static synchronized int handleCommandsFromLog() throws SQLException, IOException {
  // Step 3: Read all commands after the bookmark 'lastCommandSequenceNumber'
  var commands = DbRepo.readCommandsAfter(lastCommandSequenceNumber);
  
  // Step 4: Process each command one by one
  for (var command : commands) {
    if(command.commandType.equals("INCREMENT")) {
      counterState = new CounterState(
          command.counterId, // which counter
          command.transactionId, // transactionId for idempotency tests
          counterState.version+1, // version
          counterState.amount + command.amount, // counterValue
          counterState.amount + command.amount, // returnValue
      );
    } else if(command.commandType.equals("DRAIN")) {
      counterState = new CounterState(
          command.counterId, // which counter
          command.transactionId, // transactionId for idempotency tests
          counterState.version+1, //version
          0L, // counterValue
          counterState.amount, // returnValue
      );
    }

    // Step 5: update the bookmark and return new counter return value to users
    lastCommandSequenceNumber = command.sequenceNumber;
    if(futures.containsKey(command.transactionId)) {
      futures.get(command.transactionId).complete(counterState.getCounterReturnValue(command.counterId)); // gets the return value of the command for users
      futures.remove(command.transactionId);
    }
  }
  return commands.size();
}
updated_at 18-05-2021