Source code for nwp500.mqtt.utils

"""
MQTT utility functions and data structures for Navien Smart Control.

This module provides utility functions for redacting sensitive information,
configuration classes, and common data structures used across MQTT modules.
"""

from __future__ import annotations

import re
import uuid
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any

from awscrt import mqtt

from ..config import AWS_IOT_ENDPOINT, AWS_REGION

__author__ = "Emmanuel Levijarvi"
__copyright__ = "Emmanuel Levijarvi"
__license__ = "MIT"

# Pre-compiled regex patterns for performance
_MAC_PATTERNS = [
    re.compile(r"(navilink-)[0-9a-fA-F]{12}"),
    re.compile(r"\b[0-9a-fA-F]{12}\b"),
    re.compile(r"\b([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\b"),
    re.compile(r"\b([0-9a-fA-F]{2}-){5}[0-9a-fA-F]{2}\b"),
]


[docs] def redact(obj: Any, keys_to_redact: set[str] | None = None) -> Any: """Return a redacted copy of obj with sensitive keys masked. This is a lightweight sanitizer for log messages to avoid emitting secrets such as access keys, session tokens, passwords, emails, clientIDs and sessionIDs. Args: obj: Object to redact (dict, list, tuple, or primitive) keys_to_redact: Set of key names to redact (uses defaults if None) Returns: Redacted copy of the object """ if keys_to_redact is None: keys_to_redact = { "access_key_id", "secret_access_key", "secret_key", "session_token", "sessionToken", "sessionID", "clientID", "clientId", "client_id", "password", "pushToken", "push_token", "token", "auth", "macAddress", "mac_address", "email", } # Primitive types: return as-is if obj is None or isinstance(obj, (bool, int, float)): return obj if isinstance(obj, str): # avoid printing long secret-like strings fully if len(obj) > 256: return obj[:64] + "...<redacted>..." + obj[-64:] return obj # dicts: redact sensitive keys recursively if isinstance(obj, dict): redacted: dict[Any, Any] = {} for k, v in obj.items(): if str(k) in keys_to_redact: redacted[k] = "<REDACTED>" else: redacted[k] = redact(v, keys_to_redact) return redacted # lists / tuples: redact elements if isinstance(obj, (list, tuple)): # Explicitly annotate generator expression to avoid unknown types return type(obj)(redact(v, keys_to_redact) for v in obj) # fallback: represent object as string but avoid huge dumps try: s = str(obj) if len(s) > 512: return s[:256] + "...<redacted>..." return s except Exception: return "<UNREPRESENTABLE>"
[docs] def redact_topic(topic: str) -> str: """ Redact sensitive information from MQTT topic strings. Topics often contain MAC addresses or device unique identifiers, e.g.: - cmd/52/navilink-04786332fca0/st/did - cmd/52/navilink-04786332fca0/ctrl - cmd/52/04786332fca0/ctrl - or with colons/hyphens (04:78:63:32:fc:a0 or 04-78-63-32-fc-a0) Args: topic: MQTT topic string Returns: Topic with MAC addresses redacted Note: Uses pre-compiled regex patterns for better performance. """ # Extra safety: catch any remaining hexadecimal or device-related sequences # MAC/device length w/ possible delimiters, prefixes, or casing for pattern in _MAC_PATTERNS: topic = pattern.sub("REDACTED", topic) # Defensive: Cleanup for most common MAC and device ID patterns topic = re.sub( r"([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})", "REDACTED", topic ) # 01:23:45:67:89:ab topic = re.sub( r"([0-9A-Fa-f]{2}-){5}[0-9A-Fa-f]{2}", "REDACTED", topic ) # 01-23-45-67-89-ab topic = re.sub(r"([0-9A-Fa-f]{12})", "REDACTED", topic) # 0123456789ab topic = re.sub( r"(navilink-)[0-9A-Fa-f]{8,}", r"\1REDACTED", topic ) # navilink-xxxxxxx # Further defensive: catch anything that looks like a device ID # (alphanumeric, 8+ chars) topic = re.sub(r"(device[-_]?)?[0-9A-Fa-f]{8,}", "REDACTED", topic) # Final fallback: catch any continuous hex/alphanumeric string # longer than 8 chars (to cover variant IDs) topic = re.sub(r"[0-9A-Fa-f]{8,}", "REDACTED", topic) return topic
[docs] def redact_mac(mac: str | None) -> str: """Mask a MAC address or device ID for safe logging. Args: mac: The MAC address or device ID to redact (e.g., 'navilink-0123456789ab') Returns: A redacted string like 'navilink-01...89ab' or '<REDACTED>' """ if not mac: return "<REDACTED>" # Handle navilink- prefix prefix = "" if mac.startswith("navilink-"): prefix = "navilink-" mac = mac[len("navilink-") :] if len(mac) <= 4: return f"{prefix}<REDACTED>" # Mask central part, keeping first 2 and last 4 return f"{prefix}{mac[:2]}...{mac[-4:]}"
[docs] def redact_serial(serial: str | None) -> str: """Mask a serial number for safe logging. Args: serial: Serial number to redact Returns: Redacted serial like 'AB...1234' """ if not serial: return "<REDACTED>" if len(serial) <= 6: return "<REDACTED>" # Mask central part, keeping first 2 and last 4 return f"{serial[:2]}...{serial[-4:]}"
[docs] @dataclass class MqttConnectionConfig: """Configuration for MQTT connection. Attributes: endpoint: AWS IoT endpoint URL region: AWS region client_id: MQTT client ID (auto-generated if None) clean_session: Whether to start with a clean session keep_alive_secs: Keep-alive interval in seconds auto_reconnect: Enable automatic reconnection max_reconnect_attempts: Maximum reconnection attempts (-1 for unlimited) initial_reconnect_delay: Initial delay between reconnect attempts max_reconnect_delay: Maximum delay between reconnect attempts reconnect_backoff_multiplier: Exponential backoff multiplier deep_reconnect_threshold: Attempt count to trigger full connection rebuild enable_command_queue: Enable command queueing when disconnected max_queued_commands: Maximum number of queued commands """ endpoint: str = AWS_IOT_ENDPOINT region: str = AWS_REGION client_id: str | None = None clean_session: bool = True keep_alive_secs: int = 1200 # Reconnection settings auto_reconnect: bool = True max_reconnect_attempts: int = -1 # -1 = unlimited retries initial_reconnect_delay: float = 1.0 # seconds max_reconnect_delay: float = 120.0 # seconds reconnect_backoff_multiplier: float = 2.0 deep_reconnect_threshold: int = ( 10 # Switch to full rebuild after N attempts ) # Command queue settings enable_command_queue: bool = True max_queued_commands: int = 100 def __post_init__(self) -> None: """Generate client ID if not provided and validate settings.""" if not self.client_id: object.__setattr__( self, "client_id", f"navien-client-{uuid.uuid4().hex[:8]}" ) if self.deep_reconnect_threshold < 1: object.__setattr__(self, "deep_reconnect_threshold", 1)
[docs] @dataclass class QueuedCommand: """Represents a command that is queued for sending when reconnected. Attributes: topic: MQTT topic to publish to payload: Command payload dictionary qos: Quality of Service level timestamp: Time when command was queued """ topic: str payload: dict[str, Any] qos: mqtt.QoS timestamp: datetime
[docs] class PeriodicRequestType(Enum): """Types of periodic requests that can be sent. Attributes: DEVICE_INFO: Request device information periodically DEVICE_STATUS: Request device status periodically """ DEVICE_INFO = "device_info" DEVICE_STATUS = "device_status"
_ALT_KEYS: dict[str, str] = { "status": "st", "feature": "did", } _SENTINEL = object()
[docs] def get_response_data(message: dict[str, Any], key: str | None) -> Any: """Extract data from an MQTT message, supporting key variants. Checks both the nested ``response`` dict and the top-level message, using both the primary key and its alternate short-form name (e.g. ``"status"`` / ``"st"``, ``"feature"`` / ``"did"``). Lookup order preserves a strict *nested-first* precedence: 1. ``response[key]`` 2. ``response[alt_key]`` 3. ``message[key]`` 4. ``message[alt_key]`` Key presence is checked explicitly (not by truthiness), so falsy values like ``0``, ``False``, or ``{}`` are returned correctly and do not fall through to a lower-precedence candidate. Args: message: Raw MQTT message dict. key: Primary key to look up. When ``None``, the nested ``response`` dict is returned directly. Returns: The value of the first *present* key in priority order, or ``None`` if no candidate key is found. """ res: dict[str, Any] = message.get("response", {}) if key is None: return res alt_key = _ALT_KEYS.get(key) for source, k in ( (res, key), (res, alt_key), (message, key), (message, alt_key), ): if k is not None: value = source.get(k, _SENTINEL) if value is not _SENTINEL: return value return None
[docs] def topic_matches_pattern(topic: str, pattern: str) -> bool: """ Check if a topic matches a subscription pattern with wildcards. Supports MQTT wildcards: - '+' matches a single level - '#' matches multiple levels (must be at end) Args: topic: Actual topic (e.g., "cmd/52/navilink-ABC/status") pattern: Pattern with wildcards (e.g., "cmd/52/+/#") Returns: True if topic matches pattern Examples: >>> topic_matches_pattern("cmd/52/device1/status", "cmd/52/+/status") True >>> topic_matches_pattern( ... "cmd/52/device1/status/extra", "cmd/52/device1/#" ... ) True """ # Handle exact match if topic == pattern: return True # Handle wildcards topic_parts = topic.split("/") pattern_parts = pattern.split("/") # Multi-level wildcard # matches everything after if "#" in pattern_parts: hash_idx = pattern_parts.index("#") # Must be at the end if hash_idx != len(pattern_parts) - 1: return False # Topic must have at least as many parts as before the # if len(topic_parts) < hash_idx: return False # Check parts before # with + wildcard support for i in range(hash_idx): if pattern_parts[i] != "+" and topic_parts[i] != pattern_parts[i]: return False return True # Single-level wildcard + matches one level if len(topic_parts) != len(pattern_parts): return False for topic_part, pattern_part in zip( topic_parts, pattern_parts, strict=True ): if pattern_part != "+" and topic_part != pattern_part: return False return True