Considering Time in a Streaming Data System
For over a decade, I have worked on infrastructure software for streaming data systems. My experience includes developing and supporting time-series databases, publish-subscribe messaging systems for streaming data, and software for reliable event-data collection. As they are embraced by web companies for making operational and business decisions on-the-order of minutes or seconds rather than hours or days, systems that manage event data are becoming increasingly popular. Streaming data systems have actually been around for years, for securities exchange and for the monitoring and control of industrial processes. But there is no doubt that the great deal of engineering and innovation in this area is resulting in systems that are more useful, reliable, and scalable. Ben Lorica recently provided an excellent summary of the state of the industry, although he did omit some important players offering proprietary solutions, including Microsoft.
Because streaming data systems deal with event data, time is a fundamental concern. By time, I mean the time as perceived by the end-user of the system, and not time in terms of logical time, or any means of coordination in a distributed system. I want to describe a number of considerations for time that I have encountered. Not all streaming data systems handle all of these aspects of time, but constructing a system that handles the important aspects for the given application will lead to a much more effective system.
I will not talk about the challenges of time synchronization and localization, assuming that the events in a single stream are generally ordered; that the timestamps are generated from computers that are time-synchronized within a tolerance; and that the timestamps are recorded in UTC, and only localized when displayed to an end user. For a discussion of some of the challenges of synchronizing time in a distributed system, as well as a list of references, refer to Justin Sheehy's article There is No Now: Problems with simultaneity in distributed systems.
The source time is the time that the event occurred. This is the cardinal time in most streaming data systems. The event may be timestamped by the originating system — a sensor, a server, a control system, a cellular phone — or a timestamp may be applied by an external system when the variable was sampled.
For an event like a log message — perhaps a message logging information regarding the invocation of a web service method for usage monitoring — there generally are no issues with sampling. For a continuous signal, however, like a temperature, there can be problems related to sampling from aliasing, noise, and jitter. For example, sampling a sinusoidal signal twice per period will look like a constant signal. If a signal undergoes a step change, periodic sampling can also introduce uncertainty as to when exactly the signal changed; the only thing that can be deduced is that the signal changed between two samples.
Many telemetry systems will transmit a new event at a regular interval, even if the underlying signal has not changed, as a means of determining latency in the system. If the expectation is that a new event will be transmitted every minute, then it is easy to reason that if the system has not recorded a new event in over a minute, that the value is not current and should be used accordingly. Making operational decisions on a measurement that is not current could be costly or even hazardous for many applications.
Ingestion time is the time that the event was received by the sink system, like a message queue or a stream processing system. Events can often be buffered on the source system, or in transit to the sink system, for minutes, hours, or days, due to partial failures, network partitions, or scheduled downtime. When bandwidth is limited or connectivity is intermittent, measurements can also be buffered and sent in batches as part of normal operation, trading off increased latency for improved performance or reduced cost, as a result of fewer round trips or data compression. It may not be practical to record the ingestion time for every stream, but it may be practical to record the ingestion time for a subset of streams, or in a messaging system, the ingestion time for a message containing a batch of events.
Since the ingestion time can be significantly different from the source time, this difference is an indication of latency and a measure of the near real-time performance of the system. This is of particular interest for monitoring the operational performance of the system itself, but as already mentioned, it may be important to surface a measure of this latency to the end user to indicate how recent the data are. In certain applications, it can be very important to give the end user an indication that the data are out-of-date so as to not make decisions on data that are not current. Making decisions on stale data can lead to bad business decisions. Monitoring latency can also be important for applications where the data are only relevant within a certain operational window. For example, a web advertisement recommendation system may need to serve the correct advertisements within the time-frame of the user's browsing session.
With publish-subscribe messaging systems being integral components of many streaming data systems, it is often necessary to maintain an index of ingestion time or source time, to an offset in a message queue. This allows applications to start processing data from a certain point in time and avoid having to read the entire queue from the beginning.
System time is the time that the event is visible to an end user or application. For a metric that takes some processing or indexing, it may not be available immediately. This time is mostly likely only useful retrospectively, when one needs to understand what data were available at a certain point in time. For example, in regulated environments like pharmaceutical manufacturing, electrical utilities, and emission control from process industries, it can be important to know what data were visible to an operator or an alarming system when a decision was made. I don't know of any system that records this time, and it could be complex to account for, but theoretically, it is an interesting time to record. Most streaming systems implicitly assume that the source time or the ingestion time is the system time. Some systems may augment this by recording the system time when the stream is modified, for example, if an event is updated or replaced.
A derived time is the time applied to an event that has been derived from one or more other events. Essentially, a timestamp applied to a materialized view of the source data. A derived event can be as simple as the running total of a single stream, or it could be as complex as aggregating data across streams and time to perform a more complex calculation. The derived event may be timestamped using the source timestamp of the latest input to the calculation — this would be appropriate for a running total of one or more streams, like a webpage view count — or the event may be timestamped relative to when the calculation was performed, when sampling, aggregating, or windowing streams. Beyond assigning the timestamp of the derived event, there are many other considerations related to time for derived events.
One big challenge with derived events is what to do when the source data change? The source data may change because they are delayed, missing, out-of-order, revised, or even wrong. In addition to the source data changing, it is possible that the calculation itself may change, due to a bug in the code or a modification to the calculation. In many systems, it is ideal to re-derive the calculated data from the modified source data. Recomputation is often very challenging as many systems are not designed for it as a principal feature, only considering it as an after thought. Recomputation may also be expensive and time consuming depending on the structure, indexing, and the sheer volume of data. Jay Kreps shares some important considerations for recalculating derived events in his article Questioning the Lambda Architecture, and considers recalculation an integral design consideration of his proposed Kappa Architecture.
For many applications, however, it is not acceptable to simply re-derive the events. For compliance in regulated industries, often the original events must be preserved as a record of the data available for decision making at the time, or to prevent tampering, in addition to the recalculated data. In streaming data systems used for billing, it can also be important to retain the original derived data, along with annotations as to why it was revised, when, and by whom. This can pose challenges for data indexing, query, and presentation, because there is now more than one derived event at a given timestamp. In addition, if the calculation itself is revised, it may also be important to retain the original and the revised calculation.
Having to maintain more than one derivation of a event is one of the criticisms of the Lambda Architecture. This architecture relies on a batch layer as the system of record, but augments it with a speed layer to meet the performance requirements of near real-time queries that the batch layer cannot support. The speed layer and the batch layer will both have derivations of the same event. There can be discrepancies between the speed and batch layers for both the value and the timestamp. For some applications, like in the regulated industries previously mentioned, it can also be problematic that the batch layer is the system of record, but decisions may have been made based on the ephemeral speed layer, which is not recorded permanently. Along with these criticisms, however, it should also be noted that for some applications the whole point of the Lambda Architecture is to run different calculations in the speed layer as compared to the batch layer.
Actually computing the derived event itself also has many considerations regarding time. For example, intermediate calculations might have to be persisted, perhaps even in a distributed manner, to handle service restarts, node failover, or topology changes. Another consideration when aggregating measurements over a time interval, say to calculate a 10 minute average, is when to actually complete the calculation, as input measurements can be delayed as part of normal or abnormal operations, beyond the 10 minute window. The Apache Samza project has considerations for both state management and windowing, and provides a good discussion of these topics in the documentation.
The derived time can also be in the future for predictive calculations like an hour-ahead or a day-ahead forecast. This can create challenges, especially for indexing. These types of calculations can also be revised. For example, the hour-ahead prediction may be updated once a minute. These streams are unique in publish-subscribe scenarios — like an updating graph on a dashboard or a calculation that relies on a predictive input — in that the events have already been derived with timestamps in the future, but the application needs to populate the graph or the calculation as the relevant data come into the active time interval. This can require simulating "real-time" updates to the predictive stream as the time window moves into the present.
A final consideration I'd like to mention for derived time when dealing with continuous signals, is what to do when an event does not actually exist at a specific query time? For example, perhaps the query time is slightly after the most recent event. Should the system extrapolate the last event to the query time? Or should the system return an error indicating that there is no event at this time? There is also the case where the query time falls between two events. Should the system interpolate between the two events or simply extrapolate the first event? One might encounter this when extracting events at evenly spaced intervals for an ETL operation. The answer, of course, depends on the application and the underlying signal, but providing this flexibility in the system and the query language can be an important consideration.
Many measurements are described not only by their source time or derived time, but also by time interval in which they occurred — a collection or sequence of events that are defined by a start time and an end time. Time intervals are often defined interactively, as end users make queries for data for reports, calculations, graphs, and dashboards. But the time intervals can also be intrinsic to the system — persisted, indexed, and queryable — such that users and applications can all reference the same time interval.
The time interval may be fixed — a day, a year, the lifetime of a service contract, or defined by the start and end of a shift for an operator in an industrial plant — or it may be defined by the start and end of a dynamic operational event like an outage, a spike in activity, or a control variable excursion. Time intervals can also be defined by seasonality in the data or natural operational time intervals. For example, consider the operation of an airplane which is defined by time intervals for taxiing, ascent, descent, and cruising at altitude, among others. One might be interested in studying the steady-state behaviour of the system while the plane is cruising at altitude, while ignoring the dynamics from the ascent or decent. Time intervals can also be used to describe episodes of particular interest or questionable data quality.
For many applications, when an event is deleted it can simply be discarded. Some applications may perform a soft delete, making the event no longer visible for normal operation, but not actually deleting it for a period of time, in order to support undo operations. For compliance in regulated environments, however, some measurements cannot be discarded. The deleted event may no longer be visible for normal operation of the system, but it must be recorded somewhere such that it is possible to audit what the event was, when it was deleted, and who deleted it.
Constructing a streaming data system to handle the important aspects of time for reliable system operation, compliance, and end user applications, is important for creating an effective platform. Not every infrastructure or application requires all of the time considerations that I have discussed here, and some of the times are also only needed on an exceptional basis for analysis after the fact (the Northeast blackout in 2003 would be an example). Some of the considerations affect traditionally regulated industries much more than modern web applications, but I suspect that as these systems are used increasingly for near real-time decision making, that the number industries that will fall under regulation, and thus require more of these considerations, will grow. An infrastructure that can handle these aspects of time while at the same time remaining flexible, generally applicable, and easy to use and operate, would make for a very powerful streaming data platform.