I have read the first part of book Streaming Systems recently and learned quite a lot of high level concepts and many useful tips about streaming systems. Many topics that this book discusses are very instructive to the TiCDC project that I was focusing on in the past year, which is a change data capture system that supports to replicate change data from a distributed NewSQL database to various downstreams. In this article I will talk about some core concepts that a streaming system should pay attention to, comparing the opinion that the book Streaming Systems expresses, to some practical experience from the CDC project I participated.
The book Stream Systems introduces many concepts to describe a streaming system from different dimensions. I will map these concepts to the change data capture system roughly.
Event time is time at which events actually occurred and processing time is the time at which events are observed in the system.
In the real world, the skew between these two values always exists, and may be affected by various characteristics. Which one will be used when system processes with the event is highly related with the use scenario. As for the CDC of TiDB, we can use both the
commit-ts of a row change as event time, where
commit-ts can be treated as the prewrite and commit timestamp roughly (In fact in TiDB there exists a strict 2PC theory, and we can simplify the model that each transaction has a unique, incremental global logic timestamp, in TiDB it is called timestamp oracle, TSO for short). In TiCDC
start-ts of a row change is used for data processing, with the help of event time, we can easily restore the row changed sequence in upstream, which can meet the requirement of consistent replication.
Windowing is the notion of taking a data source (either unbounded or bounded), and chopping it up along temporal boundaries into finite chunks for processing.
Window is a common approach used to cope with unbounded data, in my opinion it can be a logic concept that tries to split data into determinate boundaries. The way how a streaming system cuts data into windows leads to different data processing strategies. In TiCDC, window is more like an adaptive size window, where the size is determined by the watermark and trigger.
A trigger is a mechanism for declaring when the output for a window should be materialized relative to some external signal.
The streaming system can contain multiple components and forms a data flow chain, each component has an upstream and a downstream. Here materialized means output to the downstream of the current component. Trigger is the decision maker of when to output for a window, the choice of trigger will affect the processing quality in many aspects, including latency, correctness, throughput etc. In real world, streaming data is not stable all the time, there could exist late data, traffic peak, data re-delivery, data source panic and recovery etc. All these abnormal conditions should be taken into consideration when we design a proper trigger mechanism. In TiCDC, the trigger mechanism is highly relying on the watermark, which is a perfect watermark that can ensure a window closed. Since the transaction sequence in the upstream must be kept in TiCDC (which will enable sink or downstream consumer to recover transaction and keep data consistent), TiCDC adopts the same mechanism used in CockroachDB CDC, which is also the theory from paper Naiad: A Timely Dataflow System. The core idea of this mechanism is the trigger in each component can’t materialize data that is later than the perfect watermark to downstream, in another word, in order to output data best effort, the trigger should materialize all data before the perfect watermark.
Watermarks are temporal notions of input completeness in the event-time domain. Worded differently, they are the way the system measures progress and completeness relative to the event times of the records being processed in a stream of events. Depending upon the type of watermark, perfect or heuristic, that assertion may be a strict guarantee or an educated guess, respectively.
How to choose a suitable watermark is decided by the usage scenario, which is also a tradeoff between performance and correctness as well as scalability and availability. Supposing we have a system that requires low latency of streaming data subscription, can tolerate with late data (data out of the scope of a closed window, tolerate means the system can ignore the late data or has method to amend correctness with late data), then a heuristic watermark is better than a perfect watermark. In another situation, if a system requires to keep the event sequence strictly, such as a database replication system must keep transaction serializable when replicating to downstream (Transaction isolation is a complicated topic in database system, here serializable is not the same thing as the serializable isolation level, but means the transaction applied sequence in downstream is the same as the event time sequence in upstream. Yet this serializable could be strict or loose, such as table level serializable or transaction level serializable), then the heuristic watermark could lead to inconsistent state between upstream and downstream, if this inconsistency is not acceptable, perfect watermark is a better choice than heuristic watermark.
An accumulation mode specifies the relationship between multiple results that are observed for the same window. There exists multiple ways to refine or relate results, including discarding, accumulating or accumulating and retracting.
Different accumulations have different semantics, and will output different results with the same window/watermark strategy. However in CDC no specific accumulation is used, since the row changed event in CDC is atomistic, the only existing results relate maybe transaction oriented events grouping, which means events in the same window can be grouped by transaction identification(events with the same start-ts will be treated in a same transaction).
In this part I will focus on some key indicators in streaming systems and how to make a good tradeoff among these indicators.
In theory, the strategy with watermark has a direct impact on latency. Besides the stages of data flow and how watermarks propagating is implemented also have a big impact on latency. As for precision, it is mostly decided by watermark mechanism.
As discussed in part one, perfect watermark can ensure a 100% precision of results materializing, well perfect watermark is impractical for many real world distributed input sources. However in some known scenarios, it is possible to define a perfect watermark, such as
- Distributed system with a single, monotonically increasing timestamp allocator (The TSO in TiDB and HLC in CockroachDB are all this mechanism)
- A statically sized input source of time-ordered input, such as Apache Kafka topic with a static set of partitions, each partition of source can be consumed with monotonically increasing event time.
If a streaming system adopts the above-mentioned perfect watermark, the latency is determined by the latency of input source. Supposing there exist N nodes in input source, and the logical timestamp forwards to T, the streaming system must wait for each of the N nodes has sent event equal or greater than T. Based on this point, if the upstream input source suffers an accident, saying partial of N nodes crash and restart, the events of these fault nodes will be delayed and latency in streaming system will be increased remarkably until the upstream recover. In the real world most upstreams have some rebalance strategies when disaster happens, the data flow of down nodes will be transferred to normal nodes to decrease the recovery time, which is also known as RTO in distributed systems. To decrease latency as far as possible, the streaming system must be awareness to upstream fault recovery or rebalance, however the latency lag won’t be less than the RTO in upstream.
In real world most streaming systems contain multiple stages, watermarks are propagated across each independent stage. As for each stage, it has an input watermark and an output watermark, the output watermark is later than input watermark and the latency is contributed by the processing time of this stage. So it is obvious that the more stages in a pipeline, the more latency will be gained. As for each stage, the processing is not always monotonic because we can segment the processing within one stage into a flow with several conceptual components, each of which contributes to the output watermark. This is a common optimization, which aims to decrease processing time by dividing jobs into parallel sub jobs.
Latency and throughput sometimes look like two incompatible things, however high throughput as well as low latency are often declared to be features that are provided by many streaming systems. So what is the real situation? To clarify this complicated problem, we must have a clear definition of latency and throughput, and have a knowledge about how these metrics affect the performance a streaming system. The paper Benchmarking Distributed Stream Data Processing Systems which is published in 2018 ICDE has a good explanation about key indicators of distributed streaming systems, it has two types for both latency and throughput, event-time latency, processing-time latency, maximum throughput and sustainable throughput respectively, the finer granularity of latency measure, the better observability can gain of a system, which also makes it easier for message tracking and stuckness diagnosis. In fact both latency and throughput are in the performance scope, whether they interact each other has many factors, a good designed streaming system should be able to achieve a liner sustainable throughput along with extensible processing nodes, provide a reasonable latency for data processing and materializing, and what’s more, the throughput and latency can be tuned (such as increasing throughput with acceptable latency increasing, or decreasing acceptable throughput to reduce latency) based on the requirement.
Guaranteeing fault-tolerant and performant stream processing is hard because most streaming processing is stateful (which means it is expensive to replay from some old points) and distributed (which means node failure happens a lot). Due to the complexity of the problem, there are many approaches to fault tolerance in the open source ecosystem. Such as the following strategies used by open source projects compared in this article.
- Record acknowledgements (Apache Storm)
- Micro batching used (Apache Storm Trident, Apache Spark Streaming)
- Transactional updates (Google Cloud Dataflow)
- Distributed Snapshots (Apache Flink)
In the real implementation of TiCDC, we choose at least once delivery strategy, for two reasons
- Sink is robust in the face of replay: For the sink type of message queue, TiCDC can output a perfect watermark, in each sized window with watermark, data integrity is guaranteed even redundant data exists; For the sink type of relation database, TiCDC can output idempotent SQL statements event data is re-delivery.
- We adopt a checkpoint mechanism in sink, which means sink maintains a global output watermark and saves it into persistent storage periodically, with the checkpoint we can replay whenever error happens, which makes the system fault tolerable and fast recoverable.
Combining with the checkpoint mechanism, the system can achieve better performance, since the loose of delivery guarantee makes it easy to use batch mechanism and fast delivery strategy.
To design a general streaming system is tough stuff, mostly because the workflow and data processing pattern can be various and complex. And in many scenarios, a general streaming system may not work better than a customized system, how to choose an appropriate framework is always controversial in software engineering. This article just has a limited view of streaming systems, the evolution of streaming architectures never stops, what we should do is to keep learning, thinking and practising.