Lessons learned from TiCDC

TiCDC is a change data capture framework developed for TiDB ecosystem. It allows for the replication of change data to various downstream systems, including MySQL protocol-compatible databases, messaging queue systems like Kafka, and Object Storage Service like S3. As the top one code contributor and the former tech leader of this project, I witnessed the project from a concept to a mature system running in many product environments, and also led several important architecture evolutions of this project. At this moment, I’m going to leave this project and explore new opportunities, I’d like to retrospect and summarize the lessons I have learned from this project.

Highlights of this system

In this part I will talk about the highlights of TiCDC, from the perspectives of system architecture, core features, smart design or some engineering implementations.

A well designed plug-and-play, monolithic architecture

TiCDC stands out due to its well-designed plug-and-play monolithic architecture. Setting up a synchronization path for replicating row-based change data from an existing TiDB cluster is a breeze. Users only need to deploy one or more TiCDC processes connected to the same TiDB cluster to form a unified cluster automatically, without any topology configuration, meta-system deployment, or separated components. Different compute components, including the data source part(puller), data sort part(sorter), data encapsulation part(mounter) and data sink part(sink) are running in the same process. That is what I mean it is a plug-and-play system, and it is a classic monolithic architecture. In addition to its straightforward deployment, TiCDC is designed to handle system maintenance effectively, especially for managing many high-availability scenarios automatically, including node crasing, upstream failure, network partition and so on. TiCDC provides high availability at both the process and data levels.

  • Process level: the availability of process level means there is always a running entity that is replicating data from upstream to downstream. As for downtime it refers to the RTO(Recovery Time Objective).
  • Data level: the availability of data level means TiCDC can always ensure no data loss when system meets disaster and the system guarantees zero RPO in most scenarios(the most here means it still has some corner cases that zero RPO can’t meet).

To ensure process-level availability, TiCDC ships a leader-follower role model in cluster and relies heavily on the etcd election feature for leader election. A nearly always online leader manages the synchronization tasks of the entire cluster, including task rescheduling when some node crashes or experiences network partition. In addition a builtin two-phase scheduling can reduce latency in many scenarios including graceful restart, table re-scheduling and more.

To achieve data-level availability, TiCDC doesn’t persist any data, it reuses the MVCC data in upstream TiKV to avoid data loss during a disaster. The basic idea is to delay upstream GC of MVCC data when the synchronization progress has fallen behind in TiCDC, and TiCDC can scan the historical data from TiKV if it is not GCed. This explains why TiCDC guarantees zero RPO in most scenarios, except for the scenario that upstream TiKV GC happens without respecting TiCDC synchronization progress.

Well tuned components

As described in the previous section, there exist several core components in the data flow link of TiCDC, puller, sorter, mounter and sink respectively. Data is flowed in a pipeline mode among these components via gRPC bi-streaming or golang builtin channel, including

  • TiKV pushes data to TiCDC puller via gRPC stream
  • puller pushes data to sorter via golang channel
  • sink pulls data from sorter via golang channel

In order to guarantee second level synchronization latency, each component must keep in pace, guarantee sub-second level processing latency and low latency when sending data between components. At the same time, low latency also means high throughput since the traffic in the upstream can range a lot. To meet the latency and throughput requirement, each TiCDC component has many optimization techniques. These techniques have been fully discussed in several blogs including:

  • The puller implement
  • The sorter implement, where a LSM tree based kv storage pebble is used for data sorting
  • The pull based sink, sink itself maintains a memory quota module and pulls data from sorter by actual demand. In this way if regression happens in downstream, the accumulated data in sink is limited and won’t cause OOM. At the same time more data will be continually pushed from TiKV to puller and most of the data will be accumulated in the sorter, where data is cached in disk by multiple levels of LSM tree.
  • As for the underlying sink, it is a simple client model, including MySQL client, Kafka client or S3 client. There are some common optimization tricks such as concurrent worker, batch mechanism, asynchronous acknowledgement in Kafka producer, multiple statements optimization in MySQL protocol, etc.

Shortcomings of the system

There is no silver bullet in software engineering, especially when requirements and scenarios become more and more complex. there are still scenarios where the current system doesn’t behave well, including:

  • The multi tenants scenario. In this scenario, a single TiCDC cluster needs to serve tens of thousands of changefeeds belonging to different tenants, requiring elastic scheduling and resource isolation. Unfortunately the current monolithic architecture doesn’t fit very well with this scenario for two reasons

    • The leader runs a tick function periodically, in which function it iterates each changefeed one by one to maintain changefeed status, including synchronization progress, scheduling information etc.
    • Each compute component (puller, sorter, mounter and sink respectively) runs in background goroutines and shares the whole compute resource of the host, there is no resource isolation mechanism.
  • The fast recovery requirement. Supposing there is some error happens in the synchronization path, such as writing to MySQL returns an error and the whole changefeed restarts. The ideal way is to reuse existing computing result and only restarts necessary computing component, in this context it is better to restart sink only, and reuse existing data including the received change logs in puller, the sorted data in sorter and the encapsulated data in mounter. However in current design, TiCDC has to ask TiKV to re-scan historical data from the existing checkpoint, given the watermark that TiCDC has processed before error happens, the data between (checkpoint, processed-watermark] will be processed twice, if the checkpoint is fallen behind a lot, the data size could be very large.

  • The requirements of flexible integration and computational decoupling. These are two different requirements, but they all require TiCDC behaves like a framework instead of an independent product. More specifically

    • For the flexible integration, supposing users want to collect the encapsulated change logs from TiKV directly and send them to existing computing platform, it is difficult to use a simple TiKV cdc client, which collects raw change logs and encapsulate to standard format such as SQL, and writes data out via a pre-defined RPC format. Or use the code of TiCDC as a golang library, integrate into a program and implement a cdc adapter.
    • As for the computational decoupling, it is impossible to limit the resource usage of each compute component, scale out resource based on the workload of different computing components or implement asynchronous computing in the current architecture.
  • Implement a decoupled testing system. Although ticdc has a mature testing system, which covers unit tests, end to end integration tests and various chaos tests scenarios, it still lacks deterministic, and components based testing. Component-based testing can be treated as module-level end-to-end (E2E) testing. For example, in the case of the sorter component, there can be predefined data flow inputs and downstream sink module retrieval, allowing for quantitative testing of the sorter module’s performance and capability. This type of testing is highly beneficial for scenarios involving data consistency and performance testing. Compared to E2E testing across the entire system, this testing approach has a finer granularity and greater test reproducibility.

According to the above description, there are many scenarios for data replication:

  • On premises vs. on the cloud
  • Building an independent data link for single TiDB cluster vs. building a data transformation platform
  • Using pre-defined resource vs. adaptive resource control with auto scaling
  • Using a plug-and-play system vs. secondary development

We can’t build a single system to meet the requirements from all these scenarios, TiCDC has made many tradeoffs to meet the requirements of its target scenarios. In the next chapter, I will further analyze the tradeoffs made during the process of building TiCDC.

Lessons learned from this project

In this part I will talk about stories behind several core features development, including how technology decision was made, the history of implementation evolution and what I have learned from these stories.

The old value feature

In the first version of TiKV cdc component, it only captures the put or delete KV operation, so for the delete or update SQL in the upstream, TiKV cdc outputs the after image of this row only, making it impossible for TiCDC to output the before image of a row change, which is useful in many scenarios, such as the old filed in maxwell protocol, use old image for row positioning in message consuming scenario etc. After struggling without this feature for several months, we decided to implement it in TiKV cdc component. The implementation involved creating a cache for fast old value retrival, and if cache miss occurs, the cdc component would seek key-value pair from snapshot of kv engine. This feature has become the fundamental of cdc data source, and achieves data completeness. TiCDC doesn’t need to subscribe change logs of index region and has more flexibility in data processing.

Initially, there were concerns about the performance and complexity of implementing the old value feature in TiKV cdc, but we ignore the functional necessity. However, the functional necessity of this feature is reasonable, fundamental, and important, making it worth investing a significant amount of labor and resources into. Fortunately we finally decided to solve this problem. To our surprise after we started working on solving this problem, we didn’t meet many problems that we used to concern, and did not consume too much manpower while working on this feature. Overall, adding the old value feature to TiKV cdc was a significant improvement that solved a crucial problem and provided more flexibility in data processing. It was an essential investment that proved to be worthwhile.

The history of sorter

At the very begining, TiCDC only has a in memory sorter, which runs very fast but prone to OOM errors. In order to avoid OOM we have to restrict the input buffer size which is used for buffer pending events in memory sorter, making it impossible to handle the incremental scan scenario (The incremental scan scenario refers to TiCDC starts to synchronize data from an old TSO, TiKV scans large size data from snapshot of kv engine and all of this data needs to be sorted together).

To address this limitation, we designed the unified sorter, which combines memory sorter and disk based merge sorter. The unified sroter solves both OOM problem and the problem of large historical data incremental scan, which is benefit from the cache capability provided by disk storage. However in the replication model, each table needs to start a table pipeline for data processing, and each table pipeline needs to initialize a UnifiedSorter object. When TiCDC needs to replicate large amount tables, for example 60M tables, TiCDC has to open several files for each table pipeline and meets no space left on device error (ref: pingcap/tiflow/issues/2793).

To address the file descriptor problem, two solutions were proposed, optimizing the unified sorter or using a different sorting mechanism that can sort over multiple tables and provide read API by tables. After discussion an LSM tree based db sorter was adopted and it took about half a year to mature this feature.

Along with the usage scenarios increase, the data scale continues to expand, TiCDC’s performance is facing increasingly stringent challenge. The evolution of this feature deeply reflects the evolution of scenarios and requirements.

The buggy history of flow control

As a monolithic architecture, TiCDC components are all running in the same process and the internal data flow is connected via golang channel. For quite a long time we didn’t make detail regulation between components to restrict the rate of data input and output, or have a overall flow control and back pressure design. We met many bugs mainly OOM and deadlocks, such as pingcap/tiflow/issues/1675, pingcap/tiflow/issues/3531, pingcap/tiflow/issues/3338. In order to solve these problems, we made many bug fixes and they can be grouped into several categories:

  • Split critical tasks and secondary tasks to avoid the starvation of critical task slows down the whole system, even causes deadlock. A real example of this fix is to run DDL EventFeed stream in an independent gRPC connection.
  • Replace blocking API with non-blocking API, if the blocking code could trigger a chain reaction and cause deadlock.
  • Choose the push model or pull model carefully, especially when handling complex data flow and scenes full of uncertainty, such as huge performance changes in either producer or consumer, multiple data flows that are affected each other (that is the exactly the old situation in TiCDC push model and table based memory control, each table has its own memory quota, but the progress of each table is controlled by a global watermark, on the other hand the global watermark is decided by the resolved watermark of each table).

Apart from the bug fixes, ticdc also made several architecture refactor, changing from the original channel driven model, to table pipeline model, then to actor model, and finally made a pull based model(in sink part, except for puller and sorter). Each refactor consumed a lot of manpower, however most of the codes in pipeline model and actor model are not used any more. As a matter of fact we used to disscuss the microservice architecture for TiCDC and tried to build each computing components into independent services, such as puller service, sorter service and sink service, they can be connected with explicit API and under clear flow control. However, these plans were not fully realized due to product strategy and a lack of strong execution. Despite this, the TiCDC team is currently considering evolving the project to accommodate multi tenancy scenario, this is an existing evolution, let’s wait and see it happens.

It is always a controversial topic about whether to adopt a microservices architecture and how to divide services. Recently there is a hot discussion about microservices, Prime Video audio/video monitoring service reduced costs by 90% by migrating from a microservice to a single process and another similar article provides high level summary of serverless, microservices, and monolithic architecture, and concludes several principles for dividing microservices:

  1. Bounded context. The granularity of microservices cannot be larger than the Bounded Context in Domain-Driven Design, which is a business domain.
  2. Single responsibility, high cohesion, and loose coupling. Combine what changes for the same reason (cohesion) and separate what changes for different reasons (decoupling).
  3. Transactions and consistency. For two heavily dependent functions that require completing a transaction and ensuring strong consistency, it is best not to separate them and keep them together.
  4. Match the organizational structure. Keep things from the same team together and separate those from different teams.

Regarding the above three points, TiCDC is well suited for service-oriented decomposition:

  1. The puller and sink are highly abstract source and sink components, very similar to the abstraction of Kafka Connectors; the sorter is data consistency oriented component, and the mounter is a computing component used for data processing and transforming.
  2. Referring to the first point, these four components are independent in the business domain and have computing responsibilities that meet the characteristics of high cohesion and low coupling.
  3. Each data change in TiCDC includes a global timestamp (TSO), and TiCDC provides watermark with linearizability guarantee (which is called resolved-ts in TiCDC). The sorting of timestamps can ensure consistency of data across components, and there is no need for transaction-level constraints between components.

In fact, during the process of TiCDC’s future cloudification and moving to service-oriented architecture, there must be many known problems, such as the cost and binding issues when integrating with cloud computing components; the impact on system latency, cost increase, SLA, and operational costs due to the data flow pattern of inter-service communication after splitting into multiple services, all of which need to be addressed and there is a long way to go.

Summary

Although the development of TiCDC has made mistakes, took some long detours, this is the inevitable process and it’s important to learn from mistakes and improve upon them.

Regarding the ideal evolution routine I think TiCDC should abstract the key computing components and define two types of interfaces for interaction between components. By using golang channels for intra-process communication and RPC for inter-process communication, TiCDC could become a flexible and scalable system.

In addition, the ability to use TiCDC as a monolithic application, as well as separate it into different computing services, would provide more flexibility in deployment options. For example, TiCDC could be quickly deployed on-premises in the form of monolithic application, or used in a cloud environment where it can leverage various general-purpose computing capabilities such as S3 or message queues, in the form of microservice architecture.

Overall, these changes would make TiCDC a more powerful and versatile tool for data integration and processing, providing users with more options to adapt to their specific requirements.