Apache Cassandra 4.0 has added several new improvements to internode messaging.
The internode messaging protocol has been optimized (CASSANDRA-14485). Previously the
IPAddressAndPort of the sender was included with each message that was sent even though the
IPAddressAndPort had already been sent once when the initial connection/session was established. In Cassandra 4.0
IPAddressAndPort has been removed from every separate message sent and only sent when connection/session is initiated.
Another improvement is that at several instances (listed) a fixed 4-byte integer value has been replaced with
vint as a
vint is almost always less than 1 byte:
paramSize(the number of parameters in the header)
In Cassandra 4.0 peer-to-peer (internode) messaging has been switched to non-blocking I/O (NIO) via Netty (CASSANDRA-8457).
As serialization format, each message contains a header with several fixed fields, an optional key-value parameters section, and then the message payload itself. Note: the IP address in the header may be either IPv4 (4 bytes) or IPv6 (16 bytes).
The diagram below shows the IPv4 address for brevity.
1 1 1 1 1 2 2 2 2 2 3 3 3 3 3 4 4 4 4 4 5 5 5 5 5 6 6 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | PROTOCOL MAGIC | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Message ID | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Timestamp | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Addr len | IP Address (IPv4) / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / | Verb / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / | Parameters size / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / | Parameter data / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Payload size | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | / / Payload / / | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
An individual parameter has a String key and a byte array value. The key is serialized with its length, encoded as two bytes, followed by the UTF-8 byte encoding of the string. The body is serialized with its length, encoded as four bytes, followed by the bytes of the value.
System stability is improved by enforcing strict resource limits (CASSANDRA-15066) on the number of outbound messages that are queued, measured by the
serializedSize of the message. There are three separate limits imposed simultaneously to ensure that progress is always made without any reasonable combination of failures impacting a node’s stability.
N*3 * (internode_application_send_queue_capacity_in_bytes + internode_application_receive_queue_capacity_in_bytes)messages queued without any coordination between them although in practice, with token-aware routing, only RF*tokens nodes should need to communicate with significant bandwidth.
cassandra.yamlfor resource limits on queued messages.
internode_application_send_queue_capacity_in_bytes: 4194304 #4MiB internode_application_send_queue_reserve_endpoint_capacity_in_bytes: 134217728 #128MiB internode_application_send_queue_reserve_global_capacity_in_bytes: 536870912 #512MiB internode_application_receive_queue_capacity_in_bytes: 4194304 #4MiB internode_application_receive_queue_reserve_endpoint_capacity_in_bytes: 134217728 #128MiB internode_application_receive_queue_reserve_global_capacity_in_bytes: 536870912 #512MiB
Metrics is improved by keeping metrics using virtual tables for inter-node inbound and outbound messaging (CASSANDRA-15066). For inbound messaging a virtual table (
internode_inbound) has been added to keep metrics for:
A separate virtual table (
internode_outbound) has been added for outbound inter-node messaging. The outbound virtual table keeps metrics for:
A specialized version of hint message that takes an already encoded in a
ByteBuffer hint and sends it verbatim has been added. It is an optimization for when dispatching a hint file of the current messaging version to a node of the same messaging version, which is the most common case. It saves on extra
ByteBuffer allocations one redundant hint deserialization-serialization cycle.
A configuration setting has been added to
cassandra.yaml for the maximum continuous period a connection may be unwritable in application space.
# internode_application_timeout_in_ms = 30000
Some other new features include logging of message size to trace message for tracing a query.
In pre-4.0 Paxos prepare and propose messages always go through entire
MessagingService stack in Cassandra even if request is to be served locally, we can enhance and make local requests severed w/o involving
MessagingService. Similar things are done elsewhere in Cassandra which skips
MessagingService stage for local requests.
This is what it looks like in pre 4.0 if we have tracing on and run a light-weight transaction:
Sending PAXOS_PREPARE message to /A.B.C.D [MessagingService-Outgoing-/A.B.C.D] | 2017-09-11 21:55:18.971000 | A.B.C.D | 15045 … REQUEST_RESPONSE message received from /A.B.C.D [MessagingService-Incoming-/A.B.C.D] | 2017-09-11 21:55:18.976000 | A.B.C.D | 20270 … Processing response from /A.B.C.D [SharedPool-Worker-4] | 2017-09-11 21:55:18.976000 | A.B.C.D | 20372
Same thing applies for Propose stage as well.
In version 4.0 Paxos prepare and propose stage for local requests are optimized (CASSANDRA-13862).
Several other quality assurance improvements have been made in version 4.0 (CASSANDRA-15066).
Version 4.0 introduces framing to all internode messages, i.e. the grouping of messages into a single logical payload with headers and trailers; these frames are guaranteed to either contain at most one message, that is split into its own unique sequence of frames (for large messages), or that a frame contains only complete messages.
Previously, intra-datacenter internode messages would be unprotected from corruption by default, as only LZ4 provided any integrity checks. All messages to post 4.0 nodes are written to explicit frames, which may be:
The Unprotected option is still available.
For resilience, all frames are written with a separate CRC protected header, of 8 and 6 bytes respectively. If corruption occurs in this header, the connection must be reset, as before. If corruption occurs anywhere outside of the header, the corrupt frame will be skipped, leaving the connection intact and avoiding the loss of any messages unnecessarily.
Previously, any issue at any point in the stream would result in the connection being reset, with the loss of any in-flight messages.
The overall memory usage, and number of byte shuffles, on both inbound and outbound messages is reduced.
Outbound the Netty LZ4 encoder maintains a chunk size buffer (64KiB), that is filled before any compressed frame can be produced. Our frame encoders avoid this redundant copy, as well as freeing 192KiB per endpoint.
Inbound, frame decoders guarantee only to copy the number of bytes necessary to parse a frame, and to never store more bytes than necessary. This improvement applies twice to LZ4 connections, improving both the message decode and the LZ4 frame decode.
Version 4.0 introduces several improvements to the inbound path.
An appropriate message handler is used based on whether large or small messages are expected on a particular connection as set in a flag.
NonblockingBufferHandler, running on event loop, is used for small messages, and
BlockingBufferHandler, running off event loop, for large messages. The single implementation of
InboundMessageHandler handles messages of any size effectively by deriving size of the incoming message from the byte stream. In addition to deriving size of the message from the stream, incoming message expiration time is proactively read, before attempting to deserialize the entire message. If it’s expired at the time when a message is encountered the message is just skipped in the byte stream altogether.
And if a message fails to be deserialized while still on the receiving side - say, because of table id or column being unknown - bytes are skipped, without dropping the entire connection and losing all the buffered messages. An immediately reply back is sent to the coordinator node with the failure reason, rather than waiting for the coordinator callback to expire. This logic is extended to a corrupted frame; a corrupted frame is safely skipped over without dropping the connection.
Inbound path imposes strict limits on memory utilization. Specifically, the memory occupied by all parsed, but unprocessed messages is bound - on per-connection, per-endpoint, and global basis. Once a connection exceeds its local unprocessed capacity and cannot borrow any permits from per-endpoint and global reserve, it simply stops processing further messages, providing natural backpressure - until sufficient capacity is regained.
A consistent approach is adopted for all kinds of failure to connect, including: refused by endpoint, incompatible versions, or unexpected exceptions;
We sometimes need to reconnect a perfectly valid connection, e.g. if the preferred IP address changes. We ensure that the underlying connection has no in-progress operations before closing it and reconnecting.
Propagates to callbacks instantly, better preventing overload by reclaiming committed memory.
Failures to flush to network, perhaps because the connection has been reset are not currently notified to callback handlers, as the necessary information has been discarded, though it would be possible to do so in future if we decide it is worth our while.
“Gossip” connection has been replaced with a general purpose “Urgent” connection, for any small messages impacting system stability.
We track, and expose via Virtual Table and JMX, the number of messages and bytes that: we could not serialize or flush due to an error, we dropped due to overload or timeout, are pending, and have successfully sent.
Cassandra pre-4.0 doesn’t protect the server from allocating huge buffers for the inter-node Message objects. Adding a message size limit would be good to deal with issues such as a malfunctioning cluster participant. Version 4.0 introduced max message size config param, akin to max mutation size - set to endpoint reserve capacity by default.
As discussed in (CASSANDRA-9289) it would be nice to gracefully recover from seeing an unknown table in a message from another node. Pre-4.0, we close the connection and reconnect, which can cause other concurrent queries to fail.
Version 4.0 fixes the issue by wrapping message in-stream with
UnknownCFException, and skipping the remaining bytes in this message. TCP won’t be closed and it will remain connected for other messages.