Improved Internode Messaging
Apache Cassandra 4.0 has added several new improvements to internode messaging.
Optimized Internode Messaging Protocol
The internode messaging protocol has been optimized
(CASSANDRA-14485).
Previously the IPAddressAndPort
of the sender was included with each
message that was sent even though the IPAddressAndPort
had already
been sent once when the initial connection/session was established. In
Cassandra 4.0 IPAddressAndPort
has been removed from every separate
message sent and only sent when connection/session is initiated.
Another improvement is that at several instances (listed) a fixed 4-byte
integer value has been replaced with vint
as a vint
is almost always
less than 1 byte:
-
The
paramSize
(the number of parameters in the header) -
Each individual parameter value
-
The
payloadSize
NIO Messaging
In Cassandra 4.0 peer-to-peer (internode) messaging has been switched to non-blocking I/O (NIO) via Netty (CASSANDRA-8457).
As serialization format, each message contains a header with several fixed fields, an optional key-value parameters section, and then the message payload itself. Note: the IP address in the header may be either IPv4 (4 bytes) or IPv6 (16 bytes).
The diagram below shows the IPv4 address for brevity.
1 1 1 1 1 2 2 2 2 2 3 3 3 3 3 4 4 4 4 4 5 5 5 5 5 6 6 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | PROTOCOL MAGIC | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Message ID | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Timestamp | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Addr len | IP Address (IPv4) / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / | Verb / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / | Parameters size / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / | Parameter data / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Payload size | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | / / Payload / / | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
An individual parameter has a String key and a byte array value. The key is serialized with its length, encoded as two bytes, followed by the UTF-8 byte encoding of the string. The body is serialized with its length, encoded as four bytes, followed by the bytes of the value.
Resource limits on Queued Messages
System stability is improved by enforcing strict resource limits
(CASSANDRA-15066)
on the number of outbound messages that are queued, measured by the
serializedSize
of the message. There are three separate limits imposed
simultaneously to ensure that progress is always made without any
reasonable combination of failures impacting a node’s stability.
-
Global, per-endpoint and per-connection limits are imposed on messages queued for delivery to other nodes and waiting to be processed on arrival from other nodes in the cluster. These limits are applied to the on-wire size of the message being sent or received.
-
The basic per-link limit is consumed in isolation before any endpoint or global limit is imposed. Each node-pair has three links: urgent, small and large. Any given node may have a maximum of
N*3 * (internode_application_send_queue_capacity in bytes + internode_application_receive_queue_capacity in bytes)
of messages data queued without any coordination between them although in practice, with token-aware routing, only RF*tokens nodes should need to communicate with significant bandwidth. -
The per-endpoint limit is imposed on all messages exceeding the per-link limit, simultaneously with the global limit, on all links to or from a single node in the cluster. The global limit is imposed on all messages exceeding the per-link limit, simultaneously with the per-endpoint limit, on all links to or from any node in the cluster. The following configuration settings have been added to
cassandra.yaml
for resource limits on queued messages.
internode_application_send_queue_capacity: 4MiB internode_application_send_queue_reserve_endpoint_capacity: 128MiB internode_application_send_queue_reserve_global_capacity: 512MiB internode_application_receive_queue_capacity: 4MiB internode_application_receive_queue_reserve_endpoint_capacity: 128MiB internode_application_receive_queue_reserve_global_capacity: 512MiB
Virtual Tables for Messaging Metrics
Metrics is improved by keeping metrics using virtual tables for
inter-node inbound and outbound messaging
(CASSANDRA-15066).
For inbound messaging a virtual table (internode_inbound
) has been
added to keep metrics for:
-
Bytes and count of messages that could not be serialized or flushed due to an error
-
Bytes and count of messages scheduled
-
Bytes and count of messages successfully processed
-
Bytes and count of messages successfully received
-
Nanos and count of messages throttled
-
Bytes and count of messages expired
-
Corrupt frames recovered and unrecovered
A separate virtual table (internode_outbound
) has been added for
outbound inter-node messaging. The outbound virtual table keeps metrics
for:
-
Bytes and count of messages pending
-
Bytes and count of messages sent
-
Bytes and count of messages expired
-
Bytes and count of messages that could not be sent due to an error
-
Bytes and count of messages overloaded
-
Active Connection Count
-
Connection Attempts
-
Successful Connection Attempts
Hint Messaging
A specialized version of hint message that takes an already encoded in a
ByteBuffer
hint and sends it verbatim has been added. It is an
optimization for when dispatching a hint file of the current messaging
version to a node of the same messaging version, which is the most
common case. It saves on extra ByteBuffer
allocations one redundant
hint deserialization-serialization cycle.
Internode Application Timeout
A configuration setting has been added to cassandra.yaml
for the
maximum continuous period a connection may be unwritable in application
space.
# internode_application_timeout_in_ms = 30000
Some other new features include logging of message size to trace message for tracing a query.
Paxos prepare and propose stage for local requests optimized
In pre-4.0 Paxos prepare and propose messages always go through entire
MessagingService
stack in Cassandra even if request is to be served
locally, we can enhance and make local requests severed w/o involving
MessagingService
. Similar things are done elsewhere in Cassandra which
skips MessagingService
stage for local requests.
This is what it looks like in pre 4.0 if we have tracing on and run a light-weight transaction:
Sending PAXOS_PREPARE message to /A.B.C.D [MessagingService-Outgoing-/A.B.C.D] | 2017-09-11 21:55:18.971000 | A.B.C.D | 15045 … REQUEST_RESPONSE message received from /A.B.C.D [MessagingService-Incoming-/A.B.C.D] | 2017-09-11 21:55:18.976000 | A.B.C.D | 20270 … Processing response from /A.B.C.D [SharedPool-Worker-4] | 2017-09-11 21:55:18.976000 | A.B.C.D | 20372
Same thing applies for Propose stage as well.
In version 4.0 Paxos prepare and propose stage for local requests are optimized (CASSANDRA-13862).
Quality Assurance
Several other quality assurance improvements have been made in version 4.0 (CASSANDRA-15066).
Framing
Version 4.0 introduces framing to all internode messages, i.e. the grouping of messages into a single logical payload with headers and trailers; these frames are guaranteed to either contain at most one message, that is split into its own unique sequence of frames (for large messages), or that a frame contains only complete messages.
Corruption prevention
Previously, intra-datacenter internode messages would be unprotected from corruption by default, as only LZ4 provided any integrity checks. All messages to post 4.0 nodes are written to explicit frames, which may be:
-
LZ4 encoded
-
CRC protected
The Unprotected option is still available.
Resilience
For resilience, all frames are written with a separate CRC protected header, of 8 and 6 bytes respectively. If corruption occurs in this header, the connection must be reset, as before. If corruption occurs anywhere outside of the header, the corrupt frame will be skipped, leaving the connection intact and avoiding the loss of any messages unnecessarily.
Previously, any issue at any point in the stream would result in the connection being reset, with the loss of any in-flight messages.
Efficiency
The overall memory usage, and number of byte shuffles, on both inbound and outbound messages is reduced.
Outbound the Netty LZ4 encoder maintains a chunk size buffer (64KiB), that is filled before any compressed frame can be produced. Our frame encoders avoid this redundant copy, as well as freeing 192KiB per endpoint.
Inbound, frame decoders guarantee only to copy the number of bytes necessary to parse a frame, and to never store more bytes than necessary. This improvement applies twice to LZ4 connections, improving both the message decode and the LZ4 frame decode.
Inbound Path
Version 4.0 introduces several improvements to the inbound path.
An appropriate message handler is used based on whether large or small
messages are expected on a particular connection as set in a flag.
NonblockingBufferHandler
, running on event loop, is used for small
messages, and BlockingBufferHandler
, running off event loop, for large
messages. The single implementation of InboundMessageHandler
handles
messages of any size effectively by deriving size of the incoming
message from the byte stream. In addition to deriving size of the
message from the stream, incoming message expiration time is proactively
read, before attempting to deserialize the entire message. If it’s
expired at the time when a message is encountered the message is just
skipped in the byte stream altogether. And if a message fails to be
deserialized while still on the receiving side - say, because of table
id or column being unknown - bytes are skipped, without dropping the
entire connection and losing all the buffered messages. An immediately
reply back is sent to the coordinator node with the failure reason,
rather than waiting for the coordinator callback to expire. This logic
is extended to a corrupted frame; a corrupted frame is safely skipped
over without dropping the connection.
Inbound path imposes strict limits on memory utilization. Specifically, the memory occupied by all parsed, but unprocessed messages is bound - on per-connection, per-endpoint, and global basis. Once a connection exceeds its local unprocessed capacity and cannot borrow any permits from per-endpoint and global reserve, it simply stops processing further messages, providing natural backpressure - until sufficient capacity is regained.
Outbound Connections
Opening a connection
A consistent approach is adopted for all kinds of failure to connect, including: refused by endpoint, incompatible versions, or unexpected exceptions;
-
Retry forever, until either success or no messages waiting to deliver.
-
Wait incrementally longer periods before reconnecting, up to a maximum of 1s.
-
While failing to connect, no reserve queue limits are acquired.
Closing a connection
-
Correctly drains outbound messages that are waiting to be delivered (unless disconnected and fail to reconnect).
-
Messages written to a closing connection are either delivered or rejected, with a new connection being opened if the old is irrevocably closed.
-
Unused connections are pruned eventually.
Reconnecting
We sometimes need to reconnect a perfectly valid connection, e.g. if the preferred IP address changes. We ensure that the underlying connection has no in-progress operations before closing it and reconnecting.
Message Failure
Propagates to callbacks instantly, better preventing overload by reclaiming committed memory.
Expiry
-
No longer experiences head-of-line blocking (e.g. undroppable message preventing all droppable messages from being expired).
-
While overloaded, expiry is attempted eagerly on enqueuing threads.
-
While disconnected we schedule regular pruning, to handle the case where messages are no longer being sent, but we have a large backlog to expire.
Serialization Errors
-
Do not result in the connection being invalidated; the message is simply completed with failure, and then erased from the frame.
-
Includes detected mismatch between calculated serialization size to actual.
Failures to flush to network, perhaps because the connection has been reset are not currently notified to callback handlers, as the necessary information has been discarded, though it would be possible to do so in future if we decide it is worth our while.
Added a Message size limit
Cassandra pre-4.0 doesn’t protect the server from allocating huge buffers for the inter-node Message objects. Adding a message size limit would be good to deal with issues such as a malfunctioning cluster participant. Version 4.0 introduced max message size config param, akin to max mutation size - set to endpoint reserve capacity by default.
Recover from unknown table when deserializing internode messages
As discussed in
(CASSANDRA-9289)
it would be nice to gracefully recover from seeing an unknown table in a
message from another node. Pre-4.0, we close the connection and
reconnect, which can cause other concurrent queries to fail. Version 4.0
fixes the issue by wrapping message in-stream with
TrackedDataInputPlus
, catching UnknownCFException
, and skipping the
remaining bytes in this message. TCP won’t be closed and it will remain
connected for other messages.