Backoff and Retry Error-Handling for Akka Streams

Services that incorporate streaming workloads often need to perform asynchronous requests to other services, or execute long-running, asynchronous operations. Examples include, inserting time-series data into a database, retrieving metadata for data enrichment, and executing complex business-workflows.

Executing asynchronous operations in a streaming context, while maintaining backpressure in the stream, is easy to do with the Akka Streams API using the mapAsync element.1 But what if one of these asynchronous operations fails? How does one recover—especially when processing an unbounded stream, like a continual stream of measurements from an industrial asset or an IoT device?

Recovering asynchronous operations within a streaming context is important to ensure complete data processing. It is also important for constructing services that are robust in the event of failure, while also respecting the integrity of the broader system. In this article, I will review techniques for reliably and efficiently handling failure and recovering asynchronous operations in an Akka Stream.

Basic Error Handling in Akka Streams

The following HTTP server, implemented with Akka HTTP, when queried for a resource identifier, will return a random value, half of the time. The other half of the time, at random, it will return an HTTP 500 Internal Server Error. This is a completely contrived example, but it will aide in demonstrating the error handling and retry techniques that I want to explore in this article.

object HttpServer extends App {  
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  val random = Random

  def body(value: Int): String = s"""{"value":$value}"""

  val Id = """[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}""".r

  val route =
    path(Id) { id =>
      get {
        if (random.nextInt % 2 == 0) {
          complete(StatusCodes.InternalServerError)
        }
        else {
          complete(
            StatusCodes.OK,
            HttpEntity(ContentTypes.`application/json`,
              body(random.nextInt(101)))
          )
        }
      }
    }

  val bindingFuture = Http().bindAndHandle(route, "0.0.0.0", 8080)

  bindingFuture
    .flatMap(_.unbind())
    .onComplete(_ => system.terminate())
}

A successful response will return the HTTP status code 200 OK and it will have a JSON body with the following format:

{  
  "value": 42
}

Consider the following Akka Stream that processes a discrete set of ten identifiers—using mapAsync to execute the asynchronous HTTP calls to the server—computing the sum across all responses. Given the behaviour of the server, this stream has a less than 0.1 percent chance of completing without encountering an HTTP 500 Internal Server Error.2

implicit val system = ActorSystem()  
implicit val ec = system.dispatcher  
implicit val mat = ActorMaterializer()

val ids = List(  
  Id("bc6b456b-277a-4555-9248-c180762d981a"),
  Id("7676230e-f6e7-482e-8554-a53857a98ac7"),
  Id("ca182877-8c2a-4e88-8b1c-84d0e1da64c9"),
  Id("1d00f3af-200f-4606-bfa9-1201ecc6eb90"),
  Id("35e1f019-2ca7-434d-bbe9-e7f4cb4356ba"),
  Id("7bd746c7-333c-4358-97ae-89e6828adec3"),
  Id("1e26b82d-a922-4ce2-9129-b38011e0cd44"),
  Id("7db79f5c-a2dd-433d-807a-1e8af17031f9"),
  Id("ddf5e981-88f0-4dc3-99b2-e6abbd05309b"),
  Id("007597b8-1abd-484c-adf1-a0ea09837d1e")
)

val aggregate = Source(ids)  
  .mapAsync(parallelism = 4) { id =>
    Http().singleRequest(HttpRequest(uri = s"http://localhost:8080/$id"))
  }
  .mapAsync(parallelism = 4) {
    case HttpResponse(StatusCodes.OK, _, entity, _) =>
      Unmarshal(entity).to[Response]
    case HttpResponse(StatusCodes.InternalServerError, _, _, _) =>
      throw DatabaseBusyException
    case HttpResponse(statusCode, _, _, _) =>
      throw DatabaseUnexpectedException(statusCode)
  }
  .map(_.value)
  .runWith(Sink.fold(0)(_ + _))

aggregate.onComplete {  
  case Success(sum) => println(s"Sum : $sum")
  case Failure(error) => sys.error(s"Failure : $error")
}

Basic error handling for the Akka Streams API is performed using a stream supervision-strategy to decide what to do when the stream encounters an error processing a message. As of this writing, there are three options: 1) drop the message and continue processing, 2) drop the message and restart the stage, discarding any accumulated state, or 3) stop the entire stream, completing it with a failure.

The following is an example of a stream supervision-strategy that will discard each failed request and continue processing the rest of the messages in the stream:

val decider: Supervision.Decider = {  
  case DatabaseBusyException => Supervision.Resume
  case _ => Supervision.Stop
}

implicit val materializer = ActorMaterializer(  
  ActorMaterializerSettings(system).withSupervisionStrategy(decider))

To calculate the sum across all ten identifiers, this supervision strategy is not satisfactory, since it would drop failed requests and the sum would be incomplete. In fact, none of the stream supervision strategies are well-suited to this workload, because none of them would ensure that all of the asynchronous requests in the stream complete successfully.

Backoff and Retry for Akka Streams

In my previous articles on integrating actors and streams, I showed how an actor can be used to manage a streaming workload and restart it in the event of failure. When the server is not just failing randomly—like in my contrived example above—but is failing systemically, for a period of time, simply restarting the stream and retrying the request as fast as possible, might only make things worse. I demonstrated how actor backoff-supervision can be used in such cases to protect the integrity of other services, by restarting the stream using an exponential-backoff strategy.

Since I published those articles, the Akka Streams API has added a RestartSource.withBackoff that can be used to supervise a stream and restart it with exponential backoff. Using actor backoff-supervision for streams is still useful if one also needs the actor to maintain state, execute a state machine, or manage side-effects, but, if not, the RestartSource.withBackoff is excellent for managing an individual stream and requires very little code.

Revisiting the previous example, the following program uses RestartSource.withBackoff to restart the stream when it fails:

implicit val system = ActorSystem()  
implicit val ec = system.dispatcher  
implicit val mat = ActorMaterializer()

val ids = List(  
  Id("bc6b456b-277a-4555-9248-c180762d981a"),
  Id("7676230e-f6e7-482e-8554-a53857a98ac7"),
  Id("ca182877-8c2a-4e88-8b1c-84d0e1da64c9"),
  Id("1d00f3af-200f-4606-bfa9-1201ecc6eb90"),
  Id("35e1f019-2ca7-434d-bbe9-e7f4cb4356ba"),
  Id("7bd746c7-333c-4358-97ae-89e6828adec3"),
  Id("1e26b82d-a922-4ce2-9129-b38011e0cd44"),
  Id("7db79f5c-a2dd-433d-807a-1e8af17031f9"),
  Id("ddf5e981-88f0-4dc3-99b2-e6abbd05309b"),
  Id("007597b8-1abd-484c-adf1-a0ea09837d1e")
)

val aggregate = RestartSource.withBackoff(  
  minBackoff = 10 milliseconds,
  maxBackoff = 30 seconds,
  randomFactor = 0.2
) { () =>
  Source(ids)
    .mapAsync(parallelism = 4) { id =>
      Http().singleRequest(HttpRequest(uri = s"http://localhost:8080/$id"))
    }
    .mapAsync(parallelism = 4) {
      case HttpResponse(StatusCodes.OK, _, entity, _) =>
        Unmarshal(entity).to[Response]
      case HttpResponse(StatusCodes.InternalServerError, _, _, _) =>
        throw DatabaseBusyException
      case HttpResponse(statusCode, _, _, _) =>
        throw DatabaseUnexpectedException(statusCode)
    }
}
.map(_.value)
.runWith(Sink.fold(0)(_ + _))

aggregate.onComplete {  
  case Success(sum) => println(s"Sum : $sum")
  case Failure(error) => sys.error(s"Failure : $error")
}

However, when the stream is restarted, it is just as likely to fail again, since this approach will restart the entire stream from the beginning after each failure. Probabilistically, the stream will need to be retried many times in order to complete. This is a non-deterministic and wasteful approach. When combined with exponential-backoff, the chances of this stream completing in a reasonable time is exceedingly small.

For a stream that processes an unbounded set of messages, restarting the stream on each failure can be prohibitively expensive. For example, for a stream that consumes messages from a Kafka topic-partition, restarting the stream can result in an expensive consumer-group partition-rebalance and impact the overall throughput of the stream. Restarting a stream can also be very expensive for applications that maintain large in-memory caches that need to be invalidated and reconstructed each time the stream is restarted.

We need a better approach for handling errors that will allow the stream to make progress after transient failures, avoiding the expense of re-initialization and re-computation, while still employing a backoff strategy to respect the integrity of the entire system.

Backoff and Retry with mapAsync

Instead of initiating asynchronous calls to an external service directly from a mapAsync element, we can achieve far more granular error-handling by initiating these calls from an independent stream that uses RestartSource.withBackoff to process only one request. The independent stream will return a Future and can continue to be called from the mapAsync element, in order to control the overall parallelism of the aggregate stream.

implicit val system = ActorSystem()  
implicit val ec = system.dispatcher  
implicit val mat = ActorMaterializer()

def getResponse(id: Id): Future[Response] = {  
  RestartSource.withBackoff(
    minBackoff = 10 milliseconds,
    maxBackoff = 30 seconds,
    randomFactor = 0.2,
    maxRestarts = 10
  ) { () =>
    val responseFuture: Future[HttpResponse] =
      Http().singleRequest(HttpRequest(uri = s"http://localhost:8080/$id"))

    Source.fromFuture(responseFuture)
      .mapAsync(parallelism = 1) {
        case HttpResponse(StatusCodes.OK, _, entity, _) =>
          Unmarshal(entity).to[Response]
        case HttpResponse(StatusCodes.InternalServerError, _, _, _) =>
          throw DatabaseBusyException
        case HttpResponse(statusCode, _, _, _) =>
          throw DatabaseUnexpectedException(statusCode)
      }
  }
  .runWith(Sink.head)
  .recover {
    case _ => throw StreamFailedAfterMaxRetriesException
  }
}

val ids = List(  
  Id("bc6b456b-277a-4555-9248-c180762d981a"),
  Id("7676230e-f6e7-482e-8554-a53857a98ac7"),
  Id("ca182877-8c2a-4e88-8b1c-84d0e1da64c9"),
  Id("1d00f3af-200f-4606-bfa9-1201ecc6eb90"),
  Id("35e1f019-2ca7-434d-bbe9-e7f4cb4356ba"),
  Id("7bd746c7-333c-4358-97ae-89e6828adec3"),
  Id("1e26b82d-a922-4ce2-9129-b38011e0cd44"),
  Id("7db79f5c-a2dd-433d-807a-1e8af17031f9"),
  Id("ddf5e981-88f0-4dc3-99b2-e6abbd05309b"),
  Id("007597b8-1abd-484c-adf1-a0ea09837d1e")
)

val aggregate = Source(ids)  
  .mapAsync(parallelism = 4)(getResponse)
  .map(_.value)
  .runWith(Sink.fold(0)(_ + _))

aggregate.onComplete {  
  case Success(sum) => println(s"Sum : $sum")
  case Failure(error) => sys.error(s"Stream something wrong : $error")
}

This approach is very likely to succeed, since the backoff and retry error-handling strategy is implemented per request, rather than for the stream as a whole. Once a request has been competed successfully, it does not need to be retried and the overall stream can make progress. This approach also provides fine-grained control over how many times an individual request will be retried, using the maxRestarts parameter, before failing the stream.

Note this approach introduces the overhead of creating a new stream to process each element. This could be relatively expensive if the asynchronous task completes very quickly, like some local data-processing. However, the majority of the time, these asynchronous operations involve network calls, database inserts, or workflow executions, and the overhead of creating a new stream per operation is completely insignificant compared to the time it takes to execute the operation itself.

Expressing an individual asynchronous-operation as an independent Akka Stream allows for very simple and granular error handling, that can include backoff and retry. The need for this kind of error-handling strategy is very common in streaming applications and the Akka Streams API provides a very elegant solution to this intricate problem, without having to maintain a lot of code.

  1. For more background, I detailed the motivations for using mapAsync in my article Patterns for Streaming Measurement Data with Akka Streams and I demonstrated how executing workloads in parallel can improve the overall throughput of a stream in my article Maximizing Throughput for Akka Streams.

  2. There are ten identifiers. One request will be made for each identifier. The requests are independent and each request has a fifty-percent chance of succeeding. The cumulative probability of all ten requests succeeding is (110)/(210), which is approximately 0.1 percent of the time.