Event System¶
The MQTT client exposes two complementary callback patterns:
subscribe_*()methods parse device messages and call your callback with a model object such asDeviceStatusorReservationSchedule.nwp500.events.EventEmitter.on()listens for higher-level client events fromnwp500.mqtt_events.MqttClientEvents. These callbacks always receive one typed event dataclass.
Overview¶
Use the event system when you want to react to connection changes, status transitions, or derived state changes such as temperature deltas and error conditions.
Two Subscription Patterns¶
Typed device subscriptions¶
These methods deliver parsed model objects directly to the callback.
def on_status(status):
print(status.dhw_temperature)
print(status.current_inst_power)
await mqtt.subscribe_device_status(device, on_status)
await mqtt.request_device_status(device)
Examples include:
nwp500.mqtt.client.NavienMqttClient.subscribe_device_status()nwp500.mqtt.client.NavienMqttClient.subscribe_device_feature()nwp500.mqtt.client.NavienMqttClient.subscribe_energy_usage()nwp500.mqtt.client.NavienMqttClient.subscribe_reservation_response()nwp500.mqtt.client.NavienMqttClient.subscribe_weekly_reservation_response()nwp500.mqtt.client.NavienMqttClient.subscribe_recirculation_schedule_response()
Client event subscriptions¶
Event emitter callbacks receive a single event object.
from nwp500 import MqttClientEvents
def on_status_event(event):
print(event.status.dhw_temperature)
def on_resumed(event):
print(event.return_code)
print(event.session_present)
mqtt.on(MqttClientEvents.STATUS_RECEIVED, on_status_event)
mqtt.on(MqttClientEvents.CONNECTION_RESUMED, on_resumed)
EventEmitter API¶
- class EventEmitter¶
Base class for event-driven components.
- on(event, callback)¶
Register a callback for an event name.
- off(event, callback=None)¶
Remove one callback or all callbacks for an event.
- wait_for(event, timeout=None)¶
Wait for the next event emission and return the positional event arguments as a tuple.
args = await mqtt.wait_for(MqttClientEvents.CONNECTION_RESUMED, timeout=30) resumed = args[0] print(resumed.session_present)
MQTT Client Events¶
The nwp500.mqtt_events.MqttClientEvents registry exposes all supported
client event names with IDE-friendly constants.
from nwp500 import MqttClientEvents
for event_name in MqttClientEvents.get_all_events():
print(event_name)
ConnectionInterruptedEvent¶
- class nwp500.mqtt_events.ConnectionInterruptedEvent¶
Emitted for
nwp500.mqtt_events.MqttClientEvents.CONNECTION_INTERRUPTED.Fields:
error(Exception) - The exception that interrupted the MQTT connection.
Example:
def on_interrupted(event): print(f"Connection lost: {event.error}") mqtt.on(MqttClientEvents.CONNECTION_INTERRUPTED, on_interrupted)
ConnectionResumedEvent¶
- class nwp500.mqtt_events.ConnectionResumedEvent¶
Emitted for
nwp500.mqtt_events.MqttClientEvents.CONNECTION_RESUMED.Fields:
return_code(int) - MQTT return code from the resume attempt.session_present(bool) - Whether broker session state was preserved.
Example:
def on_resumed(event): if not event.session_present: print("Broker session was reset") mqtt.on(MqttClientEvents.CONNECTION_RESUMED, on_resumed)
StatusReceivedEvent¶
- class nwp500.mqtt_events.StatusReceivedEvent¶
Emitted for
nwp500.mqtt_events.MqttClientEvents.STATUS_RECEIVED.Fields:
status(DeviceStatus) - Parsed device status.
TemperatureChangedEvent¶
- class nwp500.mqtt_events.TemperatureChangedEvent¶
Emitted for
nwp500.mqtt_events.MqttClientEvents.TEMPERATURE_CHANGED.Fields:
old_temperature(float) - Previous DHW temperature in the current unit system.new_temperature(float) - New DHW temperature in the current unit system.
ModeChangedEvent¶
- class nwp500.mqtt_events.ModeChangedEvent¶
Emitted for
nwp500.mqtt_events.MqttClientEvents.MODE_CHANGED.Fields:
old_mode(CurrentOperationMode) - Previous operating mode.new_mode(CurrentOperationMode) - New operating mode.
PowerChangedEvent¶
- class nwp500.mqtt_events.PowerChangedEvent¶
Emitted for
nwp500.mqtt_events.MqttClientEvents.POWER_CHANGED.Fields:
old_power(float) - Previous instantaneous power draw in watts.new_power(float) - New instantaneous power draw in watts.
HeatingStartedEvent¶
- class nwp500.mqtt_events.HeatingStartedEvent¶
Emitted for
nwp500.mqtt_events.MqttClientEvents.HEATING_STARTED.Fields:
status(DeviceStatus) - Status snapshot when heating started.
HeatingStoppedEvent¶
- class nwp500.mqtt_events.HeatingStoppedEvent¶
Emitted for
nwp500.mqtt_events.MqttClientEvents.HEATING_STOPPED.Fields:
status(DeviceStatus) - Status snapshot when heating stopped.
ErrorDetectedEvent¶
- class nwp500.mqtt_events.ErrorDetectedEvent¶
Emitted for
nwp500.mqtt_events.MqttClientEvents.ERROR_DETECTED.Fields:
error_code(ErrorCode) - Newly detected device error.status(DeviceStatus) - Status snapshot that contained the error.
ErrorClearedEvent¶
- class nwp500.mqtt_events.ErrorClearedEvent¶
Emitted for
nwp500.mqtt_events.MqttClientEvents.ERROR_CLEARED.Fields:
error_code(ErrorCode) - Error code that cleared.
FeatureReceivedEvent¶
- class nwp500.mqtt_events.FeatureReceivedEvent¶
Emitted for
nwp500.mqtt_events.MqttClientEvents.FEATURE_RECEIVED.Fields:
feature(DeviceFeature) - Parsed device feature payload.
Usage Examples¶
React to typed event payloads¶
from nwp500 import MqttClientEvents
def on_temperature_changed(event):
print(f"{event.old_temperature} -> {event.new_temperature}")
def on_error(event):
print(f"Error: {event.error_code}")
print(f"Current mode: {event.status.operation_mode}")
mqtt.on(MqttClientEvents.TEMPERATURE_CHANGED, on_temperature_changed)
mqtt.on(MqttClientEvents.ERROR_DETECTED, on_error)
Wait for a connection event¶
args = await mqtt.wait_for(MqttClientEvents.CONNECTION_RESUMED, timeout=30)
resumed = args[0]
print(resumed.return_code)