Java Concurrency Testing: CountDownLatch, CyclicBarrier, and ThreadSanitizer

Java Concurrency Testing: CountDownLatch, CyclicBarrier, and ThreadSanitizer

Concurrent Java code is notoriously difficult to test. The JVM's threading model, JIT compilation, and memory model create bugs that appear only under load, on specific hardware, or with particular GC settings. Good concurrent testing is a combination of the right synchronization primitives, stress testing, and race detection tools.

The Problem with Concurrent Tests

Simple concurrent tests are often flaky. This test for a thread-safe counter has a race:

@Test
void counterShouldIncrementConcurrently() throws Exception {
    AtomicInteger counter = new AtomicInteger(0);
    int threads = 10;

    for (int i = 0; i < threads; i++) {
        new Thread(() -> counter.incrementAndGet()).start();
    }

    // Race condition: threads may not have finished
    assertEquals(10, counter.get()); // may fail
}

The test might pass most of the time because threads often finish before assertEquals. But it's not reliable.

CountDownLatch: Coordinating Test Phases

CountDownLatch lets one thread wait for others to complete:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

@Test
void counterShouldIncrementConcurrently() throws Exception {
    AtomicInteger counter = new AtomicInteger(0);
    int threadCount = 10;
    CountDownLatch latch = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {
        new Thread(() -> {
            try {
                counter.incrementAndGet();
            } finally {
                latch.countDown(); // signal this thread is done
            }
        }).start();
    }

    latch.await(); // wait for all threads
    assertEquals(10, counter.get());
}

Use two latches for more precise control — one to start all threads simultaneously, one to wait for completion:

@Test
void allThreadsStartSimultaneously() throws Exception {
    int threadCount = 10;
    CountDownLatch startSignal = new CountDownLatch(1);  // gate to hold threads
    CountDownLatch doneSignal = new CountDownLatch(threadCount); // wait for completion
    List<Long> startTimes = new CopyOnWriteArrayList<>();

    for (int i = 0; i < threadCount; i++) {
        new Thread(() -> {
            try {
                startSignal.await(); // wait for start signal
                startTimes.add(System.nanoTime());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                doneSignal.countDown();
            }
        }).start();
    }

    startSignal.countDown(); // release all threads at once
    doneSignal.await(5, TimeUnit.SECONDS);

    assertEquals(threadCount, startTimes.size());
}

CyclicBarrier: Synchronization Points

CyclicBarrier makes threads wait for each other at a specific point:

import java.util.concurrent.CyclicBarrier;

@Test
void phaseBasedProcessing() throws Exception {
    int parties = 5;
    List<String> results = new CopyOnWriteArrayList<>();

    CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
        // Runs after all parties reach the barrier
        System.out.println("All threads completed phase, moving to next");
    });

    ExecutorService executor = Executors.newFixedThreadPool(parties);

    for (int i = 0; i < parties; i++) {
        final int id = i;
        executor.submit(() -> {
            // Phase 1
            results.add("phase1-" + id);
            barrier.await(); // wait for all to finish phase 1

            // Phase 2 — all threads start simultaneously after barrier
            results.add("phase2-" + id);
            return null;
        });
    }

    executor.shutdown();
    assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));

    assertEquals(parties * 2, results.size());
    assertEquals(parties, results.stream().filter(s -> s.startsWith("phase1")).count());
}

Stress Testing for Race Conditions

Run operations thousands of times to expose intermittent race conditions:

@RepeatedTest(1000)
void concurrentMapAccessShouldNotThrow() throws Exception {
    ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
    int threadCount = 20;
    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
    CountDownLatch latch = new CountDownLatch(threadCount);
    List<Throwable> errors = new CopyOnWriteArrayList<>();

    for (int i = 0; i < threadCount; i++) {
        final int idx = i;
        executor.submit(() -> {
            try {
                map.compute("key", (k, v) -> v == null ? 1 : v + 1);
            } catch (Throwable t) {
                errors.add(t);
            } finally {
                latch.countDown();
            }
        });
    }

    latch.await(5, TimeUnit.SECONDS);
    executor.shutdown();

    assertTrue(errors.isEmpty(), "Concurrent access caused: " + errors);
    assertEquals(threadCount, map.get("key"));
}

JUnit 5's @RepeatedTest runs the test N times. Combine with -Djvm.args to use multiple CPU cores.

ThreadSanitizer (TSan)

ThreadSanitizer is a compile-time instrumentation tool that detects data races at runtime. For Java, the equivalent is a combination of tools:

jcstress — the Java Concurrency Stress test harness developed by the OpenJDK team:

<!-- pom.xml -->
<dependency>
    <groupId>org.openjdk.jcstress</groupId>
    <artifactId>jcstress-core</artifactId>
    <version>0.16</version>
</dependency>

Write a stress test:

import org.openjdk.jcstress.annotations.*;
import org.openjdk.jcstress.infra.results.II_Result;

@JCStressTest
@Outcome(id = "1, 1", expect = Expect.ACCEPTABLE, desc = "Both threads incremented")
@Outcome(id = "0, 0", expect = Expect.FORBIDDEN, desc = "Neither thread incremented — data race")
@State
public class CounterRaceTest {
    int counter = 0;

    @Actor
    public void actor1(II_Result r) {
        r.r1 = ++counter;
    }

    @Actor
    public void actor2(II_Result r) {
        r.r2 = ++counter;
    }
}

Run:

mvn clean install
java -jar target/jcstress.jar -r CounterRaceTest

jcstress exhaustively tests all possible thread interleavings — far more thorough than stress tests.

Testing with ExecutorService

Most production code uses thread pools. Test with them:

@Test
void processAllItemsConcurrently() throws Exception {
    List<Integer> items = IntStream.range(0, 100).boxed().collect(Collectors.toList());
    ExecutorService executor = Executors.newFixedThreadPool(8);
    List<Integer> processed = new CopyOnWriteArrayList<>();
    List<Future<?>> futures = new ArrayList<>();

    for (Integer item : items) {
        futures.add(executor.submit(() -> processed.add(item * 2)));
    }

    // Wait for all and check for exceptions
    for (Future<?> future : futures) {
        future.get(5, TimeUnit.SECONDS); // throws if task threw
    }

    executor.shutdown();
    assertEquals(100, processed.size());
}

Using CompletableFuture:

@Test
void completableFutureChain() throws Exception {
    CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> fetchUser(1))
        .thenApplyAsync(user -> enrichWithProfile(user))
        .thenApply(user -> user.getDisplayName());

    String name = future.get(5, TimeUnit.SECONDS);
    assertNotNull(name);
    assertFalse(name.isEmpty());
}

Testing BlockingQueue

Blocking queues are core to producer-consumer patterns:

@Test
void producerConsumerPattern() throws Exception {
    BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
    List<Integer> consumed = new CopyOnWriteArrayList<>();
    CountDownLatch consumerDone = new CountDownLatch(1);

    // Consumer
    Thread consumer = new Thread(() -> {
        try {
            while (true) {
                Integer item = queue.poll(100, TimeUnit.MILLISECONDS);
                if (item == null) break; // timeout — producer done
                consumed.add(item);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            consumerDone.countDown();
        }
    });
    consumer.start();

    // Producer
    for (int i = 0; i < 50; i++) {
        queue.put(i);
    }

    consumerDone.await(5, TimeUnit.SECONDS);
    assertEquals(50, consumed.size());
}

Detecting Deadlocks

Java's ThreadMXBean can detect deadlocks programmatically:

@Test
void noDeadlockUnderConcurrentAccess() throws Exception {
    Object lock1 = new Object();
    Object lock2 = new Object();
    CountDownLatch started = new CountDownLatch(2);
    CountDownLatch done = new CountDownLatch(2);
    AtomicBoolean deadlockDetected = new AtomicBoolean(false);

    Thread t1 = new Thread(() -> {
        synchronized (lock1) {
            started.countDown();
            try { started.await(); } catch (InterruptedException e) {}
            synchronized (lock2) { /* work */ }
        }
        done.countDown();
    });

    Thread t2 = new Thread(() -> {
        synchronized (lock2) {
            started.countDown();
            try { started.await(); } catch (InterruptedException e) {}
            synchronized (lock1) { /* work */ }
        }
        done.countDown();
    });

    t1.start(); t2.start();

    boolean completed = done.await(2, TimeUnit.SECONDS);
    if (!completed) {
        ThreadMXBean bean = ManagementFactory.getThreadMXBean();
        long[] deadlocked = bean.findDeadlockedThreads();
        if (deadlocked != null) deadlockDetected.set(true);
        t1.interrupt(); t2.interrupt();
    }

    assertFalse(deadlockDetected.get(), "Deadlock detected");
}

Virtual Threads (Java 21+)

Java 21 virtual threads simplify concurrent programming. Test them the same way:

@Test
void virtualThreadsShouldProcessConcurrently() throws Exception {
    int count = 1000;
    CountDownLatch latch = new CountDownLatch(count);
    List<Long> threadIds = new CopyOnWriteArrayList<>();

    try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
        for (int i = 0; i < count; i++) {
            executor.submit(() -> {
                threadIds.add(Thread.currentThread().threadId());
                latch.countDown();
            });
        }
        latch.await(10, TimeUnit.SECONDS);
    }

    assertEquals(count, threadIds.size());
}

End-to-End Concurrency Testing

Unit tests verify synchronization primitives. Integration tests verify that services handle concurrent requests correctly at the application level. HelpMeTest lets you test concurrent behavior end-to-end:

Scenario: 50 users submit orders simultaneously
  Given 50 users are logged in
  When all 50 submit an order at the same time
  Then all 50 orders are created successfully
  And inventory is decremented correctly
  And no duplicate order IDs exist

This catches race conditions at the HTTP layer — database locking bugs, cache inconsistencies, and double-spending scenarios that don't appear in unit tests.

Key Takeaways

  • Use CountDownLatch(1) as a start gate to release all threads simultaneously, ensuring real concurrency in tests
  • @RepeatedTest(1000) combined with concurrent access exposes intermittent races better than single runs
  • jcstress exhaustively tests all memory orderings — use it when @RepeatedTest isn't finding races
  • CopyOnWriteArrayList is safe for collecting results from multiple threads without synchronization
  • Future.get(timeout, unit) propagates task exceptions to the test — don't forget the timeout

Read more