Patterns for Streaming Measurement Data with Akka Streams

I have spent my entire career building infrastructures for streaming data to support the monitoring and control of industrial applications. I have experience building distributed systems for data acquisition, durable message-queuing, publish-subscribe messaging, and the efficient storage and query of time-series data. I have developed systems on Unix, Linux, Windows, Azure, and AWS, while programming in C, C++, C#, and Scala. I have implemented proprietary systems, leveraged open-source solutions, and adopted platform-as-a-service components from cloud-platform providers. Regardless of the programming language or the platform, common patterns emerge that are integral to the domain of streaming measurement data. The Akka Streams API is by far the most natural, powerful, and productive framework that I have used to develop these systems.

In previous articles, I provided a motivating example for the Akka Streams API and I examined the challenges of time in streaming-data systems. In this article, I will explore patterns that are fundamental to developing systems for streaming measurement data, and show how the Akka Streams API makes these patterns straightforward and reliable to implement. To demonstrate these patterns, I will provide simple, self-contained examples in Scala. In practice, the measurement messages would be streamed from a message queue, a message broker, or over something like a WebSocket connection, like I used in my motivating example for the Akka Streams API.

Batching Messages

When working with streaming measurements, it is often necessary to batch them. This is particularly true for unbounded streams, like a set of sensors that transmit an endless stream of samples. When committing measurements to a database, a message queue, or other systems, it is common to write a batch of measurements, rather than writing a single measurement at a time, in order to reduce the number of requests, and gain superior performance. Grouping messages with the Akka Streams API is as easy as adding a grouped element. The following example simulates an infinite stream of samples and writes them to a database, efficiently, 1000 elements at at time.

Source.tick(0 milliseconds, 10 milliseconds, ())  
  .map(_ => Sample(System.currentTimeMillis(), random.nextFloat()))
  .grouped(1000)
  .map(database.bulkInsert)
  .runWith(Sink.ignore)

A challenge inherent to streaming-data systems is that the data must often be processed and transmitted to other systems in a timely manner. For example, it can be important that the latest measurement from a sensor be available downstream for near real-time aggregations, generating alerts, or display in a user-interface, within seconds of being sampled. Grouping often introduces an unacceptable latency. To address this, the Akka Streams API has the groupedWithin mechanism to group events, but also emit events within a bounded time-frame, even if the maximum number of events has not been satisfied. This leads to the efficient batching of events, without the introduction of unacceptable latency.

Source.tick(0 milliseconds, 10 milliseconds, ())  
  .map(_ => Sample(System.currentTimeMillis(), random.nextFloat()))
  .groupedWithin(1000, 100 milliseconds)
  .map(database.bulkInsert)
  .runWith(Sink.ignore)

Decomposing Messages

Equally as common as batching measurements is the opposite: decomposing an aggregate into its constituents. Consider the following JSON message, where samples are reported, in aggregate, for a set of signals associated with an asset, in this case, a wind turbine. The message includes the unique asset-identifier and the time the signals were sampled.

{  
    "id": "c75cb448-df0e-4692-8e06-0321b7703992",
    "timestamp": 1486925114,
    "measurements": {
        "power": 1.7,
        "rotor_speed": 3.9,
        "wind_speed": 10.1
    }
}

It is common to parse a message like this and emit individual samples downstream, where they might be included in aggregate calculations, stored in a database, or displayed on a user-interface. Individual samples are described by the following Scala case class, which includes the signal name and value, along with the asset-identifier and measurement timestamp.

case class Measurement(id: String, timestamp: Long, signal: String, value: Long)  

People new to Akka Streams will often, logically, do the following: parse the message into a sequence of signals, then flatten the sequence, emitting individual elements into the stream. The following example handles a single message, but, in practice, this would be an unbounded stream of JSON messages.

val json =  
  """{
    |    "id": "c75cb448-df0e-4692-8e06-0321b7703992",
    |    "timestamp": 1486925114,
    |    "measurements": {
    |        "power": 1.7,
    |        "rotor_speed": 3.9,
    |        "wind_speed": 10.1
    |    }
    |}""".stripMargin

Source.single(json)  
  .map(MessageParser.parse)
  .mapConcat(identity)
  .runWith(Sink.foreach(println))

This works, but decomposing messages is the perfect use-case for flatMapConcat. I find this element is often misunderstood, and it certainly doesn't have the most discoverable or approachable name, but it is the right choice for decomposing lists of elements, and it handles the example above in a single stage.

Source.single(json)  
  .flatMapConcat { message =>
    val measurements = MessageParser.parse(message)
    Source(measurements)
  }
  .runWith(Sink.foreach(println))

Rate Limiting Requests

In streaming-data systems, it is important to limit the number of concurrent requests to other services, to avoid overwhelming these services and degrading their performance, as well as to balance workloads and maintain service-level agreements, particularly when the streams are unbounded and the message rates are dynamic. Akka Streams provides a seamless way to do this, with the added benefit that if the number of outstanding requests is saturated, the stream will backpressure and not request additional events, rather than exhausting memory.

The following example batches samples, then writes the batched samples to a database, asynchronously. It limits the number of outstanding requests to the database from this client to a maximum of 10.

Source.tick(0 milliseconds, 10 milliseconds, ())  
  .map(_ => Sample(System.currentTimeMillis(), random.nextFloat()))
  .groupedWithin(1000, 100 milliseconds)
  .mapAsync(10)(database.bulkInsertAsync)
  .runWith(Sink.ignore)

This example preserves the order of the elements downstream, which can be important for adjusting watermarks, or generating acknowledgments. If the downstream order of elements is not important, Akka Streams also provides the mapAsyncUnordered mechanism to limit the rate, without necessarily preserving downstream order.

Source.tick(0 milliseconds, 10 milliseconds, ())  
  .map(_ => Sample(System.currentTimeMillis(), random.nextFloat()))
  .groupedWithin(1000, 100 milliseconds)
  .mapAsyncUnordered(10)(database.bulkInsertAsync)
  .runWith(Sink.ignore)

Throttling Messages

Traditionally, in multi-threaded programming, throttling messages effectively can be a challenging task involving counters, timers, and synchronization across threads. Throttling the rate of messages can be important for respecting the integrity of external systems—similar to the rate limiting example that I just explored—but throttling messages is also essential for preserving system integrity in the face of unintended (e.g., resulting from a software defect), or malicious denial-of-service attacks, where a client is sustaining an unusually high rate of messages. For example, if a client is not expected to send more than one measurement message per minute to the server, then why should the server allow a client to send more messages than this?

If the upstream exceeds the specified rate, the throttle element can fail the stream, or shape the stream by backpressuring. The following example limits the upstream to sending only one message per second, even though the upstream is attempting to emit elements faster than this.

Source.tick(0 milliseconds, 10 milliseconds, ())  
  .map(_ => Sample(System.currentTimeMillis(), random.nextFloat()))
  .throttle(elements = 1, per = 1 second, maximumBurst = 1, mode = ThrottleMode.shaping)
  .runWith(Sink.foreach(println))

Depending on the application, message rates are not always uniform. For example, a client may buffer samples when it loses connectivity with the server, ultimately transmitting these samples once connectivity is restored. The maximumBurst parameter can be used to allow the client to send a burst of messages, while still respecting the throttle once this upper bound has been reached. Refer to the Akka Streams documentation for a complete description of this token-bucket algorithm.

I also find the throttle element valuable when backfilling large amounts of data, for example, backfilling a year's worth of measurement data from one database to another. It can rate-limit the backfill to allow it to proceed in the background, without overwhelming the database, or impacting other services that are dependent on it. The impact can be minimal enough that the backfill can be completed on-line, without requiring a maintenance window, or downtime.

Concurrency

To construct efficient, scalable, and low-latency streaming-data systems, it is very important to perform tasks concurrently. For instance, consider a process that reads messages from a message queue, performs a CPU-intensive task to transform these messages, then writes the results, asynchronously, to a database. To maximize performance, while writing the most recent result to the database on one thread, at a minimum, this process can be reading the next message from the message queue, concurrently, on a separate thread. It may even be beneficial to perform the CPU-intensive data transformation, concurrently, on a third thread.

By default, Akka Streams executes streams sequentially, on a single thread. For many streams, like one that just performs a series of data transformations using map, this is a good default, since it avoids the overhead of passing messages over an asynchronous boundary, and it maximizes inexpensive, young-generation garbage-collection. With traditional multi-threaded programming, parallel processing of disparate tasks can be extremely complicated, involving the synchronization of data across threads. With Akka Streams, however, this is as simple as describing asynchronous boundaries between elements using async.

As a simple demonstration of the benefits of defining asynchronous boundaries, consider the following stream, where a large JSON message is encoded and decoded using gzip—both CPU-intensive operations—one million times.

Source(1 to 1000000)  
  .map(_ => Gzip.encode(ByteString(json)))
  .map(gzipped => Gzip.decode(gzipped))
  .runWith(Sink.ignore)

This implementation will execute on a single thread and take approximately 40 percent longer than the following implementation, which will consume two threads, performing the encoding and decoding operations concurrently.

Source(1 to 1000000)  
  .map(_ => Gzip.encode(ByteString(json)))
  .async
  .map(gzipped => Gzip.decode(gzipped))
  .runWith(Sink.ignore)

With Akka Streams, defining asynchronous boundaries still requires a sophisticated understanding of the data processing pipeline—which operations are CPU-bound, which ones are memory-bound, which ones can be executed asynchronously, etc.—but unlike traditionally multi-threaded programming, implementing asynchronous boundaries is as simple, and as thread-safe, as adding an async element. This also makes experimenting with asynchronous operations, in order to find the most efficient implementation, trivial, whereas, with traditional multi-threaded programming, it would require writing a lot of complex code, just to perform the experiments.

Idle Timeouts

I have found idle timeouts to be particularly useful in two situations, both of which highlight unexpected behaviours. The first is reclaiming resources when a stream is idle. This can be common for something like a WebSocket server, or a data aggregation pipeline, that stops receiving elements from an upstream component. Perhaps this component is just temporarily off-line, or perhaps it has been permanently decommissioned, or replaced.

The following example will fail the stream, with a timeout exception, after one minute, logging an error to the application log, with the unique identifier associated with this device.

Source.tick(0 milliseconds, 10 minutes, ())  
  .map(_ => Sample(System.currentTimeMillis(), random.nextFloat()))
  .idleTimeout(1 minute)
  .runWith(Sink.foreach(println))
  .recover {
    case ex: java.util.concurrent.TimeoutException =>
      logger.error(s"Device $id has been idle for 1 minute")
  }

The idleTimeout is a simple technique for reclaiming idle resources by failing the stream, but it almost always highlights broader issues. For example, why hasn't a client communicated with the server in over 10 minutes, when it is still connected, and we expect it so send a message every minute? If idle timeouts are not enforced, these behaviours are rarely surfaced and, therefore, they are not addressed. Issues like this can be hard to track down on an individual basis, like one client or customer reporting communication issues. When examined in-aggregate, however, by analyzing application logs for these errors, patterns often emerge that allow one to identify the root cause based on a particular software version, unique message-routing, or security context.

The other situation in which I find enforcing idle timeouts enlightening is when developing functional tests for streaming-data systems. Testing streaming-data systems is no easy task, something which I plan to expand on in a future article. I often find myself writing tests to validate the outputs of streams, be they data transformations, writes to a database, or updates to intermediate streaming-data calculations. These tests focus on the business logic and the correctness of the output. But what if no elements are passed through the stream within the expected time frame, perhaps the assertions in the test are never even applied? Idle timeouts are great for codifying and asserting expectations regarding messaging latency, testing for this in addition to the correctness of the output, when designing complex functional tests for a distributed system.

Periodic Events

Streaming-data systems often use periodic messages, such as heartbeats, to provide status, or to communicate watermarks, that can be used to trigger dependent events, like resolving intermediate calculations. Often, security tokens, or application metadata, also need to be refreshed periodically.

Scheduling periodic events is quite easy with Akka Streams. The following example combines a stream of regular measurement samples with a status message, generated once a minute.

val status =  
  Source.tick(0 minute, 1 minute, ())
    .map(_ => Status())

Source.tick(0 milliseconds, 1 second, ())  
  .map(_ => Sample(System.currentTimeMillis(), random.nextFloat()))
  .merge(status)
  .runWith(Sink.foreach(println))

Note that the groupedWithin element, demonstrated previously in the example of batching messages, is another way to perform periodic tasks, like persisting offsets when consuming messages from a message queue.

Watching for Termination

It is common to take an action, like logging a message or updating counters, when a stream terminates. The watchTermination element can be used to take an action when the upstream competes with success or failure. The following example extends the Greeter WebSocket Service example from the Akka documentation with a watchTermination element, to log a message and update session counters when the WebSocket is closed.

def greeter: Flow[Message, Message, Any] =  
  Flow[Message].mapConcat {
    case tm: TextMessage =>
      TextMessage(Source.single("Hello ") ++ tm.textStream ++ Source.single("!")) :: Nil
    case bm: BinaryMessage =>
      // ignore binary messages but drain content to avoid the stream being clogged
      bm.dataStream.runWith(Sink.ignore)
      Nil
  }

val websocketRoute =  
  path("greeter") {
    val startTime = System.currentTimeMillis()
    val greeterRoute = greeter.watchTermination() { (_, done) =>
      done.onComplete {
        case Success(_) =>
          logger.info("Completed successfully")
          Stats.stats.updateAverageSessionLength(System.currentTimeMillis() - startTime)
        case Failure(ex) =>
          logger.error(s"Completed with failure : $ex")
          Stats.stats.updateAverageSessionLength(System.currentTimeMillis() - startTime)
      }
    }
    handleWebSocketMessages(greeterRoute)
  }

val bindingFuture = Http().bindAndHandle(websocketRoute, "localhost", 8080)  

Summary

The patterns that I've provided here are fundamental to processing streaming measurements for industrial systems. The concepts of batching measurements, decomposing measurements, rate-limiting requests, throttling requests, performing tasks concurrently, and so on, are conceptually simple, but their implementation in traditional, multi-threaded programming models can be challenging, complex, and error-prone. I hope that I've demonstrated how easy it is to implement these fundamentals using Akka Streams. These fundamentals can be easily composed to construct sophisticated, reliable, and resilient systems. For me, this is what makes Akka Streams so powerful and natural for constructing streaming-data systems for processing measurement data.