nwp500.mqtt package¶
Submodules¶
nwp500.mqtt.client module¶
MQTT Client for Navien Smart Control.
This module provides an MQTT client for real-time communication with Navien devices using AWS IoT Core. It handles connection, subscriptions, and message publishing for device control and monitoring.
The client uses WebSocket connections with AWS credentials obtained from the authentication flow.
Bases:
EventEmitterAsync MQTT client for Navien device communication over AWS IoT.
This client establishes WebSocket connections to AWS IoT Core using temporary AWS credentials from the authentication API. It handles: - Connection management with automatic reconnection and exponential backoff - Topic subscriptions for device events and responses - Command publishing for device control - Message routing and callbacks - Command queuing when disconnected (sends when reconnected) - Event-driven architecture with state change detection
The client extends EventEmitter to provide an event-driven architecture: - Multiple listeners per event - State change detection (temperature_changed, mode_changed, etc.) - Async handler support - Priority-based execution
The client automatically reconnects when the connection is interrupted, using exponential backoff (default: 1s, 2s, 4s, 8s, … up to 120s). Reconnection behavior can be customized via MqttConnectionConfig.
When enabled, the command queue stores commands sent while disconnected and automatically sends them when the connection is restored. This ensures commands are not lost during temporary network interruptions.
Example (Traditional Callbacks):
>>> async with NavienAuthClient(email, password) as auth_client: ... mqtt_client = NavienMqttClient(auth_client) ... await mqtt_client.connect() ... ... # Traditional callback style ... await mqtt_client.subscribe_device_status(device, on_status)
Example (Event Emitter):
>>> from nwp500.mqtt_events import MqttClientEvents >>> mqtt_client = NavienMqttClient(auth_client) ... ... # Type-safe event listeners with IDE autocomplete ... mqtt_client.on( ... MqttClientEvents.TEMPERATURE_CHANGED, ... lambda event: log_temperature(event.new_temperature), ... ) ... mqtt_client.on(MqttClientEvents.TEMPERATURE_CHANGED, update_ui) ... mqtt_client.on( ... MqttClientEvents.MODE_CHANGED, handle_mode_change ... ) ... ... # One-time listener ... mqtt_client.once(MqttClientEvents.STATUS_RECEIVED, initialize) ... ... await mqtt_client.connect()
- Events Emitted:
See
nwp500.mqtt_events.MqttClientEventsfor a complete, type-safe registry of all events with full documentation.Key events include: - status_received: Raw status update - feature_received: Device feature/capability information - temperature_changed: DHW temperature changed - mode_changed: Operation mode changed - power_changed: Power consumption changed - heating_started: Device started heating - heating_stopped: Device stopped heating - error_detected: Device error occurred - error_cleared: Device error resolved - connection_interrupted: Connection lost - connection_resumed: Connection restored
Check for available over-the-air firmware updates.
Clear all queued commands. …
Get client ID.
Commit a previously downloaded firmware update.
Configure the recirculation pump timed schedule.
Enable/configure water program reservation mode.
Configure the Time-of-Use rate schedule.
Establish connection to AWS IoT Core.
Ensures tokens are valid before connecting and refreshes if necessary.
- Returns:
True if connection successful
- Raises:
Exception – If connection fails
Deprecated access to device controller.
Get the diagnostics collector instance.
Disable the Anti-Legionella disinfection cycle.
Disable utility demand response participation.
Disable intelligent/adaptive heating mode.
Disconnect from AWS IoT Core and stop all periodic tasks.
Enable Anti-Legionella disinfection.
Enable utility demand response participation.
Enable intelligent/adaptive heating mode.
Ensure device info is cached, requesting if necessary.
Called by control commands and CLI to ensure device capabilities are available before execution.
- Parameters:
device – Device to ensure info for
timeout – Maximum time to wait for response (default: 30 seconds)
- Returns:
True if device info was successfully cached, False on timeout
- Raises:
MqttNotConnectedError – If not connected
Check if client is connected.
Check if client is currently attempting to reconnect.
Publish a message to an MQTT topic.
If not connected and command queue is enabled, the command will be queued and sent automatically when the connection is restored.
- Parameters:
topic – MQTT topic to publish to
payload – Message payload (will be JSON-encoded)
qos – Quality of Service level
- Returns:
Publish packet ID (or 0 if queued)
- Raises:
RuntimeError – If not connected and command queue is disabled
Get the number of commands currently queued.
Get the number of reconnection attempts made.
Trigger a WiFi reconnection on the device.
Recover from authentication-related connection failures.
This method is useful when MQTT connection fails due to stale/expired authentication tokens. It refreshes the tokens and attempts to reconnect the MQTT client.
- Returns:
True if recovery was successful and MQTT is reconnected, False otherwise
- Raises:
TokenRefreshError – If token refresh fails
AuthenticationError – If re-authentication fails
Example
>>> mqtt_client = NavienMqttClient(auth_client) >>> try: ... await mqtt_client.connect() ... except MqttConnectionError: ... # Connection may have failed due to stale tokens ... if await mqtt_client.recover_connection(): ... print("Successfully recovered connection") ... else: ... print("Recovery failed, check logs")
Request device information (features, firmware, etc.).
Request general device status.
Request daily energy usage data for specified month(s).
Request the current reservation program from the device.
Request the current TOU settings from the device.
Reset air filter maintenance timer.
Reset reconnection state and trigger a new reconnection attempt. …
Reset WiFi settings to factory defaults.
Trigger the smart diagnostic routine on the device.
Get session ID.
Set DHW operation mode.
Set DHW target temperature in the user’s preferred unit.
Set the freeze protection activation temperature.
Turn device on or off.
Set recirculation pump operation mode (1-4).
Enable or disable Time-of-Use optimization.
Set vacation/away mode duration (1-30 days).
Signal that the app has connected.
Start sending periodic requests for device information or status. …
Stop all periodic request tasks. …
Stop sending periodic requests for a device. …
Subscribe to an MQTT topic.
- Parameters:
topic – MQTT topic to subscribe to (can include wildcards)
callback – Function to call when messages arrive (topic, message)
qos – Quality of Service level
- Returns:
Subscription packet ID
- Raises:
Exception – If subscription fails
Subscribe to all messages from a specific device.
- Parameters:
device – Device object
callback – Message handler
- Returns:
Subscription packet ID
Subscribe to device feature/info messages with automatic parsing.
Subscribe to device status messages with automatic parsing.
Subscribe to energy usage query responses with automatic parsing.
Subscribe to recirculation schedule read responses.
Subscribe to reservation read responses with automatic parsing.
Subscribe to Time-of-Use schedule read responses with automatic parsing.
Subscribes to the
tou/rdresponse topic for the given device. The callback receives a fully-parsedTOUReservationSchedulewhenever the device responds to a TOU read or configure request (triggered byrequest_tou_settings()orconfigure_tou_schedule()).- Parameters:
device – Device whose TOU responses to receive.
callback – Called with the parsed schedule on each response.
- Returns:
Publish packet ID from the MQTT subscribe call.
Subscribe to weekly reservation read responses.
Manually trigger the recirculation pump hot button.
Unsubscribe from an MQTT topic.
- Parameters:
topic – MQTT topic to unsubscribe from
- Returns:
Unsubscribe packet ID
- Raises:
Exception – If unsubscribe fails
Unsubscribe a specific device feature callback.
Unsubscribe a specific device status callback.
Unsubscribe a specific energy usage callback.
Unsubscribe a specific recirculation schedule callback.
Unsubscribe a specific reservation response callback.
Unsubscribe a specific TOU response callback.
Unsubscribe a specific weekly reservation callback.
Update programmed reservations.
Configure the weekly temperature reservation schedule.
nwp500.mqtt.command_queue module¶
MQTT command queue management for Navien Smart Control.
This module handles queueing of commands when the MQTT connection is lost, and automatically sends them when the connection is restored.
- class nwp500.mqtt.command_queue.MqttCommandQueue(config: MqttConnectionConfig)[source]¶
Bases:
objectManages command queueing when MQTT connection is interrupted.
Commands sent while disconnected are queued and automatically sent when the connection is restored. This ensures commands are not lost during temporary network interruptions.
The queue uses asyncio.Queue with a fixed maximum size. When the queue is full, the oldest command is automatically dropped to make room for new commands (FIFO with overflow dropping).
- enqueue(topic: str, payload: dict[str, Any], qos: QoS) None[source]¶
Add a command to the queue.
If the queue is full, the oldest command is dropped to make room for the new one (FIFO with overflow dropping).
- Parameters:
topic – MQTT topic
payload – Command payload
qos – Quality of Service level
- async send_all(publish_func: Callable[[...], Any], is_connected_func: Callable[[], bool]) tuple[int, int][source]¶
Send all queued commands.
This is called automatically when connection is restored.
- Parameters:
publish_func – Async function to publish messages (topic, payload,
qos)
is_connected_func – Function to check if currently connected
- Returns:
Tuple of (sent_count, failed_count)
nwp500.mqtt.connection module¶
MQTT connection management for Navien Smart Control.
This module handles establishing and maintaining the MQTT connection to AWS IoT Core, including credential management and connection state tracking.
- class nwp500.mqtt.connection.MqttConnection(config: MqttConnectionConfig, auth_client: NavienAuthClient, on_connection_interrupted: Callable[[mqtt.Connection, AwsCrtError], None] | None = None, on_connection_resumed: Callable[[mqtt.Connection, Any, Any | None], None] | None = None)[source]¶
Bases:
objectManages MQTT connection lifecycle to AWS IoT Core.
Handles: - Connection establishment with AWS credentials - Disconnection with cleanup - Connection state tracking - AWS credentials provider creation
- async close() None[source]¶
Unconditionally close the underlying SDK connection.
Unlike
disconnect(), this method closes the connection regardless of the_connectedflag. After a connection interruption,_connectedisFalsebut the SDK connection object is still alive and its built-in auto-reconnect can still fire. Callingclose()ensures the SDK connection is fully torn down so its callbacks and auto-reconnect cannot interfere with a replacement connection.This method is safe to call multiple times or on already-closed connections.
- async connect() bool[source]¶
Establish connection to AWS IoT Core.
Ensures tokens are valid before connecting and refreshes if necessary.
- Returns:
True if connection successful
- Raises:
Exception – If connection fails
- property connection: Connection | None¶
Get the underlying MQTT connection.
- Returns:
The MQTT connection object, or None if not connected
Note
This property is provided for advanced usage. Most operations should use the higher-level methods provided by this class.
- async disconnect() None[source]¶
Disconnect from AWS IoT Core.
- Raises:
Exception – If disconnect fails
- async publish(topic: str, payload: str | dict[str, Any], qos: QoS = QoS.AT_LEAST_ONCE) int[source]¶
Publish a message to an MQTT topic.
- Parameters:
topic – MQTT topic to publish to
payload – Message payload (dict, JSON string, or bytes)
qos – Quality of Service level
- Returns:
Publish packet ID
- Raises:
RuntimeError – If not connected
asyncio.CancelledError – If operation cancelled during disconnect
- async subscribe(topic: str, qos: QoS, callback: Callable[[...], None] | None = None) tuple[Any, int][source]¶
Subscribe to an MQTT topic.
- Parameters:
topic – Topic pattern to subscribe to (supports wildcards)
qos – Quality of Service level
callback – Optional callback for received messages
- Returns:
Tuple of (subscribe_future, packet_id)
- Raises:
RuntimeError – If not connected
- async unsubscribe(topic: str) int[source]¶
Unsubscribe from an MQTT topic.
- Parameters:
topic – Topic to unsubscribe from
- Returns:
Packet ID
- Raises:
RuntimeError – If not connected
nwp500.mqtt.control module¶
MQTT Device Control Commands for Navien devices.
This module handles all device control operations including: - Status and info requests - Power control - Mode changes (DHW operation modes) - Temperature control - Anti-Legionella configuration - Reservation scheduling - Time-of-Use (TOU) configuration - Energy usage queries - App connection signaling - Demand response control - Air filter maintenance - Vacation mode configuration - Recirculation pump control and scheduling
- class nwp500.mqtt.control.MqttDeviceController(client_id: str, session_id: str, publish_func: Callable[[...], Awaitable[int]], device_info_cache: MqttDeviceInfoCache | None = None)[source]¶
Bases:
objectManages device control commands for Navien devices.
Handles all device control operations including status requests, mode changes, temperature control, scheduling, and energy queries.
This controller integrates with MqttDeviceCapabilityChecker to validate device capabilities before executing commands. Use check_support() or assert_support() methods to verify feature availability based on device capabilities before attempting to execute commands:
Example
>>> controller.assert_support("recirculation_mode", device_features) >>> # Will raise DeviceCapabilityError if not supported >>> msg_id = await controller.set_recirculation_mode(device, mode)
- assert_support(feature: str, device_features: DeviceFeature) None[source]¶
Assert that device supports a controllable feature.
- Parameters:
feature – Name of the controllable feature
device_features – Device feature information
- Raises:
DeviceCapabilityError – If feature is not supported
ValueError – If feature is not recognized
- async check_firmware_update(device: Device) int[source]¶
Check for available over-the-air firmware updates.
Sends the OTA_CHECK command (33554443) to query whether a firmware update is available. The device responds on the control ack topic.
- Parameters:
device – Device to check for updates
- Returns:
Publish packet ID
- check_support(feature: str, device_features: DeviceFeature) bool[source]¶
Check if device supports a controllable feature.
- Parameters:
feature – Name of the controllable feature
device_features – Device feature information
- Returns:
True if feature is supported, False otherwise
- Raises:
ValueError – If feature is not recognized
- async commit_firmware_update(device: Device, payload: OtaCommitPayload) int[source]¶
Commit a previously downloaded firmware update.
Sends the OTA_COMMIT command (33554442) with a special
commitOtastructure (not the standard mode/param format).- Parameters:
device – Device to update
payload – OTA commit payload specifying which firmware component and version to commit.
- Returns:
Publish packet ID
- async configure_recirculation_schedule(device: Device, schedule: RecirculationSchedule) int[source]¶
Configure the recirculation pump timed schedule.
- Parameters:
device – Device to configure
schedule – Recirculation schedule with one or more time window entries
- Returns:
Publish packet ID
- async configure_reservation_water_program(device: Device) int[source]¶
Enable/configure water program reservation mode.
- async configure_tou_schedule(device: Device, controller_serial_number: str, periods: Sequence[dict[str, Any]], *, enabled: bool = True) int[source]¶
Configure Time-of-Use pricing schedule via MQTT.
- Parameters:
device – Device object
controller_serial_number – Controller serial number
periods – List of TOU period definitions
enabled – Whether TOU is enabled (default: True)
- Returns:
Publish packet ID
- Raises:
ValueError – If controller_serial_number is empty or periods is empty
- property device_info_cache: MqttDeviceInfoCache¶
Get the device info cache.
- async disable_anti_legionella(device: Device) int[source]¶
Disable the Anti-Legionella disinfection cycle.
- async disable_demand_response(device: Device) int[source]¶
Disable utility demand response participation.
- async disable_intelligent_scheduling(device: Device) int[source]¶
Disable intelligent/adaptive heating mode.
Sends the RESERVATION_INTELLIGENT_OFF command (33554467).
- Parameters:
device – Device to configure
- Returns:
Publish packet ID
- async enable_anti_legionella(device: Device, period_days: int) int[source]¶
Enable Anti-Legionella disinfection.
- async enable_demand_response(device: Device) int[source]¶
Enable utility demand response participation.
- async enable_intelligent_scheduling(device: Device) int[source]¶
Enable intelligent/adaptive heating mode.
Sends the RESERVATION_INTELLIGENT_ON command (33554468). In this mode the device learns usage patterns and pre-heats water proactively to reduce energy consumption.
- Parameters:
device – Device to configure
- Returns:
Publish packet ID
- async reconnect_wifi(device: Device) int[source]¶
Trigger a WiFi reconnection on the device.
Sends the WIFI_RECONNECT command (33554446). Useful when the device has lost its WiFi connection and needs to re-associate.
- Parameters:
device – Device to reconnect
- Returns:
Publish packet ID
- async request_device_info(device: Device) int[source]¶
Request device information (features, firmware, etc.).
- Parameters:
device – Device object
- Returns:
Publish packet ID
- async request_device_status(device: Device) int[source]¶
Request general device status.
- Parameters:
device – Device object
- Returns:
Publish packet ID
- async request_energy_usage(device: Device, year: int, months: list[int]) int[source]¶
Request daily energy usage data for specified month(s).
This retrieves historical energy usage data showing heat pump and electric heating element consumption broken down by day. The response includes both energy usage (Wh) and operating time (hours) for each component.
- Parameters:
device – Device object
year – Year to query (e.g., 2025)
months – List of months to query (1-12). Can request multiple months.
- Returns:
Publish packet ID
Example:
# Request energy usage for September 2025 await controller.request_energy_usage( device, year=2025, months=[9] ) # Request multiple months await controller.request_energy_usage( device, year=2025, months=[7, 8, 9] )
- async request_reservations(device: Device) int[source]¶
Request the current reservation program from the device.
- Parameters:
device – Device object
- Returns:
Publish packet ID
- async request_tou_settings(device: Device, controller_serial_number: str) int[source]¶
Request current Time-of-Use schedule from the device.
- Parameters:
device – Device object
controller_serial_number – Controller serial number
- Returns:
Publish packet ID
- Raises:
ValueError – If controller_serial_number is empty
- async reset_wifi(device: Device) int[source]¶
Reset WiFi settings to factory defaults.
Sends the WIFI_RESET command (33554447). This will clear stored WiFi credentials and require re-provisioning the device.
Warning
This operation clears all stored WiFi credentials. The device will need to be re-provisioned to reconnect to the network.
- Parameters:
device – Device to reset
- Returns:
Publish packet ID
- async run_smart_diagnostic(device: Device) int[source]¶
Trigger the smart diagnostic routine on the device.
Sends the SMART_DIAGNOSTIC command (33554455). The diagnostic result is reflected in the
smart_diagnosticfield of the nextDeviceStatusupdate.- Parameters:
device – Device to diagnose
- Returns:
Publish packet ID
- async set_dhw_mode(device: Device, mode_id: int, vacation_days: int | None = None) int[source]¶
Set DHW operation mode.
- async set_dhw_temperature(device: Device, temperature: float) int[source]¶
Set DHW target temperature.
Temperature is in the user’s preferred unit (Celsius or Fahrenheit) based on the global unit system context.
- set_ensure_device_info_callback(callback: Callable[[Device], Awaitable[bool]] | None) None[source]¶
Set the callback for ensuring device info is cached.
- async set_freeze_protection_temperature(device: Device, temperature: float) int[source]¶
Set the freeze protection activation temperature.
Sends the FREZ_TEMP command (33554451). The device activates freeze protection heating when the ambient temperature drops below this threshold.
- Parameters:
device – Device to configure
temperature – Activation temperature in the user’s preferred unit (°C if unit system is metric, °F otherwise). Valid range: 35–45°F (1.7–7.2°C).
- Returns:
Publish packet ID
- async set_recirculation_mode(device: Device, mode: int) int[source]¶
Set recirculation pump operation mode (1-4).
- async set_vacation_days(device: Device, days: int) int[source]¶
Set vacation/away mode duration (1-30 days).
- async trigger_recirculation_hot_button(device: Device) int[source]¶
Manually trigger the recirculation pump hot button.
- async update_reservations(device: Device, reservations: Sequence[dict[str, Any]], *, enabled: bool = True) int[source]¶
Update programmed reservations for temperature/mode changes.
- Parameters:
device – Device object
reservations – List of reservation entries
enabled – Whether reservations are enabled (default: True)
- Returns:
Publish packet ID
- async update_weekly_reservation(device: Device, schedule: WeeklyReservationSchedule) int[source]¶
Configure the weekly temperature reservation schedule.
Sends the complete weekly schedule to the device using command code RESERVATION_WEEKLY (33554438).
- Parameters:
device – Device to configure
schedule – Weekly reservation schedule with entries for each time slot
- Returns:
Publish packet ID
nwp500.mqtt.diagnostics module¶
MQTT diagnostics and telemetry collection.
This module provides detailed diagnostics and metrics collection for MQTT connection stability analysis, helping to identify whether connection drops are caused by: - Network/environmental issues (intermittent connectivity, NAT timeouts) - AWS server-side limits (connection lifetime, message rate limits) - Client-side configuration issues (insufficient keep-alive, poor backoff)
- class nwp500.mqtt.diagnostics.ConnectionDropEvent(timestamp: str, error_name: str | None = None, error_message: str | None = None, error_code: int | None = None, reconnect_attempt: int = 0, duration_connected_seconds: float | None = None, active_subscriptions: int = 0, queued_commands: int = 0)[source]¶
Bases:
objectRecord of a single connection drop event.
- class nwp500.mqtt.diagnostics.ConnectionEvent(timestamp: str, event_type: str, session_present: bool = False, return_code: int | None = None, attempt_number: int = 0, time_to_reconnect_seconds: float | None = None)[source]¶
Bases:
objectRecord of a connection success/resumption event.
- class nwp500.mqtt.diagnostics.MqttDiagnosticsCollector(max_events_retained: int = 1000, enable_verbose_logging: bool = False)[source]¶
Bases:
objectCollects detailed diagnostics and metrics for MQTT connection analysis.
This collector tracks: - Connection drop events with error details - Connection recovery timeline - Error frequency and patterns - Session duration statistics - Network topology and timing information
For debugging: - Export logs to JSON for correlation with AWS CloudWatch - Enables continuous monitoring with configurable retention
- export_json() str[source]¶
Export all collected diagnostics as JSON.
- Returns:
JSON string suitable for storing or sending to monitoring systems
- get_metrics() MqttMetrics[source]¶
Get current aggregate metrics.
- get_recent_connections(limit: int = 10) list[ConnectionEvent][source]¶
Get the N most recent connection events.
- get_recent_drops(limit: int = 10) list[ConnectionDropEvent][source]¶
Get the N most recent connection drop events.
- on_connection_drop(callback: Callable[[ConnectionDropEvent], None]) None[source]¶
Register a callback to be invoked on each connection drop event.
- Parameters:
callback – Function that receives ConnectionDropEvent
- async record_connection_drop(error: Exception | None = None, reconnect_attempt: int = 0, active_subscriptions: int = 0, queued_commands: int = 0) None[source]¶
Record a connection drop event.
- Parameters:
error – The exception that caused the drop
reconnect_attempt – Which reconnection attempt this is (0 = initial)
active_subscriptions – Number of active subscriptions at time of drop
queued_commands – Number of commands in the queue
- async record_connection_success(event_type: str = 'connected', session_present: bool = False, return_code: int | None = None, attempt_number: int = 0) None[source]¶
Record a successful connection or reconnection event.
- Parameters:
event_type – “connected”, “resumed”, or “deep_reconnected”
session_present – Whether MQTT session was present
return_code – MQTT return code
attempt_number – Reconnection attempt number (0 = initial connect)
- class nwp500.mqtt.diagnostics.MqttMetrics(total_connections: int = 0, total_disconnects: int = 0, total_connection_drops: int = 0, total_reconnect_attempts: int = 0, longest_session_seconds: float = 0.0, shortest_session_seconds: float = inf, average_session_seconds: float = 0.0, current_session_uptime_seconds: float = 0.0, connection_drops_by_error: dict[str, int]=<factory>, reconnection_attempts_distribution: dict[str, int]=<factory>, last_drop_timestamp: str | None = None, last_successful_connect_timestamp: str | None = None, connection_recovered: int = 0, messages_published: int = 0, messages_queued: int = 0)[source]¶
Bases:
objectAggregate metrics for MQTT connection stability.
nwp500.mqtt.periodic module¶
MQTT Periodic Request Manager for Navien devices.
This module handles periodic/scheduled requests to keep device information and status up-to-date. Features include: - Configurable request intervals - Automatic skip when disconnected - Graceful task cancellation - Per-device, per-type task management
- class nwp500.mqtt.periodic.MqttPeriodicRequestManager(is_connected_func: Callable[[], bool], request_device_info_func: Callable[[...], Any], request_device_status_func: Callable[[...], Any])[source]¶
Bases:
objectManages periodic requests for device information and status.
Features: - Independent tasks per device and request type - Automatic skip when disconnected (with throttled logging) - Graceful cancellation on disconnect - Error recovery and retry
- async start_periodic_device_info_requests(device: Device, period_seconds: float = 300.0) None[source]¶
Start sending periodic device info requests.
This is a convenience wrapper around start_periodic_requests().
- Parameters:
device – Device object
period_seconds – Time between requests in seconds (default: 300 = 5
minutes)
- async start_periodic_device_status_requests(device: Device, period_seconds: float = 300.0) None[source]¶
Start sending periodic device status requests.
This is a convenience wrapper around start_periodic_requests().
- Parameters:
device – Device object
period_seconds – Time between requests in seconds (default: 300 = 5
minutes)
- async start_periodic_requests(device: Device, request_type: PeriodicRequestType = PeriodicRequestType.DEVICE_STATUS, period_seconds: float = 300.0) None[source]¶
Start sending periodic requests for device information or status.
This optional helper continuously sends requests at a specified interval. It can be used to keep device information or status up-to-date.
- Parameters:
device – Device object
request_type – Type of request (DEVICE_INFO or DEVICE_STATUS)
period_seconds – Time between requests in seconds (default: 300 = 5
minutes)
Example
>>> # Start periodic status requests (default) >>> await manager.start_periodic_requests(device) >>> >>> # Start periodic device info requests >>> await manager.start_periodic_requests( ... device, ... request_type=PeriodicRequestType.DEVICE_INFO ... ) >>> >>> # Custom period: request every 60 seconds >>> await manager.start_periodic_requests( ... device, ... period_seconds=60 ... )
Note
Only one periodic task per request type per device
Call stop_periodic_requests() to stop a task
All tasks automatically stop when client disconnects
- async stop_all_periodic_tasks(reason: str | None = None) None[source]¶
Stop all periodic request tasks.
This is automatically called when disconnecting.
- Parameters:
reason – Optional reason for logging context (e.g., “connection
failure")
Example
>>> await manager.stop_all_periodic_tasks() >>> await manager.stop_all_periodic_tasks(reason="disconnect")
- async stop_periodic_device_info_requests(device: Device) None[source]¶
Stop sending periodic device info requests for a device.
This is a convenience wrapper around stop_periodic_requests().
- Parameters:
device – Device object
- async stop_periodic_device_status_requests(device: Device) None[source]¶
Stop sending periodic device status requests for a device.
This is a convenience wrapper around stop_periodic_requests().
- Parameters:
device – Device object
- async stop_periodic_requests(device: Device, request_type: PeriodicRequestType | None = None) None[source]¶
Stop sending periodic requests for a device.
- Parameters:
device – Device object
request_type – Type of request to stop. If None, stops all types for this device.
Example
>>> # Stop specific request type >>> await manager.stop_periodic_requests( ... device, ... PeriodicRequestType.DEVICE_STATUS ... ) >>> >>> # Stop all periodic requests for device >>> await manager.stop_periodic_requests(device)
nwp500.mqtt.reconnection module¶
MQTT reconnection handler for Navien Smart Control.
This module handles automatic reconnection with exponential backoff when the MQTT connection is interrupted.
- class nwp500.mqtt.reconnection.MqttReconnectionHandler(config: MqttConnectionConfig, is_connected_func: Callable[[], bool], schedule_coroutine_func: Callable[[Any], None], reconnect_func: Callable[[], Awaitable[None]], deep_reconnect_func: Callable[[], Awaitable[None]] | None = None, emit_event_func: Callable[..., Awaitable[Any]] | None = None)[source]¶
Bases:
objectHandles automatic reconnection logic with exponential backoff.
This class manages reconnection attempts when the MQTT connection is interrupted, implementing exponential backoff and configurable retry limits.
- on_connection_interrupted(error: Exception) None[source]¶
Handle connection interruption.
- Parameters:
error – Error that caused the interruption
nwp500.mqtt.state_tracker module¶
Per-device state change detection for Navien MQTT clients.
Compares successive DeviceStatus snapshots for each device and emits
granular events when individual fields change (temperature, mode, power,
errors).
- class nwp500.mqtt.state_tracker.DeviceStateTracker(event_emitter: EventEmitter)[source]¶
Bases:
objectTracks previous device states and emits change events.
Each device (identified by MAC address) gets its own slot in
_previous_status. On every new status update, this class compares it against the stored snapshot and emits events for changed fields, then stores the new snapshot.- async process(device_mac: str, status: DeviceStatus) None[source]¶
Compare status with the previous snapshot for device_mac.
Emits the following events when values change:
temperature_changed(TemperatureChangedEvent(...))mode_changed(ModeChangedEvent(...))power_changed(PowerChangedEvent(...))heating_started(HeatingStartedEvent(...))heating_stopped(HeatingStoppedEvent(...))error_detected(ErrorDetectedEvent(...))error_cleared(ErrorClearedEvent(...))
- Parameters:
device_mac – MAC address used as the per-device key.
status – Freshly received
DeviceStatus.
nwp500.mqtt.subscriptions module¶
MQTT Subscription Management for Navien devices.
This module handles all subscription-related operations including: - Low-level subscribe/unsubscribe operations - Topic pattern matching with MQTT wildcards - Message routing and handler management - Typed subscriptions (status, feature, energy) - State change detection and event emission
- class nwp500.mqtt.subscriptions.MqttSubscriptionManager(connection: Any, client_id: str, event_emitter: EventEmitter, schedule_coroutine: Callable[[Any], None], device_info_cache: MqttDeviceInfoCache | None = None)[source]¶
Bases:
objectManages MQTT subscriptions, topic matching, and message routing.
Handles: - Subscribe/unsubscribe to MQTT topics - Topic pattern matching with wildcards (+ and #) - Message handler registration and invocation - Typed subscriptions with automatic parsing - State change detection and event emission
- async resubscribe_all() None[source]¶
Re-establish all subscriptions after a connection rebuild.
This method is called after a deep reconnection to restore all active subscriptions. It uses the stored subscription information to re-subscribe to all topics with their original QoS settings and handlers.
Note
This is typically called automatically during deep reconnection and should not need to be called manually.
- Raises:
RuntimeError – If not connected to MQTT broker
Exception – If any subscription fails
- async subscribe(topic: str, callback: Callable[[str, dict[str, Any]], None], qos: QoS = QoS.AT_LEAST_ONCE) int[source]¶
Subscribe to an MQTT topic.
- Parameters:
topic – MQTT topic to subscribe to (can include wildcards)
callback – Function to call when messages arrive (topic, message)
qos – Quality of Service level
- Returns:
Subscription packet ID
- Raises:
RuntimeError – If not connected to MQTT broker
Exception – If subscription fails
- async subscribe_device(device: Device, callback: Callable[[str, dict[str, Any]], None]) int[source]¶
Subscribe to all messages from a specific device.
- Parameters:
device – Device object
callback – Message handler
- Returns:
Subscription packet ID
- async subscribe_device_feature(device: Device, callback: Callable[[DeviceFeature], None]) int[source]¶
Subscribe to device feature/info messages with automatic parsing.
- async subscribe_device_status(device: Device, callback: Callable[[DeviceStatus], None]) int[source]¶
Subscribe to device status messages with automatic parsing.
- async subscribe_energy_usage(device: Device, callback: Callable[[EnergyUsageResponse], None]) int[source]¶
Subscribe to energy usage responses with automatic parsing.
- async subscribe_recirculation_schedule_response(device: Device, callback: Callable[[RecirculationSchedule], None]) int[source]¶
Subscribe to recirculation schedule read responses.
Subscribes to the
recirc-rsv/rdresponse topic for the given device. The callback receives aRecirculationSchedulewhenever the device responds to a recirculation schedule read request.- Parameters:
device – Device whose recirculation schedule responses to receive.
callback – Called with the parsed schedule on each response.
- Returns:
Publish packet ID from the MQTT subscribe call.
- async subscribe_reservation_response(device: Device, callback: Callable[[ReservationSchedule], None]) int[source]¶
Subscribe to reservation read responses with automatic parsing.
Subscribes to the
rsv/rdresponse topic for the given device. The callback receives a fully-parsedReservationSchedulewhenever the device responds to a reservation read request.- Parameters:
device – Device whose reservation responses to receive.
callback – Called with the parsed schedule on each response.
- Returns:
Publish packet ID from the MQTT subscribe call.
- async subscribe_tou_response(device: Device, callback: Callable[[TOUReservationSchedule], None]) int[source]¶
Subscribe to Time-of-Use schedule read responses with automatic parsing.
Subscribes to the
tou/rdresponse topic for the given device. The callback receives a fully-parsedTOUReservationSchedulewhenever the device responds to a TOU read or configure request (triggered byrequest_tou_settings()orconfigure_tou_schedule()).- Parameters:
device – Device whose TOU responses to receive.
callback – Called with the parsed schedule on each response.
- Returns:
Publish packet ID from the MQTT subscribe call.
- async subscribe_weekly_reservation_response(device: Device, callback: Callable[[WeeklyReservationSchedule], None]) int[source]¶
Subscribe to weekly reservation read responses.
Subscribes to the
rsv-weekly/rdresponse topic for the given device. The callback receives aWeeklyReservationSchedulewhenever the device responds to a weekly reservation read request.- Parameters:
device – Device whose weekly reservation responses to receive.
callback – Called with the parsed schedule on each response.
- Returns:
Publish packet ID from the MQTT subscribe call.
- async unsubscribe(topic: str, callback: Callable[[str, dict[str, Any]], None] | None = None) int[source]¶
Unsubscribe from an MQTT topic.
If a callback is provided, only that specific handler is removed. The underlying MQTT unsubscribe from the broker is only performed if no handlers remain for the topic.
If no callback is provided, all handlers are removed and the broker is unsubscribed immediately.
- Parameters:
topic – MQTT topic to unsubscribe from
callback – Optional specific handler to remove
- Returns:
Unsubscribe packet ID (or 0 if no broker call was made)
- Raises:
RuntimeError – If not connected to MQTT broker
Exception – If unsubscribe fails
- async unsubscribe_device_feature(device: Device, callback: Callable[[DeviceFeature], None]) None[source]¶
Unsubscribe a specific device feature callback.
- async unsubscribe_device_status(device: Device, callback: Callable[[DeviceStatus], None]) None[source]¶
Unsubscribe a specific device status callback.
- async unsubscribe_energy_usage(device: Device, callback: Callable[[EnergyUsageResponse], None]) None[source]¶
Unsubscribe a specific energy usage callback.
- async unsubscribe_recirculation_schedule_response(device: Device, callback: Callable[[RecirculationSchedule], None]) None[source]¶
Unsubscribe a specific recirculation schedule callback.
- async unsubscribe_reservation_response(device: Device, callback: Callable[[ReservationSchedule], None]) None[source]¶
Unsubscribe a specific reservation response callback.
- async unsubscribe_tou_response(device: Device, callback: Callable[[TOUReservationSchedule], None]) None[source]¶
Unsubscribe a specific TOU response callback.
- async unsubscribe_weekly_reservation_response(device: Device, callback: Callable[[WeeklyReservationSchedule], None]) None[source]¶
Unsubscribe a specific weekly reservation callback.
- update_connection(connection: Any) None[source]¶
Update the MQTT connection reference.
This is used when the connection is recreated (e.g., after reconnection) to update the internal reference while preserving subscriptions.
- Parameters:
connection – New MQTT connection object
Note
This does not re-establish subscriptions. Call the appropriate subscribe methods to re-register subscriptions with the new connection if needed.
nwp500.mqtt.utils module¶
MQTT utility functions and data structures for Navien Smart Control.
This module provides utility functions for redacting sensitive information, configuration classes, and common data structures used across MQTT modules.
- class nwp500.mqtt.utils.MqttConnectionConfig(endpoint: str = 'a1t30mldyslmuq-ats.iot.us-east-1.amazonaws.com', region: str = 'us-east-1', client_id: str | None = None, clean_session: bool = True, keep_alive_secs: int = 1200, auto_reconnect: bool = True, max_reconnect_attempts: int = -1, initial_reconnect_delay: float = 1.0, max_reconnect_delay: float = 120.0, reconnect_backoff_multiplier: float = 2.0, deep_reconnect_threshold: int = 10, enable_command_queue: bool = True, max_queued_commands: int = 100)[source]¶
Bases:
objectConfiguration for MQTT connection.
- class nwp500.mqtt.utils.PeriodicRequestType(*values)[source]¶
Bases:
EnumTypes of periodic requests that can be sent.
- DEVICE_INFO¶
Request device information periodically
- DEVICE_STATUS¶
Request device status periodically
- DEVICE_INFO = 'device_info'¶
- DEVICE_STATUS = 'device_status'¶
- class nwp500.mqtt.utils.QueuedCommand(topic: str, payload: dict[str, Any], qos: QoS, timestamp: datetime)[source]¶
Bases:
objectRepresents a command that is queued for sending when reconnected.
- qos¶
Quality of Service level
- Type:
awscrt.mqtt.QoS
- timestamp¶
Time when command was queued
- Type:
- qos: QoS¶
- nwp500.mqtt.utils.get_response_data(message: dict[str, Any], key: str | None) Any[source]¶
Extract data from an MQTT message, supporting key variants.
Checks both the nested
responsedict and the top-level message, using both the primary key and its alternate short-form name (e.g."status"/"st","feature"/"did"). Lookup order preserves a strict nested-first precedence:response[key]response[alt_key]message[key]message[alt_key]
Key presence is checked explicitly (not by truthiness), so falsy values like
0,False, or{}are returned correctly and do not fall through to a lower-precedence candidate.- Parameters:
message – Raw MQTT message dict.
key – Primary key to look up. When
None, the nestedresponsedict is returned directly.
- Returns:
The value of the first present key in priority order, or
Noneif no candidate key is found.
- nwp500.mqtt.utils.redact(obj: Any, keys_to_redact: set[str] | None = None) Any[source]¶
Return a redacted copy of obj with sensitive keys masked.
This is a lightweight sanitizer for log messages to avoid emitting secrets such as access keys, session tokens, passwords, emails, clientIDs and sessionIDs.
- Parameters:
obj – Object to redact (dict, list, tuple, or primitive)
keys_to_redact – Set of key names to redact (uses defaults if None)
- Returns:
Redacted copy of the object
- nwp500.mqtt.utils.redact_mac(mac: str | None) str[source]¶
Mask a MAC address or device ID for safe logging.
- Parameters:
mac – The MAC address or device ID to redact (e.g., ‘navilink-0123456789ab’)
- Returns:
A redacted string like ‘navilink-01…89ab’ or ‘<REDACTED>’
- nwp500.mqtt.utils.redact_serial(serial: str | None) str[source]¶
Mask a serial number for safe logging.
- Parameters:
serial – Serial number to redact
- Returns:
Redacted serial like ‘AB…1234’
- nwp500.mqtt.utils.redact_topic(topic: str) str[source]¶
Redact sensitive information from MQTT topic strings.
Topics often contain MAC addresses or device unique identifiers, e.g.: - cmd/52/navilink-04786332fca0/st/did - cmd/52/navilink-04786332fca0/ctrl - cmd/52/04786332fca0/ctrl - or with colons/hyphens (04:78:63:32:fc:a0 or 04-78-63-32-fc-a0)
- Parameters:
topic – MQTT topic string
- Returns:
Topic with MAC addresses redacted
Note
Uses pre-compiled regex patterns for better performance.
- nwp500.mqtt.utils.topic_matches_pattern(topic: str, pattern: str) bool[source]¶
Check if a topic matches a subscription pattern with wildcards.
Supports MQTT wildcards: - ‘+’ matches a single level - ‘#’ matches multiple levels (must be at end)
- Parameters:
topic – Actual topic (e.g., “cmd/52/navilink-ABC/status”)
pattern – Pattern with wildcards (e.g., “cmd/52/+/#”)
- Returns:
True if topic matches pattern
Examples
>>> topic_matches_pattern("cmd/52/device1/status", "cmd/52/+/status") True >>> topic_matches_pattern( ... "cmd/52/device1/status/extra", "cmd/52/device1/#" ... ) True
Module contents¶
MQTT package for Navien device communication.
This package provides MQTT client functionality for real-time communication with Navien devices using AWS IoT Core.
Main exports: - NavienMqttClient: Main MQTT client class - MqttConnectionConfig: Configuration for MQTT connections - PeriodicRequestType: Enum for periodic request types - MqttDiagnosticsCollector: Metrics and diagnostics collector - MqttMetrics, ConnectionDropEvent, ConnectionEvent: Diagnostic types
- class nwp500.mqtt.ConnectionDropEvent(timestamp: str, error_name: str | None = None, error_message: str | None = None, error_code: int | None = None, reconnect_attempt: int = 0, duration_connected_seconds: float | None = None, active_subscriptions: int = 0, queued_commands: int = 0)[source]¶
Bases:
objectRecord of a single connection drop event.
- class nwp500.mqtt.ConnectionEvent(timestamp: str, event_type: str, session_present: bool = False, return_code: int | None = None, attempt_number: int = 0, time_to_reconnect_seconds: float | None = None)[source]¶
Bases:
objectRecord of a connection success/resumption event.
- class nwp500.mqtt.MqttConnectionConfig(endpoint: str = 'a1t30mldyslmuq-ats.iot.us-east-1.amazonaws.com', region: str = 'us-east-1', client_id: str | None = None, clean_session: bool = True, keep_alive_secs: int = 1200, auto_reconnect: bool = True, max_reconnect_attempts: int = -1, initial_reconnect_delay: float = 1.0, max_reconnect_delay: float = 120.0, reconnect_backoff_multiplier: float = 2.0, deep_reconnect_threshold: int = 10, enable_command_queue: bool = True, max_queued_commands: int = 100)[source]¶
Bases:
objectConfiguration for MQTT connection.
- class nwp500.mqtt.MqttDiagnosticsCollector(max_events_retained: int = 1000, enable_verbose_logging: bool = False)[source]¶
Bases:
objectCollects detailed diagnostics and metrics for MQTT connection analysis.
This collector tracks: - Connection drop events with error details - Connection recovery timeline - Error frequency and patterns - Session duration statistics - Network topology and timing information
For debugging: - Export logs to JSON for correlation with AWS CloudWatch - Enables continuous monitoring with configurable retention
- export_json() str[source]¶
Export all collected diagnostics as JSON.
- Returns:
JSON string suitable for storing or sending to monitoring systems
- get_metrics() MqttMetrics[source]¶
Get current aggregate metrics.
- get_recent_connections(limit: int = 10) list[ConnectionEvent][source]¶
Get the N most recent connection events.
- get_recent_drops(limit: int = 10) list[ConnectionDropEvent][source]¶
Get the N most recent connection drop events.
- on_connection_drop(callback: Callable[[ConnectionDropEvent], None]) None[source]¶
Register a callback to be invoked on each connection drop event.
- Parameters:
callback – Function that receives ConnectionDropEvent
- async record_connection_drop(error: Exception | None = None, reconnect_attempt: int = 0, active_subscriptions: int = 0, queued_commands: int = 0) None[source]¶
Record a connection drop event.
- Parameters:
error – The exception that caused the drop
reconnect_attempt – Which reconnection attempt this is (0 = initial)
active_subscriptions – Number of active subscriptions at time of drop
queued_commands – Number of commands in the queue
- async record_connection_success(event_type: str = 'connected', session_present: bool = False, return_code: int | None = None, attempt_number: int = 0) None[source]¶
Record a successful connection or reconnection event.
- Parameters:
event_type – “connected”, “resumed”, or “deep_reconnected”
session_present – Whether MQTT session was present
return_code – MQTT return code
attempt_number – Reconnection attempt number (0 = initial connect)
- class nwp500.mqtt.MqttMetrics(total_connections: int = 0, total_disconnects: int = 0, total_connection_drops: int = 0, total_reconnect_attempts: int = 0, longest_session_seconds: float = 0.0, shortest_session_seconds: float = inf, average_session_seconds: float = 0.0, current_session_uptime_seconds: float = 0.0, connection_drops_by_error: dict[str, int]=<factory>, reconnection_attempts_distribution: dict[str, int]=<factory>, last_drop_timestamp: str | None = None, last_successful_connect_timestamp: str | None = None, connection_recovered: int = 0, messages_published: int = 0, messages_queued: int = 0)[source]¶
Bases:
objectAggregate metrics for MQTT connection stability.
Bases:
EventEmitterAsync MQTT client for Navien device communication over AWS IoT.
This client establishes WebSocket connections to AWS IoT Core using temporary AWS credentials from the authentication API. It handles: - Connection management with automatic reconnection and exponential backoff - Topic subscriptions for device events and responses - Command publishing for device control - Message routing and callbacks - Command queuing when disconnected (sends when reconnected) - Event-driven architecture with state change detection
The client extends EventEmitter to provide an event-driven architecture: - Multiple listeners per event - State change detection (temperature_changed, mode_changed, etc.) - Async handler support - Priority-based execution
The client automatically reconnects when the connection is interrupted, using exponential backoff (default: 1s, 2s, 4s, 8s, … up to 120s). Reconnection behavior can be customized via MqttConnectionConfig.
When enabled, the command queue stores commands sent while disconnected and automatically sends them when the connection is restored. This ensures commands are not lost during temporary network interruptions.
Example (Traditional Callbacks):
>>> async with NavienAuthClient(email, password) as auth_client: ... mqtt_client = NavienMqttClient(auth_client) ... await mqtt_client.connect() ... ... # Traditional callback style ... await mqtt_client.subscribe_device_status(device, on_status)
Example (Event Emitter):
>>> from nwp500.mqtt_events import MqttClientEvents >>> mqtt_client = NavienMqttClient(auth_client) ... ... # Type-safe event listeners with IDE autocomplete ... mqtt_client.on( ... MqttClientEvents.TEMPERATURE_CHANGED, ... lambda event: log_temperature(event.new_temperature), ... ) ... mqtt_client.on(MqttClientEvents.TEMPERATURE_CHANGED, update_ui) ... mqtt_client.on( ... MqttClientEvents.MODE_CHANGED, handle_mode_change ... ) ... ... # One-time listener ... mqtt_client.once(MqttClientEvents.STATUS_RECEIVED, initialize) ... ... await mqtt_client.connect()
- Events Emitted:
See
nwp500.mqtt_events.MqttClientEventsfor a complete, type-safe registry of all events with full documentation.Key events include: - status_received: Raw status update - feature_received: Device feature/capability information - temperature_changed: DHW temperature changed - mode_changed: Operation mode changed - power_changed: Power consumption changed - heating_started: Device started heating - heating_stopped: Device stopped heating - error_detected: Device error occurred - error_cleared: Device error resolved - connection_interrupted: Connection lost - connection_resumed: Connection restored
Check for available over-the-air firmware updates.
Clear all queued commands. …
Get client ID.
Commit a previously downloaded firmware update.
Configure the recirculation pump timed schedule.
Enable/configure water program reservation mode.
Configure the Time-of-Use rate schedule.
Establish connection to AWS IoT Core.
Ensures tokens are valid before connecting and refreshes if necessary.
- Returns:
True if connection successful
- Raises:
Exception – If connection fails
Deprecated access to device controller.
Get the diagnostics collector instance.
Disable the Anti-Legionella disinfection cycle.
Disable utility demand response participation.
Disable intelligent/adaptive heating mode.
Disconnect from AWS IoT Core and stop all periodic tasks.
Enable Anti-Legionella disinfection.
Enable utility demand response participation.
Enable intelligent/adaptive heating mode.
Ensure device info is cached, requesting if necessary.
Called by control commands and CLI to ensure device capabilities are available before execution.
- Parameters:
device – Device to ensure info for
timeout – Maximum time to wait for response (default: 30 seconds)
- Returns:
True if device info was successfully cached, False on timeout
- Raises:
MqttNotConnectedError – If not connected
Check if client is connected.
Check if client is currently attempting to reconnect.
Publish a message to an MQTT topic.
If not connected and command queue is enabled, the command will be queued and sent automatically when the connection is restored.
- Parameters:
topic – MQTT topic to publish to
payload – Message payload (will be JSON-encoded)
qos – Quality of Service level
- Returns:
Publish packet ID (or 0 if queued)
- Raises:
RuntimeError – If not connected and command queue is disabled
Get the number of commands currently queued.
Get the number of reconnection attempts made.
Trigger a WiFi reconnection on the device.
Recover from authentication-related connection failures.
This method is useful when MQTT connection fails due to stale/expired authentication tokens. It refreshes the tokens and attempts to reconnect the MQTT client.
- Returns:
True if recovery was successful and MQTT is reconnected, False otherwise
- Raises:
TokenRefreshError – If token refresh fails
AuthenticationError – If re-authentication fails
Example
>>> mqtt_client = NavienMqttClient(auth_client) >>> try: ... await mqtt_client.connect() ... except MqttConnectionError: ... # Connection may have failed due to stale tokens ... if await mqtt_client.recover_connection(): ... print("Successfully recovered connection") ... else: ... print("Recovery failed, check logs")
Request device information (features, firmware, etc.).
Request general device status.
Request daily energy usage data for specified month(s).
Request the current reservation program from the device.
Request the current TOU settings from the device.
Reset air filter maintenance timer.
Reset reconnection state and trigger a new reconnection attempt. …
Reset WiFi settings to factory defaults.
Trigger the smart diagnostic routine on the device.
Get session ID.
Set DHW operation mode.
Set DHW target temperature in the user’s preferred unit.
Set the freeze protection activation temperature.
Turn device on or off.
Set recirculation pump operation mode (1-4).
Enable or disable Time-of-Use optimization.
Set vacation/away mode duration (1-30 days).
Signal that the app has connected.
Start sending periodic requests for device information or status. …
Stop all periodic request tasks. …
Stop sending periodic requests for a device. …
Subscribe to an MQTT topic.
- Parameters:
topic – MQTT topic to subscribe to (can include wildcards)
callback – Function to call when messages arrive (topic, message)
qos – Quality of Service level
- Returns:
Subscription packet ID
- Raises:
Exception – If subscription fails
Subscribe to all messages from a specific device.
- Parameters:
device – Device object
callback – Message handler
- Returns:
Subscription packet ID
Subscribe to device feature/info messages with automatic parsing.
Subscribe to device status messages with automatic parsing.
Subscribe to energy usage query responses with automatic parsing.
Subscribe to recirculation schedule read responses.
Subscribe to reservation read responses with automatic parsing.
Subscribe to Time-of-Use schedule read responses with automatic parsing.
Subscribes to the
tou/rdresponse topic for the given device. The callback receives a fully-parsedTOUReservationSchedulewhenever the device responds to a TOU read or configure request (triggered byrequest_tou_settings()orconfigure_tou_schedule()).- Parameters:
device – Device whose TOU responses to receive.
callback – Called with the parsed schedule on each response.
- Returns:
Publish packet ID from the MQTT subscribe call.
Subscribe to weekly reservation read responses.
Manually trigger the recirculation pump hot button.
Unsubscribe from an MQTT topic.
- Parameters:
topic – MQTT topic to unsubscribe from
- Returns:
Unsubscribe packet ID
- Raises:
Exception – If unsubscribe fails
Unsubscribe a specific device feature callback.
Unsubscribe a specific device status callback.
Unsubscribe a specific energy usage callback.
Unsubscribe a specific recirculation schedule callback.
Unsubscribe a specific reservation response callback.
Unsubscribe a specific TOU response callback.
Unsubscribe a specific weekly reservation callback.
Update programmed reservations.
Configure the weekly temperature reservation schedule.