Event-Driven Programming

The nwp500 event system lets you react to device state changes, connection events, and derived transitions (temperature delta, mode change, etc.) without polling.

Two Callback Patterns

subscribe_*() methods

These deliver parsed model objects directly to your callback:

def on_status(status):
    print(status.dhw_temperature)

await mqtt.subscribe_device_status(device, on_status)
await mqtt.request_device_status(device)

.on() event emitter

These deliver a single typed event dataclass. Use them for connection events and derived state transitions (temperature delta, mode change, etc.):

from nwp500 import MqttClientEvents

def on_status_event(event):
    status = event.status
    print(f"Temperature: {status.dhw_temperature}°F")

mqtt.on(MqttClientEvents.STATUS_RECEIVED, on_status_event)

See Event System for the full event dataclass reference.

Available Events

from nwp500 import MqttClientEvents

for event_name in MqttClientEvents.get_all_events():
    print(f"- {event_name}")

Simple Event Handler

from nwp500 import NavienAuthClient, NavienAPIClient, NavienMqttClient, MqttClientEvents
import asyncio

async def main():
    async with NavienAuthClient(email, password) as auth:
        api = NavienAPIClient(auth)
        device = await api.get_first_device()

        mqtt = NavienMqttClient(auth)
        await mqtt.connect()

        def on_status_event(event):
            status = event.status
            print(f"Temperature: {status.dhw_temperature}°F")
            print(f"Power: {status.current_inst_power}W")

        mqtt.on(MqttClientEvents.STATUS_RECEIVED, on_status_event)
        await mqtt.request_device_status(device)

        await asyncio.sleep(300)
        await mqtt.disconnect()

asyncio.run(main())

Raw status subscription

Use a typed subscription when you want the model object directly:

def on_status(status):
    print(status.dhw_temperature)

await mqtt.subscribe_device_status(device, on_status)
await mqtt.request_device_status(device)

Event Registry

Use MqttClientEvents constants to avoid typos and get IDE autocomplete:

from nwp500 import MqttClientEvents, NavienMqttClient

mqtt_client = NavienMqttClient(auth)

def on_temp_change(event):
    print(f"Temperature: {event.old_temperature} -> {event.new_temperature}")

def on_heating_start(event):
    print(f"Heating started at {event.status.dhw_temperature}")

def on_error(event):
    print(f"Error: {event.error_code}")

mqtt_client.on(MqttClientEvents.TEMPERATURE_CHANGED, on_temp_change)
mqtt_client.on(MqttClientEvents.HEATING_STARTED, on_heating_start)
mqtt_client.on(MqttClientEvents.ERROR_DETECTED, on_error)

for event_name in MqttClientEvents.get_all_events():
    print(f"  - {event_name}")

See Event System for the event dataclass reference.

Advanced Patterns

Tracking significant changes

Filter callbacks to only act when a value changes by more than a threshold:

class DeviceMonitor:
    def __init__(self, device, mqtt):
        self.device = device
        self.mqtt = mqtt
        self.last_temp = None
        self.last_power = None

    async def start(self):
        await self.mqtt.subscribe_device_status(
            self.device,
            self.on_status
        )
        await self.mqtt.request_device_status(self.device)

    def on_status(self, status):
        # Temperature changed by more than 2°F
        if self.last_temp is None or abs(status.dhw_temperature - self.last_temp) >= 2:
            print(f"Temperature changed: {self.last_temp}°F → {status.dhw_temperature}°F")
            self.last_temp = status.dhw_temperature

        # Power changed by more than 100W
        if self.last_power is None or abs(status.current_inst_power - self.last_power) >= 100:
            print(f"Power changed: {self.last_power}W → {status.current_inst_power}W")
            self.last_power = status.current_inst_power

# Usage
async def main():
    async with NavienAuthClient(email, password) as auth:
        api = NavienAPIClient(auth)
        device = await api.get_first_device()

        mqtt = NavienMqttClient(auth)
        await mqtt.connect()

        monitor = DeviceMonitor(device, mqtt)
        await monitor.start()

        await asyncio.sleep(3600)  # Monitor for 1 hour

Multiple devices

Monitor multiple devices with individual callbacks.

class MultiDeviceMonitor:
    def __init__(self, mqtt):
        self.mqtt = mqtt
        self.devices = {}

    async def add_device(self, device):
        device_id = device.device_info.mac_address

        # Create device-specific callback
        def callback(status):
            self.on_device_status(device_id, status)

        # Subscribe
        await self.mqtt.subscribe_device_status(device, callback)
        await self.mqtt.request_device_status(device)

        self.devices[device_id] = {
            'device': device,
            'callback': callback,
            'last_status': None
        }

    def on_device_status(self, device_id, status):
        device_data = self.devices[device_id]
        device_name = device_data['device'].device_info.device_name

        print(f"[{device_name}]")
        print(f"  Temperature: {status.dhw_temperature}°F")
        print(f"  Power: {status.current_inst_power}W")
        print()

        device_data['last_status'] = status

# Usage
async def main():
    async with NavienAuthClient(email, password) as auth:
        api = NavienAPIClient(auth)
        devices = await api.list_devices()

        mqtt = NavienMqttClient(auth)
        await mqtt.connect()

        monitor = MultiDeviceMonitor(mqtt)

        # Add all devices
        for device in devices:
            await monitor.add_device(device)

        # Monitor indefinitely
        while True:
            await asyncio.sleep(60)

Alert rules

Trigger actions when the device crosses a threshold:

from datetime import datetime
from typing import Callable, List

class AlertRule:
    def __init__(self, name: str, condition: Callable, action: Callable):
        self.name = name
        self.condition = condition
        self.action = action

    def check(self, status):
        if self.condition(status):
            self.action(status)

class AlertSystem:
    def __init__(self, device, mqtt):
        self.device = device
        self.mqtt = mqtt
        self.rules: List[AlertRule] = []

    def add_rule(self, rule: AlertRule):
        self.rules.append(rule)

    async def start(self):
        await self.mqtt.subscribe_device_status(
            self.device,
            self.on_status
        )
        await self.mqtt.start_periodic_requests(
            self.device,
            period_seconds=60
        )

    def on_status(self, status):
        for rule in self.rules:
            rule.check(status)

# Define alert actions
def send_email(subject, body):
    print(f"EMAIL: {subject}\n{body}")
    # Implement email sending

def send_sms(message):
    print(f"SMS: {message}")
    # Implement SMS sending

def log_alert(message):
    timestamp = datetime.now().isoformat()
    print(f"[{timestamp}] ALERT: {message}")

# Usage
async def main():
    async with NavienAuthClient(email, password) as auth:
        api = NavienAPIClient(auth)
        device = await api.get_first_device()

        mqtt = NavienMqttClient(auth)
        await mqtt.connect()

        alerts = AlertSystem(device, mqtt)

        # Define alert rules
        alerts.add_rule(AlertRule(
            name="Low Temperature",
            condition=lambda s: s.dhw_temperature < 110,
            action=lambda s: send_email(
                "Low Water Temperature",
                f"Temperature dropped to {s.dhw_temperature}°F"
            )
        ))

        alerts.add_rule(AlertRule(
            name="High Power",
            condition=lambda s: s.current_inst_power > 2000,
            action=lambda s: log_alert(
                f"High power usage: {s.current_inst_power}W"
            )
        ))

        alerts.add_rule(AlertRule(
            name="Error Detected",
            condition=lambda s: s.error_code != 0,
            action=lambda s: send_sms(
                f"Device error: {s.error_code}"
            )
        ))

        await alerts.start()

        # Monitor indefinitely
        while True:
            await asyncio.sleep(3600)

Data logging

Log device data to a database or file.

import sqlite3
from datetime import datetime

class DataLogger:
    def __init__(self, device, mqtt, db_path="navien_data.db"):
        self.device = device
        self.mqtt = mqtt
        self.db_path = db_path
        self.setup_database()

    def setup_database(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS status_log (
                timestamp TEXT,
                device_mac TEXT,
                temperature REAL,
                target_temp REAL,
                power REAL,
                mode TEXT,
                operation_mode TEXT,
                error_code INTEGER
            )
        """)
        conn.commit()
        conn.close()

    async def start(self):
        await self.mqtt.subscribe_device_status(
            self.device,
            self.log_status
        )
        await self.mqtt.start_periodic_requests(
            self.device,
            period_seconds=300  # Log every 5 minutes
        )

    def log_status(self, status):
        timestamp = datetime.now().isoformat()
        device_mac = self.device.device_info.mac_address

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO status_log VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            timestamp,
            device_mac,
            status.dhw_temperature,
            status.dhw_temperature_setting,
            status.current_inst_power,
            status.dhw_operation_setting.name,
            status.operation_mode.name,
            status.error_code
        ))
        conn.commit()
        conn.close()

        print(f"[{timestamp}] Logged status for {device_mac}")

# Usage
async def main():
    async with NavienAuthClient(email, password) as auth:
        api = NavienAPIClient(auth)
        device = await api.get_first_device()

        mqtt = NavienMqttClient(auth)
        await mqtt.connect()

        logger = DataLogger(device, mqtt)
        await logger.start()

        # Log indefinitely
        while True:
            await asyncio.sleep(3600)

Home automation bridge

Publish status updates to Home Assistant or similar systems:

import aiohttp

class HomeAssistantBridge:
    def __init__(self, device, mqtt, ha_url, ha_token):
        self.device = device
        self.mqtt = mqtt
        self.ha_url = ha_url
        self.ha_token = ha_token

    async def start(self):
        await self.mqtt.subscribe_device_status(
            self.device,
            self.publish_to_ha
        )
        await self.mqtt.start_periodic_requests(
            self.device,
            period_seconds=30
        )

    async def publish_to_ha(self, status):
        """Publish device status to Home Assistant MQTT."""
        device_mac = self.device.device_info.mac_address

        # Prepare state data
        state_data = {
            'temperature': status.dhw_temperature,
            'target_temperature': status.dhw_temperature_setting,
            'power': status.current_inst_power,
            'mode': status.dhw_operation_setting.name,
            'state': status.operation_mode.name,
            'error': status.error_code
        }

        # Publish to HA
        async with aiohttp.ClientSession() as session:
            headers = {
                'Authorization': f'Bearer {self.ha_token}',
                'Content-Type': 'application/json'
            }

            url = f"{self.ha_url}/api/states/sensor.navien_{device_mac}"

            async with session.post(url, headers=headers, json={
                'state': status.dhw_temperature,
                'attributes': state_data
            }) as resp:
                if resp.status == 200:
                    print(f"Published to Home Assistant")
                else:
                    print(f"HA publish failed: {resp.status}")

# Usage
async def main():
    async with NavienAuthClient(email, password) as auth:
        api = NavienAPIClient(auth)
        device = await api.get_first_device()

        mqtt = NavienMqttClient(auth)
        await mqtt.connect()

        bridge = HomeAssistantBridge(
            device,
            mqtt,
            ha_url="http://homeassistant.local:8123",
            ha_token="your_long_lived_token"
        )

        await bridge.start()

        # Run indefinitely
        while True:
            await asyncio.sleep(3600)

Best Practices

Keep handlers lightweight

Offload heavy work with asyncio.create_task rather than blocking in the callback:

def on_status(status):
    asyncio.create_task(process_status(status))

Wrap callbacks in try/except

An unhandled exception in a callback won’t crash the event loop, but it will silence subsequent events for that subscription:

def safe_handler(status):
    try:
        process_status(status)
    except Exception as e:
        print(f"Handler error: {e}")

Async callbacks

Callbacks can be async. The client will schedule them as tasks:

async def async_handler(status):
    await save_to_database(status)
    await send_notification(status)

Batch processing

Buffer updates and flush periodically to reduce I/O overhead:

class BatchProcessor:
    def __init__(self):
        self.buffer = []

    def on_status(self, status):
        self.buffer.append(status)
        if len(self.buffer) >= 10:
            self.flush()

    def flush(self):
        save_batch_to_db(self.buffer)
        self.buffer.clear()