FreeRTOS Advanced Unit Testing: Tasks, Queues, and Semaphores
FreeRTOS applications are notoriously hard to unit test because they depend on a real-time scheduler, hardware interrupts, and timing. With the right approach—mocking the FreeRTOS kernel API—you can test task logic, queue interactions, and semaphore behavior deterministically on your host machine.
The Problem with Testing FreeRTOS Code
A typical FreeRTOS task looks like this:
void temperature_task(void *pvParameters) {
TickType_t xLastWakeTime = xTaskGetTickCount();
for (;;) {
// Read sensor
float temp = adc_read_temperature();
// Send to queue
if (xQueueSend(temp_queue, &temp, pdMS_TO_TICKS(10)) != pdPASS) {
// Log queue full error
error_handler(ERR_QUEUE_FULL);
}
// Wait for next period
vTaskDelayUntil(&xLastWakeTime, pdMS_TO_TICKS(100));
}
}This function calls xTaskGetTickCount, xQueueSend, vTaskDelayUntil—all FreeRTOS kernel functions. Running this on a host requires either the FreeRTOS POSIX port or mocking all kernel calls.
Approach 1: CMock for FreeRTOS Kernel APIs
Generate mocks from FreeRTOS headers using CMock:
# Generate mocks from FreeRTOS headers
ruby vendor/cmock/lib/cmock.rb \
--mock_path=mocks \
FreeRTOS/include/queue.h \
FreeRTOS/include/semphr.h \
FreeRTOS/include/task.h \
FreeRTOS/include/timers.hThis generates mock_queue.h, mock_semphr.h, etc. with stub functions and record/playback capabilities.
Project Structure
firmware/
├── FreeRTOS/include/ # Real FreeRTOS headers
├── src/
│ ├── tasks/
│ │ ├── temperature_task.c
│ │ └── motor_task.c
│ └── drivers/
│ └── adc_driver.c
└── tests/
├── unity/ # Unity test framework
├── cmock/ # CMock
├── mocks/ # Generated: mock_queue.c, mock_task.c, etc.
├── test_temperature_task.c
└── test_motor_task.cWriting Tests with CMock Mocks
// tests/test_temperature_task.c
#include "unity.h"
#include "mock_queue.h" // Generated mock
#include "mock_task.h" // Generated mock
#include "mock_adc_driver.h"
#include "tasks/temperature_task.h"
// The queue handle used by the task under test
extern QueueHandle_t temp_queue;
void setUp(void) {
// Reset all CMock internal state
mock_queue_Init();
mock_task_Init();
mock_adc_driver_Init();
}
void tearDown(void) {
mock_queue_Verify();
mock_queue_Destroy();
mock_task_Verify();
mock_task_Destroy();
mock_adc_driver_Verify();
mock_adc_driver_Destroy();
}
// Test: task sends valid temperature to queue
void test_TemperatureTask_SendsReadingToQueue(void) {
float expected_temp = 23.5f;
// Expect: task gets tick count for timing
xTaskGetTickCount_ExpectAndReturn(0);
// Expect: ADC read returns our temperature
adc_read_temperature_ExpectAndReturn(expected_temp);
// Expect: temperature sent to queue
// Match any pointer to float, return pdPASS, with 10ms timeout
xQueueSend_ExpectAndReturn(temp_queue, &expected_temp,
pdMS_TO_TICKS(10), pdPASS);
xQueueSend_IgnoreArg_pvItemToQueue(); // Don't check pointer value
// Expect: task delays for 100ms
vTaskDelayUntil_Expect(NULL, pdMS_TO_TICKS(100));
vTaskDelayUntil_IgnoreArg_pxPreviousWakeTime();
// Run one iteration of the task loop
temperature_task_step(); // Refactored to expose single-step
}
// Test: queue full triggers error handler
void test_TemperatureTask_QueueFullCallsErrorHandler(void) {
float expected_temp = 50.0f;
xTaskGetTickCount_ExpectAndReturn(1000);
adc_read_temperature_ExpectAndReturn(expected_temp);
// Queue send fails (full)
xQueueSend_ExpectAndReturn(temp_queue, NULL,
pdMS_TO_TICKS(10), errQUEUE_FULL);
xQueueSend_IgnoreArg_pvItemToQueue();
// Expect error handler called
error_handler_Expect(ERR_QUEUE_FULL);
vTaskDelayUntil_Expect(NULL, pdMS_TO_TICKS(100));
vTaskDelayUntil_IgnoreArg_pxPreviousWakeTime();
temperature_task_step();
}Refactoring Tasks for Testability
The for(;;) loop is untestable. Refactor to expose a single step:
// tasks/temperature_task.c
// Internal step function—testable
void temperature_task_step(void) {
static TickType_t xLastWakeTime = 0;
if (xLastWakeTime == 0) {
xLastWakeTime = xTaskGetTickCount();
}
float temp = adc_read_temperature();
if (xQueueSend(temp_queue, &temp, pdMS_TO_TICKS(10)) != pdPASS) {
error_handler(ERR_QUEUE_FULL);
}
vTaskDelayUntil(&xLastWakeTime, pdMS_TO_TICKS(100));
}
// FreeRTOS task entry point
void temperature_task(void *pvParameters) {
for (;;) {
temperature_task_step();
}
}Testing Semaphore-Based Synchronization
// src/tasks/motor_task.c
// Motor task waits for a semaphore before processing commands
void motor_task(void *pvParameters) {
for (;;) {
if (xSemaphoreTake(motor_cmd_semaphore, portMAX_DELAY) == pdPASS) {
MotorCommand cmd;
if (xQueueReceive(motor_cmd_queue, &cmd, 0) == pdPASS) {
motor_execute_command(&cmd);
}
}
}
}// tests/test_motor_task.c
#include "unity.h"
#include "mock_semphr.h"
#include "mock_queue.h"
#include "mock_motor_driver.h"
#include "tasks/motor_task.h"
void test_MotorTask_ExecutesCommandWhenSemaphoreAvailable(void) {
MotorCommand cmd = {
.type = CMD_MOVE,
.speed = 50,
.direction = DIR_FORWARD
};
// Semaphore is available immediately
xSemaphoreTake_ExpectAndReturn(motor_cmd_semaphore,
portMAX_DELAY, pdPASS);
// Command waiting in queue
xQueueReceive_ExpectAndReturn(motor_cmd_queue, NULL, 0, pdPASS);
xQueueReceive_ReturnThruPtr_pvBuffer(&cmd);
// Motor executes the command
motor_execute_command_Expect(&cmd);
motor_task_step();
}
void test_MotorTask_SkipsExecutionWhenQueueEmpty(void) {
// Semaphore taken but queue is empty
xSemaphoreTake_ExpectAndReturn(motor_cmd_semaphore,
portMAX_DELAY, pdPASS);
xQueueReceive_ExpectAndReturn(motor_cmd_queue, NULL, 0, errQUEUE_EMPTY);
// motor_execute_command should NOT be called
// (no expectation set for it, CMock will fail if it's called)
motor_task_step();
}
void test_MotorTask_WaitsWhenSemaphoreNotAvailable(void) {
// Semaphore times out (shouldn't happen with portMAX_DELAY but validates handling)
xSemaphoreTake_ExpectAndReturn(motor_cmd_semaphore,
portMAX_DELAY, pdFALSE);
// No queue access, no motor commands when semaphore not taken
motor_task_step();
}Testing Software Timers
// src/watchdog_manager.c
// Manages a software watchdog that resets the system if not fed
static TimerHandle_t watchdog_timer;
static bool watchdog_triggered = false;
static void watchdog_callback(TimerHandle_t xTimer) {
watchdog_triggered = true;
system_reset();
}
void watchdog_init(void) {
watchdog_timer = xTimerCreate(
"Watchdog",
pdMS_TO_TICKS(5000),
pdFALSE, // one-shot
NULL,
watchdog_callback
);
xTimerStart(watchdog_timer, 0);
}
void watchdog_feed(void) {
xTimerReset(watchdog_timer, 0);
}// tests/test_watchdog_manager.c
#include "unity.h"
#include "mock_timers.h"
#include "mock_system.h"
#include "watchdog_manager.h"
static TimerHandle_t fake_timer = (TimerHandle_t)0xDEADBEEF;
void test_WatchdogInit_CreatesAndStartsTimer(void) {
// Expect timer creation with 5000ms period
xTimerCreate_ExpectAndReturn(
"Watchdog",
pdMS_TO_TICKS(5000),
pdFALSE,
NULL,
NULL, // callback—ignore for now
fake_timer
);
xTimerCreate_IgnoreArg_pxCallbackFunction();
// Expect timer started
xTimerStart_ExpectAndReturn(fake_timer, 0, pdPASS);
watchdog_init();
}
void test_WatchdogFeed_ResetsTimer(void) {
// Setup: init without checking creation details
xTimerCreate_IgnoreAndReturn(fake_timer);
xTimerStart_IgnoreAndReturn(pdPASS);
watchdog_init();
// Feed should reset the timer
xTimerReset_ExpectAndReturn(fake_timer, 0, pdPASS);
watchdog_feed();
}Testing Inter-Task Communication Patterns
// tests/test_producer_consumer.c
// Test the full producer-consumer chain between tasks
#include "unity.h"
#include "mock_queue.h"
#include "tasks/sensor_producer.h"
#include "tasks/data_consumer.h"
// Simulate passing data from producer to consumer
void test_ProducerConsumer_DataFlowsCorrectly(void) {
SensorData data = {
.temperature = 25.3f,
.humidity = 60.0f,
.timestamp = 12345
};
// Producer: reads sensor, sends to queue
sensor_read_ExpectAndReturn(data);
SensorData sent_data;
xQueueSend_Stub(queue_capture_stub); // Capture what was sent
sensor_producer_step();
// Verify captured data matches
TEST_ASSERT_FLOAT_WITHIN(0.01f, data.temperature, captured_send.temperature);
TEST_ASSERT_FLOAT_WITHIN(0.01f, data.humidity, captured_send.humidity);
TEST_ASSERT_EQUAL_UINT32(data.timestamp, captured_send.timestamp);
// Consumer: receives from queue, processes
xQueueReceive_Stub(queue_return_stub); // Return captured data
process_sensor_data_Expect(&data);
data_consumer_step();
}CMakeLists for Tests
# tests/CMakeLists.txt
cmake_minimum_required(VERSION 3.16)
project(freertos_tests C)
# Unity + CMock
add_subdirectory(unity)
add_subdirectory(cmock)
# Generated mocks
add_library(freertos_mocks STATIC
mocks/mock_queue.c
mocks/mock_semphr.c
mocks/mock_task.c
mocks/mock_timers.c
)
target_include_directories(freertos_mocks PUBLIC
mocks/
${CMAKE_SOURCE_DIR}/../FreeRTOS/include
)
target_link_libraries(freertos_mocks cmock)
# Test executable
add_executable(freertos_unit_tests
test_temperature_task.c
test_motor_task.c
test_watchdog_manager.c
test_producer_consumer.c
# Source files under test
${CMAKE_SOURCE_DIR}/../src/tasks/temperature_task.c
${CMAKE_SOURCE_DIR}/../src/tasks/motor_task.c
${CMAKE_SOURCE_DIR}/../src/watchdog_manager.c
)
target_include_directories(freertos_unit_tests PRIVATE
${CMAKE_SOURCE_DIR}/../src
${CMAKE_SOURCE_DIR}/../FreeRTOS/include
)
target_link_libraries(freertos_unit_tests
unity
freertos_mocks
)
add_test(NAME freertos_unit_tests
COMMAND freertos_unit_tests -v)CI Integration
# .github/workflows/freertos-tests.yml
name: FreeRTOS Unit Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: Install dependencies
run: |
sudo apt-get install -y cmake gcc ruby
gem install cmock
- name: Generate mocks
run: |
ruby vendor/cmock/lib/cmock.rb \
--mock_path=tests/mocks \
FreeRTOS/include/queue.h \
FreeRTOS/include/semphr.h \
FreeRTOS/include/task.h \
FreeRTOS/include/timers.h
- name: Configure
run: cmake -B build tests/
- name: Build
run: cmake --build build
- name: Test
run: ctest --test-dir build -V --output-junit test_results.xml
- name: Publish test results
uses: dorny/test-reporter@v1
if: always()
with:
name: FreeRTOS Tests
path: build/test_results.xml
reporter: java-junitKey Takeaways
Testing FreeRTOS code effectively requires:
- Refactor tasks — extract
_step()functions from infinite loops so each iteration is independently testable - Mock the kernel — use CMock-generated mocks for
xQueueSend,xSemaphoreTake,vTaskDelayUntil, and timer APIs - Test behavior, not implementation — verify that correct kernel APIs are called with correct arguments, not internal task state
- Separate concerns — keep business logic (what to do) separate from scheduling logic (when to do it)
This approach lets you catch task synchronization bugs, queue overflow conditions, and semaphore misuse before ever flashing to hardware—dramatically reducing debug cycles on the target.