Source code for nwp500.mqtt.command_queue
"""
MQTT command queue management for Navien Smart Control.
This module handles queueing of commands when the MQTT connection is lost,
and automatically sends them when the connection is restored.
"""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Callable
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
from awscrt import mqtt
from .utils import QueuedCommand, redact_topic
if TYPE_CHECKING:
from .utils import MqttConnectionConfig
__author__ = "Emmanuel Levijarvi"
__copyright__ = "Emmanuel Levijarvi"
__license__ = "MIT"
_logger = logging.getLogger(__name__)
[docs]
class MqttCommandQueue:
"""
Manages command queueing when MQTT connection is interrupted.
Commands sent while disconnected are queued and automatically sent
when the connection is restored. This ensures commands are not lost
during temporary network interruptions.
The queue uses asyncio.Queue with a fixed maximum size. When the queue
is full, the oldest command is automatically dropped to make room for
new commands (FIFO with overflow dropping).
"""
def __init__(self, config: MqttConnectionConfig):
"""
Initialize the command queue.
Args:
config: MQTT connection configuration with queue settings
"""
self.config = config
# Python 3.10+ handles asyncio.Queue initialization without running loop
self._queue: asyncio.Queue[QueuedCommand] = asyncio.Queue(
maxsize=config.max_queued_commands
)
[docs]
def enqueue(
self, topic: str, payload: dict[str, Any], qos: mqtt.QoS
) -> None:
"""
Add a command to the queue.
If the queue is full, the oldest command is dropped to make room
for the new one (FIFO with overflow dropping).
Args:
topic: MQTT topic
payload: Command payload
qos: Quality of Service level
"""
if not self.config.enable_command_queue:
_logger.warning(
f"Command queue disabled, dropping command to "
f"'{redact_topic(topic)}'. Enable command queue in "
f"config to queue commands when disconnected."
)
return
command = QueuedCommand(
topic=topic,
payload=payload,
qos=qos,
timestamp=datetime.now(UTC),
)
# If queue is full, drop oldest command first
if self._queue.full():
try:
# Remove oldest command (non-blocking)
dropped = self._queue.get_nowait()
_logger.warning(
f"Command queue full ({self.config.max_queued_commands}), "
f"dropped oldest command to '{redact_topic(dropped.topic)}'"
)
except asyncio.QueueEmpty:
# Race condition - queue was emptied between check and get
pass
# Add new command (should never block since we just made room if needed)
try:
self._queue.put_nowait(command)
_logger.info(f"Queued command (queue size: {self._queue.qsize()})")
except asyncio.QueueFull:
_logger.error("Failed to enqueue command - queue unexpectedly full")
raise
[docs]
async def send_all(
self,
publish_func: Callable[..., Any],
is_connected_func: Callable[[], bool],
) -> tuple[int, int]:
"""
Send all queued commands.
This is called automatically when connection is restored.
Args:
publish_func: Async function to publish messages (topic, payload,
qos)
is_connected_func: Function to check if currently connected
Returns:
Tuple of (sent_count, failed_count)
"""
if self._queue.empty():
return (0, 0)
queue_size = self._queue.qsize()
_logger.info(f"Sending {queue_size} queued command(s)...")
sent_count = 0
failed_count = 0
while not self._queue.empty() and is_connected_func():
try:
# Get command from queue (non-blocking)
command = self._queue.get_nowait()
except asyncio.QueueEmpty:
# Queue was emptied by another task
break
try:
# Publish the queued command
await publish_func(
topic=command.topic,
payload=command.payload,
qos=command.qos,
)
sent_count += 1
_logger.debug(
f"Sent queued command to '{redact_topic(command.topic)}' "
f"(queued at {command.timestamp.isoformat()})"
)
except Exception as e:
failed_count += 1
_logger.error(
f"Failed to send queued command to "
f"'{redact_topic(command.topic)}': {e}"
)
# Re-queue if there's room
if not self._queue.full():
try:
self._queue.put_nowait(command)
_logger.warning("Re-queued failed command")
except asyncio.QueueFull:
_logger.error(
"Failed to re-queue command - queue is full"
)
break # Stop processing on error to avoid cascade failures
if sent_count > 0:
_logger.info(
f"Sent {sent_count} queued command(s)"
+ (f", {failed_count} failed" if failed_count > 0 else "")
)
return (sent_count, failed_count)
[docs]
def clear(self) -> int:
"""
Clear all queued commands.
Returns:
Number of commands cleared
"""
# Drain the queue
cleared = 0
while not self._queue.empty():
try:
self._queue.get_nowait()
cleared += 1
except asyncio.QueueEmpty:
break
if cleared > 0:
_logger.info(f"Cleared {cleared} queued command(s)")
return cleared
@property
def count(self) -> int:
"""Get the number of queued commands."""
return self._queue.qsize()
@property
def is_empty(self) -> bool:
"""Check if the queue is empty."""
return self._queue.empty()
@property
def is_full(self) -> bool:
"""Check if the queue is full."""
return self._queue.full()