Over the years working on Apache Cassandra while writing tests or trying to reproduce the issues, I’ve always found myself repeating the same procedure over and over again: creating schema, writing loops generating data, then either manually reconciling it to check the results, or comparing the result set against some predetermined expected result. Not only is this approach tedious and time-consuming, but it also does not scale: if some set of operations work for one schema, there’s no way to know if it will also work for any arbitrary schema, whether it will work if operations are executed in a different order, or if operations themselves are slightly different.
While preparing Apache Cassandra for 4.0 release, we’ve made extensive progress in how we test. The new in-tree in-JVM distributed test framework enables us to easily write tests that exercise coordinated query execution code paths while giving us flexibility and control that was previously offered only by CQLTester, a tool for exercising node-local query paths. Many subsystems were audited and covered with tests. Cassandra users tried the new version out in their clusters and reported their findings. All of these things are useful and important, but we still needed a tool that would give us the same or higher degree of confidence for every commit so that we could know that the database is working as expected, not only for an exact set of operations that exercised by unit and integration tests, but potentially for any use-case and combination of operations under circumstances comparable to production.
This all led us to develop Harry, a tool that can combine properties of stress- and integration-testing tools. Harry is a tool that can generate data for an arbitrary schema, execute data modification queries against the cluster, track the progress of operation execution, and make sure that responses to read queries are correct.
After reading this post, you will understand:
-
how Harry generates the data
-
how Harry performs verification
-
which properties values generated by Harry make verification not only possible but also efficient.
The primary audience for this post is Cassandra contributors, so you will need to be familiar with Apache Cassandra and its tooling.
Fuzz testing
Since most of the bugs are reproduced by taking a sequence of actions following some pattern, we need to specify what actions can be used to lead to a given state. However, there’s a lot of flexibility regarding which values exactly are written in specific columns.
For example, if we look at CASSANDRA-16453, which was reproduced using Harry. Code to reproduce the issue with in-JVM DTests looks something like this:
try (Cluster cluster = init(builder().withNodes(2).start()))
{
cluster.schemaChange(withKeyspace("CREATE TABLE distributed_test_keyspace.table_0 (pk0 bigint,ck0 bigint,regular0 bigint,regular1 bigint,regular2 bigint, PRIMARY KEY (pk0, ck0)) WITH CLUSTERING ORDER BY (ck0 ASC);"));
cluster.coordinator(1).execute("DELETE FROM distributed_test_keyspace.table_0 USING TIMESTAMP 1 WHERE pk0=1 AND ck0>2;", ConsistencyLevel.ALL);
cluster.get(2).executeInternal("DELETE FROM distributed_test_keyspace.table_0 USING TIMESTAMP 1 WHERE pk0=1;");
cluster.coordinator(1).execute("SELECT * FROM distributed_test_keyspace.table_0 WHERE pk0=1 AND ck0>=1 AND ck0<3;",
ConsistencyLevel.ALL, 1L, 1L, 3L);
}
You can see that, at first glance, there are only three things that that are important to reproduce the issue:
-
The table has to have at least one clustering column
-
Two actions are executed against the cluster: a range deletion, and a partition deletion
-
Both operations have the same timestamp
The rest of the details do not matter: size of the cluster, number of replicas, clustering order, consistency level with which operations are executed, types of clustering keys and values written, and so on.
The simplest way to cover a case like this with a test is to hardcode the schema and then execute a partition deletion and a range deletion hardcoding the values, much as we did above. This might work, but there’s still a chance that the proposed fix may not work for some other schema or some combination of values.
To improve the situation, we can express the test in more abstract terms and, instead of writing a repro using specific statements, we can only use the constraints we’ve specified above:
test(new SchemaGenerators.Builder("harry")
.partitionKeySpec(1, 5)
.clusteringKeySpec(1, 5)
.regularColumnSpec(1, 10)
.generator(),
historyBuilder -> {
historyBuilder.nextPartition()
.simultaneously()
.randomOrder()
.partitionDeletion()
.rangeDeletion()
.finish();
});
This spec can be used to generate clusters of different sizes, configured with different schemas, executing the given sequence of actions both in isolation and combined with other randomly generated ones, with failure-injection. Best of all, this test will not only ensure that such a sequence of actions does not produce an exception but also ensures that a cluster will respond with correct results to any allowed read query.
Generating data
Generating random values and sequences of actions and reconciling them during verification is in itself not a difficult task. Making this process time- and memory-efficient is what makes it more interesting.
For space efficiency, the log of actions generated using Harry is not kept in memory or saved anywhere on disk since any generated operation can be reproduced from its sequence number. In Harry, a sequence number consists of two parts: the logical timestamp (LTS, which has 1-1 mapping to real-time timestamp), and the modification ID, which allows having multiple uniquely identifiable operations for each logical timestamp. For the sake of simplicity, we’ll just say that each operation is represented by its sequence number / LTS.
In the example above, the operation order is determined by the seed
for the given run. Let’s say that partition deletion is executed
first. To produce a DELETE
statement from it, we now need to
generate a partition key and get a timestamp. Similarly, to generate a
range deletion, we will need a partition key, two clustering keys that
will serve as lower and higher bounds for the range tombstone, and a
timestamp.
Using the sequence number and knowing the operation type, we can now
produce descriptors that are used as the compact internal
representation of data in Harry. No matter how many parts it consists
of, any partition key is represented by a single long
. The same is
true for the clustering keys: any clustering key, single-part or
composite, is represented using a single long
descriptor. If we were
to generate an INSERT
or UPDATE
operation, each value for a
regular or a static column would have its own descriptor since we
would want to distinguish between two writes made by two different
operations.
To summarise, every operation has a sequence number, which determines everything that is required to fully reproduce this operation, including descriptors that we will later use to generate values themselves:
-
partition deletion only has a partition descriptor
-
range deletion has a partition descriptor and two clustering descriptors, specifying tombstone bounds
-
insert or update operation has a partition descriptor, a clustering descriptor, and a set of value descriptors, one for each regular and static column.
Using descriptors rather than specific values for verification can be extremely useful for efficiency. Instead of comparing potentially large values, we could just compare two longs that uniquely identify them. This means that we have to have a way to not only generate a value from the descriptor, but also to compute a descriptor the value was generated from.
In Harry, we call such a generator Bijection<T>
, and every bijection
can inflate a descriptor into the value of type T
. Then deflate
the value of type T
back into the descriptor where it was originally
generated.
Validating results
Applying a predetermined sequence of operations against a single
partition produces some partition state. Knowing the status of
execution of each operation, we can deterministically determine the
state of each node in the cluster and validate the results of
execution of any SELECT
query.
Since we can represent any operation as a sequence of descriptors, we know the order of operations (since the timestamp determines it). We can assume we know the status of each operation (whether or not it has been executed against some node), and we can deterministically produce partition state for any given point in time. Partition state is nothing but a sorted map, where the key is a clustering descriptor, and value is a row state. Row state, in this case, holds value descriptors for each column, and timestamps where operations were executed:
public class PartitionState implements Iterable<RowState> {
long partitionDescriptor;
NavigableMap<Long, RowState> rowStates;
}
public static class RowState {
long[] valueDescriptors;
long[] logicalTimestamps;
}
Similarly, since any value written to the database is generated using a bijection, we can produce the partition state from the result set by deflating every value returned by the database into the descriptor that it was generated from.
Generating Descriptors
Reproducible operation sequences can be generated from a set of rules that determines what the sequence is going to look like. For example, we can specify probability distributions for each operation type or give operations relative weights, which can be turned into the distribution internally later. Configuration for an insert / update / delete workload with a probability of an insert operation (100/251) being twice as high as a probability of a row deletion (50/251), and ten times more probable than a partition deletion (1/251), would look like:
INSERT: 100 UPDATE: 100 DELETE_ROW: 50 DELETE_PARTITION: 1
Since each operation is uniquely determined by its sequence number, we can deterministically compute its operation type by taking these probability distributions. One way to do this is by using PCG random number generator, which has some useful properties we’re going to use for generating our pseudorandom values.
If you’d like to learn more about the mathematical underpinnings of PCG, you should read this paper (https://www.pcg-random.org/pdf/hmc-cs-2014-0905.pdf). However, to be able to use PCG, it is not necessary to know any of the internals. We need a random number generator that will have the following properties:
-
Long period: sequence of numbers it produces does not repeat frequently; ideally the period should be 2^64 when generating a random number from 64 bits of entropy
-
Stream selection: the ability to produce different random sequences from the same seed, identified by some stream id.
-
Addressability: any number produced by the generator can be reproduced from the seed and its sequence number. Ideally, we’d like to have methods such as
long randomNumber(long sequenceNumber, long stream)
andlong sequenceNumber(long randomNumber, long stream)
. In other words, we should be able to determine the sequence number of the random number in the given stream. Using this method, we can also determinedistance(long x, long y)
: how many random numbers we should skip to gety
after seeingx
. -
Walkability: the ability to produce a number immediately following
long next(long randomNumber, long stream)
or precedinglong prev(long randomNumber, long stream)
the given random number in the random sequence.
You might have noticed that there are two ways to achieve the same
thing. We can get a pseudorandom number from some number known by the
system by using randomNumber(i, stream)
and by using prev(i,
stream)
. Both variants are valid, and both operations can be
inverted. We have a slight preference toward using prev
, since its
inverse can be computed in constant time.
These properties allow us to reproduce partition state from just configuration (i.e., known distributions, schema, size of the partition, etc) and a seed:
-
Partition descriptor for
N
th operation can be picked asM
th random number in the stream of partition descriptors, and the relation betweenN
andM
is determined by the chosen pattern for visiting partitions. -
Clustering descriptor for
N
th operation can be picked asM
th random number in the stream of clustering descriptors for the given partition, where maximumM
is determined by the maximum partition size, so there can be no more thanmax(M)
rows in any generated partition.
One of the simplest useful ways to represent a pattern for picking a descriptor from the sequence is to use a sliding window. The sliding window begins with a preset number of items in it and allows to visit each item in the current window one or several times in a round-robin fashion. After this, it cycles one of the items out and adds a new one in its place.
Once operation type, partition descriptor, and clustering descriptors
are determined, all we have left to cover is how to generate value
descriptors for INSERT
and UPDATE
operations. Value descriptor for
a column is uniquely identified by its sequence number and is bound by
partition descriptor, clustering descriptor, and column.
To summarise, all operations in Harry are deterministic and are represented using their descriptors. Descriptors can be computed hierarchically using the following rules:
-
Partition descriptor is picked from the stream of partition descriptors. Its position in that stream is determined by some rule (for example, a sliding window):
long pd = rng.randomNumber(positionFor(sequenceNumber), PARTITION_DESCRIPTOR_STREAM_ID)
-
Clustering descriptor is picked from the stream of clustering descriptors for the given partition.
long cd = rng.prev(positionInPartition, pd);
-
Value descriptor is picked from the stream of descriptors identified by which partition, clustering, and column the value belongs to:
long vd = rng.randomNumber(sequenceNumber, pd ^ cd ^ col);
Inflation and Deflation
We’ve mentioned before that one reason Harry state is so compact and can be validated so efficiently is because every value read from the database can be traced back to the descriptor it was generated from. To achieve this, we generate all values using order-preserving bijections. In other words, for any value generated from a descriptor, it should be possible to quickly find a descriptor this value was generated from, and two values generated from two distinct descriptors should sort the same as descriptors themselves.
Implementing an order-preserving bijection for 64-bit longs is trivial and can be achieved by using an identity function. Essentially, any long descriptor is the value it represents:
long inflate(long descriptor) {
return descriptor;
}
long deflate(long value) {
return value;
}
There are many ways to make a bijection for strings. One of the ways to do it is to have a set of 256 short strings of the same length in a sorted array. When inflating a 64-bit long descriptor into the string, we’ll be iterating over these 64 bits, taking 8 bits (one byte) at a time, using the value of this byte as an index in an array of 256 strings.
String inflate(long descriptor) {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < Long.BYTES; i++) {
int idx = getByte(descriptor, i);
builder.append(nibbles[idx]);
}
return builder.toString();
}
One thing we should take into account here is that strings are compared byte-wise, while longs use signed comparison. To make sure generated strings have the same order as descriptors, we need to XOR the sign bit.
Since any two strings produced by this generator will be unique, and we can produce at most 2^64 values using this generator, to generate longer strings we do not even need larger nibbles. We can append random data of arbitrary length to the end of the string. This does not change the order since it is determined by the prefix generated from nibbles that is unique to each value.
Such simple bijections can represent data types used for regular and static columns. We’ve previously mentioned that partition and clustering keys are also represented using 64-bit descriptors. Partition and clustering keys are composite: they consist of multiple distinct parts. One way to implement bijection for a composite type is to “slice” 64 bits of entropy into smaller chunks, each chunk giving some entropy to generate a different part of the key. Each slice is then inflated using a bijection that corresponds to the part of the key it represents. To convert the value back to the descriptor, we must deflate each part of the key and then “stitch” the values back together into a 64-bit descriptor.
To summarise, key generators are just bijections that can generate multiple values for a single 64-bit descriptor instead of one. A simplified and generalized version of such bijection may look something like this:
Object[] inflate(long descriptor) {
long[] slices = slice(descriptor);
Object[] key = new Object[slices.length];
for (int i = 0; i < slices.length; i++) {
key[i] = children[i].inflate(slices[i]);
}
return key;
}
long deflate(Object[] value) {
long[] slices = new long[value.length];
for (int i = 0; i < value.length; i++) {
slices[i] = children[i].deflate(value[i]);
}
return stitch(slices);
}
Values generated by key generators preserve the order of descriptors they were generated from, which allows efficiently checking the order of results, comparing clustering descriptors, and validating range deletions.
Putting it all together
In this post, we’ve learned how the various parts of Harry work, starting with how to reproduce a sequence of operations up to how the values are generated. Using this information, we can create a quiescent model checker that can validate the state of the database in the absence of in-flight operations, assuming we know the state of all operations before this moment.
As we’ve discussed, Harry is working with reproducible histories of operations, where the following information identifies each operation:
class Operation {
long lts; // logical timestamp of the operation
long pd; // partition descriptor, derived from LTS
long cd; // clustering descriptor, derived from LTS and PD
long OperationKind; // operation type, derived from LTS and PD
}
Now, all we need to do is to produce a sequence of operations. For example, each operation with INSERT
kind is going to be represented by:
class Insert {
long lts; // logical timestamp
long pd; // partition descriptor
long cd; // clustering descriptor
long[] vds; // value descriptors for each column
}
We can compile this information into an INSERT statement by inflating partition, clustering, and value descriptors, and deriving the real-time timestamp from the logical timestamp. This statement can then be executed against the system under test (our Cassandra cluster).
We know from the history of operations which partitions were visited, and which operations were executed against them. To validate the state of any given partition, all we need to do is to query the database to retrieve partition state from the database, deflate every row returned in the results: deflate all clustering keys into clustering descriptors, and values into corresponding value descriptors, producing internal PartitionState.
To verify that this partition state is also correct, we replay the sequence of operations again. But instead of going all the way to the generated INSERT statement, we operate with only descriptors and apply operations sequentially to the PartitionState following usual Cassandra reconciliation rules (i.e. last-write-wins, partition tombstone > tombstone > write), compute logical timestamps and value descriptors for each row.
Now, having two partition states: one “deflated” from the result set returned by the system under test and one “inflated” from the logical timestamp, we can directly compare them. If there are any inconsistencies between the two sets, we can conclude there has been an error.
Validating query results by reproducing the sequence of events is more efficient than working with full operation logs holding all values for every operation executed against the database state. This process can be made even more efficient by validating rows individually while having enough state in memory to validate a single row.
Since implementing other Cassandra features, such as partition deletions, writes without primary key liveness, static columns, column deletions, etc., do not require any additional information, and follows the same rules, for the sake of brevity, it is not covered in this post.
Quiescent checker is a very useful tool and can validate data sets generated by sequential or concurrent runners, as long as the state of each operation is known at the moment of validation. Since we simply replay the sequence of events to reproduce the state, we can not have any events whose state is unknown to the system.
Closing words
Harry is a tool that allows the testing of databases in ways we weren’t able to before. Instead of creating test cases expressed in specific schemas and statements, it enables us to describe sequences of events we’d like tested and generate schemas and possible interleavings of statements corresponding to given specifications. Creating exhaustive test suites can take a lot of time, and we can have contributor creativity poured into patches and features, not into test examples. This approach also allows testing different features in combination and checking old behaviors in the presence of new ones without explicitly creating new tests.
That said, integration testing is not always enough, and often we do not know which areas of the codebase can be problematic. Harry can be helpful here, too, since it will generate data, execute the workload against the system and validate the results. Contributors can focus on implementing test scenarios such as behavior in presence of unexpected failures, cluster expansions, or node replacements.
Harry is a productivity tool that encapsulates the properties of a stress test and a model checker, allowing us to find issues and improve the quality of Cassandra in ways that were not possible before.