Event System

The MQTT client exposes two complementary callback patterns:

Overview

Use the event system when you want to react to connection changes, status transitions, or derived state changes such as temperature deltas and error conditions.

Two Subscription Patterns

Typed device subscriptions

These methods deliver parsed model objects directly to the callback.

def on_status(status):
    print(status.dhw_temperature)
    print(status.current_inst_power)

await mqtt.subscribe_device_status(device, on_status)
await mqtt.request_device_status(device)

Examples include:

Client event subscriptions

Event emitter callbacks receive a single event object.

from nwp500 import MqttClientEvents

def on_status_event(event):
    print(event.status.dhw_temperature)

def on_resumed(event):
    print(event.return_code)
    print(event.session_present)

mqtt.on(MqttClientEvents.STATUS_RECEIVED, on_status_event)
mqtt.on(MqttClientEvents.CONNECTION_RESUMED, on_resumed)

EventEmitter API

class EventEmitter

Base class for event-driven components.

on(event, callback)

Register a callback for an event name.

off(event, callback=None)

Remove one callback or all callbacks for an event.

wait_for(event, timeout=None)

Wait for the next event emission and return the positional event arguments as a tuple.

args = await mqtt.wait_for(MqttClientEvents.CONNECTION_RESUMED, timeout=30)
resumed = args[0]
print(resumed.session_present)

MQTT Client Events

The nwp500.mqtt_events.MqttClientEvents registry exposes all supported client event names with IDE-friendly constants.

from nwp500 import MqttClientEvents

for event_name in MqttClientEvents.get_all_events():
    print(event_name)

ConnectionInterruptedEvent

class nwp500.mqtt_events.ConnectionInterruptedEvent

Emitted for nwp500.mqtt_events.MqttClientEvents.CONNECTION_INTERRUPTED.

Fields:

  • error (Exception) - The exception that interrupted the MQTT connection.

Example:

def on_interrupted(event):
    print(f"Connection lost: {event.error}")

mqtt.on(MqttClientEvents.CONNECTION_INTERRUPTED, on_interrupted)

ConnectionResumedEvent

class nwp500.mqtt_events.ConnectionResumedEvent

Emitted for nwp500.mqtt_events.MqttClientEvents.CONNECTION_RESUMED.

Fields:

  • return_code (int) - MQTT return code from the resume attempt.

  • session_present (bool) - Whether broker session state was preserved.

Example:

def on_resumed(event):
    if not event.session_present:
        print("Broker session was reset")

mqtt.on(MqttClientEvents.CONNECTION_RESUMED, on_resumed)

StatusReceivedEvent

class nwp500.mqtt_events.StatusReceivedEvent

Emitted for nwp500.mqtt_events.MqttClientEvents.STATUS_RECEIVED.

Fields:

TemperatureChangedEvent

class nwp500.mqtt_events.TemperatureChangedEvent

Emitted for nwp500.mqtt_events.MqttClientEvents.TEMPERATURE_CHANGED.

Fields:

  • old_temperature (float) - Previous DHW temperature in the current unit system.

  • new_temperature (float) - New DHW temperature in the current unit system.

ModeChangedEvent

class nwp500.mqtt_events.ModeChangedEvent

Emitted for nwp500.mqtt_events.MqttClientEvents.MODE_CHANGED.

Fields:

PowerChangedEvent

class nwp500.mqtt_events.PowerChangedEvent

Emitted for nwp500.mqtt_events.MqttClientEvents.POWER_CHANGED.

Fields:

  • old_power (float) - Previous instantaneous power draw in watts.

  • new_power (float) - New instantaneous power draw in watts.

HeatingStartedEvent

class nwp500.mqtt_events.HeatingStartedEvent

Emitted for nwp500.mqtt_events.MqttClientEvents.HEATING_STARTED.

Fields:

  • status (DeviceStatus) - Status snapshot when heating started.

HeatingStoppedEvent

class nwp500.mqtt_events.HeatingStoppedEvent

Emitted for nwp500.mqtt_events.MqttClientEvents.HEATING_STOPPED.

Fields:

  • status (DeviceStatus) - Status snapshot when heating stopped.

ErrorDetectedEvent

class nwp500.mqtt_events.ErrorDetectedEvent

Emitted for nwp500.mqtt_events.MqttClientEvents.ERROR_DETECTED.

Fields:

  • error_code (ErrorCode) - Newly detected device error.

  • status (DeviceStatus) - Status snapshot that contained the error.

ErrorClearedEvent

class nwp500.mqtt_events.ErrorClearedEvent

Emitted for nwp500.mqtt_events.MqttClientEvents.ERROR_CLEARED.

Fields:

  • error_code (ErrorCode) - Error code that cleared.

FeatureReceivedEvent

class nwp500.mqtt_events.FeatureReceivedEvent

Emitted for nwp500.mqtt_events.MqttClientEvents.FEATURE_RECEIVED.

Fields:

Usage Examples

React to typed event payloads

from nwp500 import MqttClientEvents

def on_temperature_changed(event):
    print(f"{event.old_temperature} -> {event.new_temperature}")

def on_error(event):
    print(f"Error: {event.error_code}")
    print(f"Current mode: {event.status.operation_mode}")

mqtt.on(MqttClientEvents.TEMPERATURE_CHANGED, on_temperature_changed)
mqtt.on(MqttClientEvents.ERROR_DETECTED, on_error)

Wait for a connection event

args = await mqtt.wait_for(MqttClientEvents.CONNECTION_RESUMED, timeout=30)
resumed = args[0]
print(resumed.return_code)