From da6e62edf804d7887e0b77496b7a1c6efa5ad708 Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Fri, 3 Apr 2026 12:41:55 -0700 Subject: [PATCH 1/4] Build: Add KEK validation script and authvar verify command Add signature verification support to auth_var_tool.py with PKCS7 parsing, certificate extraction, and signer verification. Add validate_kek.py to validate one file or a folder of KEK updates against expected variable metadata and payload hash, and emit JSON results for automation. Signed-off-by: Doug Flick --- scripts/auth_var_tool.py | 393 ++++++++++++++++++++++++++++++++++++- scripts/validate_kek.py | 408 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 795 insertions(+), 6 deletions(-) create mode 100644 scripts/validate_kek.py diff --git a/scripts/auth_var_tool.py b/scripts/auth_var_tool.py index b3a6c2d..63585f3 100644 --- a/scripts/auth_var_tool.py +++ b/scripts/auth_var_tool.py @@ -5,11 +5,12 @@ ## """UEFI Authenticated Variable Tool for signing and formatting variables. -This tool provides three main commands: +This tool provides four main commands: 1. format: Generates signable data and receipt files for external signing workflows 2. sign: Signs variables using PFX files or attaches pre-generated signatures -3. describe: Parses and describes existing signed variables +3. verify: Verifies cryptographic signatures of authenticated variables +4. describe: Parses and describes existing signed variables The tool supports both direct signing (using PFX files) and external signing workflows (where signatures are generated outside this tool and then attached). @@ -31,25 +32,37 @@ # Attach external signature using receipt python auth_var_tool.py sign --receipt-file MyVar.receipt.json --signature-file MyVar.bin.p7 + # Verify a signed authenticated variable + python auth_var_tool.py verify MyVar.authvar.bin MyVar 8be4df61-93ca-11d2-aa0d-00e098032b8c "NV,BS,RT,AT,AP" -v + # Describe an existing signed variable python auth_var_tool.py describe signed_variable.bin """ import argparse import datetime +import hashlib import io import json import logging import os +import re import sys import uuid from getpass import getpass +from cryptography import x509 +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa from cryptography.hazmat.primitives.serialization import pkcs12 from edk2toollib.uefi.authenticated_variables_structure_support import ( EfiVariableAuthentication2, EfiVariableAuthentication2Builder, ) +from pyasn1.codec.der import decoder, encoder +from pyasn1_modules import rfc2315 # Puts the script into debug mode, may be enabled via argparse ENABLE_DEBUG = False @@ -92,6 +105,253 @@ def _parse_timestamp(timestamp_str: str = None) -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) +def _get_hash_algorithm_from_oid(oid: str) -> hashes.HashAlgorithm | None: + """Map OID to cryptography hash algorithm.""" + oid_map = { + '2.16.840.1.101.3.4.2.1': hashes.SHA256(), + '2.16.840.1.101.3.4.2.2': hashes.SHA384(), + '2.16.840.1.101.3.4.2.3': hashes.SHA512(), + '1.3.14.3.2.26': hashes.SHA1(), + } + return oid_map.get(oid) + + +def _extract_certificates_from_pkcs7(pkcs7_data: bytes) -> list: + """Extract X.509 certificates from PKCS7 data.""" + certificates = [] + try: + # Try to decode as ContentInfo first + try: + content_info, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.ContentInfo()) + signed_data, _ = decoder.decode( + bytes(content_info['content']), + asn1Spec=rfc2315.SignedData() + ) + except Exception: + # If that fails, try decoding directly as SignedData + signed_data, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.SignedData()) + + # Extract certificates if present + if signed_data['certificates'].hasValue(): + for cert_choice in signed_data['certificates']: + cert_der = encoder.encode(cert_choice['certificate']) + cert = x509.load_der_x509_certificate(cert_der, default_backend()) + certificates.append(cert) + + except Exception as e: + logger.debug(f"Failed to extract certificates: {e}") + + return certificates + + +def _verify_pkcs7_signature(pkcs7_data: bytes, certificates: list, external_data: bytes) -> dict: + """Verify PKCS7 detached signature against external data. + + Returns: + dict: Verification results with 'verified' boolean and list of 'signers' + """ + results = { + 'verified': False, + 'signers': [], + 'errors': [] + } + + try: + # Decode PKCS7 structure + try: + content_info, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.ContentInfo()) + signed_data, _ = decoder.decode( + bytes(content_info['content']), + asn1Spec=rfc2315.SignedData() + ) + except Exception: + signed_data, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.SignedData()) + + # Verify each signer + for signer_idx, signer_info in enumerate(signed_data['signerInfos']): + signer_result = { + 'index': signer_idx, + 'verified': False, + 'error': None + } + + try: + # Get digest algorithm + digest_alg_oid = str(signer_info['digestAlgorithm']['algorithm']) + hash_algorithm = _get_hash_algorithm_from_oid(digest_alg_oid) + + if not hash_algorithm: + signer_result['error'] = f"Unsupported digest algorithm: {digest_alg_oid}" + results['errors'].append(signer_result['error']) + results['signers'].append(signer_result) + continue + + # Get the encrypted digest (signature) + encrypted_digest = bytes(signer_info['encryptedDigest']) + + # Find the signer's certificate + serial_number = int(signer_info['issuerAndSerialNumber']['serialNumber']) + signer_cert = None + for cert in certificates: + if cert.serial_number == serial_number: + signer_cert = cert + break + + if not signer_cert: + signer_result['error'] = f"Signer certificate not found (serial: {serial_number})" + results['errors'].append(signer_result['error']) + results['signers'].append(signer_result) + continue + + # Determine what data to verify + if signer_info['authenticatedAttributes'].hasValue(): + # With authenticated attributes, sign the attributes + authenticated_attrs = signer_info['authenticatedAttributes'] + attrs_der = encoder.encode(authenticated_attrs) + # Replace IMPLICIT tag [0] (0xA0) with SET OF tag (0x31) + if attrs_der[0:1] == b'\xa0': + attrs_der = b'\x31' + attrs_der[1:] + data_to_verify = attrs_der + else: + # No authenticated attributes - verify external data directly + data_to_verify = external_data + + # Verify signature + public_key = signer_cert.public_key() + + if isinstance(public_key, rsa.RSAPublicKey): + public_key.verify( + encrypted_digest, + data_to_verify, + padding.PKCS1v15(), + hash_algorithm + ) + signer_result['verified'] = True + elif isinstance(public_key, ec.EllipticCurvePublicKey): + public_key.verify( + encrypted_digest, + data_to_verify, + ec.ECDSA(hash_algorithm) + ) + signer_result['verified'] = True + else: + signer_result['error'] = f"Unsupported key type: {type(public_key)}" + results['errors'].append(signer_result['error']) + + except InvalidSignature: + signer_result['error'] = "Signature verification failed - invalid signature" + results['errors'].append(signer_result['error']) + except Exception as e: + signer_result['error'] = f"Verification error: {str(e)}" + results['errors'].append(signer_result['error']) + + results['signers'].append(signer_result) + + # Overall verification passes if all signers verified + results['verified'] = all(s['verified'] for s in results['signers']) and len(results['signers']) > 0 + + except Exception as e: + results['errors'].append(f"PKCS7 parsing error: {str(e)}") + + return results + + +def verify_variable(args: argparse.Namespace) -> int: + """Verifies the cryptographic signature of an authenticated variable. + + This command validates that: + 1. The PKCS7 signature structure is valid + 2. The signature cryptographically verifies against the signable data + 3. The signing certificate is present in the signature + + Parameters + ---------- + args : argparse.Namespace + Command-line arguments including: + - authvar_file: Path to the signed authenticated variable file + - var_name: Variable name used during signing + - var_guid: Variable GUID used during signing + - attributes: Variable attributes used during signing + - verbose: Enable detailed output + + Returns: + ------- + int + 0 if verification succeeds, 1 if verification fails + """ + try: + # Parse the authenticated variable + logger.info(f"Verifying authenticated variable: {args.authvar_file}") + + with open(args.authvar_file, 'rb') as f: + auth_var = EfiVariableAuthentication2(decodefs=f) + + # Reconstruct the signable data using the builder + signing_time = auth_var.time.get_datetime() + builder = EfiVariableAuthentication2Builder( + name=args.var_name, + guid=uuid.UUID(args.var_guid), + attributes=args.attributes, + payload=auth_var.payload, + efi_time=signing_time + ) + signable_data = builder.get_digest() + + if args.verbose: + logger.info(f"Variable Name: {args.var_name}") + logger.info(f"Variable GUID: {args.var_guid}") + logger.info(f"Attributes: {args.attributes}") + logger.info(f"Signing Time: {signing_time}") + logger.info(f"Payload Size: {len(auth_var.payload)} bytes") + logger.info(f"Signable Data SHA256: {hashlib.sha256(signable_data).hexdigest()}") + + # Extract PKCS7 signature (cert_data from edk2toollib is the PKCS7 data) + pkcs7_data = auth_var.auth_info.cert_data + + # Extract certificates + certificates = _extract_certificates_from_pkcs7(pkcs7_data) + + if args.verbose: + logger.info(f"\nCertificates found: {len(certificates)}") + for i, cert in enumerate(certificates, 1): + logger.info(f" Certificate {i}:") + logger.info(f" Subject: {cert.subject.rfc4514_string()}") + logger.info(f" Issuer: {cert.issuer.rfc4514_string()}") + logger.info(f" Valid: {cert.not_valid_before_utc} to {cert.not_valid_after_utc}") + + # Verify the signature + verification_result = _verify_pkcs7_signature(pkcs7_data, certificates, signable_data) + + # Display results + if args.verbose: + logger.info("") + logger.info("Signature Verification Results:") + for signer in verification_result['signers']: + logger.info(f" Signer {signer['index'] + 1}:") + if signer['verified']: + logger.info(" Status: VERIFIED") + else: + logger.info(" Status: FAILED") + if signer['error']: + logger.info(f" Error: {signer['error']}") + + if verification_result['verified']: + logger.info("[+] Authenticated variable signature is VALID") + return 0 + else: + logger.error("[-] Authenticated variable signature verification FAILED") + for error in verification_result['errors']: + logger.error(f" - {error}") + return 1 + + except Exception as e: + logger.error(f"Failed to verify authenticated variable: {e}") + if args.verbose: + import traceback + traceback.print_exc() + return 1 + + def format_variable(args: argparse.Namespace) -> int: """Formats a variable for signing by generating signable data and a receipt file. @@ -389,6 +649,72 @@ def _attach_signature_from_receipt(args: argparse.Namespace) -> int: return 0 +def _convert_hex_strings_to_readable(content: str) -> str: + """Convert hex-encoded strings in describe output to human-readable format. + + This function searches for patterns like "value=0x131a..." in the content + and attempts to decode them as UTF-8 strings. Common encodings in certificates + include PrintableString (0x13), UTF8String (0x0c), and IA5String (0x16). + + Parameters + ---------- + content : str + The original text content from auth_var.print() + + Returns: + ------- + str + The content with hex strings converted to readable format where possible + """ + # Pattern to match hex value lines like " value=0x131a444f204e4f54..." + pattern = r'([ ]*value=)(0x[0-9a-fA-F]+)' + + def decode_hex_value(match: 're.Match[str]') -> str: + indent = match.group(1) + hex_string = match.group(2) + + try: + # Remove "0x" prefix and convert to bytes + hex_bytes = bytes.fromhex(hex_string[2:]) + + # Check if this looks like an ASN.1 encoded string + # Common prefixes: 0x13 (PrintableString), 0x0c (UTF8String), 0x16 (IA5String) + if len(hex_bytes) >= 2 and hex_bytes[0] in [0x13, 0x0c, 0x16]: + # Second byte is the length + length = hex_bytes[1] + if len(hex_bytes) >= 2 + length: + # Extract the string content + string_data = hex_bytes[2:2+length] + try: + # Attempt to decode as UTF-8 + decoded = string_data.decode('utf-8') + # Only replace if it looks like printable text + if decoded.isprintable() or all(c in '\t\n\r' or c.isprintable() for c in decoded): + return f'{indent}{hex_string} ("{decoded}")' + except (UnicodeDecodeError, AttributeError): + # Decoding failed; ignore and try the next decoding strategy. + pass + + # If it's not ASN.1 encoded, try direct UTF-8 decode + # (in case it's just raw string data) + try: + decoded = hex_bytes.decode('utf-8') + if decoded.isprintable() or all(c in '\t\n\r' or c.isprintable() for c in decoded): + return f'{indent}{hex_string} ("{decoded}")' + except UnicodeDecodeError: + # Ignore decode errors: not all byte sequences are valid UTF-8, so skip these cases. + pass + + except (ValueError, IndexError) as e: + # Failed to decode hex string as ASN.1 or UTF-8; this is expected for some values. + logging.debug(f"Failed to decode hex string '{hex_string}': {e}") + + # If decoding fails, return original + return match.group(0) + + return re.sub(pattern, decode_hex_value, content) + + def describe_variable(args: argparse.Namespace) -> int: """Parses and describes an authenticated variable structure. @@ -410,9 +736,20 @@ def describe_variable(args: argparse.Namespace) -> int: name = os.path.basename(args.signed_payload) output_file = os.path.join(args.output_dir, f"{name}.authvar.txt") + # First write to a buffer to capture the output + buffer = io.StringIO() + auth_var.print(outfs=buffer) + + # Get the content and convert hex strings to readable format + content = buffer.getvalue() + readable_content = _convert_hex_strings_to_readable(content) + + # Write the converted content to the output file with open(output_file, "w") as f: - auth_var.print(outfs=f) + f.write(readable_content) + payload_hash = hashlib.sha256(auth_var.payload).hexdigest() + logger.info(f"Payload SHA256: {payload_hash}") logger.info(f"Output: {output_file}") return 0 @@ -567,6 +904,49 @@ def setup_describe_parser(subparsers: argparse._SubParsersAction) -> argparse._S return subparsers +def setup_verify_parser(subparsers: argparse._SubParsersAction) -> argparse._SubParsersAction: + """Sets up the verify parser. + + :param subparsers: - sub parser from argparse to add options to + + :returns: subparser + """ + verify_parser = subparsers.add_parser( + "verify", + help="Verifies the cryptographic signature of an authenticated variable" + ) + verify_parser.set_defaults(function=verify_variable) + + verify_parser.add_argument( + "authvar_file", + type=typecheck_file_exists, + help="Path to the signed authenticated variable file (.authvar.bin)" + ) + + verify_parser.add_argument( + "var_name", + help="Variable name that was used during signing (e.g., 'KEK', 'db', 'PK')" + ) + + verify_parser.add_argument( + "var_guid", + help="Variable GUID that was used during signing (e.g., '8be4df61-93ca-11d2-aa0d-00e098032b8c')" + ) + + verify_parser.add_argument( + "attributes", + help="Comma-separated list of attributes used during signing (e.g., 'NV,BS,RT,AT,AP')" + ) + + verify_parser.add_argument( + "-v", "--verbose", + action="store_true", + help="Enable verbose output with detailed verification information" + ) + + return subparsers + + def parse_args() -> argparse.Namespace: """Parses arguments from the command line.""" parser = argparse.ArgumentParser( @@ -582,9 +962,10 @@ def parse_args() -> argparse.Namespace: "--debug", action="store_true", default=False, help="enables debug printing for deep inspection" ) - subparsers = setup_format_parser(subparsers) - subparsers = setup_sign_parser(subparsers) - subparsers = setup_describe_parser(subparsers) + setup_format_parser(subparsers) + setup_sign_parser(subparsers) + setup_describe_parser(subparsers) + setup_verify_parser(subparsers) args = parser.parse_args() # Create output directory if it doesn't exist (after parsing args) diff --git a/scripts/validate_kek.py b/scripts/validate_kek.py new file mode 100644 index 0000000..36a2fb6 --- /dev/null +++ b/scripts/validate_kek.py @@ -0,0 +1,408 @@ +"""Validate KEK update file(s) and generate a JSON report. + +This script validates authenticated variable files - either a single file or all +files in a specified folder - and generates a JSON report with validation results. +""" + +import argparse +import hashlib +import json +import logging +import sys +from datetime import datetime, timezone +from pathlib import Path + +# Import validation functions from auth_var_tool +sys.path.insert(0, str(Path(__file__).parent)) +# Import the verify function from auth_var_tool +from auth_var_tool import verify_variable +from edk2toollib.uefi.authenticated_variables_structure_support import EfiVariableAuthentication2 + +# Standard KEK parameters +KEK_NAME = "KEK" +KEK_GUID = "8be4df61-93ca-11d2-aa0d-00e098032b8c" +KEK_ATTRIBUTES = "NV,BS,RT,AT,AP" + +# Expected payload hash for Microsoft 2023 KEK (EFI Signature List with x.509) +EXPECTED_PAYLOAD_HASH = "5b85333c009d7ea55cbb6f11a5c2ff45ee1091a968504c929aed25c84674962f" + + +def validate_single_kek( + kek_file: Path, + quiet: bool = False +) -> dict: + """Validate a single KEK update file. + + Args: + kek_file: Path to KEK update file + quiet: If True, suppress validation output from the verification process + + Returns: + dict: Validation result for the file + """ + logging.info(f"Validating: {kek_file.name}") + + file_result = { + "filename": kek_file.name, + "path": str(kek_file), + "valid": False, + "payload_hash_valid": False, + "error": None, + "warnings": [], + "details": {} + } + + try: + # First, parse the authenticated variable to check payload hash + with open(kek_file, 'rb') as f: + auth_var = EfiVariableAuthentication2(decodefs=f) + payload = auth_var.payload + payload_hash = hashlib.sha256(payload).hexdigest() + + file_result["payload_hash"] = payload_hash + file_result["payload_size"] = len(payload) + file_result["payload_hash_valid"] = (payload_hash.lower() == EXPECTED_PAYLOAD_HASH.lower()) + + if not file_result["payload_hash_valid"]: + warning_msg = f"Payload hash mismatch: expected {EXPECTED_PAYLOAD_HASH}, got {payload_hash}" + file_result["warnings"].append(warning_msg) + logging.warning(" [!] Payload hash mismatch!") + logging.warning(f" Expected: {EXPECTED_PAYLOAD_HASH}") + logging.warning(f" Got: {payload_hash}") + + # Validate the file using auth_var_tool.verify_variable + # Create a namespace object with the required arguments + verify_args = argparse.Namespace( + authvar_file=str(kek_file), + var_name=KEK_NAME, + var_guid=KEK_GUID, + attributes=KEK_ATTRIBUTES, + verbose=False + ) + + # Capture logger output if in quiet mode + if quiet: + # Temporarily increase logger level to suppress INFO messages + original_level = logging.root.level + logging.root.setLevel(logging.ERROR) + + try: + # verify_variable returns 0 for success, 1 for failure + exit_code = verify_variable(verify_args) + file_result["valid"] = (exit_code == 0) + + if not file_result["valid"]: + file_result["warnings"].append("Signature verification failed") + finally: + if quiet: + # Restore original logger level + logging.root.setLevel(original_level) + + # Store basic details + file_result["details"] = { + "verified": file_result["valid"] + } + + # Display results + sig_status = "VALID" if file_result["valid"] else "INVALID" + payload_status = "True" if file_result["payload_hash_valid"] else "False" + + logging.info(f" Cryptographic Signature: {sig_status}") + logging.info(f" Expected Payload: {payload_status}\n") + + except Exception as e: + file_result["error"] = str(e) + logging.error(f" [X] ERROR: {e}\n") + + return file_result + + +def validate_kek_folder( + folder_path: Path, + output_file: Path = None, + quiet: bool = False, + recursive: bool = False +) -> dict: + """Validate all .bin files in the specified folder. + + Args: + folder_path: Path to folder containing KEK update files + output_file: Optional path to output JSON file + quiet: If True, suppress validation output from the verification process + recursive: If True, process subdirectories recursively + + Returns: + dict: Validation results + """ + results = { + "validation_date": datetime.now(timezone.utc).isoformat(), + "folder": str(folder_path), + "parameters": { + "var_name": KEK_NAME, + "var_guid": KEK_GUID, + "attributes": KEK_ATTRIBUTES + }, + "files": {}, + "by_manufacturer": {} + } + + # Find all .bin files (recursively if requested) + if recursive: + bin_files = sorted(folder_path.rglob("*.bin")) + else: + bin_files = sorted(folder_path.glob("*.bin")) + + if not bin_files: + logging.warning(f"No .bin files found in {folder_path}") + # Initialize empty summary for consistency + results["summary"] = { + "total": 0, + "valid": 0, + "invalid": 0, + "manufacturers": 0 + } + return results + + logging.info(f"Found {len(bin_files)} files to validate\n") + + # Validate each file + for bin_file in bin_files: + # Determine manufacturer (relative path from base folder) + relative_path = bin_file.relative_to(folder_path) + if len(relative_path.parts) > 1: + manufacturer = relative_path.parts[0] + else: + manufacturer = "root" + + logging.info(f"Validating: {relative_path}") + + file_result = { + "filename": bin_file.name, + "relative_path": str(relative_path), + "manufacturer": manufacturer, + "path": str(bin_file), + "valid": False, + "payload_hash_valid": False, + "error": None, + "warnings": [], + "details": {} + } + + try: + # First, parse the authenticated variable to check payload hash + with open(bin_file, 'rb') as f: + auth_var = EfiVariableAuthentication2(decodefs=f) + payload = auth_var.payload + payload_hash = hashlib.sha256(payload).hexdigest() + + file_result["payload_hash"] = payload_hash + file_result["payload_size"] = len(payload) + file_result["payload_hash_valid"] = (payload_hash.lower() == EXPECTED_PAYLOAD_HASH.lower()) + + if not file_result["payload_hash_valid"]: + warning_msg = f"Payload hash mismatch: expected {EXPECTED_PAYLOAD_HASH}, got {payload_hash}" + file_result["warnings"].append(warning_msg) + logging.warning(" [!] Payload hash mismatch!") + logging.warning(f" Expected: {EXPECTED_PAYLOAD_HASH}") + logging.warning(f" Got: {payload_hash}") + + # Validate the file using auth_var_tool.verify_variable + # Create a namespace object with the required arguments + verify_args = argparse.Namespace( + authvar_file=str(bin_file), + var_name=KEK_NAME, + var_guid=KEK_GUID, + attributes=KEK_ATTRIBUTES, + verbose=False + ) + + # Capture logger output if in quiet mode + if quiet: + # Temporarily increase logger level to suppress INFO messages + original_level = logging.root.level + logging.root.setLevel(logging.ERROR) + + try: + # verify_variable returns 0 for success, 1 for failure + exit_code = verify_variable(verify_args) + file_result["valid"] = (exit_code == 0) + + if not file_result["valid"]: + file_result["warnings"].append("Signature verification failed") + finally: + if quiet: + # Restore original logger level + logging.root.setLevel(original_level) + + # Store basic details + file_result["details"] = { + "verified": file_result["valid"] + } + + # Display results + sig_status = "VALID" if file_result["valid"] else "INVALID" + payload_status = "True" if file_result["payload_hash_valid"] else "False" + + logging.info(f" Cryptographic Signature: {sig_status}") + logging.info(f" Expected Payload: {payload_status}\n") + + except Exception as e: + file_result["error"] = str(e) + logging.error(f" [X] ERROR: {e}") + + results["files"][str(relative_path)] = file_result + + # Add to manufacturer grouping + if manufacturer not in results["by_manufacturer"]: + results["by_manufacturer"][manufacturer] = { + "files": [], + "valid": 0, + "invalid": 0 + } + results["by_manufacturer"][manufacturer]["files"].append(str(relative_path)) + if file_result["valid"]: + results["by_manufacturer"][manufacturer]["valid"] += 1 + else: + results["by_manufacturer"][manufacturer]["invalid"] += 1 + + # Generate summary + valid_count = sum(1 for r in results["files"].values() if r["valid"]) + invalid_count = len(results["files"]) - valid_count + + results["summary"] = { + "total": len(results["files"]), + "valid": valid_count, + "invalid": invalid_count, + "manufacturers": len(results["by_manufacturer"]) + } + + logging.info(f"\n{'='*60}") + logging.info("SUMMARY:") + logging.info(f" Total Files: {results['summary']['total']}") + logging.info(f" Valid: {results['summary']['valid']}") + logging.info(f" Invalid: {results['summary']['invalid']}") + if recursive: + logging.info(f" Manufacturers: {results['summary']['manufacturers']}") + logging.info("") + logging.info("By Manufacturer:") + for mfr, data in sorted(results["by_manufacturer"].items()): + logging.info( + f" {mfr:30s} Total: {len(data['files']):3d} Valid: {data['valid']:3d} Invalid: {data['invalid']:3d}" + ) + logging.info(f"{'='*60}") + + # Save to file if requested + if output_file: + with open(output_file, 'w') as f: + json.dump(results, f, indent=2) + logging.info(f"\nResults saved to: {output_file}") + + return results + + +def main() -> int: + """Main entry point for validating KEK update file(s).""" + parser = argparse.ArgumentParser( + description="Validate KEK update file(s) - single file or folder" + ) + parser.add_argument( + "path", + type=Path, + help="Path to a KEK update file (.bin) or folder containing KEK update files" + ) + parser.add_argument( + "-o", "--output", + type=Path, + default=None, + help="Path to output JSON file (default: _validation_results.json, always generated)" + ) + parser.add_argument( + "-r", "--recursive", + action="store_true", + help="Process subdirectories recursively (only applicable for folders)" + ) + parser.add_argument( + "-v", "--verbose", + action="store_true", + help="Enable verbose logging" + ) + parser.add_argument( + "-q", "--quiet", + action="store_true", + help="Suppress validation output (show only summary)" + ) + + args = parser.parse_args() + + # Setup logging + log_level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig( + level=log_level, + format='%(message)s' + ) + + # Validate path exists + if not args.path.exists(): + logging.error(f"Path not found: {args.path}") + return 1 + + # Determine if path is a file or directory + if args.path.is_file(): + # Validate single file + if not args.path.suffix == '.bin': + logging.error(f"File must have .bin extension: {args.path}") + return 1 + + # Determine output file + if args.output is None: + output_file = args.path.parent / f"{args.path.stem}_validation_results.json" + else: + output_file = args.output + + # Validate the single file + file_result = validate_single_kek(args.path, quiet=args.quiet) + + # Create results structure + results = { + "validation_date": datetime.now(timezone.utc).isoformat(), + "file": str(args.path), + "parameters": { + "var_name": KEK_NAME, + "var_guid": KEK_GUID, + "attributes": KEK_ATTRIBUTES + }, + "result": file_result + } + + # Save to file + with open(output_file, 'w') as f: + json.dump(results, f, indent=2) + logging.info(f"Results saved to: {output_file}") + + # Return exit code based on validation + return 0 if file_result["valid"] else 1 + + elif args.path.is_dir(): + # Validate folder + # Determine output file + if args.output is None: + output_file = args.path.parent / f"{args.path.name}_validation_results.json" + else: + output_file = args.output + + # Run validation + results = validate_kek_folder(args.path, output_file, quiet=args.quiet, recursive=args.recursive) + + # Return exit code based on results + if results["summary"]["invalid"] > 0: + return 1 + + return 0 + + else: + logging.error(f"Invalid path type: {args.path}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) From e6489cbad82bab4b3949db9ace7957ce5bd37b00 Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Fri, 3 Apr 2026 12:56:06 -0700 Subject: [PATCH 2/4] Build: Add CI workflow for KEK validation Add a pull request workflow to run KEK validation checks in CI and surface failures early during review. Signed-off-by: Doug Flick dougflick@microsoft.com --- .github/workflows/validate-kek-updates.yml | 223 +++++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 .github/workflows/validate-kek-updates.yml diff --git a/.github/workflows/validate-kek-updates.yml b/.github/workflows/validate-kek-updates.yml new file mode 100644 index 0000000..42d7554 --- /dev/null +++ b/.github/workflows/validate-kek-updates.yml @@ -0,0 +1,223 @@ +# This workflow validates KEK update files in pull requests to ensure they have +# valid cryptographic signatures and expected payloads before merging. +# +# Copyright (c) Microsoft Corporation. +# SPDX-License-Identifier: BSD-2-Clause-Patent +name: Validate KEK Updates + +on: + pull_request: + branches: [ "main" ] + paths: + - 'PostSignedObjects/KEK/**/*.bin' + - 'PreSignedObjects/KEK/**/*.bin' + +permissions: + contents: read + issues: write + pull-requests: write + +jobs: + validate-kek: + name: Validate KEK Update Files + runs-on: ubuntu-latest + + steps: + - name: Checkout PR + uses: actions/checkout@v4 + with: + fetch-depth: 0 # Need full history to compare with base branch + + - name: Set up Python + uses: actions/setup-python@v6 + with: + python-version: 3.12 + cache: 'pip' + cache-dependency-path: pip-requirements.txt + + - name: Install Pip Dependencies + run: | + python -m pip install --upgrade pip + pip install -r pip-requirements.txt + + - name: Get Changed KEK Files + id: changed-files + run: | + # Get list of changed .bin files in KEK directories + git fetch origin ${{ github.base_ref }} + CHANGED_FILES=$(git diff --name-only --diff-filter=AM origin/${{ github.base_ref }}...HEAD | grep -E '(PostSignedObjects|PreSignedObjects)/KEK/.*\.bin$' || echo "") + + if [ -z "$CHANGED_FILES" ]; then + echo "No KEK files changed" + echo "has_changes=false" >> $GITHUB_OUTPUT + else + echo "Changed KEK files:" + echo "$CHANGED_FILES" + echo "has_changes=true" >> $GITHUB_OUTPUT + # Save changed files to a temporary file + echo "$CHANGED_FILES" > changed_kek_files.txt + fi + + - name: Validate Changed KEK Files + id: validate + if: steps.changed-files.outputs.has_changes == 'true' + run: | + VALIDATION_FAILED=0 + VALIDATION_RESULTS_DIR="kek_validation_results" + mkdir -p "$VALIDATION_RESULTS_DIR" + + # Accumulators for PR comment data + ALL_OUTPUT="" + ALL_JSON="" + ALL_HASHES="" + + echo "## KEK Validation Results" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + + while IFS= read -r file; do + if [ -f "$file" ]; then + echo "Validating: $file" + BASENAME=$(basename "$file" .bin) + OUTPUT_JSON="$VALIDATION_RESULTS_DIR/${BASENAME}_validation.json" + + # Compute SHA-256 of the binary + FILE_HASH=$(sha256sum "$file" | awk '{print $1}') + ALL_HASHES="${ALL_HASHES}${file}: ${FILE_HASH}\n" + + # Run validation and capture both exit code and stdout/stderr + CMD="python scripts/validate_kek.py \"$file\" -o \"$OUTPUT_JSON\" -q" + CMD_OUTPUT=$(python scripts/validate_kek.py "$file" -o "$OUTPUT_JSON" -q 2>&1) || true + CMD_EXIT=$? + ALL_OUTPUT="${ALL_OUTPUT}\$ ${CMD}\n${CMD_OUTPUT}\n\n" + + if [ -f "$OUTPUT_JSON" ]; then + # Parse JSON to check both signature and payload + SIGNATURE_VALID=$(jq -r '.result.valid' "$OUTPUT_JSON") + PAYLOAD_VALID=$(jq -r '.result.payload_hash_valid' "$OUTPUT_JSON") + JSON_CONTENT=$(cat "$OUTPUT_JSON") + ALL_JSON="${ALL_JSON}### ${file}\n\`\`\`json\n${JSON_CONTENT}\n\`\`\`\n\n" + + if [ "$SIGNATURE_VALID" = "true" ] && [ "$PAYLOAD_VALID" = "true" ]; then + echo "✅ **PASS**: \`$file\`" >> $GITHUB_STEP_SUMMARY + echo " - Cryptographic Signature: ✅ VALID" >> $GITHUB_STEP_SUMMARY + echo " - Expected Payload: ✅ True" >> $GITHUB_STEP_SUMMARY + elif [ "$SIGNATURE_VALID" = "true" ] && [ "$PAYLOAD_VALID" = "false" ]; then + echo "⚠️ **WARNING**: \`$file\`" >> $GITHUB_STEP_SUMMARY + echo " - Cryptographic Signature: ✅ VALID" >> $GITHUB_STEP_SUMMARY + echo " - Expected Payload: ⚠️ False (non-standard payload)" >> $GITHUB_STEP_SUMMARY + PAYLOAD_HASH=$(jq -r '.result.payload_hash' "$OUTPUT_JSON") + echo " - Payload Hash: \`$PAYLOAD_HASH\`" >> $GITHUB_STEP_SUMMARY + # Don't fail on payload mismatch, just warn + else + echo "❌ **FAIL**: \`$file\`" >> $GITHUB_STEP_SUMMARY + echo " - Cryptographic Signature: ❌ INVALID" >> $GITHUB_STEP_SUMMARY + echo " - Expected Payload: $([ "$PAYLOAD_VALID" = "true" ] && echo "✅ True" || echo "⚠️ False")" >> $GITHUB_STEP_SUMMARY + VALIDATION_FAILED=1 + fi + else + echo "❌ **FAIL**: \`$file\` - Validation script failed" >> $GITHUB_STEP_SUMMARY + ALL_JSON="${ALL_JSON}### ${file}\nNo JSON output produced.\n\n" + VALIDATION_FAILED=1 + fi + echo "" >> $GITHUB_STEP_SUMMARY + fi + done < changed_kek_files.txt + + # Save comment body to a file for the comment step + COMMENT_FILE="kek_validation_comment.md" + echo '' > "$COMMENT_FILE" + echo '❌ **KEK Validation Failed**' >> "$COMMENT_FILE" + echo '' >> "$COMMENT_FILE" + echo '### File Hashes (SHA-256)' >> "$COMMENT_FILE" + echo '```' >> "$COMMENT_FILE" + printf '%b' "$ALL_HASHES" >> "$COMMENT_FILE" + echo '```' >> "$COMMENT_FILE" + echo '' >> "$COMMENT_FILE" + echo '### Command Output' >> "$COMMENT_FILE" + echo '```' >> "$COMMENT_FILE" + printf '%b' "$ALL_OUTPUT" >> "$COMMENT_FILE" + echo '```' >> "$COMMENT_FILE" + echo '' >> "$COMMENT_FILE" + echo '### Validation Results' >> "$COMMENT_FILE" + printf '%b' "$ALL_JSON" >> "$COMMENT_FILE" + echo '' >> "$COMMENT_FILE" + echo '### Reproduce Locally' >> "$COMMENT_FILE" + echo '```' >> "$COMMENT_FILE" + echo 'pip install -r pip-requirements.txt' >> "$COMMENT_FILE" + echo 'python scripts/validate_kek.py -v' >> "$COMMENT_FILE" + echo '```' >> "$COMMENT_FILE" + + # Append detailed results to step summary + echo "" >> $GITHUB_STEP_SUMMARY + echo "### File Hashes (SHA-256)" >> $GITHUB_STEP_SUMMARY + echo '```' >> $GITHUB_STEP_SUMMARY + printf '%b' "$ALL_HASHES" >> $GITHUB_STEP_SUMMARY + echo '```' >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "### Command Output" >> $GITHUB_STEP_SUMMARY + echo '```' >> $GITHUB_STEP_SUMMARY + printf '%b' "$ALL_OUTPUT" >> $GITHUB_STEP_SUMMARY + echo '```' >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + printf '%b' "$ALL_JSON" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "### Reproduce Locally" >> $GITHUB_STEP_SUMMARY + echo '```' >> $GITHUB_STEP_SUMMARY + echo "pip install -r pip-requirements.txt" >> $GITHUB_STEP_SUMMARY + echo "python scripts/validate_kek.py -v" >> $GITHUB_STEP_SUMMARY + echo '```' >> $GITHUB_STEP_SUMMARY + + # Upload validation results as artifact + if [ -d "$VALIDATION_RESULTS_DIR" ] && [ "$(ls -A $VALIDATION_RESULTS_DIR)" ]; then + echo "Uploading validation results..." + fi + + # Exit with error if any validation failed + if [ $VALIDATION_FAILED -eq 1 ]; then + echo "::error::One or more KEK files have invalid cryptographic signatures" + exit 1 + fi + + - name: Upload Validation Results + if: steps.changed-files.outputs.has_changes == 'true' && always() + uses: actions/upload-artifact@v4 + with: + name: kek-validation-results + path: kek_validation_results/ + retention-days: 30 + + - name: Comment on PR + if: steps.changed-files.outputs.has_changes == 'true' && failure() + continue-on-error: true + uses: actions/github-script@v7 + with: + script: | + const fs = require('fs'); + const marker = ''; + const body = fs.readFileSync('kek_validation_comment.md', 'utf8') + + '\n\n_Updated: ' + new Date().toISOString() + '_'; + try { + const { data: comments } = await github.rest.issues.listComments({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: context.issue.number + }); + const existing = comments.find(c => c.body.includes(marker)); + if (existing) { + await github.rest.issues.updateComment({ + owner: context.repo.owner, + repo: context.repo.repo, + comment_id: existing.id, + body + }); + } else { + await github.rest.issues.createComment({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: context.issue.number, + body + }); + } + } catch (error) { + core.warning(`Unable to post PR comment: ${error.message}`) + } From 358e77f199a6d5e4cd2a57385e28bb850d3c48d4 Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Fri, 3 Apr 2026 15:42:11 -0700 Subject: [PATCH 3/4] Test --- .../KEK/Test/KEKUpdate_Microsoft_PK2.bin | Bin 0 -> 3637 bytes .../KEKUpdate_Microsoft_PK2.bin:Zone.Identifier | Bin 0 -> 25 bytes 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 PostSignedObjects/KEK/Test/KEKUpdate_Microsoft_PK2.bin create mode 100644 PostSignedObjects/KEK/Test/KEKUpdate_Microsoft_PK2.bin:Zone.Identifier diff --git a/PostSignedObjects/KEK/Test/KEKUpdate_Microsoft_PK2.bin b/PostSignedObjects/KEK/Test/KEKUpdate_Microsoft_PK2.bin new file mode 100644 index 0000000000000000000000000000000000000000..2e462d6f82794394e423d83e5be4744ab1d5ca6e GIT binary patch literal 3637 zcmd6p3pCW*9>?cDFAQdAJbGcoAmg!T7>^3cBN93W$t&{6gc%JX;bihib45tVGxA7M zQ51UMq;x1!Y4oB(q0q|_$N5jMfXE7UF*(TGyDJh?fLEB{_p+W`}5-qp|C_r zX~egWnK1l>oW$q%)NFVX@Y?!7rEZMAvtK#j3acU^h)MwBSP`#8E65LpMnV__5XFk9 zCt4+b6o-VMA}&@3aIxBeiDS1~6IM!`LjU1i%ZNLPQ}4Ki^j3 zhrZ?i8O-f0KMvqRD;BOI zybmZYL_olP9Fhw`h@*}L$&31*o^G2iF6YauVk|b4rk~tlqIa>;#P(OHpw4B6OANTHT&DwDY*ZT7))&FOBY<>|Y^Sr4NhwMI`?;}^v~r+3LN z>bg~UQE6T;IK&aBamwPIDXX#k-m(^~p9zP)Zm~^l&w*um+Za<4&0_I~F@fHfhs-}Q z@-OfWAFA<|z8h#+o{)Wx^iy~;=Jo^N9F5-hU6}BylZXf9%Lo|2!`d;KJHTL9E%^i$2?^8((;!i z^ZE^ATeX01abm59xa2G z%JJ5#HLh~pN-?|F2+~YP2>09bfxe(y5)E%osRmI0CO}c5&{q*rWq_1m+b~3NnvOnI zUx%U#bbuZWCy#t0U15K0yvRCZR;RCkd4+`i2;17I@wiS6L zdIp4rg{;!nj);iR3jQW$EgyEUc1R$T9TKYT!wJ&LHHwGgFLFf>`0zuzwfK2rIVQxQ8Sm=MBJq%uT_;ra8G>=9C z#6sD^habg7LI^c$oASw@=GngF@`n{oUgGO-97JfHmrkp5lGEN0)}|@{&}7<%VP~jC zU=y91KeqO#PbrppqWLph4|HF8oj_((KJ<)Ne3g%P}Gp+hq4soD^75Bg%yvk9l4}93I*S zwfvG0g(UWmZAZrU2}=~`p7Rwp z(rA+1-??}^@7-nB5m|bOm@=jG*7-T~hyEMK-+DtLzLICN?E+5vDXy42mpnR+QTjL= zE3?dL-|LbQRKQ4st(rWoQHVqFc~}2i^;pLtFNG*NBYs+;>qysA19PE*dGhMVRyT7p zSOwiL9b&pyN*A;4E_s}1doUUwSN%$IFF(H*;`LPj%Fs^s)t0gTIqm0bTPblIPlABm0wpWQe8}F$fw4=X3`H|N( zzU0r7_}XzQ7rF@_)OMIJAg=KLhX?8)otL)O&h=dPmhP7Ws~=}}&d>`JR&S3rLuwS% zX6~pyT zA5{Squu^bIqlhcM-=8E0hAGM1FP!P)7fNzqv*EHqz<(N5z<-(!e2LQpmJS`}zmCBd zQ%7I|_x~)nCZK%Dt^8;11r-X(#=?d~`&xa#d4B};XP*0cp&G5YwwLd_;xs1-lPYxm zZfnNQGj16UH3QO59`g*FYc~4rV!YtvXxZ92D(5MeGII*Y_$>!9y6HrM&*iXVoQjpP zg*3&L!du!R+SJoGCY?v=kLC`x-l(%uZ1v83CHvEvVW~l7AFaDn*N#se^HgA7`$@cf zlUwmy9_=I&-`ZuJ* zqJ)wCRhICRK$w4moJRc&a?Uq!!yt!?2e9p%4TIeIhZ+9~$D|EtpDYo~U9wx1>vRzr7m0W+f zCAG@3)uVQ9{CL6p@OnzZe0FhR;PRu!A7A5TOXl>n2RhZVm#@uH=|H8F(O>o8%6T%U z6d{DXDxn^Kb{*AOYiRo5D>2;(yC)%liSE92C(qZC{i|Z<%{LfYqi|VYp@o5mT~A(m z^2^ZYQNnb2`?8jcfyVU;j=I(}bx}9@MG9NuDC!^ftDK$*ni{>@cgl2tJ$sor!jc*u zZb$Lf=13S|gF~93>yJ~M_UR*4G zYvN6nu)O{B?6o4u61}~mf}PvlM2wHU>BSeM y-%-f*ldx!rg|U9|kE2lO&2hYr%l?^^`s{W^5)`H*b+qdx$!piAli literal 0 HcmV?d00001 diff --git a/PostSignedObjects/KEK/Test/KEKUpdate_Microsoft_PK2.bin:Zone.Identifier b/PostSignedObjects/KEK/Test/KEKUpdate_Microsoft_PK2.bin:Zone.Identifier new file mode 100644 index 0000000000000000000000000000000000000000..d6c1ec682968c796b9f5e9e080cc6f674b57c766 GIT binary patch literal 25 dcma!!%Fjy;DN4*MPD?F{<>dl#JyUFr831@K2x Date: Fri, 3 Apr 2026 15:48:07 -0700 Subject: [PATCH 4/4] fixup kek-updates --- .github/workflows/validate-kek-updates.yml | 43 +++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/.github/workflows/validate-kek-updates.yml b/.github/workflows/validate-kek-updates.yml index 42d7554..937271a 100644 --- a/.github/workflows/validate-kek-updates.yml +++ b/.github/workflows/validate-kek-updates.yml @@ -186,11 +186,22 @@ jobs: path: kek_validation_results/ retention-days: 30 + - name: Generate Token + id: app-token + if: steps.changed-files.outputs.has_changes == 'true' && always() + continue-on-error: true + uses: actions/create-github-app-token@v3 + with: + app-id: ${{ vars.MU_ACCESS_APP_ID }} + private-key: ${{ secrets.MU_ACCESS_APP_PRIVATE_KEY }} + owner: ${{ github.repository_owner }} + - name: Comment on PR - if: steps.changed-files.outputs.has_changes == 'true' && failure() + if: steps.changed-files.outputs.has_changes == 'true' && failure() && steps.app-token.outputs.token continue-on-error: true uses: actions/github-script@v7 with: + github-token: ${{ steps.app-token.outputs.token }} script: | const fs = require('fs'); const marker = ''; @@ -221,3 +232,33 @@ jobs: } catch (error) { core.warning(`Unable to post PR comment: ${error.message}`) } + + - name: Update PR Comment on Success + if: steps.changed-files.outputs.has_changes == 'true' && success() && steps.app-token.outputs.token + continue-on-error: true + uses: actions/github-script@v7 + with: + github-token: ${{ steps.app-token.outputs.token }} + script: | + const marker = ''; + try { + const { data: comments } = await github.rest.issues.listComments({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: context.issue.number + }); + const existing = comments.find(c => c.body.includes(marker)); + if (existing) { + const body = marker + '\n\u2705 **KEK Validation Passed**\n\n' + + 'All KEK update files have valid cryptographic signatures.\n\n' + + '_Updated: ' + new Date().toISOString() + '_'; + await github.rest.issues.updateComment({ + owner: context.repo.owner, + repo: context.repo.repo, + comment_id: existing.id, + body + }); + } + } catch (error) { + core.warning(`Unable to update PR comment: ${error.message}`) + }