Source code for nwp500.events

"""
Event Emitter for Navien device state changes.

This module provides an event-driven architecture for handling device state
changes, allowing multiple listeners per event and automatic state change
detection.
"""

from __future__ import annotations

import asyncio
import inspect
import logging
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any

__author__ = "Emmanuel Levijarvi"
__copyright__ = "Emmanuel Levijarvi"
__license__ = "MIT"

_logger = logging.getLogger(__name__)


[docs] @dataclass(frozen=True) class EventListener: """Represents a registered event listener.""" callback: Callable[..., Any] once: bool = False priority: int = 50 # Default priority
[docs] class EventEmitter: """ Event emitter with support for multiple listeners per event. Provides an event-driven architecture for device state changes with: - Multiple listeners per event - Async handler support - One-time listeners (once) - Priority-based execution order - Automatic state change detection Example:: emitter = EventEmitter() # Register listeners emitter.on('temperature_changed', log_temperature) emitter.on('temperature_changed', update_ui) # Emit events await emitter.emit('temperature_changed', temperature_event) # One-time listener emitter.once('device_ready', initialize) # Remove listener emitter.off('temperature_changed', log_temperature) """ def __init__(self) -> None: """Initialize the event emitter.""" self._listeners: dict[str, list[EventListener]] = defaultdict(list) self._event_counts: dict[str, int] = defaultdict(int) self._once_callbacks: set[tuple[str, Callable[..., Any]]] = ( set() ) # Track (event, callback) for once listeners
[docs] def on( self, event: str, callback: Callable[..., Any], priority: int = 50, ) -> None: """ Register an event listener. Args: event: Event name to listen for callback: Function to call when event is emitted (can be async) priority: Execution priority (higher = earlier, default: 50) Example:: from nwp500.unit_system import get_unit_system def on_temp_change(event): unit = "°C" if get_unit_system() == "metric" else "°F" print( f"Temperature: {event.old_temperature}{unit} → " f"{event.new_temperature}{unit}" ) emitter.on('temperature_changed', on_temp_change) # Async handler async def save_to_db(event): await db.save(event.new_temperature) emitter.on('temperature_changed', save_to_db, priority=100) """ listener = EventListener( callback=callback, once=False, priority=priority ) self._listeners[event].append(listener) # Sort by priority (highest first) self._listeners[event].sort( key=lambda listener: listener.priority, reverse=True ) _logger.debug( f"Registered listener for '{event}' event (priority: {priority})" )
[docs] def once( self, event: str, callback: Callable[..., Any], priority: int = 50, ) -> None: """ Register a one-time event listener. The listener will be automatically removed after first execution. Args: event: Event name to listen for callback: Function to call when event is emitted priority: Execution priority (higher = earlier, default: 50) Example:: emitter.once('device_ready', initialize_device) # Will only be called once, then auto-removed """ listener = EventListener( callback=callback, once=True, priority=priority ) self._listeners[event].append(listener) self._once_callbacks.add( (event, callback) ) # Track (event, callback) for O(1) lookup # Sort by priority (highest first) self._listeners[event].sort( key=lambda listener: listener.priority, reverse=True ) _logger.debug( f"Registered one-time listener for '{event}' event " f"(priority: {priority})" )
[docs] def off( self, event: str, callback: Callable[..., Any | None] | None = None ) -> int: """ Remove event listener(s). Args: event: Event name callback: Specific callback to remove, or None to remove all for event Returns: Number of listeners removed Example:: # Remove specific listener emitter.off('temperature_changed', log_temperature) # Remove all listeners for event emitter.off('temperature_changed') """ if event not in self._listeners: return 0 if callback is None: # Remove all listeners for this event count = len(self._listeners[event]) # Clean up from once callbacks set for listener in self._listeners[event]: self._once_callbacks.discard((event, listener.callback)) del self._listeners[event] _logger.debug( f"Removed all {count} listener(s) for '{event}' event" ) return count # Remove specific callback original_count = len(self._listeners[event]) self._listeners[event] = [ listener for listener in self._listeners[event] if listener.callback != callback ] removed_count = original_count - len(self._listeners[event]) # Clean up from once callbacks set if removed_count > 0: self._once_callbacks.discard((event, callback)) # Clean up if no listeners left if not self._listeners[event]: del self._listeners[event] if removed_count > 0: _logger.debug( f"Removed {removed_count} listener(s) for '{event}' event" ) return removed_count
[docs] async def emit(self, event: str, *args: Any, **kwargs: Any) -> int: """ Emit an event to all registered listeners. Executes listeners in priority order (highest first). One-time listeners are automatically removed after execution. Args: event: Event name to emit *args: Positional arguments to pass to listeners **kwargs: Keyword arguments to pass to listeners Returns: Number of listeners that were called Example:: # Emit with an event object await emitter.emit('temperature_changed', temperature_event) # Emit with keyword arguments await emitter.emit('status_updated', status=device_status) """ if event not in self._listeners: return 0 listeners: list[EventListener] = self._listeners[event].copy() # Copy to allow modification during iteration called_count = 0 listeners_to_remove: list[EventListener] = [] for listener in listeners: try: # Call handler and await if it returned an awaitable. result = listener.callback(*args, **kwargs) if inspect.isawaitable(result): await result called_count += 1 # Check if this is a once listener if listener.once: listeners_to_remove.append(listener) self._once_callbacks.discard((event, listener.callback)) except Exception as e: # Catch all exceptions from user callbacks to ensure # resilience. We intentionally catch Exception here because: # 1. User callbacks can raise any exception type # 2. One bad callback shouldn't break other callbacks # 3. This is an event emitter pattern where resilience is key _logger.error( f"Error in '{event}' event handler: {e}", exc_info=True, ) # Remove one-time listeners after iteration for listener in listeners_to_remove: if listener in self._listeners[event]: self._listeners[event].remove(listener) # Clean up if no listeners left if not self._listeners[event]: del self._listeners[event] # Track event count self._event_counts[event] += 1 _logger.debug(f"Emitted '{event}' event to {called_count} listener(s)") return called_count
[docs] def listener_count(self, event: str) -> int: """ Get the number of listeners for an event. Args: event: Event name Returns: Number of registered listeners Example:: count = emitter.listener_count('temperature_changed') print(f"{count} listeners registered") """ return len(self._listeners.get(event, []))
[docs] def event_count(self, event: str) -> int: """ Get the number of times an event has been emitted. Args: event: Event name Returns: Number of times event was emitted Example:: count = emitter.event_count('temperature_changed') print(f"Event emitted {count} times") """ return self._event_counts.get(event, 0)
[docs] def event_names(self) -> list[str]: """ Get list of all registered event names. Returns: List of event names with active listeners Example:: events = emitter.event_names() print(f"Active events: {', '.join(events)}") """ return list(self._listeners.keys())
[docs] def remove_all_listeners(self, event: str | None = None) -> int: """ Remove all listeners for an event, or all listeners for all events. Args: event: Event name, or None to remove all listeners Returns: Number of listeners removed Example:: # Remove all listeners for specific event emitter.remove_all_listeners('temperature_changed') # Remove all listeners for all events emitter.remove_all_listeners() """ if event is None: # Remove all listeners for all events count = sum( len(listeners) for listeners in self._listeners.values() ) self._listeners.clear() self._once_callbacks.clear() _logger.debug(f"Removed all {count} listener(s) for all events") return count # Remove all listeners for specific event return self.off(event)
[docs] async def wait_for( self, event: str, timeout: float | None = None, ) -> tuple[Any, ...]: """ Wait for an event to be emitted. Args: event: Event name to wait for timeout: Maximum time to wait in seconds (None = wait forever) Returns: Tuple of arguments passed to the event Raises: asyncio.TimeoutError: If timeout is reached Example:: # Wait for device to be ready await emitter.wait_for('device_ready', timeout=30) # Wait for specific condition args, _ = await emitter.wait_for('temperature_changed') temperature_event = args[0] current_temp = temperature_event.new_temperature """ future: asyncio.Future[tuple[tuple[Any, ...], dict[str, Any]]] = ( asyncio.get_running_loop().create_future() ) def handler(*args: Any, **kwargs: Any) -> None: if not future.done(): # Store both args and kwargs future.set_result((args, kwargs)) # Register one-time listener self.once(event, handler) try: if timeout is not None: args_tuple, _ = await asyncio.wait_for(future, timeout=timeout) else: args_tuple, _ = await future # Return just args for simplicity (most common case) return args_tuple except TimeoutError: # Remove the listener on timeout self.off(event, handler) raise