NIST Cybersecurity Framework Testing Implementation
The NIST Cybersecurity Framework (CSF) provides a flexible structure for managing cybersecurity risk. Unlike FIPS 140 or SOX, the CSF is not prescriptive — it describes outcomes, not specific controls. This makes it adaptable but also harder to test against. This guide maps CSF functions and categories to concrete automated tests.
CSF Structure
The NIST CSF 2.0 (released 2024) organizes cybersecurity into six functions:
| Function | Focus | Example Categories |
|---|---|---|
| Govern | Oversight and strategy | Risk management strategy, supply chain risk |
| Identify | Asset and risk management | Asset inventory, vulnerability management |
| Protect | Safeguards | Access control, data security, awareness training |
| Detect | Anomaly detection | Monitoring, detection processes |
| Respond | Incident response | Analysis, containment, recovery |
| Recover | Restoration | Recovery planning, improvements |
Developer testing primarily covers Identify, Protect, and Detect. The other functions involve processes, policies, and organizational activities.
Identify Function
ID.AM: Asset Management
The CSF requires maintaining an inventory of software, data, and system assets. Test that your inventory mechanisms work:
# test/nist_csf/test_identify.py
"""NIST CSF 2.0 — Identify Function"""
class TestIDAssetManagement:
"""ID.AM: Asset Management"""
def test_software_component_inventory_complete(self):
"""ID.AM-01: Software assets inventoried — all dependencies tracked"""
# Verify SBOM (Software Bill of Materials) is generated
from pathlib import Path
sbom_path = Path('sbom.json')
assert sbom_path.exists(), \
"NIST CSF ID.AM-01: SBOM not generated — cannot inventory software assets"
import json
with open(sbom_path) as f:
sbom = json.load(f)
assert len(sbom.get('packages', [])) > 0, \
"NIST CSF ID.AM-01: SBOM has no packages listed"
def test_data_classification_applied(self, db):
"""ID.AM-04: Data is categorized according to classification policy"""
# Verify all tables with PII have data classification metadata
tables_with_pii = db.query("""
SELECT table_name
FROM information_schema.columns
WHERE column_name IN ('email', 'ssn', 'phone', 'address', 'date_of_birth')
AND table_schema = 'public'
""")
for table in tables_with_pii:
classification = db.query("""
SELECT classification_level
FROM data_classifications
WHERE table_name = %s
""", table['table_name'])
assert len(classification) > 0, \
f"NIST CSF ID.AM-04: Table '{table['table_name']}' contains PII but has no data classification"
def test_vulnerability_management_scanning_active(self):
"""ID.RA-01: Vulnerability scanning is operational"""
from src.compliance import get_last_vulnerability_scan
from datetime import datetime, timezone, timedelta
last_scan = get_last_vulnerability_scan()
days_since = (datetime.now(timezone.utc) - last_scan).days
assert days_since <= 30, \
f"NIST CSF ID.RA-01: Last vulnerability scan was {days_since} days ago (monthly required)"Protect Function
PR.AA: Identity Management and Access Control
# test/nist_csf/test_protect.py
"""NIST CSF 2.0 — Protect Function"""
class TestPRAAAccessControl:
"""PR.AA: Identity Management and Access Control"""
def test_identities_are_authenticated(self, client):
"""PR.AA-01: Identities verified before granting access"""
response = client.get('/api/resources')
assert response.status_code == 401, \
"NIST CSF PR.AA-01: Unauthenticated access to resources must be denied"
def test_least_privilege_enforced(self, client, db):
"""PR.AA-05: Least privilege — users access only what they need"""
# A read-only user should not be able to write
read_only_token = create_token(role='read_only')
response = client.post('/api/resources',
json={'name': 'test'},
headers={'Authorization': f'Bearer {read_only_token}'})
assert response.status_code == 403, \
"NIST CSF PR.AA-05: Read-only user should not be able to create resources"
def test_privileged_access_is_revocable(self, client):
"""PR.AA-05: Privileged accounts can be disabled immediately"""
from src.auth import grant_admin_access, revoke_admin_access, check_access
user_id = 'test-user-nist-csf'
grant_admin_access(user_id)
assert check_access(user_id, 'admin') == True
revoke_admin_access(user_id)
assert check_access(user_id, 'admin') == False, \
"NIST CSF PR.AA-05: Admin access revocation did not take effect immediately"
class TestPRDSDataSecurity:
"""PR.DS: Data Security"""
def test_data_encrypted_at_rest(self, db):
"""PR.DS-01: Data at rest is protected"""
# Verify database encryption is enabled
# For PostgreSQL: check pg_tablespace encryption
# For RDS: check storage_encrypted parameter
result = db.query("SELECT current_setting('ssl') as ssl_enabled")
# This checks SSL connections; actual at-rest encryption is at infrastructure level
# Verify sensitive fields are encrypted in application layer
from src.models import User
# Get a test record and verify SSN is stored encrypted
user = User.get_test_user()
if user and user.ssn:
assert user.ssn.startswith('enc:'), \
"NIST CSF PR.DS-01: SSN field not encrypted at application layer"
def test_data_encrypted_in_transit(self):
"""PR.DS-02: Data in transit is protected"""
import ssl
import urllib.request
# Verify HTTPS is enforced (HTTP redirects to HTTPS)
try:
response = urllib.request.urlopen('http://api.example.com/health',
timeout=5)
final_url = response.url
assert final_url.startswith('https://'), \
"NIST CSF PR.DS-02: HTTP not redirected to HTTPS"
except Exception:
pass # Connection refused is acceptable (HTTP not listening)
def test_backup_integrity_verified(self):
"""PR.DS-11: Backups are verified"""
from src.backup import get_last_backup_verification
from datetime import datetime, timezone, timedelta
last_verified = get_last_backup_verification()
if last_verified is None:
raise AssertionError("NIST CSF PR.DS-11: Backup verification has never run")
days_since = (datetime.now(timezone.utc) - last_verified).days
assert days_since <= 7, \
f"NIST CSF PR.DS-11: Last backup verification was {days_since} days ago"
class TestPRIRImprovedProcesses:
"""PR.IR: Technology Infrastructure Resilience"""
def test_network_segmentation_enforced(self):
"""PR.IR-01: Networks and environments segmented"""
import socket
# Database should not be reachable from public internet
# Test that a direct connection attempt fails from outside the VPC
# This is typically tested via infrastructure scanning, not unit tests
# Here we test the application-layer boundary
from src.config import get_database_config
db_config = get_database_config()
assert not db_config.get('public_access', False), \
"NIST CSF PR.IR-01: Database is configured with public access"Detect Function
DE.CM: Continuous Monitoring
# test/nist_csf/test_detect.py
"""NIST CSF 2.0 — Detect Function"""
class TestDECMContinuousMonitoring:
"""DE.CM: Continuous Monitoring"""
def test_failed_authentication_detection(self, client, security_events):
"""DE.CM-01: Networks monitored to detect attacks — brute force"""
# Simulate brute force attempts
for i in range(10):
client.post('/auth/login', json={
'username': 'target@example.com',
'password': f'wrong-password-{i}'
})
# Verify a security event was fired
brute_force_events = security_events.get(event_type='brute_force_attempt')
assert len(brute_force_events) > 0, \
"NIST CSF DE.CM-01: Brute force attack not detected and logged"
def test_anomalous_access_patterns_detected(self, security_events):
"""DE.CM-03: Personnel activity monitored — anomaly detection"""
from src.security import check_access_anomaly
# Normal: user accesses 10 records/hour
# Anomalous: user accesses 10,000 records/hour
result = check_access_anomaly(
user_id='user-123',
records_accessed=10000,
time_window_minutes=60
)
assert result.get('anomaly_detected') == True, \
"NIST CSF DE.CM-03: Mass data access not flagged as anomalous"
def test_unauthorized_software_execution_detected(self):
"""DE.CM-09: Computing hardware and software monitored"""
# Verify process monitoring is active (check for expected monitoring agent)
import subprocess
result = subprocess.run(['pgrep', '-x', 'auditd'],
capture_output=True, text=True)
if result.returncode != 0:
# auditd not running — check for alternative
result = subprocess.run(['systemctl', 'is-active', 'auditd'],
capture_output=True, text=True)
assert result.returncode == 0, \
"NIST CSF DE.CM-09: auditd not running — process monitoring inactive"
def test_security_logs_shipped_to_siem(self):
"""DE.CM: Adverse events detected and analyzed"""
from src.logging import get_log_forwarding_config
config = get_log_forwarding_config()
assert config.get('siem_endpoint') is not None, \
"NIST CSF DE.CM: SIEM log forwarding not configured"
# Verify the SIEM endpoint is reachable
import socket
host = config['siem_endpoint']
port = config.get('siem_port', 514)
try:
socket.setdefaulttimeout(3)
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
except socket.error:
raise AssertionError(
f"NIST CSF DE.CM: SIEM endpoint {host}:{port} not reachable"
)Automated CSF Assessment
Map test results to CSF subcategories for audit reporting:
# scripts/nist-csf-assessment.py
"""
Runs all NIST CSF tests and generates a profile report.
"""
import subprocess
import json
from datetime import datetime, timezone
CSF_MAPPING = {
'TestIDAssetManagement': {
'function': 'Identify',
'category': 'ID.AM',
'description': 'Asset Management'
},
'TestPRAAAccessControl': {
'function': 'Protect',
'category': 'PR.AA',
'description': 'Identity Management and Access Control'
},
'TestPRDSDataSecurity': {
'function': 'Protect',
'category': 'PR.DS',
'description': 'Data Security'
},
'TestDECMContinuousMonitoring': {
'function': 'Detect',
'category': 'DE.CM',
'description': 'Continuous Monitoring'
},
}
def run_assessment():
# Run tests with JSON output
result = subprocess.run([
'python3', '-m', 'pytest',
'test/nist_csf/',
'-v',
'--json-report',
'--json-report-file=nist-csf-results.json'
], capture_output=True, text=True)
with open('nist-csf-results.json') as f:
test_results = json.load(f)
# Build profile
profile = {
'assessment_date': datetime.now(timezone.utc).isoformat(),
'framework': 'NIST CSF 2.0',
'functions': {}
}
for test in test_results.get('tests', []):
class_name = test.get('nodeid', '').split('::')[1] if '::' in test.get('nodeid', '') else ''
mapping = CSF_MAPPING.get(class_name)
if mapping:
function = mapping['function']
if function not in profile['functions']:
profile['functions'][function] = {'categories': {}}
category = mapping['category']
if category not in profile['functions'][function]['categories']:
profile['functions'][function]['categories'][category] = {
'description': mapping['description'],
'tests': [],
'status': 'PASS'
}
test_info = {
'name': test.get('nodeid'),
'outcome': test.get('outcome'),
'duration': test.get('duration')
}
profile['functions'][function]['categories'][category]['tests'].append(test_info)
if test.get('outcome') != 'passed':
profile['functions'][function]['categories'][category]['status'] = 'FAIL'
output_path = f"nist-csf-profile-{datetime.now().strftime('%Y-%m-%d')}.json"
with open(output_path, 'w') as f:
json.dump(profile, f, indent=2)
# Print summary
print("\nNIST CSF Assessment Summary")
print("=" * 40)
for function, data in profile['functions'].items():
print(f"\n{function}:")
for category, cat_data in data['categories'].items():
status = cat_data['status']
icon = '✅' if status == 'PASS' else '❌'
print(f" {icon} {category}: {cat_data['description']} — {status}")
if __name__ == '__main__':
run_assessment()Summary
NIST CSF testing maps framework categories to automated checks:
- Identify: Asset inventory (SBOM), data classification, vulnerability scan recency
- Protect: Authentication enforcement, least privilege, encryption at rest/transit, backup verification
- Detect: Brute force detection, anomaly detection, SIEM log forwarding, process monitoring
The CSF's outcomes-based approach means tests verify that security objectives are met — not that specific controls are configured. This allows flexibility in implementation while still producing auditable evidence. Running these tests in CI and generating CSF profile reports gives you continuous assessment rather than point-in-time snapshots.