Rethinking Streaming Workloads with Akka Streams: Part II

In the first installment of this series, I demonstrated how materializing an Akka Stream is relatively inexpensive and I explored how to express a number of workloads—workloads that you may not initially think of as streaming workloads—using the Akka Streams API to provide concurrency control, throttling, circuit breaking, and retry with exponential backoff. In this article, I will develop a more in-depth example to show how a request-response workload that enumerates a large collection can be reimagined with the Akka Streams API in order to provide a more convenient and powerful abstraction.

Streaming a Paginated HTTP API

If an HTTP API returns a large collection of resources, it becomes impractical to return all of the resources in a single response. Streaming the body of a large HTTP response can help in terms of both performance and resource consumption, but the client may still need an efficient way to resume the request if it encounters a failure part way through the response. To make each request efficient and to make it possible to arbitrarily enumerate the entire collection, a common approach is to paginate the API.

Consider an HTTP API that returns the unique set of device identifiers for a large collection of IoT devices. Since the collection is large, the API is paginated. A request for the first page is as follows:

curl "http://localhost:8080/devices?pagenumber=1&pagesize=10"

The request returns the following JSON response, which follows the JSON API specification. It includes a section for data (the device identifiers), metadata (the number of pages), and links to relative pages (like the next page of the paginated API).

{
  "meta": {
    "pages": 10000
  },
  "data": [
    {
      "id": "27ba0ec2-bd08-4473-bb3a-759bf76890ca"
    },
    {
      "id": "e3056593-c56d-4fee-8e05-d1d6918a4f21"
    },
    {
      "id": "6fefd14c-c5b2-498e-a669-0b70d414c9cb"
    },
    {
      "id": "6d3fcf52-b0a9-4104-b059-1323e0147100"
    },
    {
      "id": "f6a257e7-b63e-44d9-a388-ef9d6a256d76"
    },
    {
      "id": "f5df2f12-5fe9-4a86-b761-aa15ad7a20ab"
    },
    {
      "id": "cdc941be-9496-485d-98a2-def297c742e6"
    },
    {
      "id": "48e353a2-95fe-4ac2-a175-47af19f0cf04"
    },
    {
      "id": "3ecdcf88-b767-4784-ae25-19276cc1863b"
    },
    {
      "id": "16e8c382-2b5e-413e-a558-ca8f1c59e791"
    }
  ],
  "links": {
    "self": "http://localhost:8080/devices?pagenumber=1&pagesize=10",
    "first": "http://localhost:8080/devices?pagenumber=1&pagesize=10",
    "prev": null,
    "next": "http://localhost:8080/devices?pagenumber=2&pagesize=10",
    "last": "http://localhost:8080/devices?pagenumber=10000&pagesize=10"
  }
}

On the surface, paginated request-response does not seem like a streaming workload. But iterating over the entire collection of devices from the paginated API—perhaps to generate a status report or an inventory of devices with certain properties—can actually be expressed more naturally as a streaming workload. Rather than managing requests, interpreting responses, handling errors, and managing concurrency, a custom Akka Streams Source can be constructed that abstracts the paginated API and simply emits a stream of device identifiers.

A custom source for streaming the device identifiers is included below. It is a not trivial code, but you do not need to understand every detail. I have included the custom source for completeness and I will describe what it is doing, in broad terms, over the course of the next few paragraphs. Feel free to skip this example and keep reading to appreciate the advantages of this approach once applying business logic to the elements streamed from this source. If you are convinced of this streaming approach, you can return to study the details of this example later.

object DevicesSource {
  def apply(pageSize: Int = 1000)(implicit system: ActorSystem, mat: ActorMaterializer): Source[DeviceId, NotUsed] =
    Source.fromGraph(new DevicesSource(pageSize)(system, mat))
}

final class DevicesSource(pageSize: Int)(implicit system: ActorSystem, mat: ActorMaterializer)
  extends GraphStage[SourceShape[DeviceId]] {
  val out: Outlet[DeviceId]                 = Outlet(Logging.simpleName(this) + ".out")
  override val shape: SourceShape[DeviceId] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      private var currentPageUri: Option[String]                        = None
      private var lastPageUri: Option[String]                           = None
      private var nextPageUri: Option[String]                           = None
      private var deviceIds: List[DeviceId]                             = List.empty[DeviceId]
      private var responseCallback: AsyncCallback[Try[DevicesResponse]] = _

      private val requestPending = new AtomicBoolean(false)
      private val maxDeviceIds   = 5 * pageSize
      private val firstPage      = s"http://localhost:8080/devices?pagenumber=1&pagesize=$pageSize"

      private def getNextPage(
        currentPageUri: Option[String],
        nextPageUri: Option[String],
        lastPageUri: Option[String],
        deviceIdsCount: Int
      ): Unit = {
        implicit val ec = mat.executionContext

        // Do not allow more than one outstanding request at a time
        val requestIsPending = !requestPending.compareAndSet(false, true)

        // If the last page has been requested, there are no more pages
        val noMorePages = currentPageUri.isDefined && currentPageUri == lastPageUri

        // Do not exceed the maximum buffer if the downstream backpressures
        val maxBufferCount = deviceIdsCount >= maxDeviceIds

        if (requestIsPending || noMorePages || maxBufferCount) {
          return
        }

        val uri = nextPageUri.getOrElse(firstPage)

        Http()
          .singleRequest(HttpRequest(uri = uri))
          .map {
            case response if response.status != StatusCodes.OK =>
              response.discardEntityBytes()
              responseCallback.invoke(Failure(HttpResponseException(response.status)))
            case response =>
              Unmarshal(response)
                .to[DevicesResponse]
                .map(r => responseCallback.invoke(Success(r)))
                .recover {
                  case ex =>
                    responseCallback.invoke(Failure(UnmarshalResponseException(ex.getMessage)))
                }
          }
          .recover {
            case ex => responseCallback.invoke(Failure(HttpConnectionException(ex.getMessage)))
          }
      }

      override def preStart(): Unit = {
        responseCallback = getAsyncCallback[Try[DevicesResponse]](tryPushAfterResponse)
        getNextPage(currentPageUri, nextPageUri, lastPageUri, deviceIds.size)
      }

      setHandler(
        out,
        new OutHandler {
          override def onPull(): Unit = {
            if (currentPageUri.isDefined && currentPageUri == lastPageUri && deviceIds.isEmpty) {
              completeStage()
            } else if (deviceIds.isEmpty) {
              () // do nothing, waiting for first request from preStart() to complete
            } else {
              getNextPage(currentPageUri, nextPageUri, lastPageUri, deviceIds.size)
              push(out, deviceIds.head)
              deviceIds = deviceIds.tail
            }
          }
        }
      )

      private def tryPushAfterResponse(result: Try[DevicesResponse]): Unit = result match {
        case Success(response) =>
          requestPending.set(false)

          currentPageUri = Some(response.links.self)
          nextPageUri = response.links.next
          lastPageUri = Some(response.links.last)

          if (deviceIds.isEmpty) {
            deviceIds = response.data
          } else {
            deviceIds ++= response.data
          }

          if (isAvailable(out)) {
            push(out, deviceIds.head)
            deviceIds = deviceIds.tail
          }

          getNextPage(currentPageUri, nextPageUri, lastPageUri, deviceIds.size)

        case Failure(failure) => failStage(failure)
      }
    }
}

The basic idea of the custom source is as follows. When it starts, the preStart method will make a request for the first page of the paginated API by calling the getNextPage method. It will parse the HTTP response into a list of device identifiers that are cached in an in-memory buffer by the custom source. When there is downstream demand, the source will emit one device identifier at a time, in order, from the in-memory buffer to the downstream stage, removing that device identifier from the in-memory buffer. At the same time, once the source has processed one page of the HTTP response, it will asynchronously request the next page, by following the links returned by the paginated API, in an attempt to saturate the stream with device identifiers from the in-memory buffer, rather than having the downstream wait on requesting and processing the next page once the in-memory buffer is empty.

To preserve order, the source will only allow one outstanding API request at at time. To provide resource constraints, if the downstream backpressures, the source will stop requesting new pages if the in-memory buffer of device identifiers reaches the maximum (five times the page size). Similarly, if the API is temporarily busy, it will not make the situation worse by initiating a huge number of requests. The stream will naturally backpressure and gracefully adjust to the dynamics of the system, both the upstream and the downstream.

It is critical that the HTTP response is processed in a thread-safe manner. Variables are mutated only in the tryPushAfterResponse method, which is registered as the asynchronous callback method. The Akka Streams API will ensure the registered callback is invoked in a thread-safe manner.

Custom graphs are powerful, but, as you have just seen, implementing them is not trivial.[1] You need to manage mutable state and you must ensure that your implementation is thread-safe. You must also respect the state machine that ensures the graph follows the Reactive Streams protocol, like not pushing an element until the downstream has pulled. There are a number of edge cases to test to ensure you have the implementation correct, especially around error conditions and stream completion. However, the intricacies of creating a custom source is not something that every application developer needs to understand. Custom sources like this could be maintained by a libraries or APIs team—people who are experts in this domain—providing clean interfaces to the majority of application developers.

The Payoff: Focusing on the Business Logic

With the intricacies of implementing the custom source out of the way, when it comes to applying business logic on the collection, it could not be more straightforward:

DevicesSource(pageSize = 1000)
  .map(id => processJob(id))
  .runWith(Sink.ignore)

Rather than worrying about making requests, parsing responses, and handling errors, you can just focus on applying your business logic to the collection of identifiers emitted by this source. After all, isn't this all you want to do? Enumerating the set of identifiers from this paginated API is just a means to this end. The convenience of this abstraction cannot be understated.[2] The remainder of this article will explore some of the additional advantages.

Throttling

The custom source makes it possible to take full advantage of the Akka Streams API to provide high-level constructs like filtering, partitioning, group-by, or grouping within a time-frame. For example, it is now trivial to limit concurrency and throughput. Revisiting two of the examples from the first article in this series, when processing the collection of device identifiers, it is possible to limit the concurrent jobs to four using mapAsync:

DevicesSource(pageSize = 1000)
  .mapAsync(4)(processJob)
  .runWith(Sink.ignore)

It is also straightforward to limit the overall throughput to no more than 100 jobs per second using throttle:

DevicesSource(pageSize = 1000)
  .throttle(
    elements = 100,
    per = 1 second,
    maximumBurst = 100,
    mode = ThrottleMode.shaping
  )
  .mapAsync(4)(processJob)
  .runWith(Sink.ignore)

The ability to focus on the business logic, composing it with the Akka Streams API to provide flow control and manage the dynamics of the workload, is an extremely powerful abstraction.

Handling Changes in the Collection

One challenge with iterating over a large collection using a paginated API is that the elements in the collection may change. For example, after requesting the first few pages, a set of devices could be added or deleted. When it is important to process the entire collection atomically, an entity tag can be used to make a conditional HTTP request.

The HTTP ETag header can be used to return the current entity tag on the first request. When making requests for subsequent pages, the client can compare the cached entity tag with the entity tag from subsequent responses to determine if the collection has changed. The mechanics of this can again be encapsulated within the custom source:

private def tryPushAfterResponse(result: Try[(DevicesResponse, EntityTag)]): Unit = result match {
  case Success((response, responseEntityTag)) =>
    requestPending.set(false)

    // Store the entity tag after getting the first page
    if (entityTag.isEmpty) {
      entityTag = Some(responseEntityTag)
    }

    // If entity tag validation is enabled, fail stage if entity tag changes
    if (entityTagValidation && entityTag.get != responseEntityTag) {
      failStage(throw EntityTagMismatch(expected = entityTag.get, actual = responseEntityTag))
    }

    currentPageUri = Some(response.links.self)
    nextPageUri = response.links.next
    lastPageUri = Some(response.links.last)

    if (deviceIds.isEmpty) {
      deviceIds = response.data
    } else {
      deviceIds ++= response.data
    }

    if (isAvailable(out)) {
      push(out, deviceIds.head)
      deviceIds = deviceIds.tail
    }

    getNextPage(currentPageUri, nextPageUri, lastPageUri, deviceIds.size)

  case Failure(failure) => failStage(failure)
}

The application developer can continue to focus solely on the business logic:

DevicesSource(pageSize = 1000, entityTagValidation = true)
  .mapAsync(4)(processJob)
  .runWith(Sink.ignore)

Resuming in the Middle

Another challenge with iterating over a large collection using a paginated API is that you could encounter a transient error—from a service restarting or a network partition—part way through. Rather than making single requests against the paginated API in the custom source, the robustness of this approach could be improved by incorporating some of the techniques that I explored in the previous article in this series, techniques like using RestartSource.withBackoff. These mechanics could again be encapsulated in the custom source, allowing the application developer to continue to focus on the business logic, rather than the system dynamics. A complimentary approach would be allowing the stream to start at a specific page, so that processing can continue from where it left off:

DevicesSource(pageSize = 1000, firstPageNumber = 42)
  .mapAsync(4)(processJob)
  .runWith(Sink.ignore)

One way to manage this state is to update an actor as jobs complete—the actor can maintain the overall state of the stream. I will not provide an example here, but refer to my series on integrating actors and streams, or see the next article in this series, where I include examples of interfacing streams with actors that maintain state.[3]

PagedSource

As we have seen, custom graphs are powerful but implementing them is not trivial. The akka-stream-contrib project has a number of extensions to the Akka Streams API. It includes a paginated source that can be used to stream items from any paginated source. This source can be used to stream from the paginated HTTP API in the previous example, without the need to write a custom graph stage:

val firstPageNumber = 1
val pageSize        = 100
val firstPageUri    = s"http://localhost:8080/devices?pagenumber=$firstPageNumber&pagesize=$pageSize"

val pagedSource =
  PagedSource[DeviceId, String](firstKey = firstPageUri) { nextPageUri =>
    Http()
      .singleRequest(HttpRequest(uri = nextPageUri))
      .flatMap {
        case httpResponse if httpResponse.status != StatusCodes.OK =>
          throw HttpResponseException(httpResponse.status)
        case httpResponse =>
          Unmarshal(httpResponse)
            .to[DevicesResponse]
            .map { response =>
              PagedSource.Page(
                response.data,
                if (response.links.self == response.links.last) None
                else response.links.next
              )
            }
            .recover {
              case ex =>
                throw UnmarshalResponseException(ex.getMessage)
            }
      }
      .recover {
        case ex: HttpResponseException      => throw ex
        case ex: UnmarshalResponseException => throw ex
        case ex                             => throw HttpConnectionException(ex.getMessage)
      }
  }

The source can then be used just like the custom graph stage in the previous example:

pagedSource
  .mapAsync(4)(processJob)
  .runWith(Sink.ignore)

Source.unfoldAsync

Under the hood, the paged source is implemented using Source.unfoldAsync from the Akka Streams API, which is a powerful primitive for streaming large collections of data. Sometimes the complexity of the problem requires a custom source, which is why I included an example at the beginning of this article. But in many cases, a primitive like Source.unfoldAsync, which unfolds a sequence of futures, is all that is required:[4]

object DevicesSource {
  def apply(firstPageNumber: Int = 0, pageSize: Int = 100): Source[List[DeviceId], NotUsed] = {
    val firstPageUri = Some(s"http://localhost:8080/devices?pagenumber=$firstPageNumber&pagesize=$pageSize")

    Source.unfoldAsync[Option[String], List[DeviceId]](firstPageUri) {
      case Some(uri) =>
        Http()
          .singleRequest(HttpRequest(uri = uri))
          .flatMap {
            case httpResponse if httpResponse.status != StatusCodes.OK =>
              throw HttpResponseException(httpResponse.status)
            case httpResponse =>
              Unmarshal(httpResponse)
                .to[DevicesResponse]
                .map(response => Some(response.links.next, response.data))
                .recover {
                  case ex =>
                    throw UnmarshalResponseException(ex.getMessage)
                }
          }
          .recover {
            case ex: HttpResponseException      => throw ex
            case ex: UnmarshalResponseException => throw ex
            case ex                             => throw HttpConnectionException(ex.getMessage)
          }
      case None =>
        Future.successful(None)
    }
  }
}

This source can be used just like before:

DevicesSource(pageSize = 1000, firstPageNumber = 42)
  .mapAsync(4)(processJob)
  .runWith(Sink.ignore)

If performance demands require prefetching a number of futures, similar to the implementation in the custom source, a downstream buffer can be added to signal a large demand, a technique I described in my article on maximizing throughput for Akka Streams.

Summary

When you think of enumerating a large collection by performing request-response queries against a paginated HTTP API, you may not immediately think of this as a streaming workload. However, as I demonstrated in this article, reimagining this workload using the Akka Streams API can help build systems that are more reliable and resilient, through bounded resource-constraints and flow control, and allow you to focus on the problem at hand, without having to focus on the mechanics of composing queries, handling responses, retrying errors, or adapting to system dynamics. This concept applies to almost any workload that iterates over a large collection, not just paginated HTTP APIs.
In the final installment in this series, I will explore how the Akka Streams API can be used to provide a better separation of concerns as compared to using actors alone.


  1. I did not get anyone to code review this article, so it would not surprise me if there is a bug in my implementation. However, this should not distract from the narrative of this article. ↩︎

  2. The Slick API, which is part of the Alpakka project and can be used for interfacing with relational databases using the Akka Streams API, is an example of a similar abstraction. For example, it can be used to stream rows from a relational database table. ↩︎

  3. Note that my series of articles on integrating actors and streams predates Akka Typed. While Akka Streams have always been strongly typed, actors were untyped. The general patterns for interfacing streams and actors remain the same, but some of the syntax changed with Akka Typed. In the next installment of this series on rethinking streaming workloads, I include an example of interfacing an Akka Stream with a typed actor using Akka Typed. ↩︎

  4. Thanks to @gabfssilva for the suggestion to include this example for completeness. ↩︎