Rethinking Streaming Workloads with Akka Streams: Part I

The Akka Streams API is one of my favourite tools for building reactive, distributed applications. If you are not familiar with it, I published an article on the motivations for using the Akka Streams API, as well as an article on how its powerful semantics address common patterns when streaming telemetry. I also published a series on how to integrate Akka Steams with Akka Actors in order to build sophisticated applications that live at the intersection of streaming data and managing state. These types of applications are especially common when monitoring or controlling physical assets, like in industrial automation or the Internet of Things (IoT).

It is natural to adopt the Akka Streams API for inherently streaming workloads: for applications that extract, transform, and load (ETL) data; for ingesting telemetry from industrial equipment or IoT devices; or for complex event processing (CEP) systems. However, the Akka Streams API is much more broadly applicable and it can be used to address many problems that you may not initially think of as streaming problems. In this article, I will explore some common patterns that can be reimagined as streaming workloads in order to leverage the powerful semantics of the Akka Streams API, as well as asynchronous backpressure for bounded resource constraints.

Materializing Streams is Expensive!

The first thing to address is the notion that materializing a stream is expensive. People often develop the instinct that streams are a heavy-weight approach, since they use actors for the actual execution. They believe streams should be avoided, especially for short-lived operations, in favour of just using a Scala Future. At the end of the day, actors are just classes and most of us are not afraid to instantiate a few classes in order to perform a task. The expense of materializing a stream is relative. If the workload involves a network call, a database insert, or an expensive calculation, the materialization and execution overhead of the stream will be insignificant by comparison. It is worth considering an example.

When comparing relative performance, one must always consider the workload—an IO-bound workload, a CPU-bound workload, a memory-bound workload, and a bandwidth-bound workload will all have very different characteristics. Consider the processJob method which executes a job and returns a Future. It has the following signature:

def processJob(job: Int): Future[Int]

The following program executes 1000 jobs by concurrently executing 1000 futures (using map), then it accumulates the results with Future.foldLeft:

val jobs = 1 to 1000
val work = jobs.map(processJob)
val result = Future.foldLeft(work)(0) {
  case (acc, r) => acc + r
}

To compare just the relative overhead of executing each job using an Akka Stream, the equivalent workload is as follows:

val jobs = 1 to 1000
val work = jobs.map { job =>
  Source.fromFuture(processJob(job)).runWith(Sink.head)
}
val result = Future.foldLeft(work)(0) {
  case (acc, r) => acc + r
}

If the jobs must be executed in sequence, rather than running all of them concurrently, this can be achieved using foldLeft on the collection of jobs:

val jobs = 1 to 1000
val work = jobs.foldLeft(Future.successful(0)) {
  case (result, job) =>
    result.flatMap { acc =>
      processJob(job)
        .flatMap(r => Future.successful(acc + r))
    }
}

Again, to make a relative comparison, the equivalent workload using an Akka Stream to execute each job is as follows:

val jobs = 1 to 1000
val work = jobs.foldLeft(Future.successful(0)) {
  case (result, job) =>
    result.flatMap { acc =>
      Source
        .fromFuture(processJob(job))
        .runWith(Sink.head)
        .flatMap(r => Future.successful(acc + r))
    }
}

To understand the relative performance, consider the extreme case of the processJob function immediately returning a completed Future:[1]

def processJob(job: Int): Future[Int] = {
  Future.successful(job)
}

In both the case of the concurrent execution with Future.foldLeft, as well as the synchronous execution with the collection foldLeft, using Akka Streams adds on the order of 100 milliseconds to this workload on the computer that I am using. This is only 100 microseconds per future. This is significant in the case of a future that is completed immediately, but if the future takes more than a few milliseconds to complete, as it would when making a network call or a database insert, this difference becomes negligible. In the synchronous case, if the future takes longer than 1 millisecond, there is no difference in execution time between the Akka Streams example and the example that just uses a Scala Future. In the concurrent case, the difference starts to become negligible if the future takes longer than 10 milliseconds for a CPU-bound workload and 100 milliseconds for an IO-bound workload, at least on my four-core computer.

If the performance impact is negligible, levering the convenience of the Akka Streams API to perform higher-order functions can become advantageous. Throughout the rest of this three-part series, I will explore examples that use the Akka Streams API for workloads that are not traditionally expressed as streaming workloads.

Managing Concurrency

As the examples that I just provided demonstrate, working with a collection of futures is somewhat all or nothing. Either all of the futures get executed in concurrently—using Future.foldLeft, Future.sequence, Future.traverse, or similar methods—or each future is executed sequentially. What if we want to execute a number of futures concurrently, but also limit parallelism, to put an upper bound on resources, or avoid overwhelming other systems by performing too many concurrent request or computations? This can be somewhat difficult to achieve with just a Scala Future.

The Akka Stream API provides a very easy way to limit concurrency using mapAsync. Revisiting the example above, it is possible to limit the execution to no more than four jobs at a time:

val jobs = 1 to 1000
val result = Source(jobs)
  .mapAsync(4)(processJob)
  .runFold(0)(_ + _)

This approach also has the advantage of maintaining order. The results from mapAsync will be emitted downstream in the upstream order, regardless of the order in which the individual futures complete. Maintaining temporal order is important for many workloads, like workloads that rely on event-sourcing techniques. If order is not important, mapAsyncUnordered can be used and can provide a performance benefit in some cases.[2]

Who Knows How Long to Sleep?

Similar to the example that I just presented, quickly iterating over a large number of items by mapping over a list, or using a for comprehension, can be resource intensive. In some cases, it can be important to include some means to throttle the workload. Consider the following example that executes as fast as possible:

for (i <- 1 to 1000) yield processJob(i)

Composing this problem as a for comprehension does not offer an effective means of throttling this workload. Perhaps this workload must honour request quotas imposed by other services or face failures, artificial delays, or premium charges when these quotas are exceeded. Adding a Thread.sleep would be crude and blocking the thread is an inefficient use of resources:

for (i <- 1 to count) yield {
  Thread.sleep(10) // Who knows how long to sleep ¯\_(ツ)_/¯?
  processJob(i)
}

Reframing this problem as a streaming workload and throttling the requests with the Akka Streams API throttle element is far more efficient, effective, and elegant. The following example throttles the workload to no more than 100 requests per second:[3]

Source(1 to 1000)
  .throttle(
    elements = 100,
    per = 1 second,
    maximumBurst = 100,
    mode = ThrottleMode.shaping
  )
  .mapAsync(4)(processJob)
  .runWith(Sink.seq)

Retrying Requests

When composing a program with futures, it is important to consider what happens when a future fails. Does the failed future need to be retried? If so, does it need to be retried with exponential backoff, so that dependent systems have time to recover? After retrying the future a certain number of times, should the program give up? If the program is processing a large sequence of futures and one fails, how can it continue from where it left off, rather than starting from the beginning?

You will find some creative solutions on the Internet for retying futures, most often using recursion. I find the Akka Streams API really valuable for handling failures, retry, and circuit breaking in a compositional way. To demonstrate, I will modify an example from an earlier essay of mine entitled Backoff and Retry Error-Handling for Akka Streams. The following example makes an HTTP request for a specific identifier and returns a value. It executes the request using a Scala Future:

val id = "bc6b456b-277a-4555-9248-c180762d981a"
val response: Future[Response] =
  Http()
    .singleRequest(HttpRequest(uri = s"http://localhost:8080/$id"))
    .flatMap {
      case HttpResponse(StatusCodes.OK, _, entity, _) =>
        Unmarshal(entity).to[Response]
      case HttpResponse(StatusCodes.InternalServerError, _, _, _) =>
        throw DatabaseBusyException
      case HttpResponse(statusCode, _, _, _) =>
        throw DatabaseUnexpectedException(statusCode)
    }

response.onComplete {
  case Success(r)  => println(s"response : ${r.value}")
  case Failure(ex) => println(s"failure : $ex")
}

I often compose requests like this using the Akka Streams API:

val id = "bc6b456b-277a-4555-9248-c180762d981a"
val responseFuture: Future[HttpResponse] =
  Http().singleRequest(HttpRequest(uri = s"http://localhost:8080/$id"))

Source
  .fromFuture(responseFuture)
  .mapAsync(parallelism = 1) {
    case HttpResponse(StatusCodes.OK, _, entity, _) =>
      Unmarshal(entity).to[Response]
    case HttpResponse(StatusCodes.InternalServerError, _, _, _) =>
      throw DatabaseBusyException
    case HttpResponse(statusCode, _, _, _) =>
      throw DatabaseUnexpectedException(statusCode)
  }

As I already discussed, materializing a stream to execute a network request is usually negligible in terms of the overall performance. Furthermore, once the request is expressed as an Akka Streams Source, a simple way to retry this request is with RestartSource.withBackoff. This also makes it easy to set a maximum number of retry attempts using the maxRestarts parameter:

val id = "bc6b456b-277a-4555-9248-c180762d981a"
val response: Future[Response] =
  RestartSource
    .withBackoff(
      minBackoff = 10 milliseconds,
      maxBackoff = 30 seconds,
      randomFactor = 0.2,
      maxRestarts = 10
    ) { () =>
      val responseFuture: Future[HttpResponse] =
        Http().singleRequest(HttpRequest(uri = s"http://localhost:8080/$id"))

      Source
        .fromFuture(responseFuture)
        .mapAsync(parallelism = 1) {
          case HttpResponse(StatusCodes.OK, _, entity, _) =>
            Unmarshal(entity).to[Response]
          case HttpResponse(StatusCodes.InternalServerError, _, _, _) =>
            throw DatabaseBusyException
          case HttpResponse(statusCode, _, _, _) =>
            throw DatabaseUnexpectedException(statusCode)
        }
    }
    .runWith(Sink.head)

response.onComplete {
  case Success(r)  => println(s"response : ${r.value}")
  case Failure(ex) => println(s"failure : $ex")
}

In addition, if this service is being queried by a number of code paths in the application, it is easy to insert a circuit breaker to fail-fast when the service has returned an error:

val breaker = CircuitBreaker(
  system.scheduler,
  maxFailures = 3,
  callTimeout = 30.seconds,
  resetTimeout = 1.minute
)

val id = "bc6b456b-277a-4555-9248-c180762d981a"
val response: Future[Response] =
  RestartSource
    .withBackoff(
      minBackoff = 10 milliseconds,
      maxBackoff = 30 seconds,
      randomFactor = 0.2,
      maxRestarts = 10
    ) { () =>
      val responseFuture: Future[HttpResponse] =
        Http().singleRequest(HttpRequest(uri = s"http://localhost:8080/$id"))

      val responseFutureWithBreaker: Future[HttpResponse] =
        breaker.withCircuitBreaker(responseFuture)

      Source
        .fromFuture(responseFutureWithBreaker)
        .mapAsync(parallelism = 1) {
          case HttpResponse(StatusCodes.OK, _, entity, _) =>
            Unmarshal(entity).to[Response]
          case HttpResponse(StatusCodes.InternalServerError, _, _, _) =>
            throw DatabaseBusyException
          case HttpResponse(statusCode, _, _, _) =>
            throw DatabaseUnexpectedException(statusCode)
        }
    }
    .runWith(Sink.head)

response.onComplete {
  case Success(r)  => println(s"response : ${r.value}")
  case Failure(ex) => println(s"failure : $ex")
}

Another benefit of using RestartSource.withBackoff is that if an application is processing a sequence of futures and there is a transient failure part-way through, it can retry the individual future and keep going, rather than having to restart processing from the beginning of the sequence. I provided a detailed example of this case in my essay Backoff and Retry Error-Handling for Akka Streams.

Summary

In this first installment of this series, I demonstrated how materializing an Akka Stream is inexpensive when compared to service requests, database inserts, or CPU-bound workloads. This makes the Akka Streams API a powerful option for executing a diversity of workloads, including workloads that are not continuous or long-lived streams, as well as workloads that are not typically considered streaming workloads. I demonstrated how the Akka Streams API is capable of limiting concurrency, throttling, and handling errors and retries. In the next article in this series, I will demonstrate how managing a large request-response workload can be reimagined using the Akka Stream API.


  1. If you are unclear what the difference is between Future.apply and Future.successful, see my article Scala's Future.successful: Do Not Block Your Future Success. ↩︎

  2. I explored the performance difference between mapAsync and mapAsyncUnordered in my article Maximizing Throughput for Akka Streams. ↩︎

  3. I presented a similar example for starting a large number of WebSocket clients in my article Integrating Akka Streams and Akka Actors: Part II. ↩︎