Source code for nwp500.mqtt.periodic

"""
MQTT Periodic Request Manager for Navien devices.

This module handles periodic/scheduled requests to keep device information
and status up-to-date. Features include:
- Configurable request intervals
- Automatic skip when disconnected
- Graceful task cancellation
- Per-device, per-type task management
"""

from __future__ import annotations

import asyncio
import contextlib
import logging
from collections.abc import Callable
from typing import Any

from awscrt.exceptions import AwsCrtError

from ..models import Device
from .utils import PeriodicRequestType

__author__ = "Emmanuel Levijarvi"

_logger = logging.getLogger(__name__)


[docs] class MqttPeriodicRequestManager: """ Manages periodic requests for device information and status. Features: - Independent tasks per device and request type - Automatic skip when disconnected (with throttled logging) - Graceful cancellation on disconnect - Error recovery and retry """ def __init__( self, is_connected_func: Callable[[], bool], request_device_info_func: Callable[..., Any], request_device_status_func: Callable[..., Any], ): """ Initialize periodic request manager. Args: is_connected_func: Function that returns connection status request_device_info_func: Async function to request device info request_device_status_func: Async function to request device status """ self._is_connected = is_connected_func self._request_device_info = request_device_info_func self._request_device_status = request_device_status_func # Track active periodic tasks self._periodic_tasks: dict[str, asyncio.Task[None]] = {} @property def active_task_count(self) -> int: """Get number of active periodic tasks.""" return len(self._periodic_tasks)
[docs] async def start_periodic_requests( self, device: Device, request_type: PeriodicRequestType = PeriodicRequestType.DEVICE_STATUS, period_seconds: float = 300.0, ) -> None: """ Start sending periodic requests for device information or status. This optional helper continuously sends requests at a specified interval. It can be used to keep device information or status up-to-date. Args: device: Device object request_type: Type of request (DEVICE_INFO or DEVICE_STATUS) period_seconds: Time between requests in seconds (default: 300 = 5 minutes) Example: >>> # Start periodic status requests (default) >>> await manager.start_periodic_requests(device) >>> >>> # Start periodic device info requests >>> await manager.start_periodic_requests( ... device, ... request_type=PeriodicRequestType.DEVICE_INFO ... ) >>> >>> # Custom period: request every 60 seconds >>> await manager.start_periodic_requests( ... device, ... period_seconds=60 ... ) Note: - Only one periodic task per request type per device - Call stop_periodic_requests() to stop a task - All tasks automatically stop when client disconnects """ device_id = device.device_info.mac_address # Do not log MAC address; use a generic placeholder to avoid leaking # sensitive information redacted_device_id = "DEVICE_ID_REDACTED" task_name = f"periodic_{request_type.value}_{device_id}" # Stop existing task for this device/type if any if task_name in self._periodic_tasks: _logger.info( f"Stopping existing periodic {request_type.value} task" ) await self.stop_periodic_requests(device, request_type) async def periodic_request() -> None: """Execute periodic requests for device information or status. This coroutine runs continuously, sending requests at the configured interval. It automatically skips requests when disconnected and provides throttled logging to reduce noise. """ _logger.info( f"Started periodic {request_type.value} requests for " f"{redacted_device_id} (every {period_seconds}s)" ) # Track consecutive skips for throttled logging consecutive_skips = 0 while True: try: if not self._is_connected(): consecutive_skips += 1 # Log warning only on first skip and then every 10th # skip to reduce noise if ( consecutive_skips == 1 or consecutive_skips % 10 == 0 ): _logger.warning( "Not connected, skipping %s request for %s " "(skipped %d time%s)", request_type.value, redacted_device_id, consecutive_skips, "s" if consecutive_skips > 1 else "", ) else: _logger.debug( "Not connected, skipping %s request for %s", request_type.value, redacted_device_id, ) else: # Reset skip counter when connected if consecutive_skips > 0: _logger.info( "Reconnected, resuming %s requests for %s " "(had skipped %d)", request_type.value, redacted_device_id, consecutive_skips, ) consecutive_skips = 0 # Send appropriate request type if request_type == PeriodicRequestType.DEVICE_INFO: await self._request_device_info(device) elif request_type == PeriodicRequestType.DEVICE_STATUS: await self._request_device_status(device) _logger.debug( "Sent periodic %s request for %s", request_type.value, redacted_device_id, ) # Wait for the specified period await asyncio.sleep(period_seconds) except asyncio.CancelledError: _logger.info( f"Periodic {request_type.value} requests cancelled " f"for {redacted_device_id}" ) break except (AwsCrtError, RuntimeError) as e: # Handle known MQTT errors gracefully error_name = ( getattr(e, "name", None) if isinstance(e, AwsCrtError) else None ) if ( error_name == "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION" ): _logger.debug( "Periodic %s request cancelled due to clean " "session for %s. This is expected during " "reconnection.", request_type.value, redacted_device_id, ) elif error_name == "AWS_ERROR_MQTT_CONNECTION_DESTROYED": _logger.warning( "MQTT connection destroyed during %s request " "for %s. This can occur during AWS-initiated " "disconnections (e.g., 24-hour timeout). " "Reconnection will be attempted automatically.", request_type.value, redacted_device_id, ) else: _logger.error( "Error in periodic %s request for %s: %s", request_type.value, redacted_device_id, e, exc_info=True, ) # Continue despite errors await asyncio.sleep(period_seconds) # Create and store the task task = asyncio.create_task(periodic_request()) self._periodic_tasks[task_name] = task _logger.info( f"Started periodic {request_type.value} task for " f"{redacted_device_id} with period {period_seconds}s" )
[docs] async def stop_periodic_requests( self, device: Device, request_type: PeriodicRequestType | None = None, ) -> None: """ Stop sending periodic requests for a device. Args: device: Device object request_type: Type of request to stop. If None, stops all types for this device. Example: >>> # Stop specific request type >>> await manager.stop_periodic_requests( ... device, ... PeriodicRequestType.DEVICE_STATUS ... ) >>> >>> # Stop all periodic requests for device >>> await manager.stop_periodic_requests(device) """ device_id = device.device_info.mac_address if request_type is None: # Stop all request types for this device types_to_stop = [ PeriodicRequestType.DEVICE_INFO, PeriodicRequestType.DEVICE_STATUS, ] else: types_to_stop = [request_type] stopped_count = 0 for req_type in types_to_stop: task_name = f"periodic_{req_type.value}_{device_id}" if task_name in self._periodic_tasks: task = self._periodic_tasks[task_name] task.cancel() with contextlib.suppress(asyncio.CancelledError): await task del self._periodic_tasks[task_name] stopped_count += 1 if stopped_count == 0: _logger.debug( "No periodic tasks found for device" + (f" (type={request_type.value})" if request_type else "") )
[docs] async def stop_all_periodic_tasks(self, reason: str | None = None) -> None: """ Stop all periodic request tasks. This is automatically called when disconnecting. Args: reason: Optional reason for logging context (e.g., "connection failure") Example: >>> await manager.stop_all_periodic_tasks() >>> await manager.stop_all_periodic_tasks(reason="disconnect") """ if not self._periodic_tasks: return task_count = len(self._periodic_tasks) reason_msg = f" due to {reason}" if reason else "" _logger.info(f"Stopping {task_count} periodic task(s){reason_msg}") # Cancel all tasks for task in self._periodic_tasks.values(): task.cancel() # Wait for all to complete await asyncio.gather( *self._periodic_tasks.values(), return_exceptions=True ) self._periodic_tasks.clear() _logger.info("All periodic tasks stopped")
# Convenience methods for common use cases
[docs] async def start_periodic_device_info_requests( self, device: Device, period_seconds: float = 300.0 ) -> None: """ Start sending periodic device info requests. This is a convenience wrapper around start_periodic_requests(). Args: device: Device object period_seconds: Time between requests in seconds (default: 300 = 5 minutes) """ await self.start_periodic_requests( device=device, request_type=PeriodicRequestType.DEVICE_INFO, period_seconds=period_seconds, )
[docs] async def start_periodic_device_status_requests( self, device: Device, period_seconds: float = 300.0 ) -> None: """ Start sending periodic device status requests. This is a convenience wrapper around start_periodic_requests(). Args: device: Device object period_seconds: Time between requests in seconds (default: 300 = 5 minutes) """ await self.start_periodic_requests( device=device, request_type=PeriodicRequestType.DEVICE_STATUS, period_seconds=period_seconds, )
[docs] async def stop_periodic_device_info_requests(self, device: Device) -> None: """ Stop sending periodic device info requests for a device. This is a convenience wrapper around stop_periodic_requests(). Args: device: Device object """ await self.stop_periodic_requests( device, PeriodicRequestType.DEVICE_INFO )
[docs] async def stop_periodic_device_status_requests( self, device: Device ) -> None: """ Stop sending periodic device status requests for a device. This is a convenience wrapper around stop_periodic_requests(). Args: device: Device object """ await self.stop_periodic_requests( device, PeriodicRequestType.DEVICE_STATUS )