Rethinking Streaming Workloads with Akka Streams: Part III

This series is focused on recomposing workloads that are not traditionally expressed as streaming workloads. Employing a streaming approach can make a problem simpler, more elegant, and more natural. The first article explored streaming tools for flow control, bounded resource-constraints, and error handling, and demonstrated how these tools can be composed with non-streaming workloads. The second article demonstrated how streaming a large collection can be more straightforward than a request-response approach. In this article, the third of this four-part series, I will show how combining actors and streams can make problems simpler, with less code to test and maintain, and with a more natural separation of concerns.

A Simple Cache

Consider an application that needs to cache a list of device identifiers for a collection of IoT devices. An actor is great for maintaining a cache and making it available to a number of threads in a thread-safe and scalable manner.

The actor below, implemented using the functional style of Akka Typed, maintains a cache and returns the cache when queried. This actor is simple and straightforward. It has only one behaviour and it only responds to one message.

object Cache {
  final case class Get(requestId: RequestId, replyTo: ActorRef[CachedDeviceIds])
  final case class CachedDeviceIds(devices: List[DeviceId])

  def cached(devices: List[DeviceId]): Behavior[Get] =
    Behaviors.receive { (context, message) =>
      context.log.info("Cache request for requestId {}.", message.requestId)
      message.replyTo ! CachedDeviceIds(devices)
      Behaviors.same
    }
}

This trivial program demonstrates how the cache actor is created and queried:

object SimpleCache extends App {
  val deviceIds = List(
    DeviceId("27ba0ec2-bd08-4473-bb3a-759bf76890ca"),
    DeviceId("e3056593-c56d-4fee-8e05-d1d6918a4f21"),
    DeviceId("6fefd14c-c5b2-498e-a669-0b70d414c9cb")
  )

  val cache = ActorSystem[Get](Cache.cached(deviceIds), "cache")

  implicit val timeout: Timeout             = 5.seconds
  implicit val scheduler: Scheduler         = cache.scheduler
  implicit val ec: ExecutionContextExecutor = cache.executionContext

  val requestId = RequestId("12345")

  val result: Future[CachedDeviceIds] = cache.ref.ask(ref => Cache.Get(requestId, ref))

  result.onComplete {
    case Success(_) => println("Devices!")
    case Failure(_) => println("Failure!")
  }
}

Now this program only has one actor and the program does not do anything useful. We know from Carl Hewitt that, "One actor is no actor. Actors come in systems."[1] In a real program, this cache would be shared across a number of threads. The cache would not be defined statically, it would be loaded from the file system, or through a request to another service. Loading the cache would not be deterministic—it could take time and it could fail. The program may also need to share the cache actor with other actors and begin accepting requests or processing messages before the cache is completely loaded. This means the cache may be empty and the cache actor will need to reflect this fact to the caller. Finally, the current implementation has no mechanism to refresh the cache and adapt to changes over the runtime of the application.

A Not-So-Simple Cache

These issues can be addressed by extending the actor to initialize the cache and periodically refresh the cache. The following actor caches a large set of device identifiers streamed from the DevicesSource presented in the previous article in this series. The actor starts in the empty behaviour which is a composition of three behaviours: 1) a setup function that is used to make the initial request to populate the cache, 2) a timer which expires every hour, in order to send a message to refresh the cache, and 3) the main function that handles all of the messages. If the cache has not been initialized yet, a message to get the cache will return the EmptyCache message. Once the cache has been initialized, the actor changes its behaviour and responds to messages to get the cache with the cached device identifiers. When the timer expires, it will refresh the cache.

object Cache {
  sealed trait CacheRequests
  final case class Get(requestId: RequestId, replyTo: ActorRef[CacheResponses]) extends CacheRequests
  final private case class Devices(devices: List[DeviceId])                     extends CacheRequests
  final private case object Timeout                                             extends CacheRequests
  final private case object CacheRefreshFailed                                  extends CacheRequests

  sealed trait CacheResponses
  final case object EmptyCache                            extends CacheResponses
  final case class CachedDevices(devices: List[DeviceId]) extends CacheResponses

  private case object TimerKey

  val empty: Behavior[CacheRequests] =
    Behaviors.withTimers { timer =>
      timer.startTimerWithFixedDelay(TimerKey, Timeout, 1.hour)

      Behaviors.setup { context =>
        refreshCache(context.self, context.system)

        Behaviors.receiveMessage[CacheRequests] {
          case Get(requestId, replyTo) =>
            context.log.info("Empty cache request for requestId {}.", requestId)
            replyTo ! EmptyCache
            Behaviors.same
          case Devices(devices) =>
            context.log.info("Initialized cache.")
            cached(devices)
          case CacheRefreshFailed =>
            context.log.error("Failed to initialize cache. Will retry.")
            Behaviors.same
          case Timeout =>
            context.log.info("Refreshing cache.")
            refreshCache(context.self, context.system)
            Behaviors.same
        }
      }
    }

  private def cached(devices: List[DeviceId]): Behavior[CacheRequests] =
    Behaviors.receive { (context, message) =>
      message match {
        case Get(requestId, replyTo) =>
          context.log.info("Cache request for requestId {}.", requestId)
          replyTo ! CachedDevices(devices)
          Behaviors.same
        case Devices(d) =>
          context.log.info("Updated cache.")
          cached(d)
        case CacheRefreshFailed =>
          context.log.error("Failed to refresh cache.")
          Behaviors.same
        case Timeout =>
          context.log.info("Refreshing cache.")
          refreshCache(context.self, context.system)
          Behaviors.same
      }
    }

  private def refreshCache(self: ActorRef[CacheRequests], system: ActorSystem[Nothing]): Unit = {
    import akka.actor.typed.scaladsl.adapter._
    implicit val untypedSystem: actor.ActorSystem = system.toUntyped
    implicit val ec: ExecutionContextExecutor     = untypedSystem.dispatcher
    implicit val mat: ActorMaterializer           = ActorMaterializer()

    DevicesSource()
      .mapConcat(identity)
      .runWith(Sink.seq)
      .onComplete {
        case Success(devices) => self ! Devices(devices.toList)
        case Failure(_)       => self ! CacheRefreshFailed
      }
  }
}

This actor is doing way too much. Combining the timers for refreshing the cache with the state machine for how to respond to a message, based on whether or not the cache has been populated, makes this actor difficult to test and reason about. Including the concrete refreshCache method, which calls an external service, inside the actor itself, makes this actor even more coupled and difficult to test. Furthermore, if there is a transient failure initializing the cache, the actor will not try again for an hour. It is certainly possible to manipulate the expiration of the timer based on the current state, but this would complicate the implementation and testing even further.

Separating Concerns

When we are involved in a problem, it can be easy to grow functionality incrementally, or piecemeal, and end up with a tightly coupled solution, like the one above. I have certainly been guilty of this. Although this implementation is undesirable, it should be noted that Akka Typed, with its composable behaviours and type safety, does make it relatively straightforward to reason about. The sealed traits for CacheRequests and CacheResponses make explicit the messages that can be sent to this actor and the messages that must be handled in response. However, this implementation can be improved by separating concerns. The first step is to extract the timer responsible for refreshing the cache into its own actor:

object Timer {
  private case object TimerKey

  sealed trait TimerMessage
  private case object Timeout                                   extends TimerMessage
  private case class FailedRequest(ex: Throwable)               extends TimerMessage
  private case class SuccessfulRequest(devices: List[DeviceId]) extends TimerMessage

  def start(cache: ActorRef[CacheRequests], request: => Future[List[DeviceId]]): Behavior[TimerMessage] =
    Behaviors.withTimers { timer =>
      timer.startTimerWithFixedDelay(key = TimerKey, msg = Timeout, delay = 10.seconds)
      Behaviors.receive[TimerMessage] { (context, message) =>
        message match {
          case Timeout =>
            context.log.info("Executing request.")
            implicit val ec: ExecutionContextExecutor = context.executionContext
            request.onComplete {
              case Success(devices) => context.self ! SuccessfulRequest(devices)
              case Failure(ex)      => context.self ! FailedRequest(ex)
            }
            Behaviors.same
          case SuccessfulRequest(devices) =>
            context.log.info("Successful response.")
            cache ! Devices(devices)
            Behaviors.same
          case FailedRequest(ex) =>
            context.log.error("Failed response.")
            throw ex
        }
      }
    }
}

The timer actor holds a reference to the cache actor. It does not accept any external messages. When the timer expires, it executes a request for the list of device identifiers. If this request completes successfully, it sends the result to the cache actor. If retrieving the device identifiers fails, this actor will throw an exception and terminate. It can be restarted by a supervising actor. Note that because the function for retrieving the device identifiers is passed as an argument rather than being included explicitly, it makes the behaviour of this actor easy to test, without relying on an external dependency.

With the timer logic removed, the cache actor becomes much simpler. It starts with an empty cache. Just like before, if it receives a message to get the cache and the cache is empty, it responds with the EmptyCache message. If it receives a list of devices identifiers, it will cache them and change its behaviour. In the cached behaviour, the actor will return the cache in response to a message to retrieve the cache, or update the cache in response to an updated list of device identifiers. The behaviours of this actor are easy reason about and can be exhaustively unit tested.

object Cache {
  sealed trait CacheRequests
  final case class Get(requestId: RequestId, replyTo: ActorRef[CacheResponses]) extends CacheRequests
  final case class Devices(devices: List[DeviceId])                             extends CacheRequests

  sealed trait CacheResponses
  final case object EmptyCache                            extends CacheResponses
  final case class CachedDevices(devices: List[DeviceId]) extends CacheResponses

  val empty: Behavior[CacheRequests] =
    Behaviors.receive[CacheRequests] { (context, message) =>
      message match {
        case Get(requestId, replyTo) =>
          context.log.info("Empty cache request for requestId {}.", requestId)
          replyTo ! EmptyCache
          Behaviors.same
        case Devices(devices) =>
          context.log.info("Initializing cache.")
          cached(devices)
      }
    }

  private def cached(devices: List[DeviceId]): Behavior[CacheRequests] =
    Behaviors.receive { (context, message) =>
      message match {
        case Get(requestId, replyTo) =>
          context.log.info("Cache request for requestId {}.", requestId)
          replyTo ! CachedDevices(devices)
          Behaviors.same
        case Devices(updatedDevices) =>
          context.log.info("Updating cache.")
          cached(updatedDevices)
      }
    }
}

From another actor, the cache actor can be spawned and introduced to the timer actor:

val cache = context.spawn(Cache.empty, "cache")
context.spawn(Timer.start(cache, getDevices), "timer")

With this approach, it is also possible to address the problem of transient failures from the getDevices function when populating the cache, by using a supervision strategy, with exponential backoff, to restart the timer actor:

val cache = context.spawn(Cache.empty, "cache")

val timer = Behaviors
  .supervise {
    Timer.start(cache, getDevices)
  }
  .onFailure[Throwable] {
    SupervisorStrategy.restartWithBackoff(
      minBackoff = 0.seconds,
      maxBackoff = 60.seconds,
      randomFactor = 0.1)
  }

context.spawn(timer, "timer")

This implementation is considerably better. With a focus on interacting through functions and messaging, there is a stronger separation of concerns, it is more loosely coupled, and more composable. By restarting the timer actor, it also solves the problem of retrying transient failures without having to wait an hour. However, stepping back, there is nothing particularly unique about the problems that these two actors are solving. Rather than explicitly caching List[DeviceId], the cache actor could be abstracted to cache any type. Similarly, the timer actor could schedule any generic request that returns a future at a fixed interval.[2]

Akka Streams

After this (rather long) journey into Akka Typed, you are probably wondering what any of this has to do with streaming. Streams can address many of these generic patterns. Problems like this can often be composed more succinctly by interfacing actors and streams. For example, implementing the timer actor is unnecessary because it can be achieved using a Source.tick:

val cache = context.spawn(Cache.empty, "cache")

val stream = Source
  .tick(initialDelay = 0.seconds, interval = 10.seconds, tick = ())
  .mapAsync(parallelism = 1) { _ =>
    getDevices
  }
  .map(devices => cache ! Devices(devices))
  .recover {
    case ex => system.log.error("Failed to get devices : {}", ex)
  }
  .runWith(Sink.ignore)

In this example, the cache actor remains unchanged, continuing to maintain the state of the cache, and it is the only concrete object that we need to implement and test. Supervising failures through exponential backoff, as was implemented in the preceding section, can be achieved by composing the stream with a RestartSource:

val cache = context.spawn(Cache.empty, "cache")

val stream = RestartSource
  .withBackoff(
    minBackoff = 0.seconds,
    maxBackoff = 60.seconds,
    randomFactor = 0.1
  ) { () =>
    Source
      .tick(initialDelay = 0.seconds, interval = 1.hour, tick = ())
      .mapAsync(parallelism = 1) { _ =>
        getDevices
      }
      .map(devices => cache ! Devices(devices))
      .recover {
        case ex => system.log.error("Failed to get devices : {}", ex)
      }
  }
  .runWith(Sink.ignore)

Similar to the other examples that I have explored in this series, on the surface, this may not seem like a streaming workload, but using a stream for the scheduling of messages, handling system dynamics, and expressing the workflow, is more straightforward and digestible. It also involves writing, testing, and maintaining less code. Rather than tracing messages across a set of dependent actors to understanding program flow, describing a linear workflow with a stream—especially when there are multiple, dependent steps—can be easier to understand. Finally, if there is a risk of overloading the cache actor with requests, the stream can use the ask pattern to ensure the stream backpressues when sending messages to the cache actor, providing bounded resource constrains and making the program more resilient in the face of system dynamics.

Conclusion

Actors are great for maintaining state and expressing different behaviours, including state machines. Actors are also great for modelling hierarchies that describe relationships, or handle failure and supervision. But when an actor includes a lot of business logic—handling timers, system dynamics, workflows, or transformations—the actor becomes hard to reason about and difficult to test. Interfacing streams with actors can simplify problems and lead to more robust solutions.

In the final article of this series, I will explore how streams can be used to manage large collections of actors to address system dynamics and provide bounded resource-constraints.


  1. Reactive Microsystems by Jonas Bonér. ↩︎

  2. I will leave these implementations as an exercise for the reader. ↩︎