FreeRTOS Advanced Unit Testing: Tasks, Queues, and Semaphores

FreeRTOS Advanced Unit Testing: Tasks, Queues, and Semaphores

FreeRTOS applications are notoriously hard to unit test because they depend on a real-time scheduler, hardware interrupts, and timing. With the right approach—mocking the FreeRTOS kernel API—you can test task logic, queue interactions, and semaphore behavior deterministically on your host machine.

The Problem with Testing FreeRTOS Code

A typical FreeRTOS task looks like this:

void temperature_task(void *pvParameters) {
    TickType_t xLastWakeTime = xTaskGetTickCount();
    
    for (;;) {
        // Read sensor
        float temp = adc_read_temperature();
        
        // Send to queue
        if (xQueueSend(temp_queue, &temp, pdMS_TO_TICKS(10)) != pdPASS) {
            // Log queue full error
            error_handler(ERR_QUEUE_FULL);
        }
        
        // Wait for next period
        vTaskDelayUntil(&xLastWakeTime, pdMS_TO_TICKS(100));
    }
}

This function calls xTaskGetTickCount, xQueueSend, vTaskDelayUntil—all FreeRTOS kernel functions. Running this on a host requires either the FreeRTOS POSIX port or mocking all kernel calls.

Approach 1: CMock for FreeRTOS Kernel APIs

Generate mocks from FreeRTOS headers using CMock:

# Generate mocks from FreeRTOS headers
ruby vendor/cmock/lib/cmock.rb \
    --mock_path=mocks \
    FreeRTOS/include/queue.h \
    FreeRTOS/include/semphr.h \
    FreeRTOS/include/task.h \
    FreeRTOS/include/timers.h

This generates mock_queue.h, mock_semphr.h, etc. with stub functions and record/playback capabilities.

Project Structure

firmware/
├── FreeRTOS/include/     # Real FreeRTOS headers
├── src/
│   ├── tasks/
│   │   ├── temperature_task.c
│   │   └── motor_task.c
│   └── drivers/
│       └── adc_driver.c
└── tests/
    ├── unity/            # Unity test framework
    ├── cmock/            # CMock
    ├── mocks/            # Generated: mock_queue.c, mock_task.c, etc.
    ├── test_temperature_task.c
    └── test_motor_task.c

Writing Tests with CMock Mocks

// tests/test_temperature_task.c
#include "unity.h"
#include "mock_queue.h"     // Generated mock
#include "mock_task.h"      // Generated mock
#include "mock_adc_driver.h"
#include "tasks/temperature_task.h"

// The queue handle used by the task under test
extern QueueHandle_t temp_queue;

void setUp(void) {
    // Reset all CMock internal state
    mock_queue_Init();
    mock_task_Init();
    mock_adc_driver_Init();
}

void tearDown(void) {
    mock_queue_Verify();
    mock_queue_Destroy();
    mock_task_Verify();
    mock_task_Destroy();
    mock_adc_driver_Verify();
    mock_adc_driver_Destroy();
}

// Test: task sends valid temperature to queue
void test_TemperatureTask_SendsReadingToQueue(void) {
    float expected_temp = 23.5f;
    
    // Expect: task gets tick count for timing
    xTaskGetTickCount_ExpectAndReturn(0);
    
    // Expect: ADC read returns our temperature
    adc_read_temperature_ExpectAndReturn(expected_temp);
    
    // Expect: temperature sent to queue
    // Match any pointer to float, return pdPASS, with 10ms timeout
    xQueueSend_ExpectAndReturn(temp_queue, &expected_temp, 
                                pdMS_TO_TICKS(10), pdPASS);
    xQueueSend_IgnoreArg_pvItemToQueue();  // Don't check pointer value
    
    // Expect: task delays for 100ms
    vTaskDelayUntil_Expect(NULL, pdMS_TO_TICKS(100));
    vTaskDelayUntil_IgnoreArg_pxPreviousWakeTime();
    
    // Run one iteration of the task loop
    temperature_task_step();  // Refactored to expose single-step
}

// Test: queue full triggers error handler
void test_TemperatureTask_QueueFullCallsErrorHandler(void) {
    float expected_temp = 50.0f;
    
    xTaskGetTickCount_ExpectAndReturn(1000);
    adc_read_temperature_ExpectAndReturn(expected_temp);
    
    // Queue send fails (full)
    xQueueSend_ExpectAndReturn(temp_queue, NULL, 
                                pdMS_TO_TICKS(10), errQUEUE_FULL);
    xQueueSend_IgnoreArg_pvItemToQueue();
    
    // Expect error handler called
    error_handler_Expect(ERR_QUEUE_FULL);
    
    vTaskDelayUntil_Expect(NULL, pdMS_TO_TICKS(100));
    vTaskDelayUntil_IgnoreArg_pxPreviousWakeTime();
    
    temperature_task_step();
}

Refactoring Tasks for Testability

The for(;;) loop is untestable. Refactor to expose a single step:

// tasks/temperature_task.c

// Internal step function—testable
void temperature_task_step(void) {
    static TickType_t xLastWakeTime = 0;
    
    if (xLastWakeTime == 0) {
        xLastWakeTime = xTaskGetTickCount();
    }
    
    float temp = adc_read_temperature();
    
    if (xQueueSend(temp_queue, &temp, pdMS_TO_TICKS(10)) != pdPASS) {
        error_handler(ERR_QUEUE_FULL);
    }
    
    vTaskDelayUntil(&xLastWakeTime, pdMS_TO_TICKS(100));
}

// FreeRTOS task entry point
void temperature_task(void *pvParameters) {
    for (;;) {
        temperature_task_step();
    }
}

Testing Semaphore-Based Synchronization

// src/tasks/motor_task.c
// Motor task waits for a semaphore before processing commands

void motor_task(void *pvParameters) {
    for (;;) {
        if (xSemaphoreTake(motor_cmd_semaphore, portMAX_DELAY) == pdPASS) {
            MotorCommand cmd;
            if (xQueueReceive(motor_cmd_queue, &cmd, 0) == pdPASS) {
                motor_execute_command(&cmd);
            }
        }
    }
}
// tests/test_motor_task.c
#include "unity.h"
#include "mock_semphr.h"
#include "mock_queue.h"
#include "mock_motor_driver.h"
#include "tasks/motor_task.h"

void test_MotorTask_ExecutesCommandWhenSemaphoreAvailable(void) {
    MotorCommand cmd = {
        .type = CMD_MOVE,
        .speed = 50,
        .direction = DIR_FORWARD
    };
    
    // Semaphore is available immediately
    xSemaphoreTake_ExpectAndReturn(motor_cmd_semaphore, 
                                    portMAX_DELAY, pdPASS);
    
    // Command waiting in queue
    xQueueReceive_ExpectAndReturn(motor_cmd_queue, NULL, 0, pdPASS);
    xQueueReceive_ReturnThruPtr_pvBuffer(&cmd);
    
    // Motor executes the command
    motor_execute_command_Expect(&cmd);
    
    motor_task_step();
}

void test_MotorTask_SkipsExecutionWhenQueueEmpty(void) {
    // Semaphore taken but queue is empty
    xSemaphoreTake_ExpectAndReturn(motor_cmd_semaphore, 
                                    portMAX_DELAY, pdPASS);
    xQueueReceive_ExpectAndReturn(motor_cmd_queue, NULL, 0, errQUEUE_EMPTY);
    
    // motor_execute_command should NOT be called
    // (no expectation set for it, CMock will fail if it's called)
    
    motor_task_step();
}

void test_MotorTask_WaitsWhenSemaphoreNotAvailable(void) {
    // Semaphore times out (shouldn't happen with portMAX_DELAY but validates handling)
    xSemaphoreTake_ExpectAndReturn(motor_cmd_semaphore, 
                                    portMAX_DELAY, pdFALSE);
    
    // No queue access, no motor commands when semaphore not taken
    motor_task_step();
}

Testing Software Timers

// src/watchdog_manager.c
// Manages a software watchdog that resets the system if not fed

static TimerHandle_t watchdog_timer;
static bool watchdog_triggered = false;

static void watchdog_callback(TimerHandle_t xTimer) {
    watchdog_triggered = true;
    system_reset();
}

void watchdog_init(void) {
    watchdog_timer = xTimerCreate(
        "Watchdog",
        pdMS_TO_TICKS(5000),
        pdFALSE,  // one-shot
        NULL,
        watchdog_callback
    );
    xTimerStart(watchdog_timer, 0);
}

void watchdog_feed(void) {
    xTimerReset(watchdog_timer, 0);
}
// tests/test_watchdog_manager.c
#include "unity.h"
#include "mock_timers.h"
#include "mock_system.h"
#include "watchdog_manager.h"

static TimerHandle_t fake_timer = (TimerHandle_t)0xDEADBEEF;

void test_WatchdogInit_CreatesAndStartsTimer(void) {
    // Expect timer creation with 5000ms period
    xTimerCreate_ExpectAndReturn(
        "Watchdog",
        pdMS_TO_TICKS(5000),
        pdFALSE,
        NULL,
        NULL,  // callback—ignore for now
        fake_timer
    );
    xTimerCreate_IgnoreArg_pxCallbackFunction();
    
    // Expect timer started
    xTimerStart_ExpectAndReturn(fake_timer, 0, pdPASS);
    
    watchdog_init();
}

void test_WatchdogFeed_ResetsTimer(void) {
    // Setup: init without checking creation details
    xTimerCreate_IgnoreAndReturn(fake_timer);
    xTimerStart_IgnoreAndReturn(pdPASS);
    watchdog_init();
    
    // Feed should reset the timer
    xTimerReset_ExpectAndReturn(fake_timer, 0, pdPASS);
    watchdog_feed();
}

Testing Inter-Task Communication Patterns

// tests/test_producer_consumer.c
// Test the full producer-consumer chain between tasks

#include "unity.h"
#include "mock_queue.h"
#include "tasks/sensor_producer.h"
#include "tasks/data_consumer.h"

// Simulate passing data from producer to consumer
void test_ProducerConsumer_DataFlowsCorrectly(void) {
    SensorData data = {
        .temperature = 25.3f,
        .humidity = 60.0f,
        .timestamp = 12345
    };
    
    // Producer: reads sensor, sends to queue
    sensor_read_ExpectAndReturn(data);
    
    SensorData sent_data;
    xQueueSend_Stub(queue_capture_stub);  // Capture what was sent
    
    sensor_producer_step();
    
    // Verify captured data matches
    TEST_ASSERT_FLOAT_WITHIN(0.01f, data.temperature, captured_send.temperature);
    TEST_ASSERT_FLOAT_WITHIN(0.01f, data.humidity, captured_send.humidity);
    TEST_ASSERT_EQUAL_UINT32(data.timestamp, captured_send.timestamp);
    
    // Consumer: receives from queue, processes
    xQueueReceive_Stub(queue_return_stub);  // Return captured data
    process_sensor_data_Expect(&data);
    
    data_consumer_step();
}

CMakeLists for Tests

# tests/CMakeLists.txt
cmake_minimum_required(VERSION 3.16)
project(freertos_tests C)

# Unity + CMock
add_subdirectory(unity)
add_subdirectory(cmock)

# Generated mocks
add_library(freertos_mocks STATIC
    mocks/mock_queue.c
    mocks/mock_semphr.c
    mocks/mock_task.c
    mocks/mock_timers.c
)
target_include_directories(freertos_mocks PUBLIC
    mocks/
    ${CMAKE_SOURCE_DIR}/../FreeRTOS/include
)
target_link_libraries(freertos_mocks cmock)

# Test executable
add_executable(freertos_unit_tests
    test_temperature_task.c
    test_motor_task.c
    test_watchdog_manager.c
    test_producer_consumer.c
    # Source files under test
    ${CMAKE_SOURCE_DIR}/../src/tasks/temperature_task.c
    ${CMAKE_SOURCE_DIR}/../src/tasks/motor_task.c
    ${CMAKE_SOURCE_DIR}/../src/watchdog_manager.c
)

target_include_directories(freertos_unit_tests PRIVATE
    ${CMAKE_SOURCE_DIR}/../src
    ${CMAKE_SOURCE_DIR}/../FreeRTOS/include
)

target_link_libraries(freertos_unit_tests
    unity
    freertos_mocks
)

add_test(NAME freertos_unit_tests 
         COMMAND freertos_unit_tests -v)

CI Integration

# .github/workflows/freertos-tests.yml
name: FreeRTOS Unit Tests

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v4
        with:
          submodules: recursive
      
      - name: Install dependencies
        run: |
          sudo apt-get install -y cmake gcc ruby
          gem install cmock
      
      - name: Generate mocks
        run: |
          ruby vendor/cmock/lib/cmock.rb \
              --mock_path=tests/mocks \
              FreeRTOS/include/queue.h \
              FreeRTOS/include/semphr.h \
              FreeRTOS/include/task.h \
              FreeRTOS/include/timers.h
      
      - name: Configure
        run: cmake -B build tests/
      
      - name: Build
        run: cmake --build build
      
      - name: Test
        run: ctest --test-dir build -V --output-junit test_results.xml
      
      - name: Publish test results
        uses: dorny/test-reporter@v1
        if: always()
        with:
          name: FreeRTOS Tests
          path: build/test_results.xml
          reporter: java-junit

Key Takeaways

Testing FreeRTOS code effectively requires:

  1. Refactor tasks — extract _step() functions from infinite loops so each iteration is independently testable
  2. Mock the kernel — use CMock-generated mocks for xQueueSend, xSemaphoreTake, vTaskDelayUntil, and timer APIs
  3. Test behavior, not implementation — verify that correct kernel APIs are called with correct arguments, not internal task state
  4. Separate concerns — keep business logic (what to do) separate from scheduling logic (when to do it)

This approach lets you catch task synchronization bugs, queue overflow conditions, and semaphore misuse before ever flashing to hardware—dramatically reducing debug cycles on the target.

Read more