Hardware-in-the-Loop Testing Patterns: From Automotive to IoT
Hardware-in-the-Loop (HIL) testing puts real firmware under test by simulating the physical environment it would operate in. While basic HIL setups test simple I/O, production-grade HIL rigs implement sophisticated patterns for automotive ECUs, industrial controllers, and IoT devices. This guide covers the architectural patterns that make HIL testing scalable and reliable.
HIL Architecture Layers
A production HIL rig has four distinct layers:
┌─────────────────────────────────────────────┐
│ Test Orchestration Layer │
│ (Python/Robot Framework test scripts) │
├─────────────────────────────────────────────┤
│ Signal Management Layer │
│ (stimulus generation, capture, analysis) │
├─────────────────────────────────────────────┤
│ Hardware Interface Layer │
│ (DAQ cards, CAN interfaces, serial, GPIO) │
├─────────────────────────────────────────────┤
│ Device Under Test (DUT) │
│ (your ECU, microcontroller, or IoT device) │
└─────────────────────────────────────────────┘Each layer has a well-defined interface. This separation lets you swap hardware interfaces without rewriting test scripts.
Pattern 1: Stimulus-Response Testing
The most common HIL pattern: generate a stimulus, observe the DUT's response, validate against spec.
# hil/patterns/stimulus_response.py
from dataclasses import dataclass
from typing import Callable, Any
import time
@dataclass
class Stimulus:
name: str
channel: str
waveform: list # [(time_s, value), ...]
@dataclass
class ExpectedResponse:
channel: str
condition: Callable[[float], bool]
timeout_s: float
description: str
class StimulusResponseTest:
def __init__(self, daq, dut):
self.daq = daq
self.dut = dut
def run(self, stimuli: list[Stimulus],
responses: list[ExpectedResponse]) -> dict:
results = {}
# Apply all stimuli
for stim in stimuli:
self.daq.program_waveform(stim.channel, stim.waveform)
# Start capture
self.daq.start_capture()
# Start stimulus playback
self.daq.start_playback()
# Check each expected response
for resp in responses:
deadline = time.time() + resp.timeout_s
captured = False
while time.time() < deadline:
value = self.daq.read(resp.channel)
if resp.condition(value):
captured = True
break
time.sleep(0.001) # 1ms polling
results[resp.channel] = {
'passed': captured,
'description': resp.description
}
self.daq.stop()
return results# Example: test fan turns on when temperature exceeds threshold
test = StimulusResponseTest(daq=my_daq, dut=my_ecu)
results = test.run(
stimuli=[
Stimulus(
name="temperature_ramp",
channel="analog_out_0", # connected to temp sensor input
waveform=[
(0.0, 20.0), # 20°C at t=0
(5.0, 20.0), # hold at 20°C for 5s
(5.1, 80.0), # step to 80°C (above threshold)
(15.0, 80.0), # hold
]
)
],
responses=[
ExpectedResponse(
channel="digital_in_2", # fan control output
condition=lambda v: v > 3.3, # logic high
timeout_s=1.0, # should respond within 1 second
description="Fan enables within 1s of temp exceeding 75°C"
)
]
)
assert results["digital_in_2"]["passed"], "Fan did not activate on overheat"Pattern 2: Fault Injection
Systematically inject hardware faults to validate DUT error handling:
# hil/patterns/fault_injection.py
from enum import Enum
class FaultType(Enum):
OPEN_CIRCUIT = "open" # Disconnect signal
SHORT_TO_GND = "short_gnd" # Pull signal to 0V
SHORT_TO_VCC = "short_vcc" # Pull signal to VCC
OUT_OF_RANGE = "oor" # Apply value outside valid range
INTERMITTENT = "intermittent" # Toggle rapidly
class FaultInjector:
def __init__(self, relay_board, daq):
self.relay = relay_board
self.daq = daq
def inject(self, channel: str, fault: FaultType,
duration_s: float = None):
"""Inject fault and optionally auto-restore."""
if fault == FaultType.OPEN_CIRCUIT:
self.relay.open(channel)
elif fault == FaultType.SHORT_TO_GND:
self.relay.short_to_gnd(channel)
elif fault == FaultType.SHORT_TO_VCC:
self.relay.short_to_vcc(channel)
elif fault == FaultType.OUT_OF_RANGE:
self.daq.set_voltage(channel, 6.0) # 6V on 5V system
elif fault == FaultType.INTERMITTENT:
self._intermittent_fault(channel, duration_s)
return
if duration_s:
time.sleep(duration_s)
self.restore(channel)
def restore(self, channel: str):
self.relay.restore(channel)
self.daq.restore(channel)
def _intermittent_fault(self, channel: str, duration_s: float):
deadline = time.time() + duration_s
while time.time() < deadline:
self.relay.open(channel)
time.sleep(0.05)
self.relay.restore(channel)
time.sleep(0.05)# Test: DUT handles sensor open-circuit fault gracefully
class SensorFaultTest:
def test_open_circuit_sets_fault_code(self, hil_fixture):
dut = hil_fixture.dut
injector = hil_fixture.fault_injector
can_reader = hil_fixture.can_reader
# Baseline: sensor reading valid
assert dut.read_diagnostic() == "NO_FAULT"
# Inject open circuit on temperature sensor
injector.inject("temp_sensor", FaultType.OPEN_CIRCUIT)
# DUT should detect and report fault within 500ms
time.sleep(0.5)
# Read fault code via CAN diagnostic message
dtcs = can_reader.read_dtcs()
assert "P0118" in dtcs, "Temperature circuit high fault not set"
# Restore fault and verify DUT clears code
injector.restore("temp_sensor")
time.sleep(1.0)
dtcs = can_reader.read_dtcs()
assert "P0118" not in dtcs, "DTC not cleared after fault removal"Pattern 3: Timing Validation
Real-time systems must respond within defined time windows:
# hil/patterns/timing_validator.py
import numpy as np
from dataclasses import dataclass
@dataclass
class TimingSpec:
stimulus_channel: str
response_channel: str
min_delay_ms: float
max_delay_ms: float
stimulus_threshold: float
response_threshold: float
class TimingValidator:
def __init__(self, high_speed_daq):
# Requires high sample rate: >= 100kHz for ms-level timing
self.daq = high_speed_daq
def measure_response_time(self, spec: TimingSpec,
num_samples: int = 100) -> dict:
delays = []
for _ in range(num_samples):
# Capture both channels at high rate
self.daq.start_synchronized_capture(
channels=[spec.stimulus_channel, spec.response_channel],
sample_rate_hz=100_000,
duration_s=0.1
)
# Apply stimulus
self.daq.pulse(spec.stimulus_channel, width_ms=50)
# Wait for capture
data = self.daq.get_capture()
# Find stimulus edge
stim = data[spec.stimulus_channel]
resp = data[spec.response_channel]
time_axis = data['time']
stim_edge = self._find_rising_edge(stim, time_axis,
spec.stimulus_threshold)
resp_edge = self._find_rising_edge(resp, time_axis,
spec.response_threshold)
if stim_edge and resp_edge:
delay_ms = (resp_edge - stim_edge) * 1000
delays.append(delay_ms)
delays = np.array(delays)
return {
'mean_ms': float(np.mean(delays)),
'min_ms': float(np.min(delays)),
'max_ms': float(np.max(delays)),
'std_ms': float(np.std(delays)),
'p99_ms': float(np.percentile(delays, 99)),
'all_within_spec': bool(
np.all(delays >= spec.min_delay_ms) and
np.all(delays <= spec.max_delay_ms)
),
'violations': int(np.sum(
(delays < spec.min_delay_ms) | (delays > spec.max_delay_ms)
))
}
def _find_rising_edge(self, signal, time_axis, threshold):
for i in range(1, len(signal)):
if signal[i-1] < threshold and signal[i] >= threshold:
return time_axis[i]
return None# Test relay driver meets timing spec
def test_relay_activation_timing():
timing = TimingValidator(daq=high_speed_daq)
spec = TimingSpec(
stimulus_channel="relay_coil_cmd",
response_channel="relay_contact",
min_delay_ms=5.0, # coil needs at least 5ms to energize
max_delay_ms=15.0, # must close within 15ms
stimulus_threshold=2.5,
response_threshold=2.5
)
result = timing.measure_response_time(spec, num_samples=50)
print(f"Relay timing: mean={result['mean_ms']:.2f}ms, "
f"p99={result['p99_ms']:.2f}ms")
assert result['all_within_spec'], (
f"Relay timing violations: {result['violations']}/50 samples "
f"outside [{spec.min_delay_ms}, {spec.max_delay_ms}]ms. "
f"Actual range: [{result['min_ms']:.2f}, {result['max_ms']:.2f}]ms"
)Pattern 4: State Machine Validation
Verify DUT progresses through correct state sequences:
# hil/patterns/state_machine_validator.py
from typing import Optional
import re
class StateMachineValidator:
"""Validates DUT state transitions via CAN messages or serial output."""
def __init__(self, state_reader):
self.reader = state_reader
self.history = []
def expect_sequence(self, expected: list[str],
timeout_s: float = 10.0) -> bool:
"""Wait for DUT to pass through all expected states in order."""
expected_idx = 0
deadline = time.time() + timeout_s
while time.time() < deadline and expected_idx < len(expected):
state = self.reader.get_current_state()
if state != self.history[-1:]: # New state
self.history.append(state)
if state == expected[expected_idx]:
expected_idx += 1
return expected_idx == len(expected)
def forbid_state(self, forbidden: str, duration_s: float) -> bool:
"""Verify DUT never enters a forbidden state during a period."""
deadline = time.time() + duration_s
while time.time() < deadline:
state = self.reader.get_current_state()
if state == forbidden:
return False
time.sleep(0.01)
return Truedef test_motor_startup_sequence():
validator = StateMachineValidator(can_state_reader)
# Trigger startup
hil.set_ignition(True)
# Must follow: INIT → SELF_TEST → IDLE → RUN
assert validator.expect_sequence(
["INIT", "SELF_TEST", "IDLE", "RUN"],
timeout_s=5.0
), f"Invalid startup sequence. Got: {validator.history}"
# Must never enter FAULT during normal startup
assert validator.forbid_state("FAULT", duration_s=5.0), \
"Motor entered FAULT state during normal startup"Pattern 5: Power Cycling and Reset Validation
# hil/patterns/power_cycle.py
class PowerCycleValidator:
def __init__(self, power_supply, dut_monitor):
self.psu = power_supply
self.monitor = dut_monitor
def stress_test(self, cycles: int, on_time_s: float,
off_time_s: float) -> dict:
results = []
for i in range(cycles):
# Power on
self.psu.set_output(True)
time.sleep(on_time_s)
# Capture state before power off
state_before = self.monitor.get_state()
# Power off
self.psu.set_output(False)
time.sleep(off_time_s)
# Power on again
self.psu.set_output(True)
# Wait for DUT to boot
boot_time = self._wait_for_ready(timeout_s=5.0)
# Capture state after reboot
state_after = self.monitor.get_state()
results.append({
'cycle': i,
'boot_time_s': boot_time,
'state_consistent': state_after == 'IDLE',
'data_intact': self._verify_nvram()
})
return {
'total_cycles': cycles,
'all_boots_succeeded': all(r['boot_time_s'] is not None
for r in results),
'max_boot_time_s': max(r['boot_time_s'] or 0 for r in results),
'nvram_integrity': all(r['data_intact'] for r in results),
'results': results
}Integrating HIL into CI
# .github/workflows/hil-tests.yml
name: HIL Tests
on:
push:
branches: [main]
workflow_dispatch:
inputs:
test_suite:
description: 'Test suite to run'
default: 'smoke'
type: choice
options: [smoke, regression, stress]
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run host-side unit tests
run: pytest tests/unit/ -v
hil-tests:
runs-on: self-hosted
needs: unit-tests
environment: hil-rig
steps:
- uses: actions/checkout@v4
- name: Reserve HIL rig
run: ./scripts/reserve_rig.sh --rig stm32-rig-01 --timeout 30m
- name: Flash firmware
run: |
cmake -B build -DTARGET=stm32f4
cmake --build build
./scripts/flash.sh build/firmware.elf
- name: Run HIL test suite
run: |
pytest tests/hil/ \
-v \
--hil-config=config/rig_stm32.yaml \
--test-suite=${{ inputs.test_suite || 'smoke' }} \
--junit-xml=hil_results.xml
- name: Release HIL rig
if: always()
run: ./scripts/release_rig.sh --rig stm32-rig-01
- name: Upload results
uses: actions/upload-artifact@v4
if: always()
with:
name: hil-test-results
path: hil_results.xmlSignal Conditioning and Calibration
Real HIL rigs require calibration before tests are reliable:
# hil/calibration.py
class RigCalibrator:
def __init__(self, daq, reference_meter):
self.daq = daq
self.meter = reference_meter
def calibrate_analog_inputs(self) -> dict:
"""Measure actual vs expected voltages and compute correction factors."""
cal_points = [0.0, 1.0, 2.5, 4.0, 5.0]
corrections = {}
for channel in self.daq.analog_input_channels:
corrections[channel] = []
for voltage in cal_points:
# Apply known voltage from calibrated source
self.daq.apply_cal_voltage(channel, voltage)
time.sleep(0.1)
# Read from DUT and from reference meter
daq_reading = self.daq.read(channel)
ref_reading = self.meter.read(channel)
corrections[channel].append({
'applied': voltage,
'daq': daq_reading,
'reference': ref_reading,
'error_pct': abs(daq_reading - ref_reading) / ref_reading * 100
})
return corrections
def save_cal_file(self, corrections: dict, path: str):
import json
with open(path, 'w') as f:
json.dump({
'timestamp': time.time(),
'corrections': corrections
}, f, indent=2)Summary
Production HIL testing patterns:
| Pattern | Use Case | Key Implementation |
|---|---|---|
| Stimulus-Response | Basic I/O validation | Waveform playback + channel polling |
| Fault Injection | Error handling validation | Relay boards + DAQ out-of-range |
| Timing Validation | Real-time spec compliance | High-speed synchronized capture |
| State Machine | Protocol and sequencing | CAN/serial state readback |
| Power Cycling | Reset and NVRAM integrity | Programmable PSU + boot monitoring |
The overarching principle: every physical interaction with the DUT is mediated by a hardware interface layer that has a software representation. This lets you write test logic in Python or Robot Framework, swap rigs without rewriting tests, and integrate HIL execution into the same CI pipelines that run unit and integration tests.