Cassandra Testing Guide: DataStax Driver, CassandraUnit, Testcontainers & CQL Assertions
Apache Cassandra's wide-column model, tuneable consistency, and partition-key-driven design create unique testing challenges. This guide covers the full testing stack—from unit testing data model code with mock drivers to integration testing with real Cassandra instances via Testcontainers.
Understanding Cassandra Testing Challenges
Cassandra doesn't work like a relational database, and your tests need to reflect that:
- Partition key design determines data co-location and query performance—test it explicitly
- Consistency levels affect what your application reads back immediately after a write
- Schema changes require careful coordination (no ALTER TABLE for existing data types)
- Denormalized tables mean the same data lives in multiple tables—test all write paths
Approach 1: DataStax Driver Testing with Mocks
For unit testing business logic that happens to use Cassandra, mock the DataStax session:
@ExtendWith(MockitoExtension.class)
class UserRepositoryTest {
@Mock
CqlSession session;
@Mock
PreparedStatement preparedStatement;
@Mock
BoundStatement boundStatement;
@Mock
ResultSet resultSet;
@Mock
Row row;
UserRepository repository;
@BeforeEach
void setUp() {
repository = new UserRepository(session);
}
@Test
void shouldFindUserById() {
UUID userId = UUID.randomUUID();
// Setup mock chain
when(session.prepare(any(SimpleStatement.class))).thenReturn(preparedStatement);
when(preparedStatement.bind(userId)).thenReturn(boundStatement);
when(session.execute(boundStatement)).thenReturn(resultSet);
when(resultSet.one()).thenReturn(row);
when(row.getUuid("id")).thenReturn(userId);
when(row.getString("email")).thenReturn("user@example.com");
when(row.getString("name")).thenReturn("John Doe");
Optional<User> user = repository.findById(userId);
assertTrue(user.isPresent());
assertEquals("user@example.com", user.get().getEmail());
verify(session).execute(boundStatement);
}
@Test
void shouldReturnEmptyWhenUserNotFound() {
UUID userId = UUID.randomUUID();
when(session.prepare(any(SimpleStatement.class))).thenReturn(preparedStatement);
when(preparedStatement.bind(userId)).thenReturn(boundStatement);
when(session.execute(boundStatement)).thenReturn(resultSet);
when(resultSet.one()).thenReturn(null);
Optional<User> user = repository.findById(userId);
assertFalse(user.isPresent());
}
}The Repository Implementation
@Repository
public class UserRepository {
private final CqlSession session;
private final PreparedStatement findById;
public UserRepository(CqlSession session) {
this.session = session;
this.findById = session.prepare(
"SELECT id, email, name FROM users WHERE id = ?"
);
}
public Optional<User> findById(UUID id) {
Row row = session.execute(findById.bind(id)).one();
if (row == null) return Optional.empty();
return Optional.of(new User(
row.getUuid("id"),
row.getString("email"),
row.getString("name")
));
}
}Approach 2: Testcontainers with Real Cassandra
For integration tests that validate CQL queries, schema, and actual behavior:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<scope>test</scope>
</dependency>@Testcontainers
class CassandraIntegrationTest {
@Container
static CassandraContainer<?> cassandra =
new CassandraContainer<>(DockerImageName.parse("cassandra:4.1"))
.withInitScript("schema.cql")
.withStartupTimeout(Duration.ofMinutes(3));
static CqlSession session;
@BeforeAll
static void setupSession() {
session = CqlSession.builder()
.addContactPoint(cassandra.getContactPoint())
.withLocalDatacenter(cassandra.getLocalDatacenter())
.build();
}
@AfterAll
static void closeSession() {
if (session != null) session.close();
}
@BeforeEach
void truncateTables() {
session.execute("TRUNCATE test_keyspace.users");
}
}The init script (src/test/resources/schema.cql):
CREATE KEYSPACE IF NOT EXISTS test_keyspace
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
USE test_keyspace;
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY,
email TEXT,
name TEXT,
created_at TIMESTAMP
);
CREATE TABLE IF NOT EXISTS users_by_email (
email TEXT PRIMARY KEY,
user_id UUID,
name TEXT
);
CREATE TABLE IF NOT EXISTS orders_by_user (
user_id UUID,
order_id TIMEUUID,
total DECIMAL,
status TEXT,
PRIMARY KEY (user_id, order_id)
) WITH CLUSTERING ORDER BY (order_id DESC);CQL Assertions: Testing Queries Directly
Testing Partition Key Queries
@Test
void shouldInsertAndRetrieveUserByPartitionKey() {
UUID userId = UUID.randomUUID();
Instant now = Instant.now();
// Insert
session.execute(
"INSERT INTO test_keyspace.users (id, email, name, created_at) VALUES (?, ?, ?, ?)",
userId, "alice@example.com", "Alice Smith", now
);
// Retrieve by partition key
Row row = session.execute(
"SELECT * FROM test_keyspace.users WHERE id = ?", userId
).one();
assertNotNull(row);
assertEquals("alice@example.com", row.getString("email"));
assertEquals("Alice Smith", row.getString("name"));
assertEquals(now.truncatedTo(ChronoUnit.MILLIS),
row.getInstant("created_at").truncatedTo(ChronoUnit.MILLIS));
}Testing Clustering Key Ordering
@Test
void shouldReturnOrdersInReverseChronologicalOrder() throws InterruptedException {
UUID userId = UUID.randomUUID();
// Insert orders with different timestamps (TIMEUUID encodes time)
for (int i = 1; i <= 5; i++) {
session.execute(
"INSERT INTO test_keyspace.orders_by_user " +
"(user_id, order_id, total, status) VALUES (?, now(), ?, ?)",
userId, new BigDecimal(i * 100), "completed"
);
Thread.sleep(10); // Ensure different timestamps
}
// Retrieve all orders
List<Row> orders = session.execute(
"SELECT * FROM test_keyspace.orders_by_user WHERE user_id = ?", userId
).all();
assertEquals(5, orders.size());
// Verify descending order (most recent first due to CLUSTERING ORDER BY ... DESC)
for (int i = 0; i < orders.size() - 1; i++) {
UUID current = orders.get(i).getUuid("order_id");
UUID next = orders.get(i + 1).getUuid("order_id");
assertTrue(UUIDs.unixTimestamp(current) > UUIDs.unixTimestamp(next),
"Orders should be in descending time order");
}
}Testing Allow Filtering (and Why to Avoid It)
@Test
void shouldQueryWithAllowFilteringButSlowly() {
// Seed data
for (int i = 0; i < 100; i++) {
session.execute(
"INSERT INTO test_keyspace.users (id, email, name) VALUES (?, ?, ?)",
UUID.randomUUID(), "user" + i + "@example.com", "User " + i
);
}
// ALLOW FILTERING works but scans entire partition
// This test verifies the query returns correct results
// but in production, this pattern should be replaced with
// a lookup table or secondary index
List<Row> users = session.execute(
"SELECT * FROM test_keyspace.users WHERE name = 'User 42' ALLOW FILTERING"
).all();
assertEquals(1, users.size());
assertEquals("user42@example.com", users.get(0).getString("email"));
}Testing the Dual-Write Pattern
A common Cassandra pattern is maintaining multiple tables for different query patterns:
@Test
void shouldMaintainConsistencyAcrossLookupTables() {
UUID userId = UUID.randomUUID();
String email = "bob@example.com";
String name = "Bob Jones";
// Service that writes to both tables atomically using BATCH
session.execute(BatchStatement.builder(BatchType.LOGGED)
.addStatement(SimpleStatement.builder(
"INSERT INTO test_keyspace.users (id, email, name) VALUES (?, ?, ?)")
.addPositionalValues(userId, email, name)
.build())
.addStatement(SimpleStatement.builder(
"INSERT INTO test_keyspace.users_by_email (email, user_id, name) VALUES (?, ?, ?)")
.addPositionalValues(email, userId, name)
.build())
.build()
);
// Verify both tables have consistent data
Row byId = session.execute(
"SELECT * FROM test_keyspace.users WHERE id = ?", userId).one();
Row byEmail = session.execute(
"SELECT * FROM test_keyspace.users_by_email WHERE email = ?", email).one();
assertNotNull(byId);
assertNotNull(byEmail);
assertEquals(byId.getUuid("id"), byEmail.getUuid("user_id"));
assertEquals(byId.getString("name"), byEmail.getString("name"));
}Approach 3: CassandraUnit for Lightweight Tests
CassandraUnit embeds a Cassandra instance directly (no Docker required):
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<version>4.3.1.0</version>
<scope>test</scope>
</dependency>@RunWith(CassandraUnitRunner.class)
@CassandraDataSet(value = "users-dataset.cql", keyspace = "test_keyspace")
public class UserDaoTest {
@ClassRule
public static CassandraUnit cassandraUnit =
new CassandraUnit(new ClassPathCQLDataSet("schema.cql", "test_keyspace"));
private UserDao dao;
@Before
public void setUp() {
CqlSession session = CqlSession.builder()
.addContactPoint(new InetSocketAddress("localhost", 9142))
.withLocalDatacenter("datacenter1")
.build();
dao = new UserDao(session);
}
@Test
public void shouldFindUsersCreatedToday() {
List<User> users = dao.findCreatedAfter(LocalDate.now().atStartOfDay(ZoneOffset.UTC).toInstant());
assertFalse(users.isEmpty());
}
}Note: CassandraUnit is less actively maintained than Testcontainers. For new projects, prefer Testcontainers.
Testing Prepared Statements
Prepared statements improve performance and safety—test that your repository uses them correctly:
@Test
void shouldUseParameterizedQueriesNotStringConcatenation() {
// Verify SQL injection is prevented
String maliciousEmail = "'; DROP TABLE users; --";
// This should NOT throw and should not affect the table
Optional<User> result = userRepository.findByEmail(maliciousEmail);
assertFalse(result.isPresent());
// Table should still exist and be usable
userRepository.save(new User(UUID.randomUUID(), "safe@example.com", "Safe User"));
Optional<User> safeUser = userRepository.findByEmail("safe@example.com");
assertTrue(safeUser.isPresent());
}Testing Schema Migrations
Use Cassandra Migrations or a custom migration runner:
@Test
void shouldApplyMigrationsInOrder() throws Exception {
// Start with empty keyspace
CassandraContainer<?> fresh = new CassandraContainer<>("cassandra:4.1");
fresh.start();
CqlSession freshSession = CqlSession.builder()
.addContactPoint(fresh.getContactPoint())
.withLocalDatacenter(fresh.getLocalDatacenter())
.build();
// Apply migrations
CassandraMigration migration = new CassandraMigration();
migration.setKeyspaceConfiguration(new KeyspaceConfiguration(
"test_keyspace",
1,
false
));
migration.migrate(freshSession);
// Verify final schema state
KeyspaceMetadata keyspace = freshSession.getMetadata()
.getKeyspace("test_keyspace")
.orElseThrow();
assertTrue(keyspace.getTable("users").isPresent());
assertTrue(keyspace.getTable("orders_by_user").isPresent());
assertTrue(keyspace.getTable("schema_version").isPresent());
freshSession.close();
fresh.stop();
}Performance Testing
@Test
void shouldHandleHighThroughputWrites() throws InterruptedException {
UUID testPartition = UUID.randomUUID();
int writeCount = 1000;
CountDownLatch latch = new CountDownLatch(writeCount);
AtomicInteger errors = new AtomicInteger(0);
PreparedStatement insert = session.prepare(
"INSERT INTO test_keyspace.orders_by_user (user_id, order_id, total, status) " +
"VALUES (?, now(), ?, ?)"
);
long start = System.currentTimeMillis();
for (int i = 0; i < writeCount; i++) {
int finalI = i;
CompletableFuture.runAsync(() -> {
try {
session.execute(insert.bind(
testPartition,
new BigDecimal(finalI * 10),
"completed"
));
} catch (Exception e) {
errors.incrementAndGet();
} finally {
latch.countDown();
}
});
}
latch.await(30, TimeUnit.SECONDS);
long duration = System.currentTimeMillis() - start;
assertEquals(0, errors.get(), "No write errors expected");
assertThat(duration).isLessThan(10_000L);
long count = session.execute(
"SELECT COUNT(*) FROM test_keyspace.orders_by_user WHERE user_id = ?",
testPartition
).one().getLong(0);
assertEquals(writeCount, count);
}Cassandra Testing with HelpMeTest
For applications where Cassandra backs a user-facing API or dashboard, HelpMeTest tests the complete user flow:
*** Test Cases ***
User Order History Loads Correctly
As Admin
Go To https://app.example.com/users/123/orders
Wait Until Element Is Visible [data-testid=order-list] timeout=10s
Element Count Should Be Greater Than [data-testid=order-item] 0
First Order Should Be Most RecentThis catches issues that unit tests miss—pagination not working, timestamps displaying incorrectly, or partial data from consistency-level mismatches.
Key Takeaways
Cassandra testing requires a layered approach:
- Mock tests for business logic using DataStax driver mocks
- Testcontainers for CQL assertion and integration testing
- Schema validation tests for migration correctness
- Performance tests for write throughput and query latency
- Dual-write consistency tests for denormalized table patterns
The partition key is the foundation of everything in Cassandra—model your tests around it.