NIST Cybersecurity Framework Testing Implementation

NIST Cybersecurity Framework Testing Implementation

The NIST Cybersecurity Framework (CSF) provides a flexible structure for managing cybersecurity risk. Unlike FIPS 140 or SOX, the CSF is not prescriptive — it describes outcomes, not specific controls. This makes it adaptable but also harder to test against. This guide maps CSF functions and categories to concrete automated tests.

CSF Structure

The NIST CSF 2.0 (released 2024) organizes cybersecurity into six functions:

Function Focus Example Categories
Govern Oversight and strategy Risk management strategy, supply chain risk
Identify Asset and risk management Asset inventory, vulnerability management
Protect Safeguards Access control, data security, awareness training
Detect Anomaly detection Monitoring, detection processes
Respond Incident response Analysis, containment, recovery
Recover Restoration Recovery planning, improvements

Developer testing primarily covers Identify, Protect, and Detect. The other functions involve processes, policies, and organizational activities.

Identify Function

ID.AM: Asset Management

The CSF requires maintaining an inventory of software, data, and system assets. Test that your inventory mechanisms work:

# test/nist_csf/test_identify.py
"""NIST CSF 2.0 — Identify Function"""

class TestIDAssetManagement:
    """ID.AM: Asset Management"""
    
    def test_software_component_inventory_complete(self):
        """ID.AM-01: Software assets inventoried — all dependencies tracked"""
        # Verify SBOM (Software Bill of Materials) is generated
        from pathlib import Path
        sbom_path = Path('sbom.json')
        
        assert sbom_path.exists(), \
            "NIST CSF ID.AM-01: SBOM not generated — cannot inventory software assets"
        
        import json
        with open(sbom_path) as f:
            sbom = json.load(f)
        
        assert len(sbom.get('packages', [])) > 0, \
            "NIST CSF ID.AM-01: SBOM has no packages listed"
    
    def test_data_classification_applied(self, db):
        """ID.AM-04: Data is categorized according to classification policy"""
        # Verify all tables with PII have data classification metadata
        tables_with_pii = db.query("""
            SELECT table_name 
            FROM information_schema.columns 
            WHERE column_name IN ('email', 'ssn', 'phone', 'address', 'date_of_birth')
            AND table_schema = 'public'
        """)
        
        for table in tables_with_pii:
            classification = db.query("""
                SELECT classification_level 
                FROM data_classifications 
                WHERE table_name = %s
            """, table['table_name'])
            
            assert len(classification) > 0, \
                f"NIST CSF ID.AM-04: Table '{table['table_name']}' contains PII but has no data classification"
    
    def test_vulnerability_management_scanning_active(self):
        """ID.RA-01: Vulnerability scanning is operational"""
        from src.compliance import get_last_vulnerability_scan
        from datetime import datetime, timezone, timedelta
        
        last_scan = get_last_vulnerability_scan()
        days_since = (datetime.now(timezone.utc) - last_scan).days
        
        assert days_since <= 30, \
            f"NIST CSF ID.RA-01: Last vulnerability scan was {days_since} days ago (monthly required)"

Protect Function

PR.AA: Identity Management and Access Control

# test/nist_csf/test_protect.py
"""NIST CSF 2.0 — Protect Function"""

class TestPRAAAccessControl:
    """PR.AA: Identity Management and Access Control"""
    
    def test_identities_are_authenticated(self, client):
        """PR.AA-01: Identities verified before granting access"""
        response = client.get('/api/resources')
        assert response.status_code == 401, \
            "NIST CSF PR.AA-01: Unauthenticated access to resources must be denied"
    
    def test_least_privilege_enforced(self, client, db):
        """PR.AA-05: Least privilege — users access only what they need"""
        # A read-only user should not be able to write
        read_only_token = create_token(role='read_only')
        
        response = client.post('/api/resources', 
                               json={'name': 'test'},
                               headers={'Authorization': f'Bearer {read_only_token}'})
        
        assert response.status_code == 403, \
            "NIST CSF PR.AA-05: Read-only user should not be able to create resources"
    
    def test_privileged_access_is_revocable(self, client):
        """PR.AA-05: Privileged accounts can be disabled immediately"""
        from src.auth import grant_admin_access, revoke_admin_access, check_access
        
        user_id = 'test-user-nist-csf'
        grant_admin_access(user_id)
        assert check_access(user_id, 'admin') == True
        
        revoke_admin_access(user_id)
        assert check_access(user_id, 'admin') == False, \
            "NIST CSF PR.AA-05: Admin access revocation did not take effect immediately"

class TestPRDSDataSecurity:
    """PR.DS: Data Security"""
    
    def test_data_encrypted_at_rest(self, db):
        """PR.DS-01: Data at rest is protected"""
        # Verify database encryption is enabled
        # For PostgreSQL: check pg_tablespace encryption
        # For RDS: check storage_encrypted parameter
        
        result = db.query("SELECT current_setting('ssl') as ssl_enabled")
        # This checks SSL connections; actual at-rest encryption is at infrastructure level
        
        # Verify sensitive fields are encrypted in application layer
        from src.models import User
        
        # Get a test record and verify SSN is stored encrypted
        user = User.get_test_user()
        if user and user.ssn:
            assert user.ssn.startswith('enc:'), \
                "NIST CSF PR.DS-01: SSN field not encrypted at application layer"
    
    def test_data_encrypted_in_transit(self):
        """PR.DS-02: Data in transit is protected"""
        import ssl
        import urllib.request
        
        # Verify HTTPS is enforced (HTTP redirects to HTTPS)
        try:
            response = urllib.request.urlopen('http://api.example.com/health',
                                               timeout=5)
            final_url = response.url
            assert final_url.startswith('https://'), \
                "NIST CSF PR.DS-02: HTTP not redirected to HTTPS"
        except Exception:
            pass  # Connection refused is acceptable (HTTP not listening)
    
    def test_backup_integrity_verified(self):
        """PR.DS-11: Backups are verified"""
        from src.backup import get_last_backup_verification
        from datetime import datetime, timezone, timedelta
        
        last_verified = get_last_backup_verification()
        if last_verified is None:
            raise AssertionError("NIST CSF PR.DS-11: Backup verification has never run")
        
        days_since = (datetime.now(timezone.utc) - last_verified).days
        assert days_since <= 7, \
            f"NIST CSF PR.DS-11: Last backup verification was {days_since} days ago"

class TestPRIRImprovedProcesses:
    """PR.IR: Technology Infrastructure Resilience"""
    
    def test_network_segmentation_enforced(self):
        """PR.IR-01: Networks and environments segmented"""
        import socket
        
        # Database should not be reachable from public internet
        # Test that a direct connection attempt fails from outside the VPC
        # This is typically tested via infrastructure scanning, not unit tests
        # Here we test the application-layer boundary
        
        from src.config import get_database_config
        db_config = get_database_config()
        
        assert not db_config.get('public_access', False), \
            "NIST CSF PR.IR-01: Database is configured with public access"

Detect Function

DE.CM: Continuous Monitoring

# test/nist_csf/test_detect.py
"""NIST CSF 2.0 — Detect Function"""

class TestDECMContinuousMonitoring:
    """DE.CM: Continuous Monitoring"""
    
    def test_failed_authentication_detection(self, client, security_events):
        """DE.CM-01: Networks monitored to detect attacks — brute force"""
        # Simulate brute force attempts
        for i in range(10):
            client.post('/auth/login', json={
                'username': 'target@example.com',
                'password': f'wrong-password-{i}'
            })
        
        # Verify a security event was fired
        brute_force_events = security_events.get(event_type='brute_force_attempt')
        assert len(brute_force_events) > 0, \
            "NIST CSF DE.CM-01: Brute force attack not detected and logged"
    
    def test_anomalous_access_patterns_detected(self, security_events):
        """DE.CM-03: Personnel activity monitored — anomaly detection"""
        from src.security import check_access_anomaly
        
        # Normal: user accesses 10 records/hour
        # Anomalous: user accesses 10,000 records/hour
        
        result = check_access_anomaly(
            user_id='user-123',
            records_accessed=10000,
            time_window_minutes=60
        )
        
        assert result.get('anomaly_detected') == True, \
            "NIST CSF DE.CM-03: Mass data access not flagged as anomalous"
    
    def test_unauthorized_software_execution_detected(self):
        """DE.CM-09: Computing hardware and software monitored"""
        # Verify process monitoring is active (check for expected monitoring agent)
        import subprocess
        
        result = subprocess.run(['pgrep', '-x', 'auditd'], 
                                capture_output=True, text=True)
        
        if result.returncode != 0:
            # auditd not running — check for alternative
            result = subprocess.run(['systemctl', 'is-active', 'auditd'],
                                    capture_output=True, text=True)
            assert result.returncode == 0, \
                "NIST CSF DE.CM-09: auditd not running — process monitoring inactive"
    
    def test_security_logs_shipped_to_siem(self):
        """DE.CM: Adverse events detected and analyzed"""
        from src.logging import get_log_forwarding_config
        
        config = get_log_forwarding_config()
        assert config.get('siem_endpoint') is not None, \
            "NIST CSF DE.CM: SIEM log forwarding not configured"
        
        # Verify the SIEM endpoint is reachable
        import socket
        host = config['siem_endpoint']
        port = config.get('siem_port', 514)
        
        try:
            socket.setdefaulttimeout(3)
            socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
        except socket.error:
            raise AssertionError(
                f"NIST CSF DE.CM: SIEM endpoint {host}:{port} not reachable"
            )

Automated CSF Assessment

Map test results to CSF subcategories for audit reporting:

# scripts/nist-csf-assessment.py
"""
Runs all NIST CSF tests and generates a profile report.
"""
import subprocess
import json
from datetime import datetime, timezone

CSF_MAPPING = {
    'TestIDAssetManagement': {
        'function': 'Identify',
        'category': 'ID.AM',
        'description': 'Asset Management'
    },
    'TestPRAAAccessControl': {
        'function': 'Protect',
        'category': 'PR.AA',
        'description': 'Identity Management and Access Control'
    },
    'TestPRDSDataSecurity': {
        'function': 'Protect',
        'category': 'PR.DS',
        'description': 'Data Security'
    },
    'TestDECMContinuousMonitoring': {
        'function': 'Detect',
        'category': 'DE.CM',
        'description': 'Continuous Monitoring'
    },
}

def run_assessment():
    # Run tests with JSON output
    result = subprocess.run([
        'python3', '-m', 'pytest',
        'test/nist_csf/',
        '-v',
        '--json-report',
        '--json-report-file=nist-csf-results.json'
    ], capture_output=True, text=True)
    
    with open('nist-csf-results.json') as f:
        test_results = json.load(f)
    
    # Build profile
    profile = {
        'assessment_date': datetime.now(timezone.utc).isoformat(),
        'framework': 'NIST CSF 2.0',
        'functions': {}
    }
    
    for test in test_results.get('tests', []):
        class_name = test.get('nodeid', '').split('::')[1] if '::' in test.get('nodeid', '') else ''
        mapping = CSF_MAPPING.get(class_name)
        
        if mapping:
            function = mapping['function']
            if function not in profile['functions']:
                profile['functions'][function] = {'categories': {}}
            
            category = mapping['category']
            if category not in profile['functions'][function]['categories']:
                profile['functions'][function]['categories'][category] = {
                    'description': mapping['description'],
                    'tests': [],
                    'status': 'PASS'
                }
            
            test_info = {
                'name': test.get('nodeid'),
                'outcome': test.get('outcome'),
                'duration': test.get('duration')
            }
            profile['functions'][function]['categories'][category]['tests'].append(test_info)
            
            if test.get('outcome') != 'passed':
                profile['functions'][function]['categories'][category]['status'] = 'FAIL'
    
    output_path = f"nist-csf-profile-{datetime.now().strftime('%Y-%m-%d')}.json"
    with open(output_path, 'w') as f:
        json.dump(profile, f, indent=2)
    
    # Print summary
    print("\nNIST CSF Assessment Summary")
    print("=" * 40)
    for function, data in profile['functions'].items():
        print(f"\n{function}:")
        for category, cat_data in data['categories'].items():
            status = cat_data['status']
            icon = '✅' if status == 'PASS' else '❌'
            print(f"  {icon} {category}: {cat_data['description']}{status}")

if __name__ == '__main__':
    run_assessment()

Summary

NIST CSF testing maps framework categories to automated checks:

  • Identify: Asset inventory (SBOM), data classification, vulnerability scan recency
  • Protect: Authentication enforcement, least privilege, encryption at rest/transit, backup verification
  • Detect: Brute force detection, anomaly detection, SIEM log forwarding, process monitoring

The CSF's outcomes-based approach means tests verify that security objectives are met — not that specific controls are configured. This allows flexibility in implementation while still producing auditable evidence. Running these tests in CI and generating CSF profile reports gives you continuous assessment rather than point-in-time snapshots.

Read more