Source code for nwp500.mqtt.state_tracker

"""Per-device state change detection for Navien MQTT clients.

Compares successive :class:`DeviceStatus` snapshots for each device and emits
granular events when individual fields change (temperature, mode, power,
errors).
"""

from __future__ import annotations

import logging

from ..events import EventEmitter
from ..models import DeviceStatus
from ..mqtt_events import (
    ErrorClearedEvent,
    ErrorDetectedEvent,
    HeatingStartedEvent,
    HeatingStoppedEvent,
    ModeChangedEvent,
    PowerChangedEvent,
    TemperatureChangedEvent,
)
from ..unit_system import get_unit_system

_logger = logging.getLogger(__name__)


[docs] class DeviceStateTracker: """Tracks previous device states and emits change events. Each device (identified by MAC address) gets its own slot in ``_previous_status``. On every new status update, this class compares it against the stored snapshot and emits events for changed fields, then stores the new snapshot. """ def __init__(self, event_emitter: EventEmitter) -> None: self._event_emitter = event_emitter self._previous_status: dict[str, DeviceStatus] = {}
[docs] def clear(self) -> None: """Drop all stored snapshots (call on disconnect).""" self._previous_status.clear()
[docs] async def process(self, device_mac: str, status: DeviceStatus) -> None: """Compare *status* with the previous snapshot for *device_mac*. Emits the following events when values change: - ``temperature_changed(TemperatureChangedEvent(...))`` - ``mode_changed(ModeChangedEvent(...))`` - ``power_changed(PowerChangedEvent(...))`` - ``heating_started(HeatingStartedEvent(...))`` - ``heating_stopped(HeatingStoppedEvent(...))`` - ``error_detected(ErrorDetectedEvent(...))`` - ``error_cleared(ErrorClearedEvent(...))`` Args: device_mac: MAC address used as the per-device key. status: Freshly received :class:`DeviceStatus`. """ if device_mac not in self._previous_status: self._previous_status[device_mac] = status return prev = self._previous_status[device_mac] try: # Temperature change (compare raw values) if status.dhw_temperature_raw != prev.dhw_temperature_raw: await self._event_emitter.emit( "temperature_changed", TemperatureChangedEvent( device_mac=device_mac, old_temperature=prev.dhw_temperature, new_temperature=status.dhw_temperature, ), ) unit_suffix = "°C" if get_unit_system() == "metric" else "°F" _logger.debug( "Temperature changed: %s%s%s%s", prev.dhw_temperature, unit_suffix, status.dhw_temperature, unit_suffix, ) # Operation mode change (compare raw values) if status.operation_mode != prev.operation_mode: await self._event_emitter.emit( "mode_changed", ModeChangedEvent( device_mac=device_mac, old_mode=prev.operation_mode, new_mode=status.operation_mode, ), ) _logger.debug( "Mode changed: %s%s", prev.operation_mode, status.operation_mode, ) # Power consumption change (compare raw values) if status.current_inst_power != prev.current_inst_power: await self._event_emitter.emit( "power_changed", PowerChangedEvent( device_mac=device_mac, old_power=prev.current_inst_power, new_power=status.current_inst_power, ), ) _logger.debug( "Power changed: %sW → %sW", prev.current_inst_power, status.current_inst_power, ) # Heating started / stopped (compare raw values) prev_heating = prev.current_inst_power > 0 curr_heating = status.current_inst_power > 0 if curr_heating and not prev_heating: await self._event_emitter.emit( "heating_started", HeatingStartedEvent(device_mac=device_mac, status=status), ) _logger.debug("Heating started") if not curr_heating and prev_heating: await self._event_emitter.emit( "heating_stopped", HeatingStoppedEvent(device_mac=device_mac, status=status), ) _logger.debug("Heating stopped") # Error detection / clearance if status.error_code and not prev.error_code: await self._event_emitter.emit( "error_detected", ErrorDetectedEvent( device_mac=device_mac, error_code=status.error_code, status=status, ), ) _logger.info("Error detected: %s", status.error_code) if not status.error_code and prev.error_code: await self._event_emitter.emit( "error_cleared", ErrorClearedEvent( device_mac=device_mac, error_code=prev.error_code ), ) _logger.info("Error cleared: %s", prev.error_code) except (TypeError, AttributeError, RuntimeError) as e: _logger.error("Error detecting state changes: %s", e, exc_info=True) finally: self._previous_status[device_mac] = status