MQTT Client

NavienMqttClient is the main interface for real-time communication with Navien devices — status monitoring, device control, and event callbacks.

Important

Use the REST API only for device discovery. Everything else goes through MQTT.

Overview

  • Real-Time Monitoring - Subscribe to device status updates

  • Device Control - Send commands (power, temperature, mode)

  • Event System - React to state changes with callbacks

  • Auto-Reconnection - Exponential backoff reconnection with command queueing

  • Type-Safe - Returns typed models (DeviceStatus, DeviceFeature)

  • Periodic Requests - Scheduled status polling

  • Energy Monitoring - Query historical energy usage data

Quick Start

Basic Monitoring

from nwp500 import NavienAuthClient, NavienAPIClient, NavienMqttClient
import asyncio

async def main():
    async with NavienAuthClient("email@example.com", "password") as auth:
        # Get device via API
        api = NavienAPIClient(auth)
        device = await api.get_first_device()

        # Connect MQTT
        mqtt = NavienMqttClient(auth)
        await mqtt.connect()

        # Subscribe to status updates
        def on_status(status):
            unit = status.get_field_unit('dhw_temperature')
            print(f"Water Temp: {status.dhw_temperature}{unit}")
            print(f"Target: {status.dhw_temperature_setting}{unit}")
            print(f"Power: {status.current_inst_power}W")
            print(f"Mode: {status.dhw_operation_setting.name}")

        await mqtt.subscribe_device_status(device, on_status)
        await mqtt.request_device_status(device)

        # Monitor for 60 seconds
        await asyncio.sleep(60)
        await mqtt.disconnect()

asyncio.run(main())

Device Control

Control operations are now exposed directly on NavienMqttClient; use the direct mqtt.* methods for control operations.

Control methods rely on cached device feature data for capability-aware validation. Request device info first, or call nwp500.mqtt.client.NavienMqttClient.ensure_device_info_cached() before issuing commands.

async def control_device():
    async with NavienAuthClient(email, password) as auth:
        api = NavienAPIClient(auth)
        device = await api.get_first_device()

        mqtt = NavienMqttClient(auth)
        await mqtt.connect()

        await mqtt.subscribe_device_feature(device, lambda f: None)
        await mqtt.request_device_info(device)

        await mqtt.set_power(device, power_on=True)
        await mqtt.set_dhw_mode(device, mode_id=3)
        await mqtt.set_dhw_temperature(device, 140.0)

        await mqtt.disconnect()

API Reference

Connection Methods

connect()

connect()

Connect to AWS IoT Core MQTT broker.

Returns:

True if connection successful

Return type:

bool

Raises:

Exception – If connection fails

Example:

mqtt = NavienMqttClient(auth)

try:
    connected = await mqtt.connect()
    if connected:
        print(f"Connected! Client ID: {mqtt.client_id}")
    else:
        print("Connection failed")
except Exception as e:
    print(f"Error connecting: {e}")

disconnect()

disconnect()

Disconnect from MQTT broker and cleanup all resources.

Stops all periodic tasks, unsubscribes from topics, and closes connection.

Example:

try:
    # ... operations ...
finally:
    await mqtt.disconnect()

Monitoring Methods

subscribe_device_status()

subscribe_device_status(device, callback)

Subscribe to device status updates with automatic parsing.

The callback receives DeviceStatus objects containing temperature, power, operation mode, component states, and more.

Parameters:
  • device (Device) – Device object

  • callback (Callable[[DeviceStatus], None]) – Function receiving DeviceStatus objects

Returns:

Subscription packet ID

Return type:

int

Example:

def on_status(status):
    """Called every time device status updates."""
    print(f"Temperature: {status.dhw_temperature}°F")
    print(f"Target: {status.dhw_temperature_setting}°F")
    print(f"Mode: {status.dhw_operation_setting.name}")
    print(f"Power: {status.current_inst_power}W")
    print(f"Energy: {status.dhw_charge_per}%")

    # Check if actively heating
    if status.operation_busy:
        print("Device is heating water")
        if status.comp_use:
            print("  - Heat pump running")
        if status.heat_upper_use:
            print("  - Upper heater active")
        if status.heat_lower_use:
            print("  - Lower heater active")

    # Check water usage
    if status.dhw_use:
        print("Water is being used (short-term)")
    if status.dhw_use_sustained:
        print("Water is being used (sustained)")

    # Check for errors
    if status.error_code != 0:
        print(f"ERROR: {status.error_code}")

await mqtt.subscribe_device_status(device, on_status)
await mqtt.request_device_status(device)

request_device_status()

request_device_status(device)

Request current device status.

Parameters:

device (Device) – Device object

Returns:

Publish packet ID

Return type:

int

Example:

# Subscribe first to receive response
await mqtt.subscribe_device_status(device, on_status)

# Then request
await mqtt.request_device_status(device)

# Can request periodically
while monitoring:
    await mqtt.request_device_status(device)
    await asyncio.sleep(30)  # Every 30 seconds

subscribe_device_feature()

subscribe_device_feature(device, callback)

Subscribe to device feature/capability/info updates.

The callback receives DeviceFeature objects containing serial number, firmware version, temperature limits, and supported features.

Parameters:
  • device (Device) – Device object

  • callback (Callable[[DeviceFeature], None]) – Function receiving DeviceFeature objects

Returns:

Subscription packet ID

Return type:

int

Example:

def on_feature(feature):
    """Called when device features/info received."""
    print(f"Serial: {feature.controller_serial_number}")
    print(f"Firmware: {feature.controller_sw_version}")
    print(f"Temp Range: {feature.dhw_temperature_min}°F - "
          f"{feature.dhw_temperature_max}°F")

    # Check capabilities
    if feature.energy_usage_use:
        print("Energy monitoring: Supported")
    if feature.anti_legionella_setting_use:
        print("Anti-Legionella: Supported")
    if feature.reservation_use:
        print("Reservations: Supported")

await mqtt.subscribe_device_feature(device, on_feature)
await mqtt.request_device_info(device)

request_device_info()

request_device_info(device)

Request device features and capabilities.

Parameters:

device (Device) – Device object

Returns:

Publish packet ID

Return type:

int

Example:

await mqtt.subscribe_device_feature(device, on_feature)
await mqtt.request_device_info(device)

subscribe_device()

subscribe_device(device, callback)

Subscribe to all messages from a device (low-level).

This subscribes to both control and status topics, providing raw message access. For most use cases, use subscribe_device_status() or subscribe_device_feature() instead.

Parameters:
  • device (Device) – Device object

  • callback (Callable[[str, dict], None]) – Function receiving (topic, message) tuples

Returns:

List of subscription packet IDs

Return type:

list[int]

Example:

def on_message(topic, message):
    """Receive all messages from device."""
    print(f"Topic: {topic}")
    print(f"Message: {message}")

    if 'response' in message:
        response = message['response']
        if 'status' in response:
            # Device status update
            status_data = response['status']
        elif 'feature' in response:
            # Device feature info
            feature_data = response['feature']

await mqtt.subscribe_device(device, on_message)

Control Methods

Capability Checking

Most control commands depend on device capabilities reported by DeviceFeature. Request device info first so the client can validate support and ranges before sending commands.

await mqtt.subscribe_device_feature(device, lambda feature: print(feature))
await mqtt.request_device_info(device)

# Alternative helper: request and wait until the cache is populated
await mqtt.ensure_device_info_cached(device)

Common capability flags include power_use, dhw_use, dhw_temperature_setting_use, program_reservation_use, recirculation_use, recirc_reservation_use, freeze_protection_use, and smart_diagnostic_use.

set_power()

set_power(device, power_on)

Turn device power on or off.

Capability Required: power_use

Parameters:
  • device (Device) – Device object

  • power_on (bool) – True to power on, False to power off

Returns:

Publish packet ID

Return type:

int

set_dhw_mode()

set_dhw_mode(device, mode_id, vacation_days=None)

Set the DHW operating mode.

Capability Required: dhw_use

Parameters:
  • mode_id (int) – One of the DHW operation mode IDs

  • vacation_days (int or None) – Required for vacation mode; valid range 1-30

Raises:

set_dhw_temperature()

set_dhw_temperature(device, temperature)

Set the target water temperature in the current unit system.

Capability Required: dhw_temperature_setting_use

The valid range is checked against the device’s reported dhw_temperature_min and dhw_temperature_max values.

enable_anti_legionella()

enable_anti_legionella(device, period_days)

Enable the anti-Legionella cycle.

Capability Required: anti_legionella_setting_use

Parameters:

period_days (int) – Cycle period in days (1-30)

disable_anti_legionella()

disable_anti_legionella(device)

Disable the anti-Legionella cycle.

set_vacation_days()

set_vacation_days(device, days)

Convenience wrapper for vacation mode.

Capability Required: holiday_use

update_reservations()

update_reservations(device, reservations, *, enabled=True)

Update the standard reservation program.

Parameters:
  • reservations (Sequence[dict[str, Any]]) – Sequence of raw reservation entries using the protocol fields enable, week, hour, min, mode, and param

  • enabled (bool) – Global reservation enable flag

Example:

from nwp500 import build_reservation_entry

reservations = [
    build_reservation_entry(
        enabled=True,
        days=["MO", "TU", "WE", "TH", "FR"],
        hour=6,
        minute=0,
        mode_id=4,
        temperature=60.0,
    )
]

await mqtt.update_reservations(device, reservations, enabled=True)

request_reservations()

request_reservations(device)

Request the current programmed reservations.

subscribe_reservation_response()

subscribe_reservation_response(device, callback)

Subscribe to parsed reservation read responses.

Parameters:

callback (Callable[[ReservationSchedule], None]) – Called with ReservationSchedule

update_weekly_reservation()

update_weekly_reservation(device, schedule)

Send a typed weekly reservation schedule.

Capability Required: program_reservation_use

Parameters:

schedule (WeeklyReservationSchedule) – Weekly reservation schedule payload

subscribe_weekly_reservation_response()

subscribe_weekly_reservation_response(device, callback)

Subscribe to parsed weekly reservation responses.

Parameters:

callback (Callable[[WeeklyReservationSchedule], None]) – Called with WeeklyReservationSchedule

configure_reservation_water_program()

configure_reservation_water_program(device)

Enable the device’s reservation water-program mode.

Capability Required: program_reservation_use

configure_recirculation_schedule()

configure_recirculation_schedule(device, schedule)

Configure the timed recirculation schedule.

Capability Required: recirc_reservation_use

Parameters:

schedule (RecirculationSchedule) – Recirculation schedule payload

subscribe_recirculation_schedule_response()

subscribe_recirculation_schedule_response(device, callback)

Subscribe to parsed recirculation schedule responses.

Parameters:

callback (Callable[[RecirculationSchedule], None]) – Called with RecirculationSchedule

set_recirculation_mode()

set_recirculation_mode(device, mode)

Set the recirculation operating mode.

Capability Required: recirculation_use

Parameters:

mode (int) – Mode ID in the range 1-4

trigger_recirculation_hot_button()

trigger_recirculation_hot_button(device)

Trigger an immediate recirculation run.

Capability Required: recirculation_use

configure_tou_schedule()

configure_tou_schedule(device, controller_serial_number, periods, *, enabled=True)

Configure the Time-of-Use schedule.

Capability Required: program_reservation_use

request_tou_settings()

request_tou_settings(device, controller_serial_number)

Request the current TOU schedule.

subscribe_tou_response()

subscribe_tou_response(device, callback)

Subscribe to parsed TOU schedule responses.

The callback is invoked with a TOUReservationSchedule whenever the device responds to a request_tou_settings() read or a configure_tou_schedule() write (both use the tou/rd response topic).

Parameters:

callback (Callable[[TOUReservationSchedule], None]) – Called with the parsed TOU schedule on each response.

unsubscribe_tou_response()

unsubscribe_tou_response(device, callback)

Unsubscribe a previously registered TOU response callback.

Parameters:

callback (Callable[[TOUReservationSchedule], None]) – The same callable passed to subscribe_tou_response().

set_tou_enabled()

set_tou_enabled(device, enabled)

Enable or disable TOU optimization.

Capability Required: program_reservation_use

request_energy_usage()

request_energy_usage(device, year, months)

Request daily energy usage data for one or more months.

subscribe_energy_usage()

subscribe_energy_usage(device, callback)

Subscribe to parsed energy usage responses.

Parameters:

callback (Callable[[EnergyUsageResponse], None]) – Called with EnergyUsageResponse

check_firmware_update()

check_firmware_update(device)

Trigger an OTA firmware availability check. The response arrives asynchronously on the device’s MQTT response topic.

commit_firmware_update()

commit_firmware_update(device, payload)

Commit a previously downloaded firmware update.

Parameters:

payload (OtaCommitPayload) – OTA commit payload identifying the component and version

Warning

The device reboots when a firmware commit is applied.

reconnect_wifi()

reconnect_wifi(device)

Ask the device to reconnect to WiFi using its current configuration.

reset_wifi()

reset_wifi(device)

Clear the stored WiFi configuration.

Warning

After reset_wifi(), the device must be provisioned again.

set_freeze_protection_temperature()

set_freeze_protection_temperature(device, temperature)

Set the freeze-protection threshold in the current unit system.

Available on devices that expose freeze_protection_use.

run_smart_diagnostic()

run_smart_diagnostic(device)

Trigger the device’s smart diagnostic routine.

Available on devices that expose smart_diagnostic_use.

The result appears in the next DeviceStatus.smart_diagnostic update.

enable_intelligent_scheduling()

enable_intelligent_scheduling(device)

Enable adaptive/intelligent scheduling mode.

disable_intelligent_scheduling()

disable_intelligent_scheduling(device)

Disable adaptive/intelligent scheduling mode.

enable_demand_response()

enable_demand_response(device)

Enable utility demand-response participation.

disable_demand_response()

disable_demand_response(device)

Disable utility demand-response participation.

reset_air_filter()

reset_air_filter(device)

Reset the air-filter maintenance timer.

signal_app_connection()

signal_app_connection(device)

Publish an app-connection heartbeat event to the device.

Periodic Request Methods

start_periodic_requests()

start_periodic_requests(device, request_type=DEVICE_STATUS, period_seconds=300.0)

Start automatic periodic status or info requests.

Parameters:
  • device (Device) – Device object

  • request_type (PeriodicRequestType) – Type of request (DEVICE_STATUS or DEVICE_INFO)

  • period_seconds (float) – Interval in seconds (default: 300 = 5 minutes)

Example:

from nwp500.mqtt_utils import PeriodicRequestType

# Subscribe first
await mqtt.subscribe_device_status(device, on_status)

# Start periodic status requests every 60 seconds
await mqtt.start_periodic_requests(
    device,
    PeriodicRequestType.DEVICE_STATUS,
    period_seconds=60
)

# Monitor for extended period
await asyncio.sleep(3600)  # 1 hour

# Stop when done
await mqtt.stop_periodic_requests(
    device,
    PeriodicRequestType.DEVICE_STATUS
)

stop_periodic_requests()

stop_periodic_requests(device, request_type)

Stop periodic requests for a device.

Parameters:

stop_all_periodic_tasks()

stop_all_periodic_tasks(device)

Stop all periodic tasks for a device.

Parameters:

device (Device) – Device object

Utility Methods

signal_app_connection()

signal_app_connection(device)

Signal that an application has connected (recommended at startup).

Parameters:

device (Device) – Device object

Returns:

Publish packet ID

Return type:

int

Example:

await mqtt.connect()
await mqtt.signal_app_connection(device)

subscribe(), unsubscribe(), publish()

Low-level MQTT operations (advanced use only).

Properties

is_connected

is_connected

Check if currently connected to MQTT broker.

Type:

bool

Example:

if mqtt.is_connected:
    await mqtt.set_power(device, True)
else:
    print("Not connected")

client_id

client_id

Get MQTT client ID.

Type:

str

session_id

session_id

Get current session ID.

Type:

str

queued_commands_count

queued_commands_count

Get number of queued commands (when offline).

Type:

int

Example:

count = mqtt.queued_commands_count
if count > 0:
    print(f"{count} commands queued (will send on reconnect)")

reconnect_attempts

reconnect_attempts

Get current reconnection attempt count.

Type:

int

is_reconnecting

is_reconnecting

Check if currently attempting to reconnect.

Type:

bool

Examples

Example 1: Complete Monitoring Application

from nwp500 import NavienAuthClient, NavienAPIClient, NavienMqttClient
from datetime import datetime
import asyncio

async def monitor_device():
    async with NavienAuthClient(email, password) as auth:
        api = NavienAPIClient(auth)
        device = await api.get_first_device()

        mqtt = NavienMqttClient(auth)
        await mqtt.connect()

        # Track state
        last_temp = None
        last_power = None

        def on_status(status):
            nonlocal last_temp, last_power
            now = datetime.now().strftime("%H:%M:%S")

            # Temperature changed
            if last_temp != status.dhw_temperature:
                print(f"[{now}] Temperature: {status.dhw_temperature}°F "
                      f"(Target: {status.dhw_temperature_setting}°F)")
                last_temp = status.dhw_temperature

            # Power changed
            if last_power != status.current_inst_power:
                print(f"[{now}] Power: {status.current_inst_power}W")
                last_power = status.current_inst_power

            # Heating state
            if status.operation_busy:
                components = []
                if status.comp_use:
                    components.append("HP")
                if status.heat_upper_use:
                    components.append("Upper")
                if status.heat_lower_use:
                    components.append("Lower")
                print(f"[{now}] Heating: {', '.join(components)}")

        await mqtt.subscribe_device_status(device, on_status)
        await mqtt.request_device_status(device)

        # Monitor indefinitely
        try:
            while True:
                await asyncio.sleep(3600)
        except KeyboardInterrupt:
            print("Stopping...")
        finally:
            await mqtt.disconnect()

asyncio.run(monitor_device())

Example 2: Automatic Temperature Control

async def auto_temperature_control():
    \"\"\"Adjust temperature based on usage patterns.\"\"\"
    async with NavienAuthClient(email, password) as auth:
        api = NavienAPIClient(auth)
        device = await api.get_first_device()

        mqtt = NavienMqttClient(auth)
        await mqtt.connect()

        # Track water usage
        last_use_time = None

        def on_status(status):
            nonlocal last_use_time

            # Water is being used
            if status.dhw_use or status.dhw_use_sustained:
                last_use_time = datetime.now()

                # If temp dropped below 130°F, boost to high demand
                if status.dhw_temperature < 130:
                    asyncio.create_task(
                        mqtt.set_dhw_mode(device, 4)  # High Demand
                    )

            # No use for 2 hours, go to energy saver
            elif last_use_time:
                idle_time = (datetime.now() - last_use_time).seconds
                if idle_time > 7200:  # 2 hours
                    asyncio.create_task(
                        mqtt.set_dhw_mode(device, 3)  # Energy Saver
                    )

        await mqtt.subscribe_device_status(device, on_status)
        await mqtt.start_periodic_requests(device, period_seconds=60)

        # Run for extended period
        await asyncio.sleep(86400)  # 24 hours
        await mqtt.disconnect()

asyncio.run(auto_temperature_control())

Example 3: Multi-Device Monitoring

async def monitor_multiple_devices():
    \"\"\"Monitor multiple devices simultaneously.\"\"\"
    async with NavienAuthClient(email, password) as auth:
        api = NavienAPIClient(auth)
        devices = await api.list_devices()

        mqtt = NavienMqttClient(auth)
        await mqtt.connect()

        # Create callback for each device
        def create_callback(device_name):
            def callback(status):
                print(f"[{device_name}] {status.dhw_temperature}°F, "
                      f"{status.current_inst_power}W, "
                      f"{status.dhw_operation_setting.name}")
            return callback

        # Subscribe to all devices
        for device in devices:
            callback = create_callback(device.device_info.device_name)
            await mqtt.subscribe_device_status(device, callback)
            await mqtt.request_device_status(device)

        # Monitor
        await asyncio.sleep(3600)
        await mqtt.disconnect()

asyncio.run(monitor_multiple_devices())

Best Practices

Subscribe before requesting

The device responds on a topic you must already be listening to:

# correct
await mqtt.subscribe_device_status(device, on_status)
await mqtt.request_device_status(device)

# wrong — response arrives before subscription
await mqtt.request_device_status(device)
await mqtt.subscribe_device_status(device, on_status)

Use context managers

async with NavienAuthClient(email, password) as auth:
    mqtt = NavienMqttClient(auth)
    try:
        await mqtt.connect()
        # ... operations ...
    finally:
        await mqtt.disconnect()

Handle connection events

def on_interrupted(event):
    print(f"Connection lost: {event.error}")

def on_resumed(event):
    print(f"Connection restored (session_present={event.session_present})")

mqtt.on("connection_interrupted", on_interrupted)
mqtt.on("connection_resumed", on_resumed)

Periodic requests for long-running monitoring

await mqtt.subscribe_device_status(device, on_status)
await mqtt.start_periodic_requests(device, period_seconds=300)
await asyncio.sleep(86400)
await mqtt.stop_periodic_requests(device)