Open
Description
Who is this feature for?
The user configs automq.zonerouter.channels to get ride of the cross AZ traffic cost.
What problem are they facing today?
The current zero cross AZ traffic cost of AutoMQ has the following aspects that can be optimized:
- Double write traffic of data: When data is forwarded across AZs, a copy of the data is written to S3 in AZ1. Then, the nodes in AZ2 read from S3 and write it again to the Partition Leader, which stores the data in its entirety. The repeated writing of data doubles the consumption of bandwidth and the S3 API.
- Consume E2E Latency P99: When AZ1 wants to read the data written by the AZ2 Partition Leader, it must wait for the 1 second Commit interval of the AZ2 node before the data becomes visible to AZ1.
Proposed solution
Main & Proxy
AutoMQ will establish a Main and Proxy relationship among nodes across AZs through consistent hashing. Suppose there are 3 AZs, with 2 nodes in each AZ, namely N1 (AZ1), N2 (AZ1), N3 (AZ2), N4 (AZ2), N5 (AZ3), and N6 (AZ3):
- A node can assume both the Main and Proxy roles simultaneously.
- A Main node has exactly one Proxy node in another AZ. For example, assume the Proxy node of N1 in AZ2 is N3.
- If N1 is the Leader of partitions PN - PM, then N3 will be responsible for message sending and receiving of PN - PM in AZ2.
- Cluster nodes will modify the Metadata response according to the AZ to which the Client belongs (via client.id or CIDR), enabling the Client to communicate only with nodes in its own AZ. For instance, if a Client in AZ2 wants to access PN, the Broker address corresponding to PN in the Metadata response will be modified to N3.
Write
- RouterChannel is used to exempt cross-AZ Produce traffic, and most of the data will be stored in RouterChannel.
- ConfirmWAL is used to store the sequencing results of the data in RouterChannel and only stores a small amount of data.
- The underlying implementation of ConfirmWAL can be S3 or NFS.
- After the KafkaApis layer of node N3 receives the ProduceRequest, it first splits and writes the data of each partition in the request into the RouterChannel.
- Write the data into the RouterChannel regardless of whether it needs to be forwarded. The majority of the data is in the RouterChannel, keeping the data volume of ConfirmWAL very small. This enables ConfirmWAL to return within 10+ms even when using S3 as the underlying storage. Furthermore, EFS can be used as the underlying storage to further reduce latency and cost (EFS charges according to the amount of transferred data).
- The RouterChannel persists the data to a shared storage, returns the channelOffset, and the written data can be read on any other node through the channelOffset.
- Then, send the ZoneRouteRequest to node N1 where the Partition Leader is located. The request contains the channelOffset returned from the previous write to the RouterChannel.
- Node N1 reads the corresponding data from the RouterChannel according to the channelOffset and writes the data into the partition.
- Carry additional information such as channelOffset outside the normal partition writing process.
- Perform data persistence: The underlying LogCache holds the complete MemoryRecords data, and ConfirmWAL (the original WAL) only records the corresponding Link Record.
- The Link Record contains information: data index, channelOffset; information added during the writing process, storeTimestamp, partitionOffset.
- Besides storing the Link Record, ConfirmWAL still stores data that originally has no link, such as Time/Txn Index and MetaStream, in the original direct storage way.
- Return success to the client.
Read
- The Proxy node N3 sends GetPartitionSnapshotRequest to the Main node N1 every 2ms.
- N1 returns the incrementally changed PartitionSnapshot (segments, endOffsets, streams, ...) and the endOffset of the current ConfirmWAL.
- N3 replay(lastReplayedOffset, endOffset) and puts the ConfirmWAL of N1 into the Snapshot-Read Cache.
- If it is a Link Record, additionally parse the real data from the RouterChannel.
- N3 applies the PartitionSnapshot to the Partition and automatically wakes up the DelayFetch (the Fetch that is mounted by LongPolling due to no data).
- The Fetch process is basically the same as before. It first tries to get data from the Snapshot-Read Cache, and if not available, it gets from the BlockCache.
Link Record
epoch = -1 indicates that the StreamRecordBatch is a Link Record. The Payload of the Link Record is as follows:
magic | sourceNodeId | channelOffset | partitionOffset | timestamp
The parsing process is as follows:
- Obtain the corresponding RouterChannel through the sourceNodeId.
- Call RouterChannel#get(channelOffset) to read and parse the corresponding MemoryRecord.
- Insert partitionOffset and timestamp into the MemoryRecord and return.
RouterChannel Cleanup
- The Controller Leader updates regularly every 10s: two Records, newRouteEpoch and fencedRouteEpoch, where fencedRouteEpoch = newRouteEpoch - 1. The epoch is broadcast to each node through KRaft.
- After Proxy N3 receives newRouteEpoch, it sets RouterChannel#nextEpoch(newRouteEpoch), and all subsequent forwarded requests will use newRouteEpoch.
- After Main N1 receives fencedRouteEpoch, it rejects any ZoneRouteRequest that is less than or equal to fencedRouteEpoch.
- Main N1 waits for all requests with fencedRouteEpoch to be persisted. When committing, it carries RouteEpoch = fencedRouteEpoch.
- To prevent the RouterChannel from being unable to be cleaned due to low node traffic and delayed commits, a commit is forced every 10s.
- The Controller Leader checks the RouteEpoch of all Brokers, takes the minimum value and issues commitedRouteEpoch.
- Nodes that are Fenced & have no Opening Stream are not included in the calculation.
- Proxy N3 calls RouterChannel#trim(commitedRouteEpoch) to clean up the committed data.
Additional notes
No response