nwp500.mqtt package

Submodules

nwp500.mqtt.client module

MQTT Client for Navien Smart Control.

This module provides an MQTT client for real-time communication with Navien devices using AWS IoT Core. It handles connection, subscriptions, and message publishing for device control and monitoring.

The client uses WebSocket connections with AWS credentials obtained from the authentication flow.

class nwp500.mqtt.client.NavienMqttClient(auth_client: NavienAuthClient, config: MqttConnectionConfig | None = None, unit_system: Literal['metric', 'us_customary'] | None = None)[source]

Bases: EventEmitter

Async MQTT client for Navien device communication over AWS IoT.

This client establishes WebSocket connections to AWS IoT Core using temporary AWS credentials from the authentication API. It handles: - Connection management with automatic reconnection and exponential backoff - Topic subscriptions for device events and responses - Command publishing for device control - Message routing and callbacks - Command queuing when disconnected (sends when reconnected) - Event-driven architecture with state change detection

The client extends EventEmitter to provide an event-driven architecture: - Multiple listeners per event - State change detection (temperature_changed, mode_changed, etc.) - Async handler support - Priority-based execution

The client automatically reconnects when the connection is interrupted, using exponential backoff (default: 1s, 2s, 4s, 8s, … up to 120s). Reconnection behavior can be customized via MqttConnectionConfig.

When enabled, the command queue stores commands sent while disconnected and automatically sends them when the connection is restored. This ensures commands are not lost during temporary network interruptions.

Example (Traditional Callbacks):

>>> async with NavienAuthClient(email, password) as auth_client:
...     mqtt_client = NavienMqttClient(auth_client)
...     await mqtt_client.connect()
...
...     # Traditional callback style
...     await mqtt_client.subscribe_device_status(device, on_status)

Example (Event Emitter):

>>> from nwp500.mqtt_events import MqttClientEvents
>>> mqtt_client = NavienMqttClient(auth_client)
...
... # Type-safe event listeners with IDE autocomplete
... mqtt_client.on(
...     MqttClientEvents.TEMPERATURE_CHANGED,
...     lambda event: log_temperature(event.new_temperature),
... )
... mqtt_client.on(MqttClientEvents.TEMPERATURE_CHANGED, update_ui)
... mqtt_client.on(
...     MqttClientEvents.MODE_CHANGED, handle_mode_change
... )
...
... # One-time listener
... mqtt_client.once(MqttClientEvents.STATUS_RECEIVED, initialize)
...
... await mqtt_client.connect()
Events Emitted:

See nwp500.mqtt_events.MqttClientEvents for a complete, type-safe registry of all events with full documentation.

Key events include: - status_received: Raw status update - feature_received: Device feature/capability information - temperature_changed: DHW temperature changed - mode_changed: Operation mode changed - power_changed: Power consumption changed - heating_started: Device started heating - heating_stopped: Device stopped heating - error_detected: Device error occurred - error_cleared: Device error resolved - connection_interrupted: Connection lost - connection_resumed: Connection restored

async check_firmware_update(device: Device) int[source]

Check for available over-the-air firmware updates.

clear_command_queue() int[source]

Clear all queued commands. …

property client_id: str

Get client ID.

async commit_firmware_update(device: Device, payload: OtaCommitPayload) int[source]

Commit a previously downloaded firmware update.

async configure_recirculation_schedule(device: Device, schedule: RecirculationSchedule) int[source]

Configure the recirculation pump timed schedule.

async configure_reservation_water_program(device: Device) int[source]

Enable/configure water program reservation mode.

async configure_tou_schedule(device: Device, controller_serial_number: str, periods: Sequence[dict[str, Any]], *, enabled: bool = True) int[source]

Configure the Time-of-Use rate schedule.

async connect() bool[source]

Establish connection to AWS IoT Core.

Ensures tokens are valid before connecting and refreshes if necessary.

Returns:

True if connection successful

Raises:

Exception – If connection fails

property control: MqttDeviceController

Deprecated access to device controller.

property diagnostics: MqttDiagnosticsCollector

Get the diagnostics collector instance.

async disable_anti_legionella(device: Device) int[source]

Disable the Anti-Legionella disinfection cycle.

async disable_demand_response(device: Device) int[source]

Disable utility demand response participation.

async disable_intelligent_scheduling(device: Device) int[source]

Disable intelligent/adaptive heating mode.

async disconnect() None[source]

Disconnect from AWS IoT Core and stop all periodic tasks.

async enable_anti_legionella(device: Device, period_days: int) int[source]

Enable Anti-Legionella disinfection.

async enable_demand_response(device: Device) int[source]

Enable utility demand response participation.

async enable_intelligent_scheduling(device: Device) int[source]

Enable intelligent/adaptive heating mode.

async ensure_device_info_cached(device: Device, timeout: float = 30.0) bool[source]

Ensure device info is cached, requesting if necessary.

Called by control commands and CLI to ensure device capabilities are available before execution.

Parameters:
  • device – Device to ensure info for

  • timeout – Maximum time to wait for response (default: 30 seconds)

Returns:

True if device info was successfully cached, False on timeout

Raises:

MqttNotConnectedError – If not connected

property is_connected: bool

Check if client is connected.

property is_reconnecting: bool

Check if client is currently attempting to reconnect.

async publish(topic: str, payload: dict[str, Any], qos: QoS = QoS.AT_LEAST_ONCE) int[source]

Publish a message to an MQTT topic.

If not connected and command queue is enabled, the command will be queued and sent automatically when the connection is restored.

Parameters:
  • topic – MQTT topic to publish to

  • payload – Message payload (will be JSON-encoded)

  • qos – Quality of Service level

Returns:

Publish packet ID (or 0 if queued)

Raises:

RuntimeError – If not connected and command queue is disabled

property queued_commands_count: int

Get the number of commands currently queued.

property reconnect_attempts: int

Get the number of reconnection attempts made.

async reconnect_wifi(device: Device) int[source]

Trigger a WiFi reconnection on the device.

async recover_connection() bool[source]

Recover from authentication-related connection failures.

This method is useful when MQTT connection fails due to stale/expired authentication tokens. It refreshes the tokens and attempts to reconnect the MQTT client.

Returns:

True if recovery was successful and MQTT is reconnected, False otherwise

Raises:

Example

>>> mqtt_client = NavienMqttClient(auth_client)
>>> try:
...     await mqtt_client.connect()
... except MqttConnectionError:
...     # Connection may have failed due to stale tokens
...     if await mqtt_client.recover_connection():
...         print("Successfully recovered connection")
...     else:
...         print("Recovery failed, check logs")
async request_device_info(device: Device) int[source]

Request device information (features, firmware, etc.).

async request_device_status(device: Device) int[source]

Request general device status.

async request_energy_usage(device: Device, year: int, months: list[int]) int[source]

Request daily energy usage data for specified month(s).

async request_reservations(device: Device) int[source]

Request the current reservation program from the device.

async request_tou_settings(device: Device, controller_serial_number: str) int[source]

Request the current TOU settings from the device.

async reset_air_filter(device: Device) int[source]

Reset air filter maintenance timer.

async reset_reconnect() None[source]

Reset reconnection state and trigger a new reconnection attempt. …

async reset_wifi(device: Device) int[source]

Reset WiFi settings to factory defaults.

async run_smart_diagnostic(device: Device) int[source]

Trigger the smart diagnostic routine on the device.

property session_id: str

Get session ID.

async set_dhw_mode(device: Device, mode_id: int, vacation_days: int | None = None) int[source]

Set DHW operation mode.

async set_dhw_temperature(device: Device, temperature: float) int[source]

Set DHW target temperature in the user’s preferred unit.

async set_freeze_protection_temperature(device: Device, temperature: float) int[source]

Set the freeze protection activation temperature.

async set_power(device: Device, power_on: bool) int[source]

Turn device on or off.

async set_recirculation_mode(device: Device, mode: int) int[source]

Set recirculation pump operation mode (1-4).

async set_tou_enabled(device: Device, enabled: bool) int[source]

Enable or disable Time-of-Use optimization.

async set_vacation_days(device: Device, days: int) int[source]

Set vacation/away mode duration (1-30 days).

async signal_app_connection(device: Device) int[source]

Signal that the app has connected.

async start_periodic_requests(device: Device, request_type: PeriodicRequestType = PeriodicRequestType.DEVICE_STATUS, period_seconds: float = 300.0) None[source]

Start sending periodic requests for device information or status. …

async stop_all_periodic_tasks(_reason: str | None = None) None[source]

Stop all periodic request tasks. …

async stop_periodic_requests(device: Device, request_type: PeriodicRequestType | None = None) None[source]

Stop sending periodic requests for a device. …

async subscribe(topic: str, callback: Callable[[str, dict[str, Any]], None], qos: QoS = QoS.AT_LEAST_ONCE) int[source]

Subscribe to an MQTT topic.

Parameters:
  • topic – MQTT topic to subscribe to (can include wildcards)

  • callback – Function to call when messages arrive (topic, message)

  • qos – Quality of Service level

Returns:

Subscription packet ID

Raises:

Exception – If subscription fails

async subscribe_device(device: Device, callback: Callable[[str, dict[str, Any]], None]) int[source]

Subscribe to all messages from a specific device.

Parameters:
  • device – Device object

  • callback – Message handler

Returns:

Subscription packet ID

async subscribe_device_feature(device: Device, callback: Callable[[DeviceFeature], None]) int[source]

Subscribe to device feature/info messages with automatic parsing.

async subscribe_device_status(device: Device, callback: Callable[[DeviceStatus], None]) int[source]

Subscribe to device status messages with automatic parsing.

async subscribe_energy_usage(device: Device, callback: Callable[[EnergyUsageResponse], None]) int[source]

Subscribe to energy usage query responses with automatic parsing.

async subscribe_recirculation_schedule_response(device: Device, callback: Callable[[RecirculationSchedule], None]) int[source]

Subscribe to recirculation schedule read responses.

async subscribe_reservation_response(device: Device, callback: Callable[[ReservationSchedule], None]) int[source]

Subscribe to reservation read responses with automatic parsing.

async subscribe_tou_response(device: Device, callback: Callable[[TOUReservationSchedule], None]) int[source]

Subscribe to Time-of-Use schedule read responses with automatic parsing.

Subscribes to the tou/rd response topic for the given device. The callback receives a fully-parsed TOUReservationSchedule whenever the device responds to a TOU read or configure request (triggered by request_tou_settings() or configure_tou_schedule()).

Parameters:
  • device – Device whose TOU responses to receive.

  • callback – Called with the parsed schedule on each response.

Returns:

Publish packet ID from the MQTT subscribe call.

async subscribe_weekly_reservation_response(device: Device, callback: Callable[[WeeklyReservationSchedule], None]) int[source]

Subscribe to weekly reservation read responses.

async trigger_recirculation_hot_button(device: Device) int[source]

Manually trigger the recirculation pump hot button.

async unsubscribe(topic: str) int[source]

Unsubscribe from an MQTT topic.

Parameters:

topic – MQTT topic to unsubscribe from

Returns:

Unsubscribe packet ID

Raises:

Exception – If unsubscribe fails

async unsubscribe_device_feature(device: Device, callback: Callable[[DeviceFeature], None]) None[source]

Unsubscribe a specific device feature callback.

async unsubscribe_device_status(device: Device, callback: Callable[[DeviceStatus], None]) None[source]

Unsubscribe a specific device status callback.

async unsubscribe_energy_usage(device: Device, callback: Callable[[EnergyUsageResponse], None]) None[source]

Unsubscribe a specific energy usage callback.

async unsubscribe_recirculation_schedule_response(device: Device, callback: Callable[[RecirculationSchedule], None]) None[source]

Unsubscribe a specific recirculation schedule callback.

async unsubscribe_reservation_response(device: Device, callback: Callable[[ReservationSchedule], None]) None[source]

Unsubscribe a specific reservation response callback.

async unsubscribe_tou_response(device: Device, callback: Callable[[TOUReservationSchedule], None]) None[source]

Unsubscribe a specific TOU response callback.

async unsubscribe_weekly_reservation_response(device: Device, callback: Callable[[WeeklyReservationSchedule], None]) None[source]

Unsubscribe a specific weekly reservation callback.

async update_reservations(device: Device, reservations: Sequence[dict[str, Any]], *, enabled: bool = True) int[source]

Update programmed reservations.

async update_weekly_reservation(device: Device, schedule: WeeklyReservationSchedule) int[source]

Configure the weekly temperature reservation schedule.

nwp500.mqtt.command_queue module

MQTT command queue management for Navien Smart Control.

This module handles queueing of commands when the MQTT connection is lost, and automatically sends them when the connection is restored.

class nwp500.mqtt.command_queue.MqttCommandQueue(config: MqttConnectionConfig)[source]

Bases: object

Manages command queueing when MQTT connection is interrupted.

Commands sent while disconnected are queued and automatically sent when the connection is restored. This ensures commands are not lost during temporary network interruptions.

The queue uses asyncio.Queue with a fixed maximum size. When the queue is full, the oldest command is automatically dropped to make room for new commands (FIFO with overflow dropping).

clear() int[source]

Clear all queued commands.

Returns:

Number of commands cleared

property count: int

Get the number of queued commands.

enqueue(topic: str, payload: dict[str, Any], qos: QoS) None[source]

Add a command to the queue.

If the queue is full, the oldest command is dropped to make room for the new one (FIFO with overflow dropping).

Parameters:
  • topic – MQTT topic

  • payload – Command payload

  • qos – Quality of Service level

property is_empty: bool

Check if the queue is empty.

property is_full: bool

Check if the queue is full.

async send_all(publish_func: Callable[[...], Any], is_connected_func: Callable[[], bool]) tuple[int, int][source]

Send all queued commands.

This is called automatically when connection is restored.

Parameters:
  • publish_func – Async function to publish messages (topic, payload,

  • qos)

  • is_connected_func – Function to check if currently connected

Returns:

Tuple of (sent_count, failed_count)

nwp500.mqtt.connection module

MQTT connection management for Navien Smart Control.

This module handles establishing and maintaining the MQTT connection to AWS IoT Core, including credential management and connection state tracking.

class nwp500.mqtt.connection.MqttConnection(config: MqttConnectionConfig, auth_client: NavienAuthClient, on_connection_interrupted: Callable[[mqtt.Connection, AwsCrtError], None] | None = None, on_connection_resumed: Callable[[mqtt.Connection, Any, Any | None], None] | None = None)[source]

Bases: object

Manages MQTT connection lifecycle to AWS IoT Core.

Handles: - Connection establishment with AWS credentials - Disconnection with cleanup - Connection state tracking - AWS credentials provider creation

async close() None[source]

Unconditionally close the underlying SDK connection.

Unlike disconnect(), this method closes the connection regardless of the _connected flag. After a connection interruption, _connected is False but the SDK connection object is still alive and its built-in auto-reconnect can still fire. Calling close() ensures the SDK connection is fully torn down so its callbacks and auto-reconnect cannot interfere with a replacement connection.

This method is safe to call multiple times or on already-closed connections.

async connect() bool[source]

Establish connection to AWS IoT Core.

Ensures tokens are valid before connecting and refreshes if necessary.

Returns:

True if connection successful

Raises:

Exception – If connection fails

property connection: Connection | None

Get the underlying MQTT connection.

Returns:

The MQTT connection object, or None if not connected

Note

This property is provided for advanced usage. Most operations should use the higher-level methods provided by this class.

async disconnect() None[source]

Disconnect from AWS IoT Core.

Raises:

Exception – If disconnect fails

property is_connected: bool

Check if currently connected.

async publish(topic: str, payload: str | dict[str, Any], qos: QoS = QoS.AT_LEAST_ONCE) int[source]

Publish a message to an MQTT topic.

Parameters:
  • topic – MQTT topic to publish to

  • payload – Message payload (dict, JSON string, or bytes)

  • qos – Quality of Service level

Returns:

Publish packet ID

Raises:
async subscribe(topic: str, qos: QoS, callback: Callable[[...], None] | None = None) tuple[Any, int][source]

Subscribe to an MQTT topic.

Parameters:
  • topic – Topic pattern to subscribe to (supports wildcards)

  • qos – Quality of Service level

  • callback – Optional callback for received messages

Returns:

Tuple of (subscribe_future, packet_id)

Raises:

RuntimeError – If not connected

async unsubscribe(topic: str) int[source]

Unsubscribe from an MQTT topic.

Parameters:

topic – Topic to unsubscribe from

Returns:

Packet ID

Raises:

RuntimeError – If not connected

nwp500.mqtt.control module

MQTT Device Control Commands for Navien devices.

This module handles all device control operations including: - Status and info requests - Power control - Mode changes (DHW operation modes) - Temperature control - Anti-Legionella configuration - Reservation scheduling - Time-of-Use (TOU) configuration - Energy usage queries - App connection signaling - Demand response control - Air filter maintenance - Vacation mode configuration - Recirculation pump control and scheduling

class nwp500.mqtt.control.MqttDeviceController(client_id: str, session_id: str, publish_func: Callable[[...], Awaitable[int]], device_info_cache: MqttDeviceInfoCache | None = None)[source]

Bases: object

Manages device control commands for Navien devices.

Handles all device control operations including status requests, mode changes, temperature control, scheduling, and energy queries.

This controller integrates with MqttDeviceCapabilityChecker to validate device capabilities before executing commands. Use check_support() or assert_support() methods to verify feature availability based on device capabilities before attempting to execute commands:

Example

>>> controller.assert_support("recirculation_mode", device_features)
>>> # Will raise DeviceCapabilityError if not supported
>>> msg_id = await controller.set_recirculation_mode(device, mode)
assert_support(feature: str, device_features: DeviceFeature) None[source]

Assert that device supports a controllable feature.

Parameters:
  • feature – Name of the controllable feature

  • device_features – Device feature information

Raises:
async check_firmware_update(device: Device) int[source]

Check for available over-the-air firmware updates.

Sends the OTA_CHECK command (33554443) to query whether a firmware update is available. The device responds on the control ack topic.

Parameters:

device – Device to check for updates

Returns:

Publish packet ID

check_support(feature: str, device_features: DeviceFeature) bool[source]

Check if device supports a controllable feature.

Parameters:
  • feature – Name of the controllable feature

  • device_features – Device feature information

Returns:

True if feature is supported, False otherwise

Raises:

ValueError – If feature is not recognized

async commit_firmware_update(device: Device, payload: OtaCommitPayload) int[source]

Commit a previously downloaded firmware update.

Sends the OTA_COMMIT command (33554442) with a special commitOta structure (not the standard mode/param format).

Parameters:
  • device – Device to update

  • payload – OTA commit payload specifying which firmware component and version to commit.

Returns:

Publish packet ID

async configure_recirculation_schedule(device: Device, schedule: RecirculationSchedule) int[source]

Configure the recirculation pump timed schedule.

Parameters:
  • device – Device to configure

  • schedule – Recirculation schedule with one or more time window entries

Returns:

Publish packet ID

async configure_reservation_water_program(device: Device) int[source]

Enable/configure water program reservation mode.

async configure_tou_schedule(device: Device, controller_serial_number: str, periods: Sequence[dict[str, Any]], *, enabled: bool = True) int[source]

Configure Time-of-Use pricing schedule via MQTT.

Parameters:
  • device – Device object

  • controller_serial_number – Controller serial number

  • periods – List of TOU period definitions

  • enabled – Whether TOU is enabled (default: True)

Returns:

Publish packet ID

Raises:

ValueError – If controller_serial_number is empty or periods is empty

property device_info_cache: MqttDeviceInfoCache

Get the device info cache.

async disable_anti_legionella(device: Device) int[source]

Disable the Anti-Legionella disinfection cycle.

async disable_demand_response(device: Device) int[source]

Disable utility demand response participation.

async disable_intelligent_scheduling(device: Device) int[source]

Disable intelligent/adaptive heating mode.

Sends the RESERVATION_INTELLIGENT_OFF command (33554467).

Parameters:

device – Device to configure

Returns:

Publish packet ID

async enable_anti_legionella(device: Device, period_days: int) int[source]

Enable Anti-Legionella disinfection.

async enable_demand_response(device: Device) int[source]

Enable utility demand response participation.

async enable_intelligent_scheduling(device: Device) int[source]

Enable intelligent/adaptive heating mode.

Sends the RESERVATION_INTELLIGENT_ON command (33554468). In this mode the device learns usage patterns and pre-heats water proactively to reduce energy consumption.

Parameters:

device – Device to configure

Returns:

Publish packet ID

async reconnect_wifi(device: Device) int[source]

Trigger a WiFi reconnection on the device.

Sends the WIFI_RECONNECT command (33554446). Useful when the device has lost its WiFi connection and needs to re-associate.

Parameters:

device – Device to reconnect

Returns:

Publish packet ID

async request_device_info(device: Device) int[source]

Request device information (features, firmware, etc.).

Parameters:

device – Device object

Returns:

Publish packet ID

async request_device_status(device: Device) int[source]

Request general device status.

Parameters:

device – Device object

Returns:

Publish packet ID

async request_energy_usage(device: Device, year: int, months: list[int]) int[source]

Request daily energy usage data for specified month(s).

This retrieves historical energy usage data showing heat pump and electric heating element consumption broken down by day. The response includes both energy usage (Wh) and operating time (hours) for each component.

Parameters:
  • device – Device object

  • year – Year to query (e.g., 2025)

  • months – List of months to query (1-12). Can request multiple months.

Returns:

Publish packet ID

Example:

# Request energy usage for September 2025
await controller.request_energy_usage(
    device,
    year=2025,
    months=[9]
)

# Request multiple months
await controller.request_energy_usage(
    device,
    year=2025,
    months=[7, 8, 9]
)
async request_reservations(device: Device) int[source]

Request the current reservation program from the device.

Parameters:

device – Device object

Returns:

Publish packet ID

async request_tou_settings(device: Device, controller_serial_number: str) int[source]

Request current Time-of-Use schedule from the device.

Parameters:
  • device – Device object

  • controller_serial_number – Controller serial number

Returns:

Publish packet ID

Raises:

ValueError – If controller_serial_number is empty

async reset_air_filter(device: Device) int[source]

Reset air filter maintenance timer.

async reset_wifi(device: Device) int[source]

Reset WiFi settings to factory defaults.

Sends the WIFI_RESET command (33554447). This will clear stored WiFi credentials and require re-provisioning the device.

Warning

This operation clears all stored WiFi credentials. The device will need to be re-provisioned to reconnect to the network.

Parameters:

device – Device to reset

Returns:

Publish packet ID

async run_smart_diagnostic(device: Device) int[source]

Trigger the smart diagnostic routine on the device.

Sends the SMART_DIAGNOSTIC command (33554455). The diagnostic result is reflected in the smart_diagnostic field of the next DeviceStatus update.

Parameters:

device – Device to diagnose

Returns:

Publish packet ID

async set_dhw_mode(device: Device, mode_id: int, vacation_days: int | None = None) int[source]

Set DHW operation mode.

async set_dhw_temperature(device: Device, temperature: float) int[source]

Set DHW target temperature.

Temperature is in the user’s preferred unit (Celsius or Fahrenheit) based on the global unit system context.

set_ensure_device_info_callback(callback: Callable[[Device], Awaitable[bool]] | None) None[source]

Set the callback for ensuring device info is cached.

async set_freeze_protection_temperature(device: Device, temperature: float) int[source]

Set the freeze protection activation temperature.

Sends the FREZ_TEMP command (33554451). The device activates freeze protection heating when the ambient temperature drops below this threshold.

Parameters:
  • device – Device to configure

  • temperature – Activation temperature in the user’s preferred unit (°C if unit system is metric, °F otherwise). Valid range: 35–45°F (1.7–7.2°C).

Returns:

Publish packet ID

async set_power(device: Device, power_on: bool) int[source]

Turn device on or off.

async set_recirculation_mode(device: Device, mode: int) int[source]

Set recirculation pump operation mode (1-4).

async set_tou_enabled(device: Device, enabled: bool) int[source]

Toggle Time-of-Use functionality.

async set_vacation_days(device: Device, days: int) int[source]

Set vacation/away mode duration (1-30 days).

async signal_app_connection(device: Device) int[source]

Signal that the app has connected. …

async trigger_recirculation_hot_button(device: Device) int[source]

Manually trigger the recirculation pump hot button.

async update_reservations(device: Device, reservations: Sequence[dict[str, Any]], *, enabled: bool = True) int[source]

Update programmed reservations for temperature/mode changes.

Parameters:
  • device – Device object

  • reservations – List of reservation entries

  • enabled – Whether reservations are enabled (default: True)

Returns:

Publish packet ID

async update_weekly_reservation(device: Device, schedule: WeeklyReservationSchedule) int[source]

Configure the weekly temperature reservation schedule.

Sends the complete weekly schedule to the device using command code RESERVATION_WEEKLY (33554438).

Parameters:
  • device – Device to configure

  • schedule – Weekly reservation schedule with entries for each time slot

Returns:

Publish packet ID

nwp500.mqtt.diagnostics module

MQTT diagnostics and telemetry collection.

This module provides detailed diagnostics and metrics collection for MQTT connection stability analysis, helping to identify whether connection drops are caused by: - Network/environmental issues (intermittent connectivity, NAT timeouts) - AWS server-side limits (connection lifetime, message rate limits) - Client-side configuration issues (insufficient keep-alive, poor backoff)

class nwp500.mqtt.diagnostics.ConnectionDropEvent(timestamp: str, error_name: str | None = None, error_message: str | None = None, error_code: int | None = None, reconnect_attempt: int = 0, duration_connected_seconds: float | None = None, active_subscriptions: int = 0, queued_commands: int = 0)[source]

Bases: object

Record of a single connection drop event.

active_subscriptions: int = 0
duration_connected_seconds: float | None = None
error_code: int | None = None
error_message: str | None = None
error_name: str | None = None
queued_commands: int = 0
reconnect_attempt: int = 0
timestamp: str
to_dict() dict[str, Any][source]

Convert to dictionary.

class nwp500.mqtt.diagnostics.ConnectionEvent(timestamp: str, event_type: str, session_present: bool = False, return_code: int | None = None, attempt_number: int = 0, time_to_reconnect_seconds: float | None = None)[source]

Bases: object

Record of a connection success/resumption event.

attempt_number: int = 0
event_type: str
return_code: int | None = None
session_present: bool = False
time_to_reconnect_seconds: float | None = None
timestamp: str
to_dict() dict[str, Any][source]

Convert to dictionary.

class nwp500.mqtt.diagnostics.MqttDiagnosticsCollector(max_events_retained: int = 1000, enable_verbose_logging: bool = False)[source]

Bases: object

Collects detailed diagnostics and metrics for MQTT connection analysis.

This collector tracks: - Connection drop events with error details - Connection recovery timeline - Error frequency and patterns - Session duration statistics - Network topology and timing information

For debugging: - Export logs to JSON for correlation with AWS CloudWatch - Enables continuous monitoring with configurable retention

export_json() str[source]

Export all collected diagnostics as JSON.

Returns:

JSON string suitable for storing or sending to monitoring systems

get_metrics() MqttMetrics[source]

Get current aggregate metrics.

get_recent_connections(limit: int = 10) list[ConnectionEvent][source]

Get the N most recent connection events.

get_recent_drops(limit: int = 10) list[ConnectionDropEvent][source]

Get the N most recent connection drop events.

on_connection_drop(callback: Callable[[ConnectionDropEvent], None]) None[source]

Register a callback to be invoked on each connection drop event.

Parameters:

callback – Function that receives ConnectionDropEvent

print_summary() None[source]

Print a human-readable summary of diagnostics.

async record_connection_drop(error: Exception | None = None, reconnect_attempt: int = 0, active_subscriptions: int = 0, queued_commands: int = 0) None[source]

Record a connection drop event.

Parameters:
  • error – The exception that caused the drop

  • reconnect_attempt – Which reconnection attempt this is (0 = initial)

  • active_subscriptions – Number of active subscriptions at time of drop

  • queued_commands – Number of commands in the queue

async record_connection_success(event_type: str = 'connected', session_present: bool = False, return_code: int | None = None, attempt_number: int = 0) None[source]

Record a successful connection or reconnection event.

Parameters:
  • event_type – “connected”, “resumed”, or “deep_reconnected”

  • session_present – Whether MQTT session was present

  • return_code – MQTT return code

  • attempt_number – Reconnection attempt number (0 = initial connect)

record_publish(queued: bool = False) None[source]

Record a publish/queue operation.

async update_metrics() None[source]

Update current metrics (e.g., current session uptime).

class nwp500.mqtt.diagnostics.MqttMetrics(total_connections: int = 0, total_disconnects: int = 0, total_connection_drops: int = 0, total_reconnect_attempts: int = 0, longest_session_seconds: float = 0.0, shortest_session_seconds: float = inf, average_session_seconds: float = 0.0, current_session_uptime_seconds: float = 0.0, connection_drops_by_error: dict[str, int]=<factory>, reconnection_attempts_distribution: dict[str, int]=<factory>, last_drop_timestamp: str | None = None, last_successful_connect_timestamp: str | None = None, connection_recovered: int = 0, messages_published: int = 0, messages_queued: int = 0)[source]

Bases: object

Aggregate metrics for MQTT connection stability.

average_session_seconds: float = 0.0
connection_drops_by_error: dict[str, int]
connection_recovered: int = 0
current_session_uptime_seconds: float = 0.0
last_drop_timestamp: str | None = None
last_successful_connect_timestamp: str | None = None
longest_session_seconds: float = 0.0
messages_published: int = 0
messages_queued: int = 0
reconnection_attempts_distribution: dict[str, int]
shortest_session_seconds: float = inf
to_dict() dict[str, Any][source]

Convert to dictionary for JSON serialization.

total_connection_drops: int = 0
total_connections: int = 0
total_disconnects: int = 0
total_reconnect_attempts: int = 0

nwp500.mqtt.periodic module

MQTT Periodic Request Manager for Navien devices.

This module handles periodic/scheduled requests to keep device information and status up-to-date. Features include: - Configurable request intervals - Automatic skip when disconnected - Graceful task cancellation - Per-device, per-type task management

class nwp500.mqtt.periodic.MqttPeriodicRequestManager(is_connected_func: Callable[[], bool], request_device_info_func: Callable[[...], Any], request_device_status_func: Callable[[...], Any])[source]

Bases: object

Manages periodic requests for device information and status.

Features: - Independent tasks per device and request type - Automatic skip when disconnected (with throttled logging) - Graceful cancellation on disconnect - Error recovery and retry

property active_task_count: int

Get number of active periodic tasks.

async start_periodic_device_info_requests(device: Device, period_seconds: float = 300.0) None[source]

Start sending periodic device info requests.

This is a convenience wrapper around start_periodic_requests().

Parameters:
  • device – Device object

  • period_seconds – Time between requests in seconds (default: 300 = 5

  • minutes)

async start_periodic_device_status_requests(device: Device, period_seconds: float = 300.0) None[source]

Start sending periodic device status requests.

This is a convenience wrapper around start_periodic_requests().

Parameters:
  • device – Device object

  • period_seconds – Time between requests in seconds (default: 300 = 5

  • minutes)

async start_periodic_requests(device: Device, request_type: PeriodicRequestType = PeriodicRequestType.DEVICE_STATUS, period_seconds: float = 300.0) None[source]

Start sending periodic requests for device information or status.

This optional helper continuously sends requests at a specified interval. It can be used to keep device information or status up-to-date.

Parameters:
  • device – Device object

  • request_type – Type of request (DEVICE_INFO or DEVICE_STATUS)

  • period_seconds – Time between requests in seconds (default: 300 = 5

  • minutes)

Example

>>> # Start periodic status requests (default)
>>> await manager.start_periodic_requests(device)
>>>
>>> # Start periodic device info requests
>>> await manager.start_periodic_requests(
...     device,
...     request_type=PeriodicRequestType.DEVICE_INFO
... )
>>>
>>> # Custom period: request every 60 seconds
>>> await manager.start_periodic_requests(
...     device,
...     period_seconds=60
... )

Note

  • Only one periodic task per request type per device

  • Call stop_periodic_requests() to stop a task

  • All tasks automatically stop when client disconnects

async stop_all_periodic_tasks(reason: str | None = None) None[source]

Stop all periodic request tasks.

This is automatically called when disconnecting.

Parameters:
  • reason – Optional reason for logging context (e.g., “connection

  • failure")

Example

>>> await manager.stop_all_periodic_tasks()
>>> await manager.stop_all_periodic_tasks(reason="disconnect")
async stop_periodic_device_info_requests(device: Device) None[source]

Stop sending periodic device info requests for a device.

This is a convenience wrapper around stop_periodic_requests().

Parameters:

device – Device object

async stop_periodic_device_status_requests(device: Device) None[source]

Stop sending periodic device status requests for a device.

This is a convenience wrapper around stop_periodic_requests().

Parameters:

device – Device object

async stop_periodic_requests(device: Device, request_type: PeriodicRequestType | None = None) None[source]

Stop sending periodic requests for a device.

Parameters:
  • device – Device object

  • request_type – Type of request to stop. If None, stops all types for this device.

Example

>>> # Stop specific request type
>>> await manager.stop_periodic_requests(
...     device,
...     PeriodicRequestType.DEVICE_STATUS
... )
>>>
>>> # Stop all periodic requests for device
>>> await manager.stop_periodic_requests(device)

nwp500.mqtt.reconnection module

MQTT reconnection handler for Navien Smart Control.

This module handles automatic reconnection with exponential backoff when the MQTT connection is interrupted.

class nwp500.mqtt.reconnection.MqttReconnectionHandler(config: MqttConnectionConfig, is_connected_func: Callable[[], bool], schedule_coroutine_func: Callable[[Any], None], reconnect_func: Callable[[], Awaitable[None]], deep_reconnect_func: Callable[[], Awaitable[None]] | None = None, emit_event_func: Callable[..., Awaitable[Any]] | None = None)[source]

Bases: object

Handles automatic reconnection logic with exponential backoff.

This class manages reconnection attempts when the MQTT connection is interrupted, implementing exponential backoff and configurable retry limits.

property attempt_count: int

Get the number of reconnection attempts made.

async cancel() None[source]

Cancel any pending reconnection task.

disable() None[source]

Disable automatic reconnection (e.g., for manual disconnect).

enable() None[source]

Enable automatic reconnection.

property is_reconnecting: bool

Check if currently attempting to reconnect.

on_connection_interrupted(error: Exception) None[source]

Handle connection interruption.

Parameters:

error – Error that caused the interruption

on_connection_resumed(return_code: Any, session_present: Any) None[source]

Handle connection resumption.

Parameters:
  • return_code – MQTT return code

  • session_present – Whether session was present

reset() None[source]

Reset reconnection state and enable reconnection.

reset_attempts() None[source]

Reset the reconnection attempt counter.

nwp500.mqtt.state_tracker module

Per-device state change detection for Navien MQTT clients.

Compares successive DeviceStatus snapshots for each device and emits granular events when individual fields change (temperature, mode, power, errors).

class nwp500.mqtt.state_tracker.DeviceStateTracker(event_emitter: EventEmitter)[source]

Bases: object

Tracks previous device states and emits change events.

Each device (identified by MAC address) gets its own slot in _previous_status. On every new status update, this class compares it against the stored snapshot and emits events for changed fields, then stores the new snapshot.

clear() None[source]

Drop all stored snapshots (call on disconnect).

async process(device_mac: str, status: DeviceStatus) None[source]

Compare status with the previous snapshot for device_mac.

Emits the following events when values change:

  • temperature_changed(TemperatureChangedEvent(...))

  • mode_changed(ModeChangedEvent(...))

  • power_changed(PowerChangedEvent(...))

  • heating_started(HeatingStartedEvent(...))

  • heating_stopped(HeatingStoppedEvent(...))

  • error_detected(ErrorDetectedEvent(...))

  • error_cleared(ErrorClearedEvent(...))

Parameters:
  • device_mac – MAC address used as the per-device key.

  • status – Freshly received DeviceStatus.

nwp500.mqtt.subscriptions module

MQTT Subscription Management for Navien devices.

This module handles all subscription-related operations including: - Low-level subscribe/unsubscribe operations - Topic pattern matching with MQTT wildcards - Message routing and handler management - Typed subscriptions (status, feature, energy) - State change detection and event emission

class nwp500.mqtt.subscriptions.MqttSubscriptionManager(connection: Any, client_id: str, event_emitter: EventEmitter, schedule_coroutine: Callable[[Any], None], device_info_cache: MqttDeviceInfoCache | None = None)[source]

Bases: object

Manages MQTT subscriptions, topic matching, and message routing.

Handles: - Subscribe/unsubscribe to MQTT topics - Topic pattern matching with wildcards (+ and #) - Message handler registration and invocation - Typed subscriptions with automatic parsing - State change detection and event emission

clear_subscriptions() None[source]

Clear all subscription tracking (called on disconnect).

async resubscribe_all() None[source]

Re-establish all subscriptions after a connection rebuild.

This method is called after a deep reconnection to restore all active subscriptions. It uses the stored subscription information to re-subscribe to all topics with their original QoS settings and handlers.

Note

This is typically called automatically during deep reconnection and should not need to be called manually.

Raises:
async subscribe(topic: str, callback: Callable[[str, dict[str, Any]], None], qos: QoS = QoS.AT_LEAST_ONCE) int[source]

Subscribe to an MQTT topic.

Parameters:
  • topic – MQTT topic to subscribe to (can include wildcards)

  • callback – Function to call when messages arrive (topic, message)

  • qos – Quality of Service level

Returns:

Subscription packet ID

Raises:
async subscribe_device(device: Device, callback: Callable[[str, dict[str, Any]], None]) int[source]

Subscribe to all messages from a specific device.

Parameters:
  • device – Device object

  • callback – Message handler

Returns:

Subscription packet ID

async subscribe_device_feature(device: Device, callback: Callable[[DeviceFeature], None]) int[source]

Subscribe to device feature/info messages with automatic parsing.

async subscribe_device_status(device: Device, callback: Callable[[DeviceStatus], None]) int[source]

Subscribe to device status messages with automatic parsing.

async subscribe_energy_usage(device: Device, callback: Callable[[EnergyUsageResponse], None]) int[source]

Subscribe to energy usage responses with automatic parsing.

async subscribe_recirculation_schedule_response(device: Device, callback: Callable[[RecirculationSchedule], None]) int[source]

Subscribe to recirculation schedule read responses.

Subscribes to the recirc-rsv/rd response topic for the given device. The callback receives a RecirculationSchedule whenever the device responds to a recirculation schedule read request.

Parameters:
  • device – Device whose recirculation schedule responses to receive.

  • callback – Called with the parsed schedule on each response.

Returns:

Publish packet ID from the MQTT subscribe call.

async subscribe_reservation_response(device: Device, callback: Callable[[ReservationSchedule], None]) int[source]

Subscribe to reservation read responses with automatic parsing.

Subscribes to the rsv/rd response topic for the given device. The callback receives a fully-parsed ReservationSchedule whenever the device responds to a reservation read request.

Parameters:
  • device – Device whose reservation responses to receive.

  • callback – Called with the parsed schedule on each response.

Returns:

Publish packet ID from the MQTT subscribe call.

async subscribe_tou_response(device: Device, callback: Callable[[TOUReservationSchedule], None]) int[source]

Subscribe to Time-of-Use schedule read responses with automatic parsing.

Subscribes to the tou/rd response topic for the given device. The callback receives a fully-parsed TOUReservationSchedule whenever the device responds to a TOU read or configure request (triggered by request_tou_settings() or configure_tou_schedule()).

Parameters:
  • device – Device whose TOU responses to receive.

  • callback – Called with the parsed schedule on each response.

Returns:

Publish packet ID from the MQTT subscribe call.

async subscribe_weekly_reservation_response(device: Device, callback: Callable[[WeeklyReservationSchedule], None]) int[source]

Subscribe to weekly reservation read responses.

Subscribes to the rsv-weekly/rd response topic for the given device. The callback receives a WeeklyReservationSchedule whenever the device responds to a weekly reservation read request.

Parameters:
  • device – Device whose weekly reservation responses to receive.

  • callback – Called with the parsed schedule on each response.

Returns:

Publish packet ID from the MQTT subscribe call.

property subscriptions: dict[str, QoS]

Get current subscriptions.

async unsubscribe(topic: str, callback: Callable[[str, dict[str, Any]], None] | None = None) int[source]

Unsubscribe from an MQTT topic.

If a callback is provided, only that specific handler is removed. The underlying MQTT unsubscribe from the broker is only performed if no handlers remain for the topic.

If no callback is provided, all handlers are removed and the broker is unsubscribed immediately.

Parameters:
  • topic – MQTT topic to unsubscribe from

  • callback – Optional specific handler to remove

Returns:

Unsubscribe packet ID (or 0 if no broker call was made)

Raises:
async unsubscribe_device_feature(device: Device, callback: Callable[[DeviceFeature], None]) None[source]

Unsubscribe a specific device feature callback.

async unsubscribe_device_status(device: Device, callback: Callable[[DeviceStatus], None]) None[source]

Unsubscribe a specific device status callback.

async unsubscribe_energy_usage(device: Device, callback: Callable[[EnergyUsageResponse], None]) None[source]

Unsubscribe a specific energy usage callback.

async unsubscribe_recirculation_schedule_response(device: Device, callback: Callable[[RecirculationSchedule], None]) None[source]

Unsubscribe a specific recirculation schedule callback.

async unsubscribe_reservation_response(device: Device, callback: Callable[[ReservationSchedule], None]) None[source]

Unsubscribe a specific reservation response callback.

async unsubscribe_tou_response(device: Device, callback: Callable[[TOUReservationSchedule], None]) None[source]

Unsubscribe a specific TOU response callback.

async unsubscribe_weekly_reservation_response(device: Device, callback: Callable[[WeeklyReservationSchedule], None]) None[source]

Unsubscribe a specific weekly reservation callback.

update_connection(connection: Any) None[source]

Update the MQTT connection reference.

This is used when the connection is recreated (e.g., after reconnection) to update the internal reference while preserving subscriptions.

Parameters:

connection – New MQTT connection object

Note

This does not re-establish subscriptions. Call the appropriate subscribe methods to re-register subscriptions with the new connection if needed.

nwp500.mqtt.utils module

MQTT utility functions and data structures for Navien Smart Control.

This module provides utility functions for redacting sensitive information, configuration classes, and common data structures used across MQTT modules.

class nwp500.mqtt.utils.MqttConnectionConfig(endpoint: str = 'a1t30mldyslmuq-ats.iot.us-east-1.amazonaws.com', region: str = 'us-east-1', client_id: str | None = None, clean_session: bool = True, keep_alive_secs: int = 1200, auto_reconnect: bool = True, max_reconnect_attempts: int = -1, initial_reconnect_delay: float = 1.0, max_reconnect_delay: float = 120.0, reconnect_backoff_multiplier: float = 2.0, deep_reconnect_threshold: int = 10, enable_command_queue: bool = True, max_queued_commands: int = 100)[source]

Bases: object

Configuration for MQTT connection.

endpoint

AWS IoT endpoint URL

Type:

str

region

AWS region

Type:

str

client_id

MQTT client ID (auto-generated if None)

Type:

str | None

clean_session

Whether to start with a clean session

Type:

bool

keep_alive_secs

Keep-alive interval in seconds

Type:

int

auto_reconnect

Enable automatic reconnection

Type:

bool

max_reconnect_attempts

Maximum reconnection attempts (-1 for unlimited)

Type:

int

initial_reconnect_delay

Initial delay between reconnect attempts

Type:

float

max_reconnect_delay

Maximum delay between reconnect attempts

Type:

float

reconnect_backoff_multiplier

Exponential backoff multiplier

Type:

float

deep_reconnect_threshold

Attempt count to trigger full connection rebuild

Type:

int

enable_command_queue

Enable command queueing when disconnected

Type:

bool

max_queued_commands

Maximum number of queued commands

Type:

int

auto_reconnect: bool = True
clean_session: bool = True
client_id: str | None = None
deep_reconnect_threshold: int = 10
enable_command_queue: bool = True
endpoint: str = 'a1t30mldyslmuq-ats.iot.us-east-1.amazonaws.com'
initial_reconnect_delay: float = 1.0
keep_alive_secs: int = 1200
max_queued_commands: int = 100
max_reconnect_attempts: int = -1
max_reconnect_delay: float = 120.0
reconnect_backoff_multiplier: float = 2.0
region: str = 'us-east-1'
class nwp500.mqtt.utils.PeriodicRequestType(*values)[source]

Bases: Enum

Types of periodic requests that can be sent.

DEVICE_INFO

Request device information periodically

DEVICE_STATUS

Request device status periodically

DEVICE_INFO = 'device_info'
DEVICE_STATUS = 'device_status'
class nwp500.mqtt.utils.QueuedCommand(topic: str, payload: dict[str, Any], qos: QoS, timestamp: datetime)[source]

Bases: object

Represents a command that is queued for sending when reconnected.

topic

MQTT topic to publish to

Type:

str

payload

Command payload dictionary

Type:

dict[str, Any]

qos

Quality of Service level

Type:

awscrt.mqtt.QoS

timestamp

Time when command was queued

Type:

datetime.datetime

payload: dict[str, Any]
qos: QoS
timestamp: datetime
topic: str
nwp500.mqtt.utils.get_response_data(message: dict[str, Any], key: str | None) Any[source]

Extract data from an MQTT message, supporting key variants.

Checks both the nested response dict and the top-level message, using both the primary key and its alternate short-form name (e.g. "status" / "st", "feature" / "did"). Lookup order preserves a strict nested-first precedence:

  1. response[key]

  2. response[alt_key]

  3. message[key]

  4. message[alt_key]

Key presence is checked explicitly (not by truthiness), so falsy values like 0, False, or {} are returned correctly and do not fall through to a lower-precedence candidate.

Parameters:
  • message – Raw MQTT message dict.

  • key – Primary key to look up. When None, the nested response dict is returned directly.

Returns:

The value of the first present key in priority order, or None if no candidate key is found.

nwp500.mqtt.utils.redact(obj: Any, keys_to_redact: set[str] | None = None) Any[source]

Return a redacted copy of obj with sensitive keys masked.

This is a lightweight sanitizer for log messages to avoid emitting secrets such as access keys, session tokens, passwords, emails, clientIDs and sessionIDs.

Parameters:
  • obj – Object to redact (dict, list, tuple, or primitive)

  • keys_to_redact – Set of key names to redact (uses defaults if None)

Returns:

Redacted copy of the object

nwp500.mqtt.utils.redact_mac(mac: str | None) str[source]

Mask a MAC address or device ID for safe logging.

Parameters:

mac – The MAC address or device ID to redact (e.g., ‘navilink-0123456789ab’)

Returns:

A redacted string like ‘navilink-01…89ab’ or ‘<REDACTED>’

nwp500.mqtt.utils.redact_serial(serial: str | None) str[source]

Mask a serial number for safe logging.

Parameters:

serial – Serial number to redact

Returns:

Redacted serial like ‘AB…1234’

nwp500.mqtt.utils.redact_topic(topic: str) str[source]

Redact sensitive information from MQTT topic strings.

Topics often contain MAC addresses or device unique identifiers, e.g.: - cmd/52/navilink-04786332fca0/st/did - cmd/52/navilink-04786332fca0/ctrl - cmd/52/04786332fca0/ctrl - or with colons/hyphens (04:78:63:32:fc:a0 or 04-78-63-32-fc-a0)

Parameters:

topic – MQTT topic string

Returns:

Topic with MAC addresses redacted

Note

Uses pre-compiled regex patterns for better performance.

nwp500.mqtt.utils.topic_matches_pattern(topic: str, pattern: str) bool[source]

Check if a topic matches a subscription pattern with wildcards.

Supports MQTT wildcards: - ‘+’ matches a single level - ‘#’ matches multiple levels (must be at end)

Parameters:
  • topic – Actual topic (e.g., “cmd/52/navilink-ABC/status”)

  • pattern – Pattern with wildcards (e.g., “cmd/52/+/#”)

Returns:

True if topic matches pattern

Examples

>>> topic_matches_pattern("cmd/52/device1/status", "cmd/52/+/status")
True
>>> topic_matches_pattern(
...     "cmd/52/device1/status/extra", "cmd/52/device1/#"
... )
True

Module contents

MQTT package for Navien device communication.

This package provides MQTT client functionality for real-time communication with Navien devices using AWS IoT Core.

Main exports: - NavienMqttClient: Main MQTT client class - MqttConnectionConfig: Configuration for MQTT connections - PeriodicRequestType: Enum for periodic request types - MqttDiagnosticsCollector: Metrics and diagnostics collector - MqttMetrics, ConnectionDropEvent, ConnectionEvent: Diagnostic types

class nwp500.mqtt.ConnectionDropEvent(timestamp: str, error_name: str | None = None, error_message: str | None = None, error_code: int | None = None, reconnect_attempt: int = 0, duration_connected_seconds: float | None = None, active_subscriptions: int = 0, queued_commands: int = 0)[source]

Bases: object

Record of a single connection drop event.

active_subscriptions: int = 0
duration_connected_seconds: float | None = None
error_code: int | None = None
error_message: str | None = None
error_name: str | None = None
queued_commands: int = 0
reconnect_attempt: int = 0
timestamp: str
to_dict() dict[str, Any][source]

Convert to dictionary.

class nwp500.mqtt.ConnectionEvent(timestamp: str, event_type: str, session_present: bool = False, return_code: int | None = None, attempt_number: int = 0, time_to_reconnect_seconds: float | None = None)[source]

Bases: object

Record of a connection success/resumption event.

attempt_number: int = 0
event_type: str
return_code: int | None = None
session_present: bool = False
time_to_reconnect_seconds: float | None = None
timestamp: str
to_dict() dict[str, Any][source]

Convert to dictionary.

class nwp500.mqtt.MqttConnectionConfig(endpoint: str = 'a1t30mldyslmuq-ats.iot.us-east-1.amazonaws.com', region: str = 'us-east-1', client_id: str | None = None, clean_session: bool = True, keep_alive_secs: int = 1200, auto_reconnect: bool = True, max_reconnect_attempts: int = -1, initial_reconnect_delay: float = 1.0, max_reconnect_delay: float = 120.0, reconnect_backoff_multiplier: float = 2.0, deep_reconnect_threshold: int = 10, enable_command_queue: bool = True, max_queued_commands: int = 100)[source]

Bases: object

Configuration for MQTT connection.

endpoint

AWS IoT endpoint URL

Type:

str

region

AWS region

Type:

str

client_id

MQTT client ID (auto-generated if None)

Type:

str | None

clean_session

Whether to start with a clean session

Type:

bool

keep_alive_secs

Keep-alive interval in seconds

Type:

int

auto_reconnect

Enable automatic reconnection

Type:

bool

max_reconnect_attempts

Maximum reconnection attempts (-1 for unlimited)

Type:

int

initial_reconnect_delay

Initial delay between reconnect attempts

Type:

float

max_reconnect_delay

Maximum delay between reconnect attempts

Type:

float

reconnect_backoff_multiplier

Exponential backoff multiplier

Type:

float

deep_reconnect_threshold

Attempt count to trigger full connection rebuild

Type:

int

enable_command_queue

Enable command queueing when disconnected

Type:

bool

max_queued_commands

Maximum number of queued commands

Type:

int

auto_reconnect: bool = True
clean_session: bool = True
client_id: str | None = None
deep_reconnect_threshold: int = 10
enable_command_queue: bool = True
endpoint: str = 'a1t30mldyslmuq-ats.iot.us-east-1.amazonaws.com'
initial_reconnect_delay: float = 1.0
keep_alive_secs: int = 1200
max_queued_commands: int = 100
max_reconnect_attempts: int = -1
max_reconnect_delay: float = 120.0
reconnect_backoff_multiplier: float = 2.0
region: str = 'us-east-1'
class nwp500.mqtt.MqttDiagnosticsCollector(max_events_retained: int = 1000, enable_verbose_logging: bool = False)[source]

Bases: object

Collects detailed diagnostics and metrics for MQTT connection analysis.

This collector tracks: - Connection drop events with error details - Connection recovery timeline - Error frequency and patterns - Session duration statistics - Network topology and timing information

For debugging: - Export logs to JSON for correlation with AWS CloudWatch - Enables continuous monitoring with configurable retention

export_json() str[source]

Export all collected diagnostics as JSON.

Returns:

JSON string suitable for storing or sending to monitoring systems

get_metrics() MqttMetrics[source]

Get current aggregate metrics.

get_recent_connections(limit: int = 10) list[ConnectionEvent][source]

Get the N most recent connection events.

get_recent_drops(limit: int = 10) list[ConnectionDropEvent][source]

Get the N most recent connection drop events.

on_connection_drop(callback: Callable[[ConnectionDropEvent], None]) None[source]

Register a callback to be invoked on each connection drop event.

Parameters:

callback – Function that receives ConnectionDropEvent

print_summary() None[source]

Print a human-readable summary of diagnostics.

async record_connection_drop(error: Exception | None = None, reconnect_attempt: int = 0, active_subscriptions: int = 0, queued_commands: int = 0) None[source]

Record a connection drop event.

Parameters:
  • error – The exception that caused the drop

  • reconnect_attempt – Which reconnection attempt this is (0 = initial)

  • active_subscriptions – Number of active subscriptions at time of drop

  • queued_commands – Number of commands in the queue

async record_connection_success(event_type: str = 'connected', session_present: bool = False, return_code: int | None = None, attempt_number: int = 0) None[source]

Record a successful connection or reconnection event.

Parameters:
  • event_type – “connected”, “resumed”, or “deep_reconnected”

  • session_present – Whether MQTT session was present

  • return_code – MQTT return code

  • attempt_number – Reconnection attempt number (0 = initial connect)

record_publish(queued: bool = False) None[source]

Record a publish/queue operation.

async update_metrics() None[source]

Update current metrics (e.g., current session uptime).

class nwp500.mqtt.MqttMetrics(total_connections: int = 0, total_disconnects: int = 0, total_connection_drops: int = 0, total_reconnect_attempts: int = 0, longest_session_seconds: float = 0.0, shortest_session_seconds: float = inf, average_session_seconds: float = 0.0, current_session_uptime_seconds: float = 0.0, connection_drops_by_error: dict[str, int]=<factory>, reconnection_attempts_distribution: dict[str, int]=<factory>, last_drop_timestamp: str | None = None, last_successful_connect_timestamp: str | None = None, connection_recovered: int = 0, messages_published: int = 0, messages_queued: int = 0)[source]

Bases: object

Aggregate metrics for MQTT connection stability.

average_session_seconds: float = 0.0
connection_drops_by_error: dict[str, int]
connection_recovered: int = 0
current_session_uptime_seconds: float = 0.0
last_drop_timestamp: str | None = None
last_successful_connect_timestamp: str | None = None
longest_session_seconds: float = 0.0
messages_published: int = 0
messages_queued: int = 0
reconnection_attempts_distribution: dict[str, int]
shortest_session_seconds: float = inf
to_dict() dict[str, Any][source]

Convert to dictionary for JSON serialization.

total_connection_drops: int = 0
total_connections: int = 0
total_disconnects: int = 0
total_reconnect_attempts: int = 0
class nwp500.mqtt.NavienMqttClient(auth_client: NavienAuthClient, config: MqttConnectionConfig | None = None, unit_system: Literal['metric', 'us_customary'] | None = None)[source]

Bases: EventEmitter

Async MQTT client for Navien device communication over AWS IoT.

This client establishes WebSocket connections to AWS IoT Core using temporary AWS credentials from the authentication API. It handles: - Connection management with automatic reconnection and exponential backoff - Topic subscriptions for device events and responses - Command publishing for device control - Message routing and callbacks - Command queuing when disconnected (sends when reconnected) - Event-driven architecture with state change detection

The client extends EventEmitter to provide an event-driven architecture: - Multiple listeners per event - State change detection (temperature_changed, mode_changed, etc.) - Async handler support - Priority-based execution

The client automatically reconnects when the connection is interrupted, using exponential backoff (default: 1s, 2s, 4s, 8s, … up to 120s). Reconnection behavior can be customized via MqttConnectionConfig.

When enabled, the command queue stores commands sent while disconnected and automatically sends them when the connection is restored. This ensures commands are not lost during temporary network interruptions.

Example (Traditional Callbacks):

>>> async with NavienAuthClient(email, password) as auth_client:
...     mqtt_client = NavienMqttClient(auth_client)
...     await mqtt_client.connect()
...
...     # Traditional callback style
...     await mqtt_client.subscribe_device_status(device, on_status)

Example (Event Emitter):

>>> from nwp500.mqtt_events import MqttClientEvents
>>> mqtt_client = NavienMqttClient(auth_client)
...
... # Type-safe event listeners with IDE autocomplete
... mqtt_client.on(
...     MqttClientEvents.TEMPERATURE_CHANGED,
...     lambda event: log_temperature(event.new_temperature),
... )
... mqtt_client.on(MqttClientEvents.TEMPERATURE_CHANGED, update_ui)
... mqtt_client.on(
...     MqttClientEvents.MODE_CHANGED, handle_mode_change
... )
...
... # One-time listener
... mqtt_client.once(MqttClientEvents.STATUS_RECEIVED, initialize)
...
... await mqtt_client.connect()
Events Emitted:

See nwp500.mqtt_events.MqttClientEvents for a complete, type-safe registry of all events with full documentation.

Key events include: - status_received: Raw status update - feature_received: Device feature/capability information - temperature_changed: DHW temperature changed - mode_changed: Operation mode changed - power_changed: Power consumption changed - heating_started: Device started heating - heating_stopped: Device stopped heating - error_detected: Device error occurred - error_cleared: Device error resolved - connection_interrupted: Connection lost - connection_resumed: Connection restored

async check_firmware_update(device: Device) int[source]

Check for available over-the-air firmware updates.

clear_command_queue() int[source]

Clear all queued commands. …

property client_id: str

Get client ID.

async commit_firmware_update(device: Device, payload: OtaCommitPayload) int[source]

Commit a previously downloaded firmware update.

async configure_recirculation_schedule(device: Device, schedule: RecirculationSchedule) int[source]

Configure the recirculation pump timed schedule.

async configure_reservation_water_program(device: Device) int[source]

Enable/configure water program reservation mode.

async configure_tou_schedule(device: Device, controller_serial_number: str, periods: Sequence[dict[str, Any]], *, enabled: bool = True) int[source]

Configure the Time-of-Use rate schedule.

async connect() bool[source]

Establish connection to AWS IoT Core.

Ensures tokens are valid before connecting and refreshes if necessary.

Returns:

True if connection successful

Raises:

Exception – If connection fails

property control: MqttDeviceController

Deprecated access to device controller.

property diagnostics: MqttDiagnosticsCollector

Get the diagnostics collector instance.

async disable_anti_legionella(device: Device) int[source]

Disable the Anti-Legionella disinfection cycle.

async disable_demand_response(device: Device) int[source]

Disable utility demand response participation.

async disable_intelligent_scheduling(device: Device) int[source]

Disable intelligent/adaptive heating mode.

async disconnect() None[source]

Disconnect from AWS IoT Core and stop all periodic tasks.

async enable_anti_legionella(device: Device, period_days: int) int[source]

Enable Anti-Legionella disinfection.

async enable_demand_response(device: Device) int[source]

Enable utility demand response participation.

async enable_intelligent_scheduling(device: Device) int[source]

Enable intelligent/adaptive heating mode.

async ensure_device_info_cached(device: Device, timeout: float = 30.0) bool[source]

Ensure device info is cached, requesting if necessary.

Called by control commands and CLI to ensure device capabilities are available before execution.

Parameters:
  • device – Device to ensure info for

  • timeout – Maximum time to wait for response (default: 30 seconds)

Returns:

True if device info was successfully cached, False on timeout

Raises:

MqttNotConnectedError – If not connected

property is_connected: bool

Check if client is connected.

property is_reconnecting: bool

Check if client is currently attempting to reconnect.

async publish(topic: str, payload: dict[str, Any], qos: QoS = QoS.AT_LEAST_ONCE) int[source]

Publish a message to an MQTT topic.

If not connected and command queue is enabled, the command will be queued and sent automatically when the connection is restored.

Parameters:
  • topic – MQTT topic to publish to

  • payload – Message payload (will be JSON-encoded)

  • qos – Quality of Service level

Returns:

Publish packet ID (or 0 if queued)

Raises:

RuntimeError – If not connected and command queue is disabled

property queued_commands_count: int

Get the number of commands currently queued.

property reconnect_attempts: int

Get the number of reconnection attempts made.

async reconnect_wifi(device: Device) int[source]

Trigger a WiFi reconnection on the device.

async recover_connection() bool[source]

Recover from authentication-related connection failures.

This method is useful when MQTT connection fails due to stale/expired authentication tokens. It refreshes the tokens and attempts to reconnect the MQTT client.

Returns:

True if recovery was successful and MQTT is reconnected, False otherwise

Raises:

Example

>>> mqtt_client = NavienMqttClient(auth_client)
>>> try:
...     await mqtt_client.connect()
... except MqttConnectionError:
...     # Connection may have failed due to stale tokens
...     if await mqtt_client.recover_connection():
...         print("Successfully recovered connection")
...     else:
...         print("Recovery failed, check logs")
async request_device_info(device: Device) int[source]

Request device information (features, firmware, etc.).

async request_device_status(device: Device) int[source]

Request general device status.

async request_energy_usage(device: Device, year: int, months: list[int]) int[source]

Request daily energy usage data for specified month(s).

async request_reservations(device: Device) int[source]

Request the current reservation program from the device.

async request_tou_settings(device: Device, controller_serial_number: str) int[source]

Request the current TOU settings from the device.

async reset_air_filter(device: Device) int[source]

Reset air filter maintenance timer.

async reset_reconnect() None[source]

Reset reconnection state and trigger a new reconnection attempt. …

async reset_wifi(device: Device) int[source]

Reset WiFi settings to factory defaults.

async run_smart_diagnostic(device: Device) int[source]

Trigger the smart diagnostic routine on the device.

property session_id: str

Get session ID.

async set_dhw_mode(device: Device, mode_id: int, vacation_days: int | None = None) int[source]

Set DHW operation mode.

async set_dhw_temperature(device: Device, temperature: float) int[source]

Set DHW target temperature in the user’s preferred unit.

async set_freeze_protection_temperature(device: Device, temperature: float) int[source]

Set the freeze protection activation temperature.

async set_power(device: Device, power_on: bool) int[source]

Turn device on or off.

async set_recirculation_mode(device: Device, mode: int) int[source]

Set recirculation pump operation mode (1-4).

async set_tou_enabled(device: Device, enabled: bool) int[source]

Enable or disable Time-of-Use optimization.

async set_vacation_days(device: Device, days: int) int[source]

Set vacation/away mode duration (1-30 days).

async signal_app_connection(device: Device) int[source]

Signal that the app has connected.

async start_periodic_requests(device: Device, request_type: PeriodicRequestType = PeriodicRequestType.DEVICE_STATUS, period_seconds: float = 300.0) None[source]

Start sending periodic requests for device information or status. …

async stop_all_periodic_tasks(_reason: str | None = None) None[source]

Stop all periodic request tasks. …

async stop_periodic_requests(device: Device, request_type: PeriodicRequestType | None = None) None[source]

Stop sending periodic requests for a device. …

async subscribe(topic: str, callback: Callable[[str, dict[str, Any]], None], qos: QoS = QoS.AT_LEAST_ONCE) int[source]

Subscribe to an MQTT topic.

Parameters:
  • topic – MQTT topic to subscribe to (can include wildcards)

  • callback – Function to call when messages arrive (topic, message)

  • qos – Quality of Service level

Returns:

Subscription packet ID

Raises:

Exception – If subscription fails

async subscribe_device(device: Device, callback: Callable[[str, dict[str, Any]], None]) int[source]

Subscribe to all messages from a specific device.

Parameters:
  • device – Device object

  • callback – Message handler

Returns:

Subscription packet ID

async subscribe_device_feature(device: Device, callback: Callable[[DeviceFeature], None]) int[source]

Subscribe to device feature/info messages with automatic parsing.

async subscribe_device_status(device: Device, callback: Callable[[DeviceStatus], None]) int[source]

Subscribe to device status messages with automatic parsing.

async subscribe_energy_usage(device: Device, callback: Callable[[EnergyUsageResponse], None]) int[source]

Subscribe to energy usage query responses with automatic parsing.

async subscribe_recirculation_schedule_response(device: Device, callback: Callable[[RecirculationSchedule], None]) int[source]

Subscribe to recirculation schedule read responses.

async subscribe_reservation_response(device: Device, callback: Callable[[ReservationSchedule], None]) int[source]

Subscribe to reservation read responses with automatic parsing.

async subscribe_tou_response(device: Device, callback: Callable[[TOUReservationSchedule], None]) int[source]

Subscribe to Time-of-Use schedule read responses with automatic parsing.

Subscribes to the tou/rd response topic for the given device. The callback receives a fully-parsed TOUReservationSchedule whenever the device responds to a TOU read or configure request (triggered by request_tou_settings() or configure_tou_schedule()).

Parameters:
  • device – Device whose TOU responses to receive.

  • callback – Called with the parsed schedule on each response.

Returns:

Publish packet ID from the MQTT subscribe call.

async subscribe_weekly_reservation_response(device: Device, callback: Callable[[WeeklyReservationSchedule], None]) int[source]

Subscribe to weekly reservation read responses.

async trigger_recirculation_hot_button(device: Device) int[source]

Manually trigger the recirculation pump hot button.

async unsubscribe(topic: str) int[source]

Unsubscribe from an MQTT topic.

Parameters:

topic – MQTT topic to unsubscribe from

Returns:

Unsubscribe packet ID

Raises:

Exception – If unsubscribe fails

async unsubscribe_device_feature(device: Device, callback: Callable[[DeviceFeature], None]) None[source]

Unsubscribe a specific device feature callback.

async unsubscribe_device_status(device: Device, callback: Callable[[DeviceStatus], None]) None[source]

Unsubscribe a specific device status callback.

async unsubscribe_energy_usage(device: Device, callback: Callable[[EnergyUsageResponse], None]) None[source]

Unsubscribe a specific energy usage callback.

async unsubscribe_recirculation_schedule_response(device: Device, callback: Callable[[RecirculationSchedule], None]) None[source]

Unsubscribe a specific recirculation schedule callback.

async unsubscribe_reservation_response(device: Device, callback: Callable[[ReservationSchedule], None]) None[source]

Unsubscribe a specific reservation response callback.

async unsubscribe_tou_response(device: Device, callback: Callable[[TOUReservationSchedule], None]) None[source]

Unsubscribe a specific TOU response callback.

async unsubscribe_weekly_reservation_response(device: Device, callback: Callable[[WeeklyReservationSchedule], None]) None[source]

Unsubscribe a specific weekly reservation callback.

async update_reservations(device: Device, reservations: Sequence[dict[str, Any]], *, enabled: bool = True) int[source]

Update programmed reservations.

async update_weekly_reservation(device: Device, schedule: WeeklyReservationSchedule) int[source]

Configure the weekly temperature reservation schedule.

class nwp500.mqtt.PeriodicRequestType(*values)[source]

Bases: Enum

Types of periodic requests that can be sent.

DEVICE_INFO

Request device information periodically

DEVICE_STATUS

Request device status periodically

DEVICE_INFO = 'device_info'
DEVICE_STATUS = 'device_status'