Calling Blocking Code: There Is No Free Lunch
Making the transition from procedural, synchronous programming to fully-asynchronous programming, can be challenging. Or, it can be quite easy. If all of the languages, frameworks, and services that you are using naturally support asynchronous programming, it can be straightforward to adopt, making it easy to build scalable applications. The biggest challenges arise when integrating blocking code into otherwise asynchronous systems. Unfortunately, this is particularly common as we evolve applications that involve legacy libraries and services that do not support asynchronous interfaces.
For someone new to asynchronous programming, there is often the hope—or the illusion—that there is a simple construct to bridge this gap. But, in the end, there isn't. To effectively construct systems that intermingle synchronous and asynchronous models, it ultimately comes down to requiring a deep understanding of the frameworks you are using and the trade-offs you are making. Balancing these trade-offs will often challenge even the most experienced programmers, especially for systems at scale.
Converting synchronous code to asynchronous code is described as contagious, in that it works best if asynchronous code calls, and is called by, other asynchronous code. Basically, we need turtles all the way down. For the purposes of demonstration, I'll extend this useful example from StackOverflow—in the Scala programming language—but most of the concepts I will discuss apply to any framework that allows one to mix blocking and non-blocking code.
No Free Lunch
First, consider the following program that makes 100 blocking calls to a service. Each blocking call takes approximately one second to complete. In an effort to make this code concurrent and asynchronous, it uses the naive approach of wrapping each blocking call in a future. The futures are executed on a fixed-sized thread-pool with 8 threads.
import com.typesafe.scalalogging.StrictLogging import java.util.concurrent.Executors import scala.concurrent._ import scala.io.StdIn object FixedSizeThreadPoolBlocking extends App with StrictLogging { implicit val ec = ExecutionContext.fromExecutorService { Executors.newFixedThreadPool(8) } (1 to 100) foreach { n => Future { logger.info("Starting Future: " + n) Service.blockingCall() logger.info("Ending Future: " + n) } } StdIn.readLine() sys.exit() }
Running the program generates the following output:
14:25:05 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 8 14:25:05 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 3 14:25:05 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 7 14:25:05 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 6 14:25:05 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 2 14:25:05 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 5 14:25:05 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 4 14:25:05 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 1 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 3 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 5 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 2 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 6 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 4 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 7 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 8 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 1 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 9 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 10 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 11 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 12 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 13 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 14 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 15 14:25:06 INFO FixedSizeThreadPoolBlocking$ - Starting Future: 16 14:25:07 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 13 14:25:07 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 15 14:25:07 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 12 14:25:07 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 9 14:25:07 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 10 14:25:07 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 11 14:25:07 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 16 14:25:07 INFO FixedSizeThreadPoolBlocking$ - Ending Future: 14 ...
Because the futures are executing on a fixed-size thread-pool with 8 threads, only 8 futures are ever executing simultaneously. They complete in batches. It takes over 12 seconds to execute all 100 blocking calls. This is important to digest. Blocking code is blocking code. When it executes, it must block the thread that it is running on. There is no free lunch. Wrapping blocking code in a future does not magically make it non-blocking.
There are, however, more options for handling blocking calls. Instead of instantiating our own fixed-size thread-pool for the execution context, let's use the commonly used global execution context. This is the execution context the compiler will suggest if you use a future without specifying an execution context:
Cannot find an implicit ExecutionContext. You might pass an (implicit ec: ExecutionContext) parameter to your method or import scala.concurrent.ExecutionContext.Implicits.global.
The global execution context will default the number of threads to the number of available processors. In my case, this is also 8.
import com.typesafe.scalalogging.StrictLogging import scala.concurrent.Future import scala.io.StdIn object GlobalExecutionContextWithoutBlockingDirective extends App with StrictLogging { implicit val ec = scala.concurrent.ExecutionContext.Implicits.global (1 to 100) foreach { n => Future { logger.info("Starting Future: " + n) Service.blockingCall() logger.info("Ending Future: " + n) } } StdIn.readLine() sys.exit() }
The output of this program, using the global execution context, is no different than the program above, that used the fixed-size thread-pool, and it still takes over 12 seconds to execute.
14:38:58 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 8 14:38:58 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 2 14:38:58 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 6 14:38:58 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 7 14:38:58 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 4 14:38:58 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 3 14:38:58 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 1 14:38:58 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 5 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 6 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 4 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 2 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 1 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 8 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 5 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 3 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 7 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 9 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 10 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 11 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 12 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 13 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 14 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 15 14:38:59 INFO GlobalExecutionContextWithoutBlockingDirective$ - Starting Future: 16 14:39:00 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 9 14:39:00 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 16 14:39:00 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 14 14:39:00 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 13 14:39:00 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 15 14:39:00 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 11 14:39:00 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 10 14:39:00 INFO GlobalExecutionContextWithoutBlockingDirective$ - Ending Future: 12 ...
The global execution context can, however, exceed the default number of threads if the blocking code is wrapped in the scala.concurrent.blocking
method.
import com.typesafe.scalalogging.StrictLogging import scala.concurrent.{Future, blocking} import scala.io.StdIn object GlobalExecutionContextWithBlockingDirective extends App with StrictLogging { implicit val ec = scala.concurrent.ExecutionContext.Implicits.global (1 to 100) foreach { n => Future { if (n <= 3 || n >= 97) logger.info("Starting Future: " + n) blocking { Service.blockingCall() } if (n <= 3 || n >= 97) logger.info("Ending Future: " + n) } } StdIn.readLine() sys.exit() }
If the threads in the global execution context are exhausted, a new thread will be created to execute the code in the blocking
method. Running this program generates the following output:
14:50:35 INFO GlobalExecutionContextWithBlockingDirective$ - Starting Future: 3 14:50:35 INFO GlobalExecutionContextWithBlockingDirective$ - Starting Future: 1 14:50:35 INFO GlobalExecutionContextWithBlockingDirective$ - Starting Future: 2 14:50:35 INFO GlobalExecutionContextWithBlockingDirective$ - Starting Future: 97 14:50:35 INFO GlobalExecutionContextWithBlockingDirective$ - Starting Future: 98 14:50:35 INFO GlobalExecutionContextWithBlockingDirective$ - Starting Future: 99 14:50:35 INFO GlobalExecutionContextWithBlockingDirective$ - Starting Future: 100 14:50:36 INFO GlobalExecutionContextWithBlockingDirective$ - Ending Future: 99 14:50:36 INFO GlobalExecutionContextWithBlockingDirective$ - Ending Future: 100 14:50:36 INFO GlobalExecutionContextWithBlockingDirective$ - Ending Future: 97 14:50:36 INFO GlobalExecutionContextWithBlockingDirective$ - Ending Future: 98 14:50:36 INFO GlobalExecutionContextWithBlockingDirective$ - Ending Future: 2 14:50:36 INFO GlobalExecutionContextWithBlockingDirective$ - Ending Future: 1 14:50:36 INFO GlobalExecutionContextWithBlockingDirective$ - Ending Future: 3
All of the futures are executed concurrently and all 100 futures complete within 1 second. This seems ideal, but, again, there is no free lunch. Blocking code still comes at a cost. This solution works great in the above example with only 100 blocking calls, but consider a service that must process thousands of simultaneous, blocking requests.
import com.typesafe.scalalogging.StrictLogging import scala.concurrent.{Future, blocking} import scala.io.StdIn object GlobalExecutionContextWithBlockingDirective extends App with StrictLogging { implicit val ec = scala.concurrent.ExecutionContext.Implicits.global (1 to 10000) foreach { n => Future { if (n <= 3 || n >= 9997) logger.info("Starting Future: " + n) blocking { Service.blockingCall() } if (n <= 3 || n >= 9997) logger.info("Ending Future: " + n) }.onFailure { case _ => logger.info(s"Failed : " + n) } } StdIn.readLine() sys.exit() }
The output of this program, with 10,000 blocking requests, makes it clear that it is impossible to keep creating a new thread for each concurrent blocking call without limit:
15:07:17 INFO GlobalExecutionContextWithBlockingDirective2$ - Starting Future: 1 15:07:17 INFO GlobalExecutionContextWithBlockingDirective2$ - Starting Future: 3 15:07:17 INFO GlobalExecutionContextWithBlockingDirective2$ - Starting Future: 2 java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at scala.concurrent.forkjoin.ForkJoinPool.tryCompensate(ForkJoinPool.java:2343) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3638) at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.blockOn(ExecutionContextImpl.scala:45) at scala.concurrent.package$.blocking(package.scala:123) at GlobalExecutionContextWithBlockingDirective2$$anonfun$1$$anonfun$apply$mcVI$sp$1.apply$mcV$sp(GlobalExecutionContextWithBlockingDirective.scala:12) at GlobalExecutionContextWithBlockingDirective2$$anonfun$1$$anonfun$apply$mcVI$sp$1.apply(GlobalExecutionContextWithBlockingDirective.scala:10) at GlobalExecutionContextWithBlockingDirective2$$anonfun$1$$anonfun$apply$mcVI$sp$1.apply(GlobalExecutionContextWithBlockingDirective.scala:10) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Turtles All The Way Down
To complete this discussion and highlight the benefits of avoiding blocking code, let's return to the fixed-sized thread-pool, used in the first example, but let's make a non-blocking service call, rather than the blocking call.
import com.typesafe.scalalogging.StrictLogging import java.util.concurrent.Executors import scala.concurrent._ import scala.io.StdIn object FixedSizeThreadPoolNonBlocking extends App with StrictLogging { implicit val ec = ExecutionContext.fromExecutorService { Executors.newFixedThreadPool(8) } (1 to 10000) foreach { n => Future { if (n <= 3 || n >= 9997) logger.info("Starting Future: " + n) Service.nonBlockingCall() if (n <= 3 || n >= 9997) logger.info("Ending Future: " + n) } } StdIn.readLine() sys.exit() }
The output from this program is striking. All 10,000 futures run concurrently and complete within 1 second. Furthermore, this is accomplished with only 8 threads in the underlying thread-pool. This uses significantly fewer resources than creating a thread to handle each concurrent, blocking call, and is far more scalable. Using the global execution context would also achieve the same result in this case. Note that each individual non-blocking call still takes close to a second to complete, it is just that while the call is being made, it is not blocking a thread.
15:20:30 INFO FixedSizeThreadPoolNonBlocking$ - Starting Future: 1 15:20:30 INFO FixedSizeThreadPoolNonBlocking$ - Starting Future: 3 15:20:30 INFO FixedSizeThreadPoolNonBlocking$ - Starting Future: 2 15:20:31 INFO FixedSizeThreadPoolNonBlocking$ - Ending Future: 2 15:20:31 INFO FixedSizeThreadPoolNonBlocking$ - Ending Future: 1 15:20:31 INFO FixedSizeThreadPoolNonBlocking$ - Ending Future: 3 15:20:31 INFO FixedSizeThreadPoolNonBlocking$ - Starting Future: 9999 15:20:31 INFO FixedSizeThreadPoolNonBlocking$ - Starting Future: 9997 15:20:31 INFO FixedSizeThreadPoolNonBlocking$ - Starting Future: 10000 15:20:31 INFO FixedSizeThreadPoolNonBlocking$ - Starting Future: 9998 15:20:31 INFO FixedSizeThreadPoolNonBlocking$ - Ending Future: 9999 15:20:31 INFO FixedSizeThreadPoolNonBlocking$ - Ending Future: 9997 15:20:31 INFO FixedSizeThreadPoolNonBlocking$ - Ending Future: 10000 15:20:31 INFO FixedSizeThreadPoolNonBlocking$ - Ending Future: 9998
It is actually possible to execute 1 million concurrent, asynchronous calls on this 8-core machine within 1 second. Note that this is not infinitely scalable. At some point, there will be so many concurrent calls that scheduling and completing the futures will exhaust the capacity of the underlying thread-pool and take longer than a second to execute, or potentially exhaust memory resources with the creation of so many futures.
Handling Blocking Code
Ideally, avoid it. Choose libraries, frameworks, and services that natively support asynchronicity. Overall, technologies that do not support asynchronous interactions are going to fall out of favour.
If there is an option to use HTTP, use it. I recently ran into problems trying to scale an application that made blocking calls to a database. The native client for this database uses blocking calls exclusively. Fortunately, the database also has an HTTP API. This allowed us to make asynchronous HTTP calls to avoid the blocking calls altogether. Interfacing with this database with HTTP was somewhat less convenient, but the scalability benefits more than justified this choice.
If you have to make blocking calls, make them on an execution context backed by a fixed number of threads, dedicated to executing these blocking calls. This won't necessarily make your application more scalable, but it will make it easier to reason about—for example, knowing when it needs to be scaled out—and it will also allow other aspects of the application to remain responsive—like a metrics endpoint that gives critical visibility into the state of the application—even when the blocking execution context is saturated.