Source code for nwp500.mqtt.diagnostics

"""MQTT diagnostics and telemetry collection.

This module provides detailed diagnostics and metrics collection for MQTT
connection stability analysis, helping to identify whether connection drops
are caused by:
- Network/environmental issues (intermittent connectivity, NAT timeouts)
- AWS server-side limits (connection lifetime, message rate limits)
- Client-side configuration issues (insufficient keep-alive, poor backoff)
"""

from __future__ import annotations

import json
import logging
import time
from collections import defaultdict
from collections.abc import Callable
from dataclasses import asdict, dataclass, field
from datetime import UTC, datetime
from typing import Any

from awscrt.exceptions import AwsCrtError

__author__ = "Emmanuel Levijarvi"
__copyright__ = "Emmanuel Levijarvi"
__license__ = "MIT"

_logger = logging.getLogger(__name__)


[docs] @dataclass class ConnectionDropEvent: """Record of a single connection drop event.""" timestamp: str # ISO 8601 timestamp error_name: str | None = None error_message: str | None = None error_code: int | None = None reconnect_attempt: int = 0 duration_connected_seconds: float | None = None active_subscriptions: int = 0 queued_commands: int = 0
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary.""" return asdict(self)
[docs] @dataclass class ConnectionEvent: """Record of a connection success/resumption event.""" timestamp: str # ISO 8601 timestamp event_type: str # "connected", "resumed", "deep_reconnected" session_present: bool = False return_code: int | None = None attempt_number: int = 0 time_to_reconnect_seconds: float | None = None
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary.""" return asdict(self)
[docs] @dataclass class MqttMetrics: """Aggregate metrics for MQTT connection stability.""" # Connection lifecycle total_connections: int = 0 total_disconnects: int = 0 total_connection_drops: int = 0 total_reconnect_attempts: int = 0 # Timing metrics longest_session_seconds: float = 0.0 shortest_session_seconds: float = float("inf") average_session_seconds: float = 0.0 current_session_uptime_seconds: float = 0.0 # Failure analysis connection_drops_by_error: dict[str, int] = field(default_factory=dict) reconnection_attempts_distribution: dict[str, int] = field( default_factory=dict ) # Bucketed by attempt count # Recent activity last_drop_timestamp: str | None = None last_successful_connect_timestamp: str | None = None connection_recovered: int = 0 # Number of successful reconnections # QoS tracking messages_published: int = 0 messages_queued: int = 0
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary for JSON serialization.""" d = asdict(self) # Replace inf with None for JSON compatibility if d.get("shortest_session_seconds") == float("inf"): d["shortest_session_seconds"] = None return d
[docs] class MqttDiagnosticsCollector: """ Collects detailed diagnostics and metrics for MQTT connection analysis. This collector tracks: - Connection drop events with error details - Connection recovery timeline - Error frequency and patterns - Session duration statistics - Network topology and timing information For debugging: - Export logs to JSON for correlation with AWS CloudWatch - Enables continuous monitoring with configurable retention """ def __init__( self, max_events_retained: int = 1000, enable_verbose_logging: bool = False, ): """ Initialize diagnostics collector. Args: max_events_retained: Maximum number of events to keep in memory (older events are discarded, but logged) enable_verbose_logging: If True, log every event to logger """ self.max_events_retained = max_events_retained self.enable_verbose_logging = enable_verbose_logging # Event history (limited size) self._drop_events: list[ConnectionDropEvent] = [] self._connection_events: list[ConnectionEvent] = [] # Aggregate metrics self._metrics = MqttMetrics() # Session tracking self._session_start_time: float | None = None self._session_duration_history: list[float] = [] self._last_connection_timestamp: str | None = None self._last_drop_timestamp: float | None = None # Error categorization self._aws_error_name_counts: dict[str, int] = defaultdict(int) # Callbacks self._on_drop_listeners: list[ Callable[[ConnectionDropEvent], None] ] = []
[docs] def on_connection_drop( self, callback: Callable[[ConnectionDropEvent], None], ) -> None: """ Register a callback to be invoked on each connection drop event. Args: callback: Function that receives ConnectionDropEvent """ self._on_drop_listeners.append(callback)
[docs] async def record_connection_drop( self, error: Exception | None = None, reconnect_attempt: int = 0, active_subscriptions: int = 0, queued_commands: int = 0, ) -> None: """ Record a connection drop event. Args: error: The exception that caused the drop reconnect_attempt: Which reconnection attempt this is (0 = initial) active_subscriptions: Number of active subscriptions at time of drop queued_commands: Number of commands in the queue """ now = datetime.now(UTC).isoformat() duration = None if self._session_start_time is not None: duration = time.time() - self._session_start_time # Extract error details error_name = None error_message = None error_code = None if error is not None: error_message = str(error) if isinstance(error, AwsCrtError): error_name = getattr(error, "name", None) error_code = getattr(error, "code", None) # Track AWS error frequency if error_name: self._aws_error_name_counts[error_name] += 1 # Create event event = ConnectionDropEvent( timestamp=now, error_name=error_name, error_message=error_message, error_code=error_code, reconnect_attempt=reconnect_attempt, duration_connected_seconds=duration, active_subscriptions=active_subscriptions, queued_commands=queued_commands, ) # Update metrics self._metrics.total_connection_drops += 1 self._metrics.total_reconnect_attempts += 1 if error_name: self._metrics.connection_drops_by_error[error_name] = ( self._metrics.connection_drops_by_error.get(error_name, 0) + 1 ) self._metrics.last_drop_timestamp = now self._last_drop_timestamp = time.time() # Track session duration if duration is not None: self._session_duration_history.append(duration) if duration > self._metrics.longest_session_seconds: self._metrics.longest_session_seconds = duration if duration < self._metrics.shortest_session_seconds: self._metrics.shortest_session_seconds = duration # Recalculate average if self._session_duration_history: self._metrics.average_session_seconds = sum( self._session_duration_history ) / len(self._session_duration_history) # Store event (with size limit) self._drop_events.append(event) if len(self._drop_events) > self.max_events_retained: self._drop_events.pop(0) # Log if verbose mode enabled if self.enable_verbose_logging: _logger.warning( f"Connection drop recorded: error={error_name}, " f"duration={duration}s, attempt={reconnect_attempt}, " f"subs={active_subscriptions}, queued={queued_commands}" ) # Call registered callbacks for callback in self._on_drop_listeners: try: callback(event) except Exception as e: _logger.error(f"Error in drop listener: {e}")
[docs] async def record_connection_success( self, event_type: str = "connected", session_present: bool = False, return_code: int | None = None, attempt_number: int = 0, ) -> None: """ Record a successful connection or reconnection event. Args: event_type: "connected", "resumed", or "deep_reconnected" session_present: Whether MQTT session was present return_code: MQTT return code attempt_number: Reconnection attempt number (0 = initial connect) """ now = datetime.now(UTC).isoformat() time_to_reconnect = None # Update metrics if event_type == "connected": self._metrics.total_connections += 1 else: self._metrics.connection_recovered += 1 # Calculate time to reconnect if self._last_drop_timestamp is not None: time_to_reconnect = time.time() - self._last_drop_timestamp # Start new session self._session_start_time = time.time() self._last_connection_timestamp = now # Create event event = ConnectionEvent( timestamp=now, event_type=event_type, session_present=session_present, return_code=return_code, attempt_number=attempt_number, time_to_reconnect_seconds=time_to_reconnect, ) # Update current uptime self._metrics.current_session_uptime_seconds = 0.0 self._metrics.last_successful_connect_timestamp = now # Store event self._connection_events.append(event) if len(self._connection_events) > self.max_events_retained: self._connection_events.pop(0) # Log if verbose if self.enable_verbose_logging: _logger.info( f"Connection success recorded: type={event_type}, " f"session_present={session_present}, " f"time_to_reconnect={time_to_reconnect}s, " f"attempt={attempt_number}" )
[docs] def record_publish(self, queued: bool = False) -> None: """Record a publish/queue operation.""" if queued: self._metrics.messages_queued += 1 else: self._metrics.messages_published += 1
[docs] async def update_metrics(self) -> None: """Update current metrics (e.g., current session uptime).""" if self._session_start_time is not None: self._metrics.current_session_uptime_seconds = ( time.time() - self._session_start_time )
[docs] def get_metrics(self) -> MqttMetrics: """Get current aggregate metrics.""" # Update current session uptime before returning if self._session_start_time is not None: self._metrics.current_session_uptime_seconds = ( time.time() - self._session_start_time ) # Rebuild reconnection attempt distribution from drop events attempt_buckets: dict[str, int] = defaultdict(int) for event in self._drop_events: # Bucket attempts: 1, 2-5, 6-10, 11+ if event.reconnect_attempt <= 1: bucket = "1" elif event.reconnect_attempt <= 5: bucket = "2-5" elif event.reconnect_attempt <= 10: bucket = "6-10" else: bucket = "11+" attempt_buckets[bucket] += 1 self._metrics.reconnection_attempts_distribution = dict(attempt_buckets) return self._metrics
[docs] def get_recent_drops(self, limit: int = 10) -> list[ConnectionDropEvent]: """Get the N most recent connection drop events.""" return self._drop_events[-limit:]
[docs] def get_recent_connections(self, limit: int = 10) -> list[ConnectionEvent]: """Get the N most recent connection events.""" return self._connection_events[-limit:]
[docs] def export_json(self) -> str: """ Export all collected diagnostics as JSON. Returns: JSON string suitable for storing or sending to monitoring systems """ export_data = { "timestamp": datetime.now(UTC).isoformat(), "metrics": self.get_metrics().to_dict(), "recent_drops": [ event.to_dict() for event in self.get_recent_drops(50) ], "recent_connections": [ event.to_dict() for event in self.get_recent_connections(50) ], "aws_error_counts": dict(self._aws_error_name_counts), "session_history_summary": { "total_sessions": len(self._session_duration_history), "sample_durations": self._session_duration_history[-20:], }, } return json.dumps(export_data, indent=2, default=str)
[docs] def print_summary(self) -> None: """Print a human-readable summary of diagnostics.""" metrics = self.get_metrics() _logger.info("=" * 70) _logger.info("MQTT CONNECTION DIAGNOSTICS SUMMARY") _logger.info("=" * 70) _logger.info(f"Total Connections: {metrics.total_connections}") _logger.info( f"Total Connection Drops: {metrics.total_connection_drops}" ) _logger.info( f"Successful Reconnections: {metrics.connection_recovered}" ) _logger.info( f"Total Reconnection Attempts: {metrics.total_reconnect_attempts}" ) _logger.info("-" * 70) _logger.info("SESSION DURATION STATISTICS") _logger.info("-" * 70) if self._session_duration_history: _logger.info( f"Longest Session: {metrics.longest_session_seconds:.1f}s" ) _logger.info( f"Shortest Session: {metrics.shortest_session_seconds:.1f}s" ) _logger.info( f"Average Session: {metrics.average_session_seconds:.1f}s" ) _logger.info( f"Current Session Uptime: " f"{metrics.current_session_uptime_seconds:.1f}s" ) if metrics.connection_drops_by_error: _logger.info("-" * 70) _logger.info("CONNECTION DROPS BY ERROR TYPE") _logger.info("-" * 70) for error, count in sorted( metrics.connection_drops_by_error.items(), key=lambda x: x[1], reverse=True, ): _logger.info(f" {error}: {count}") if metrics.reconnection_attempts_distribution: _logger.info("-" * 70) _logger.info("RECONNECTION ATTEMPTS DISTRIBUTION") _logger.info("-" * 70) for bucket, count in sorted( metrics.reconnection_attempts_distribution.items() ): _logger.info(f" Attempts {bucket}: {count}") _logger.info("-" * 70) _logger.info(f"Messages Published: {metrics.messages_published}") _logger.info(f"Messages Queued: {metrics.messages_queued}") _logger.info("=" * 70)