Event-Driven Programming¶
The nwp500 event system lets you react to device state changes, connection events, and derived transitions (temperature delta, mode change, etc.) without polling.
Two Callback Patterns¶
subscribe_*() methods¶
These deliver parsed model objects directly to your callback:
def on_status(status):
print(status.dhw_temperature)
await mqtt.subscribe_device_status(device, on_status)
await mqtt.request_device_status(device)
.on() event emitter¶
These deliver a single typed event dataclass. Use them for connection events and derived state transitions (temperature delta, mode change, etc.):
from nwp500 import MqttClientEvents
def on_status_event(event):
status = event.status
print(f"Temperature: {status.dhw_temperature}°F")
mqtt.on(MqttClientEvents.STATUS_RECEIVED, on_status_event)
See Event System for the full event dataclass reference.
Available Events¶
from nwp500 import MqttClientEvents
for event_name in MqttClientEvents.get_all_events():
print(f"- {event_name}")
Simple Event Handler¶
from nwp500 import NavienAuthClient, NavienAPIClient, NavienMqttClient, MqttClientEvents
import asyncio
async def main():
async with NavienAuthClient(email, password) as auth:
api = NavienAPIClient(auth)
device = await api.get_first_device()
mqtt = NavienMqttClient(auth)
await mqtt.connect()
def on_status_event(event):
status = event.status
print(f"Temperature: {status.dhw_temperature}°F")
print(f"Power: {status.current_inst_power}W")
mqtt.on(MqttClientEvents.STATUS_RECEIVED, on_status_event)
await mqtt.request_device_status(device)
await asyncio.sleep(300)
await mqtt.disconnect()
asyncio.run(main())
Raw status subscription¶
Use a typed subscription when you want the model object directly:
def on_status(status):
print(status.dhw_temperature)
await mqtt.subscribe_device_status(device, on_status)
await mqtt.request_device_status(device)
Event Registry¶
Use MqttClientEvents constants to avoid typos and get IDE autocomplete:
from nwp500 import MqttClientEvents, NavienMqttClient
mqtt_client = NavienMqttClient(auth)
def on_temp_change(event):
print(f"Temperature: {event.old_temperature} -> {event.new_temperature}")
def on_heating_start(event):
print(f"Heating started at {event.status.dhw_temperature}")
def on_error(event):
print(f"Error: {event.error_code}")
mqtt_client.on(MqttClientEvents.TEMPERATURE_CHANGED, on_temp_change)
mqtt_client.on(MqttClientEvents.HEATING_STARTED, on_heating_start)
mqtt_client.on(MqttClientEvents.ERROR_DETECTED, on_error)
for event_name in MqttClientEvents.get_all_events():
print(f" - {event_name}")
See Event System for the event dataclass reference.
Advanced Patterns¶
Tracking significant changes¶
Filter callbacks to only act when a value changes by more than a threshold:
class DeviceMonitor:
def __init__(self, device, mqtt):
self.device = device
self.mqtt = mqtt
self.last_temp = None
self.last_power = None
async def start(self):
await self.mqtt.subscribe_device_status(
self.device,
self.on_status
)
await self.mqtt.request_device_status(self.device)
def on_status(self, status):
# Temperature changed by more than 2°F
if self.last_temp is None or abs(status.dhw_temperature - self.last_temp) >= 2:
print(f"Temperature changed: {self.last_temp}°F → {status.dhw_temperature}°F")
self.last_temp = status.dhw_temperature
# Power changed by more than 100W
if self.last_power is None or abs(status.current_inst_power - self.last_power) >= 100:
print(f"Power changed: {self.last_power}W → {status.current_inst_power}W")
self.last_power = status.current_inst_power
# Usage
async def main():
async with NavienAuthClient(email, password) as auth:
api = NavienAPIClient(auth)
device = await api.get_first_device()
mqtt = NavienMqttClient(auth)
await mqtt.connect()
monitor = DeviceMonitor(device, mqtt)
await monitor.start()
await asyncio.sleep(3600) # Monitor for 1 hour
Multiple devices¶
Monitor multiple devices with individual callbacks.
class MultiDeviceMonitor:
def __init__(self, mqtt):
self.mqtt = mqtt
self.devices = {}
async def add_device(self, device):
device_id = device.device_info.mac_address
# Create device-specific callback
def callback(status):
self.on_device_status(device_id, status)
# Subscribe
await self.mqtt.subscribe_device_status(device, callback)
await self.mqtt.request_device_status(device)
self.devices[device_id] = {
'device': device,
'callback': callback,
'last_status': None
}
def on_device_status(self, device_id, status):
device_data = self.devices[device_id]
device_name = device_data['device'].device_info.device_name
print(f"[{device_name}]")
print(f" Temperature: {status.dhw_temperature}°F")
print(f" Power: {status.current_inst_power}W")
print()
device_data['last_status'] = status
# Usage
async def main():
async with NavienAuthClient(email, password) as auth:
api = NavienAPIClient(auth)
devices = await api.list_devices()
mqtt = NavienMqttClient(auth)
await mqtt.connect()
monitor = MultiDeviceMonitor(mqtt)
# Add all devices
for device in devices:
await monitor.add_device(device)
# Monitor indefinitely
while True:
await asyncio.sleep(60)
Alert rules¶
Trigger actions when the device crosses a threshold:
from datetime import datetime
from typing import Callable, List
class AlertRule:
def __init__(self, name: str, condition: Callable, action: Callable):
self.name = name
self.condition = condition
self.action = action
def check(self, status):
if self.condition(status):
self.action(status)
class AlertSystem:
def __init__(self, device, mqtt):
self.device = device
self.mqtt = mqtt
self.rules: List[AlertRule] = []
def add_rule(self, rule: AlertRule):
self.rules.append(rule)
async def start(self):
await self.mqtt.subscribe_device_status(
self.device,
self.on_status
)
await self.mqtt.start_periodic_requests(
self.device,
period_seconds=60
)
def on_status(self, status):
for rule in self.rules:
rule.check(status)
# Define alert actions
def send_email(subject, body):
print(f"EMAIL: {subject}\n{body}")
# Implement email sending
def send_sms(message):
print(f"SMS: {message}")
# Implement SMS sending
def log_alert(message):
timestamp = datetime.now().isoformat()
print(f"[{timestamp}] ALERT: {message}")
# Usage
async def main():
async with NavienAuthClient(email, password) as auth:
api = NavienAPIClient(auth)
device = await api.get_first_device()
mqtt = NavienMqttClient(auth)
await mqtt.connect()
alerts = AlertSystem(device, mqtt)
# Define alert rules
alerts.add_rule(AlertRule(
name="Low Temperature",
condition=lambda s: s.dhw_temperature < 110,
action=lambda s: send_email(
"Low Water Temperature",
f"Temperature dropped to {s.dhw_temperature}°F"
)
))
alerts.add_rule(AlertRule(
name="High Power",
condition=lambda s: s.current_inst_power > 2000,
action=lambda s: log_alert(
f"High power usage: {s.current_inst_power}W"
)
))
alerts.add_rule(AlertRule(
name="Error Detected",
condition=lambda s: s.error_code != 0,
action=lambda s: send_sms(
f"Device error: {s.error_code}"
)
))
await alerts.start()
# Monitor indefinitely
while True:
await asyncio.sleep(3600)
Data logging¶
Log device data to a database or file.
import sqlite3
from datetime import datetime
class DataLogger:
def __init__(self, device, mqtt, db_path="navien_data.db"):
self.device = device
self.mqtt = mqtt
self.db_path = db_path
self.setup_database()
def setup_database(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS status_log (
timestamp TEXT,
device_mac TEXT,
temperature REAL,
target_temp REAL,
power REAL,
mode TEXT,
operation_mode TEXT,
error_code INTEGER
)
""")
conn.commit()
conn.close()
async def start(self):
await self.mqtt.subscribe_device_status(
self.device,
self.log_status
)
await self.mqtt.start_periodic_requests(
self.device,
period_seconds=300 # Log every 5 minutes
)
def log_status(self, status):
timestamp = datetime.now().isoformat()
device_mac = self.device.device_info.mac_address
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO status_log VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
timestamp,
device_mac,
status.dhw_temperature,
status.dhw_temperature_setting,
status.current_inst_power,
status.dhw_operation_setting.name,
status.operation_mode.name,
status.error_code
))
conn.commit()
conn.close()
print(f"[{timestamp}] Logged status for {device_mac}")
# Usage
async def main():
async with NavienAuthClient(email, password) as auth:
api = NavienAPIClient(auth)
device = await api.get_first_device()
mqtt = NavienMqttClient(auth)
await mqtt.connect()
logger = DataLogger(device, mqtt)
await logger.start()
# Log indefinitely
while True:
await asyncio.sleep(3600)
Home automation bridge¶
Publish status updates to Home Assistant or similar systems:
import aiohttp
class HomeAssistantBridge:
def __init__(self, device, mqtt, ha_url, ha_token):
self.device = device
self.mqtt = mqtt
self.ha_url = ha_url
self.ha_token = ha_token
async def start(self):
await self.mqtt.subscribe_device_status(
self.device,
self.publish_to_ha
)
await self.mqtt.start_periodic_requests(
self.device,
period_seconds=30
)
async def publish_to_ha(self, status):
"""Publish device status to Home Assistant MQTT."""
device_mac = self.device.device_info.mac_address
# Prepare state data
state_data = {
'temperature': status.dhw_temperature,
'target_temperature': status.dhw_temperature_setting,
'power': status.current_inst_power,
'mode': status.dhw_operation_setting.name,
'state': status.operation_mode.name,
'error': status.error_code
}
# Publish to HA
async with aiohttp.ClientSession() as session:
headers = {
'Authorization': f'Bearer {self.ha_token}',
'Content-Type': 'application/json'
}
url = f"{self.ha_url}/api/states/sensor.navien_{device_mac}"
async with session.post(url, headers=headers, json={
'state': status.dhw_temperature,
'attributes': state_data
}) as resp:
if resp.status == 200:
print(f"Published to Home Assistant")
else:
print(f"HA publish failed: {resp.status}")
# Usage
async def main():
async with NavienAuthClient(email, password) as auth:
api = NavienAPIClient(auth)
device = await api.get_first_device()
mqtt = NavienMqttClient(auth)
await mqtt.connect()
bridge = HomeAssistantBridge(
device,
mqtt,
ha_url="http://homeassistant.local:8123",
ha_token="your_long_lived_token"
)
await bridge.start()
# Run indefinitely
while True:
await asyncio.sleep(3600)
Best Practices¶
Keep handlers lightweight¶
Offload heavy work with asyncio.create_task rather than blocking in the callback:
def on_status(status):
asyncio.create_task(process_status(status))
Wrap callbacks in try/except¶
An unhandled exception in a callback won’t crash the event loop, but it will silence subsequent events for that subscription:
def safe_handler(status):
try:
process_status(status)
except Exception as e:
print(f"Handler error: {e}")
Async callbacks¶
Callbacks can be async. The client will schedule them as tasks:
async def async_handler(status):
await save_to_database(status)
await send_notification(status)
Batch processing¶
Buffer updates and flush periodically to reduce I/O overhead:
class BatchProcessor:
def __init__(self):
self.buffer = []
def on_status(self, status):
self.buffer.append(status)
if len(self.buffer) >= 10:
self.flush()
def flush(self):
save_batch_to_db(self.buffer)
self.buffer.clear()