Cassandra Testing Guide: DataStax Driver, CassandraUnit, Testcontainers & CQL Assertions

Cassandra Testing Guide: DataStax Driver, CassandraUnit, Testcontainers & CQL Assertions

Apache Cassandra's wide-column model, tuneable consistency, and partition-key-driven design create unique testing challenges. This guide covers the full testing stack—from unit testing data model code with mock drivers to integration testing with real Cassandra instances via Testcontainers.

Understanding Cassandra Testing Challenges

Cassandra doesn't work like a relational database, and your tests need to reflect that:

  • Partition key design determines data co-location and query performance—test it explicitly
  • Consistency levels affect what your application reads back immediately after a write
  • Schema changes require careful coordination (no ALTER TABLE for existing data types)
  • Denormalized tables mean the same data lives in multiple tables—test all write paths

Approach 1: DataStax Driver Testing with Mocks

For unit testing business logic that happens to use Cassandra, mock the DataStax session:

@ExtendWith(MockitoExtension.class)
class UserRepositoryTest {

    @Mock
    CqlSession session;

    @Mock
    PreparedStatement preparedStatement;

    @Mock
    BoundStatement boundStatement;

    @Mock
    ResultSet resultSet;

    @Mock
    Row row;

    UserRepository repository;

    @BeforeEach
    void setUp() {
        repository = new UserRepository(session);
    }

    @Test
    void shouldFindUserById() {
        UUID userId = UUID.randomUUID();

        // Setup mock chain
        when(session.prepare(any(SimpleStatement.class))).thenReturn(preparedStatement);
        when(preparedStatement.bind(userId)).thenReturn(boundStatement);
        when(session.execute(boundStatement)).thenReturn(resultSet);
        when(resultSet.one()).thenReturn(row);
        when(row.getUuid("id")).thenReturn(userId);
        when(row.getString("email")).thenReturn("user@example.com");
        when(row.getString("name")).thenReturn("John Doe");

        Optional<User> user = repository.findById(userId);

        assertTrue(user.isPresent());
        assertEquals("user@example.com", user.get().getEmail());
        verify(session).execute(boundStatement);
    }

    @Test
    void shouldReturnEmptyWhenUserNotFound() {
        UUID userId = UUID.randomUUID();

        when(session.prepare(any(SimpleStatement.class))).thenReturn(preparedStatement);
        when(preparedStatement.bind(userId)).thenReturn(boundStatement);
        when(session.execute(boundStatement)).thenReturn(resultSet);
        when(resultSet.one()).thenReturn(null);

        Optional<User> user = repository.findById(userId);

        assertFalse(user.isPresent());
    }
}

The Repository Implementation

@Repository
public class UserRepository {

    private final CqlSession session;
    private final PreparedStatement findById;

    public UserRepository(CqlSession session) {
        this.session = session;
        this.findById = session.prepare(
            "SELECT id, email, name FROM users WHERE id = ?"
        );
    }

    public Optional<User> findById(UUID id) {
        Row row = session.execute(findById.bind(id)).one();
        if (row == null) return Optional.empty();

        return Optional.of(new User(
            row.getUuid("id"),
            row.getString("email"),
            row.getString("name")
        ));
    }
}

Approach 2: Testcontainers with Real Cassandra

For integration tests that validate CQL queries, schema, and actual behavior:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>cassandra</artifactId>
    <scope>test</scope>
</dependency>
@Testcontainers
class CassandraIntegrationTest {

    @Container
    static CassandraContainer<?> cassandra =
        new CassandraContainer<>(DockerImageName.parse("cassandra:4.1"))
            .withInitScript("schema.cql")
            .withStartupTimeout(Duration.ofMinutes(3));

    static CqlSession session;

    @BeforeAll
    static void setupSession() {
        session = CqlSession.builder()
            .addContactPoint(cassandra.getContactPoint())
            .withLocalDatacenter(cassandra.getLocalDatacenter())
            .build();
    }

    @AfterAll
    static void closeSession() {
        if (session != null) session.close();
    }

    @BeforeEach
    void truncateTables() {
        session.execute("TRUNCATE test_keyspace.users");
    }
}

The init script (src/test/resources/schema.cql):

CREATE KEYSPACE IF NOT EXISTS test_keyspace
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

USE test_keyspace;

CREATE TABLE IF NOT EXISTS users (
    id UUID PRIMARY KEY,
    email TEXT,
    name TEXT,
    created_at TIMESTAMP
);

CREATE TABLE IF NOT EXISTS users_by_email (
    email TEXT PRIMARY KEY,
    user_id UUID,
    name TEXT
);

CREATE TABLE IF NOT EXISTS orders_by_user (
    user_id UUID,
    order_id TIMEUUID,
    total DECIMAL,
    status TEXT,
    PRIMARY KEY (user_id, order_id)
) WITH CLUSTERING ORDER BY (order_id DESC);

CQL Assertions: Testing Queries Directly

Testing Partition Key Queries

@Test
void shouldInsertAndRetrieveUserByPartitionKey() {
    UUID userId = UUID.randomUUID();
    Instant now = Instant.now();

    // Insert
    session.execute(
        "INSERT INTO test_keyspace.users (id, email, name, created_at) VALUES (?, ?, ?, ?)",
        userId, "alice@example.com", "Alice Smith", now
    );

    // Retrieve by partition key
    Row row = session.execute(
        "SELECT * FROM test_keyspace.users WHERE id = ?", userId
    ).one();

    assertNotNull(row);
    assertEquals("alice@example.com", row.getString("email"));
    assertEquals("Alice Smith", row.getString("name"));
    assertEquals(now.truncatedTo(ChronoUnit.MILLIS),
        row.getInstant("created_at").truncatedTo(ChronoUnit.MILLIS));
}

Testing Clustering Key Ordering

@Test
void shouldReturnOrdersInReverseChronologicalOrder() throws InterruptedException {
    UUID userId = UUID.randomUUID();

    // Insert orders with different timestamps (TIMEUUID encodes time)
    for (int i = 1; i <= 5; i++) {
        session.execute(
            "INSERT INTO test_keyspace.orders_by_user " +
            "(user_id, order_id, total, status) VALUES (?, now(), ?, ?)",
            userId, new BigDecimal(i * 100), "completed"
        );
        Thread.sleep(10); // Ensure different timestamps
    }

    // Retrieve all orders
    List<Row> orders = session.execute(
        "SELECT * FROM test_keyspace.orders_by_user WHERE user_id = ?", userId
    ).all();

    assertEquals(5, orders.size());

    // Verify descending order (most recent first due to CLUSTERING ORDER BY ... DESC)
    for (int i = 0; i < orders.size() - 1; i++) {
        UUID current = orders.get(i).getUuid("order_id");
        UUID next = orders.get(i + 1).getUuid("order_id");
        assertTrue(UUIDs.unixTimestamp(current) > UUIDs.unixTimestamp(next),
            "Orders should be in descending time order");
    }
}

Testing Allow Filtering (and Why to Avoid It)

@Test
void shouldQueryWithAllowFilteringButSlowly() {
    // Seed data
    for (int i = 0; i < 100; i++) {
        session.execute(
            "INSERT INTO test_keyspace.users (id, email, name) VALUES (?, ?, ?)",
            UUID.randomUUID(), "user" + i + "@example.com", "User " + i
        );
    }

    // ALLOW FILTERING works but scans entire partition
    // This test verifies the query returns correct results
    // but in production, this pattern should be replaced with
    // a lookup table or secondary index
    List<Row> users = session.execute(
        "SELECT * FROM test_keyspace.users WHERE name = 'User 42' ALLOW FILTERING"
    ).all();

    assertEquals(1, users.size());
    assertEquals("user42@example.com", users.get(0).getString("email"));
}

Testing the Dual-Write Pattern

A common Cassandra pattern is maintaining multiple tables for different query patterns:

@Test
void shouldMaintainConsistencyAcrossLookupTables() {
    UUID userId = UUID.randomUUID();
    String email = "bob@example.com";
    String name = "Bob Jones";

    // Service that writes to both tables atomically using BATCH
    session.execute(BatchStatement.builder(BatchType.LOGGED)
        .addStatement(SimpleStatement.builder(
            "INSERT INTO test_keyspace.users (id, email, name) VALUES (?, ?, ?)")
            .addPositionalValues(userId, email, name)
            .build())
        .addStatement(SimpleStatement.builder(
            "INSERT INTO test_keyspace.users_by_email (email, user_id, name) VALUES (?, ?, ?)")
            .addPositionalValues(email, userId, name)
            .build())
        .build()
    );

    // Verify both tables have consistent data
    Row byId = session.execute(
        "SELECT * FROM test_keyspace.users WHERE id = ?", userId).one();
    Row byEmail = session.execute(
        "SELECT * FROM test_keyspace.users_by_email WHERE email = ?", email).one();

    assertNotNull(byId);
    assertNotNull(byEmail);
    assertEquals(byId.getUuid("id"), byEmail.getUuid("user_id"));
    assertEquals(byId.getString("name"), byEmail.getString("name"));
}

Approach 3: CassandraUnit for Lightweight Tests

CassandraUnit embeds a Cassandra instance directly (no Docker required):

<dependency>
    <groupId>org.cassandraunit</groupId>
    <artifactId>cassandra-unit</artifactId>
    <version>4.3.1.0</version>
    <scope>test</scope>
</dependency>
@RunWith(CassandraUnitRunner.class)
@CassandraDataSet(value = "users-dataset.cql", keyspace = "test_keyspace")
public class UserDaoTest {

    @ClassRule
    public static CassandraUnit cassandraUnit =
        new CassandraUnit(new ClassPathCQLDataSet("schema.cql", "test_keyspace"));

    private UserDao dao;

    @Before
    public void setUp() {
        CqlSession session = CqlSession.builder()
            .addContactPoint(new InetSocketAddress("localhost", 9142))
            .withLocalDatacenter("datacenter1")
            .build();
        dao = new UserDao(session);
    }

    @Test
    public void shouldFindUsersCreatedToday() {
        List<User> users = dao.findCreatedAfter(LocalDate.now().atStartOfDay(ZoneOffset.UTC).toInstant());
        assertFalse(users.isEmpty());
    }
}
Note: CassandraUnit is less actively maintained than Testcontainers. For new projects, prefer Testcontainers.

Testing Prepared Statements

Prepared statements improve performance and safety—test that your repository uses them correctly:

@Test
void shouldUseParameterizedQueriesNotStringConcatenation() {
    // Verify SQL injection is prevented
    String maliciousEmail = "'; DROP TABLE users; --";

    // This should NOT throw and should not affect the table
    Optional<User> result = userRepository.findByEmail(maliciousEmail);

    assertFalse(result.isPresent());

    // Table should still exist and be usable
    userRepository.save(new User(UUID.randomUUID(), "safe@example.com", "Safe User"));
    Optional<User> safeUser = userRepository.findByEmail("safe@example.com");
    assertTrue(safeUser.isPresent());
}

Testing Schema Migrations

Use Cassandra Migrations or a custom migration runner:

@Test
void shouldApplyMigrationsInOrder() throws Exception {
    // Start with empty keyspace
    CassandraContainer<?> fresh = new CassandraContainer<>("cassandra:4.1");
    fresh.start();

    CqlSession freshSession = CqlSession.builder()
        .addContactPoint(fresh.getContactPoint())
        .withLocalDatacenter(fresh.getLocalDatacenter())
        .build();

    // Apply migrations
    CassandraMigration migration = new CassandraMigration();
    migration.setKeyspaceConfiguration(new KeyspaceConfiguration(
        "test_keyspace",
        1,
        false
    ));
    migration.migrate(freshSession);

    // Verify final schema state
    KeyspaceMetadata keyspace = freshSession.getMetadata()
        .getKeyspace("test_keyspace")
        .orElseThrow();

    assertTrue(keyspace.getTable("users").isPresent());
    assertTrue(keyspace.getTable("orders_by_user").isPresent());
    assertTrue(keyspace.getTable("schema_version").isPresent());

    freshSession.close();
    fresh.stop();
}

Performance Testing

@Test
void shouldHandleHighThroughputWrites() throws InterruptedException {
    UUID testPartition = UUID.randomUUID();
    int writeCount = 1000;
    CountDownLatch latch = new CountDownLatch(writeCount);
    AtomicInteger errors = new AtomicInteger(0);

    PreparedStatement insert = session.prepare(
        "INSERT INTO test_keyspace.orders_by_user (user_id, order_id, total, status) " +
        "VALUES (?, now(), ?, ?)"
    );

    long start = System.currentTimeMillis();

    for (int i = 0; i < writeCount; i++) {
        int finalI = i;
        CompletableFuture.runAsync(() -> {
            try {
                session.execute(insert.bind(
                    testPartition,
                    new BigDecimal(finalI * 10),
                    "completed"
                ));
            } catch (Exception e) {
                errors.incrementAndGet();
            } finally {
                latch.countDown();
            }
        });
    }

    latch.await(30, TimeUnit.SECONDS);
    long duration = System.currentTimeMillis() - start;

    assertEquals(0, errors.get(), "No write errors expected");
    assertThat(duration).isLessThan(10_000L);

    long count = session.execute(
        "SELECT COUNT(*) FROM test_keyspace.orders_by_user WHERE user_id = ?",
        testPartition
    ).one().getLong(0);

    assertEquals(writeCount, count);
}

Cassandra Testing with HelpMeTest

For applications where Cassandra backs a user-facing API or dashboard, HelpMeTest tests the complete user flow:

*** Test Cases ***
User Order History Loads Correctly
    As    Admin
    Go To    https://app.example.com/users/123/orders
    Wait Until Element Is Visible    [data-testid=order-list]    timeout=10s
    Element Count Should Be Greater Than    [data-testid=order-item]    0
    First Order Should Be Most Recent

This catches issues that unit tests miss—pagination not working, timestamps displaying incorrectly, or partial data from consistency-level mismatches.

Key Takeaways

Cassandra testing requires a layered approach:

  1. Mock tests for business logic using DataStax driver mocks
  2. Testcontainers for CQL assertion and integration testing
  3. Schema validation tests for migration correctness
  4. Performance tests for write throughput and query latency
  5. Dual-write consistency tests for denormalized table patterns

The partition key is the foundation of everything in Cassandra—model your tests around it.

Read more