Akka Streams: A Motivating Example

I expand on these concepts in my ScalaDays presentation in Berlin and New York.

I have been using the Akka Streams API for about a year now and I have been extremely impressed with it. It is a framework for efficiently describing and executing streaming processes, while at the same time, maintaining bounded resource-usage. In this article, I will provide a motivating example for adopting the Akka Streams API. I would like to demonstrate how natural and powerful it is to develop applications in this framework, especially when contrasted with using the Akka actor-model alone, or a typical multithreaded-model in a systems programming language like C++, Java, or C#. The example I will use here is somewhat contrived, but it does parallel my experiences.

For the purposes of demonstration, consider a prototype application that serves WebSocket connections. Clients use the WebSocket connection to stream measurement data to the server. The server parses the incoming messages and writes the measurements to a database. The rate at which messages are sent to the server is extremely variable. There are bursts where the message rate is extremely high, but there are also periods where very few messages are sent to the server. The server is not in control of the rate at which a client sends messages to the server.

I have been impressed with the Akka actor-model framework for building scalable, concurrent, and fault-tolerant, message-based applications, so I adopt it for this prototype. My initial implementation receives a message over the WebSocket and sends the message to a Database Actor to parse the message and write it to the database, asynchronously.

class DatabaseActor extends Actor {
  val database = new Database()

  def receive = {
    case InsertMessage(message) =>
      database.insertAsync(message)
  }
}

val database = system.actorOf(Props[DatabaseActor], "database")

val measurementsWebSocketService =
  Flow[Message]
    .collect {
      case TextMessage.Strict(text) =>
        val message = InsertMessage.parse(text)
        database ! message
        InsertMessage.ack(message)
    }

val route = path("measurements") {
  get {
    handleWebSocketMessages(measurementsWebSocketService)
  }
}

val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

This implementation is naive in that the server may not receive the entire message at once—it may be streamed—and I have yet to consider any error handling. However, for a prototype, it works.

After experimenting with this implementation, I discover that the performance is poor, given the large number of calls the server is making to the database, processing a single message at a time. To improve the performance, I will buffer 1000 messages and write them to the database in aggregate.

class DatabaseActor extends Actor {
  val database = new Database()

  var messages: Seq[String] = Nil
  var count: Int = 0

  def receive = {
    case InsertMessage(message) =>
      messages = message +: messages
      count += 1
      if (count == 1000) {
        insert()
      }
  }

  private def insert() = {
    database.bulkInsertAsync(messages)
    messages = Nil
    count = 0
  }
}

This change was easy to make and is nicely encapsulated within the Database Actor. A drawback of this approach, however, is that since new messages arrive at a variable rate, there are some periods of time where messages are buffered and the samples in the database are not current for minutes or more. This can be addressed by scheduling a database insert at least once-a-second, even if 1000 messages have not been received.

object DatabaseActor {
  case object Insert
}

class DatabaseActor extends Actor {
  val database = new Database()

  var messages: Seq[String] = Nil
  var count: Int = 0

  override def preStart() = {
    context.system.scheduler.scheduleOnce(1 second) {
      self ! Insert
    }
  }

  def receive = {
    case InsertMessage(message) =>
      messages = message +: messages
      count += 1
      if (count == 1000) {
        insert()
      }
    case Insert =>
      insert()
      context.system.scheduler.scheduleOnce(1 second) {
        self ! Insert
      }
  }

  private def insert() = {
    if (count > 0) {
      database.bulkInsertAsync(messages)
      messages = Nil
      count = 0
    }
  }
}

The actor model makes implementing this change very straightforward. Someone familiar with traditional systems-programming will appreciate that the actor makes mutating variables thread-safe and the scheduling easy. This approach is somewhat inefficient in that the Database Actor will write messages to the database every second, as long as there are queued messages, even if a write to the database has just occurred. To address this, we could keep track of whether or not a write has occurred within the last second, and only write to the database if it is necessary.

class DatabaseActor extends Actor {
  val database = new Database()

  var messages: Seq[String] = Nil
  var count: Int = 0
  var flush = true

  override def preStart() = {
    context.system.scheduler.scheduleOnce(1 second) {
      self ! Insert
    }
  }

  def receive = {
    case InsertMessage(message) =>
      messages = message +: messages
      count += 1
      if (count == 1000) {
        insert()
        flush = false
      }
    case Insert =>
      if (flush) insert() else flush = true
      context.system.scheduler.scheduleOnce(1 second) {
        self ! Insert
      }
  }

  private def insert() = {
    if (count > 0) {
      database.bulkInsertAsync(messages)
      messages = Nil
      count = 0
    }
  }
}

All of these changes are still conveniently encapsulated within the Database Actor, but the implementation is getting more complex. Experimenting more with this prototype demonstrates that its performance has improved, so much so that when the client message-rate is high, the application overwhelms the database with asynchronous writes, negatively impacting the performance of the database for other applications. To guard against this, the Database Actor must limit the number of simultaneous, asynchronous writes to the database.

class DatabaseActor extends Actor {
  val database = new Database()

  var messages: Seq[String] = Nil
  var count = 0
  var flush = true
  var outstanding = 0

  override def preStart() = {
    context.system.scheduler.scheduleOnce(1 second) {
      self ! Insert
    }
  }

  def receive = {
    case InsertMessage(message) =>
      messages = message +: messages
      count += 1
      if (count >= 1000) {
        insert()
        flush = false
      }
    case Insert =>
      if (flush) insert() else flush = true
      context.system.scheduler.scheduleOnce(1 second) {
        self ! Insert
      }
    case Decrement =>
      outstanding -= 1
      if (count >= 1000) {
        insert()
        flush = false
      }
  }

  private def insert() = {
    if (count > 0 && outstanding < 10) {
      outstanding += 1
      val (insert, remaining) = messages.splitAt(1000)
      messages = remaining
      count = remaining.size
      database.bulkInsertAsync(insert) andThen {
        case _ => self ! Decrement
      }
    }
  }
}

The actor model remains powerful for handling the thread-saftey and scheduling, but the implementation of the Database Actor is now very complex. I would not be surprised if this implementation has some subtle bugs. Testing all of the edge cases will be difficult. To complicate matters further, I haven't even considered error handling. Error handling will add significant complexity, especially if the ordering of the messages is important and must be maintained. From what started out as a promising actor-model implementation, I've ended up having to address all of the same problems that I typically face in systems programming—concurrency, synchronization, scheduling, and so on. On top of all of this, now that the Database Actor throttles writes to the database, when the rate of incoming messages is high, messages will accumulate in memory, eventually exceeding the memory limits of the process:

Uncaught error from thread [default-akka.actor.default-dispatcher-15] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[default]
java.lang.OutOfMemoryError: GC overhead limit exceeded

Implementing some form of flow control with the client is a difficult task and it will require a significant amount of work and testing. In addition, the problems I'm addressing here—batching requests, limiting concurrency, scheduling events, flow control—are common problems when dealing with streaming data, especially unbounded streams, and are not specific to this application. These are the motivations behind the Akka Streams API.

Akka Streams

The Akka Streams API is a higher-level API that builds on top of the Akka actor-model and implements many of this things that I have just addressed, including batching messages, rate-limiting requests, and handling flow control. The following Akka Streams implementation receives a message, parses it, batches writes to the database at 1000 messages—but, in addition, will not buffer messages for more than one second—and will limit the number of outstanding, asynchronous writes to the database to 10.

val database = new Database()

val measurementsWebSocketService =
  Flow[Message]
    .collect {
      case TextMessage.Strict(text) =>
        Future.successful(text)
      case TextMessage.Streamed(textStream) =>
        textStream.runFold("")(_ + _)
          .flatMap(Future.successful)
    }
    .mapAsync(1)(identity)
    .map(InsertMessage.parse)
    .groupedWithin(1000, 1 second)
    .mapAsync(10)(database.bulkInsertAsync)
    .map(messages => InsertMessage.ack(messages.last))

val route = path("measurements") {
  get {
    handleWebSocketMessages(measurementsWebSocketService)
  }
}

val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

This implementation does everything that the previous implementation does, and more, but it is so much more concise, expressive, and maintainable. Unlike the first implementation, this one handles the fact that incoming messages may be streamed, rather than received as a complete message. This implementation also benefits from backpressure, a form of flow control which limits the incoming messages until there is demand downstream. This means that even when the message rate is extremely high and the writes to the database are saturated, the server will keep functioning and not crash as a result of exhausting memory.

Think about the amount of code underlying this simple implementation that I can just leverage and don't have to write. This greatly accelerates product development, while also improving robustness and reliability. Consider the testing burden. It may not change the end-to-end functional tests that are required, but it significantly reduces the number of unit tests that I need to write. In my original implementation, I would have to unit test all of the edge cases for the batching and throttling logic. When I use a groupedWithin or mapAsync stream element, they just work as expected, with the unit testing already handled by the Akka Streams API developers.

The Reactive Streams model of programming is very powerful and it naturally embraces the semantics and dynamics of message-based, streaming-data systems. When I worked in a different Reactive Streams framework in the past, it required the developer to understand the low level, Reactive Streams semantics—onSubscribe, onNext, onError, onComplete, and so on. It was powerful, but also complex and error-prone, and it didn't accelerate product development. Being a higher-level abstraction than Reactive Streams and handling many of the common challenges of streams-based programming models, Akka Streams is an great framework for building scalable and resilient streaming-data systems. From the Akka Streams documentation:

The Akka Streams API is completely decoupled from the Reactive Streams interfaces. While Akka Streams focus on the formulation of transformations on data streams the scope of Reactive Streams is just to define a common mechanism of how to move data across an asynchronous boundary without losses, buffering or resource exhaustion.

The relationship between these two is that the Akka Streams API is geared towards end-users while the Akka Streams implementation uses the Reactive Streams interfaces internally to pass data between the different processing stages. For this reason you will not find any resemblance between the Reactive Streams interfaces and the Akka Streams API. This is in line with the expectations of the Reactive Streams project, whose primary purpose is to define interfaces such that different streaming implementation can interoperate; it is not the purpose of Reactive Streams to describe an end-user API.

On a final note, notice how adopting the Akka Streams API didn't require writing new code, I was able to reuse the code I already had for parsing messages and interacting with the database. I've been through the process of converting a purely actor-based application into a streaming implementation a few times now. Each time it was a fairly straightforward process of reorganizing the code that we already had, in addition to removing a lot of code that we no longer needed to maintain.

Summary

The Akka actor-model framework is a great framework. I have used it extensively and it has served me very well. It remains complementary to the Akka Streams API, handling concerns like supervision, error handling, and distribution of workloads within a cluster, that I hope to explore in some future articles. When dealing with streaming data, however, especially unbounded streams, the actor model can suffer from many of the same challenges that typical systems-programming approaches do. As I've demonstrated here, the Akka Streams API addresses many of these challenges that are essential to building scalable, robust, and resilient streaming-data systems.

Both Akka and Akka Streams have a somewhat steep learning-curve, but nothing compared to the learning curve of attacking these distributed-systems and streaming-data fundamentals from scratch. These frameworks represent a lot of accumulated knowledge. As my understanding of the problems I work on expands, I have found that these frameworks naturally grow with me, continually addressing the challenges that I face.