Source code for nwp500.cli.monitoring
"""Monitoring and periodic status polling."""
from __future__ import annotations
import asyncio
import logging
from nwp500 import Device, DeviceStatus, NavienMqttClient
from nwp500.unit_system import get_unit_system
from .output_formatters import write_status_to_csv
_logger = logging.getLogger(__name__)
[docs]
async def handle_monitoring(
mqtt: NavienMqttClient, device: Device, output_file: str
) -> None:
"""
Start periodic monitoring and write status to CSV.
Args:
mqtt: MQTT client instance
device: Device to monitor
output_file: Path to output CSV file
This function runs indefinitely, polling the device every 30 seconds
and writing status updates to a CSV file.
"""
_logger.info(
f"Starting periodic monitoring. Writing updates to {output_file}"
)
_logger.info("Press Ctrl+C to stop.")
def on_status_update(status: DeviceStatus) -> None:
unit_suffix = "°C" if get_unit_system() == "metric" else "°F"
_logger.info(
f"Received status update: Temp={status.dhw_temperature}"
f"{unit_suffix}, "
f"Power={'ON' if status.dhw_use else 'OFF'}"
)
write_status_to_csv(output_file, status)
await mqtt.subscribe_device_status(device, on_status_update)
await mqtt.start_periodic_requests(device, period_seconds=30)
await mqtt.request_device_status(device) # Get an initial status right away
# Keep the script running indefinitely
await asyncio.Event().wait()