Source code for nwp500.cli.handlers

"""Command handlers for CLI operations."""

from __future__ import annotations

import asyncio
import json
import logging
from collections.abc import Callable, Coroutine
from typing import Any, TypeVar, cast

from nwp500 import (
    Device,
    DeviceFeature,
    DeviceStatus,
    EnergyUsageResponse,
    NavienAPIClient,
    NavienMqttClient,
)
from nwp500.exceptions import (
    DeviceError,
    MqttError,
    Nwp500Error,
    RangeValidationError,
    ValidationError,
)
from nwp500.models import ReservationSchedule
from nwp500.mqtt.utils import get_response_data, redact_serial
from nwp500.reservations import (
    add_reservation,
    delete_reservation,
    fetch_reservations,
    update_reservation,
)
from nwp500.topic_builder import MqttTopicBuilder
from nwp500.unit_system import get_unit_system

from .output_formatters import (
    print_device_info,
    print_device_status,
    print_energy_usage,
    print_json,
)
from .rich_output import get_formatter

_logger = logging.getLogger(__name__)
_formatter = get_formatter()

T = TypeVar("T")


async def _wait_for_response(
    subscribe_func: Callable[
        [Device, Callable[[Any], None]], Coroutine[Any, Any, Any]
    ],
    device: Device,
    action_func: Callable[[], Coroutine[Any, Any, Any]],
    timeout: float = 10.0,
    action_name: str = "operation",
) -> Any:
    """Generic helper to wait for a specific MQTT response."""
    future = asyncio.get_running_loop().create_future()

    def callback(res: Any) -> None:
        if not future.done():
            future.set_result(res)

    await subscribe_func(device, callback)
    _logger.info(f"Requesting {action_name}...")
    await action_func()

    try:
        return await asyncio.wait_for(future, timeout=timeout)
    except TimeoutError:
        _logger.error(f"Timed out waiting for {action_name} response.")
        raise


async def _handle_command_with_status_feedback(
    mqtt: NavienMqttClient,
    device: Device,
    action_func: Callable[[], Coroutine[Any, Any, Any]],
    action_name: str,
    success_msg: str,
    print_status: bool = False,
) -> DeviceStatus | None:
    """Helper for commands that wait for a DeviceStatus response."""
    try:
        status: Any = await _wait_for_response(
            mqtt.subscribe_device_status,
            device,
            action_func,
            action_name=action_name,
        )
        if print_status:
            print_json(status.model_dump())
        _logger.info(success_msg)
        _formatter.print_success(success_msg)
        return cast(DeviceStatus, status)
    except (ValidationError, RangeValidationError) as e:
        _logger.error(f"Invalid parameters: {e}")
        _formatter.print_error(str(e), title="Invalid Parameters")
    except (MqttError, DeviceError, Nwp500Error) as e:
        _logger.error(f"Error {action_name}: {e}")
        _formatter.print_error(
            str(e), title=f"Error During {action_name.title()}"
        )
    except Exception as e:
        _logger.error(f"Unexpected error {action_name}: {e}")
        _formatter.print_error(str(e), title="Unexpected Error")
    return None


[docs] async def get_controller_serial_number( mqtt: NavienMqttClient, device: Device, timeout: float = 10.0 ) -> str | None: """Retrieve controller serial number from device.""" try: feature: Any = await _wait_for_response( mqtt.subscribe_device_feature, device, lambda: mqtt.request_device_info(device), timeout=timeout, action_name="controller serial", ) serial = cast(DeviceFeature, feature).controller_serial_number _logger.info( f"Controller serial number retrieved: {redact_serial(serial)}" ) return serial except Exception: return None
[docs] async def handle_get_controller_serial_request( mqtt: NavienMqttClient, device: Device ) -> None: """Request and display just the controller serial number.""" serial = await get_controller_serial_number(mqtt, device) if serial: print(serial) else: _logger.error("Failed to retrieve controller serial number.")
async def _handle_info_request( mqtt: NavienMqttClient, device: Device, subscribe_method: Callable[ [Device, Callable[[Any], None]], Coroutine[Any, Any, Any] ], request_method: Callable[[Device], Coroutine[Any, Any, Any]], data_key: str, action_name: str, raw: bool = False, formatter: Callable[[Any], None] | None = None, ) -> None: """Generic helper for requesting and displaying device information.""" try: if not raw: res = await _wait_for_response( subscribe_method, device, lambda: request_method(device), action_name=action_name, ) if formatter: formatter(res) else: print_json(res.model_dump()) else: future = asyncio.get_running_loop().create_future() def raw_cb(topic: str, message: dict[str, Any]) -> None: if not future.done(): res = get_response_data(message, data_key) if res: print_json(res) future.set_result(None) await mqtt.subscribe_device(device, raw_cb) await request_method(device) await asyncio.wait_for(future, timeout=10) except Exception as e: _logger.error(f"Failed to get {action_name}: {e}")
[docs] async def handle_status_request( mqtt: NavienMqttClient, device: Device, raw: bool = False ) -> None: """Request device status and print it.""" await _handle_info_request( mqtt, device, mqtt.subscribe_device_status, mqtt.request_device_status, "status", "device status", raw, formatter=print_device_status if not raw else None, )
[docs] async def handle_device_info_request( mqtt: NavienMqttClient, device: Device, raw: bool = False ) -> None: """Request comprehensive device information.""" await _handle_info_request( mqtt, device, mqtt.subscribe_device_feature, mqtt.request_device_info, "feature", "device information", raw, formatter=print_device_info if not raw else None, )
[docs] async def handle_set_mode_request( mqtt: NavienMqttClient, device: Device, mode_name: str ) -> None: """Set device operation mode.""" mode_mapping = { "standby": 0, "heat-pump": 1, "electric": 2, "energy-saver": 3, "high-demand": 4, "vacation": 5, } mode_id = mode_mapping.get(mode_name.lower()) if mode_id is None: _logger.error( f"Invalid mode '{mode_name}'. Valid: {list(mode_mapping.keys())}" ) return await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.set_dhw_mode(device, mode_id), "setting mode", f"Mode changed to {mode_name}", )
[docs] async def handle_set_dhw_temp_request( mqtt: NavienMqttClient, device: Device, temperature: float ) -> None: """Set DHW target temperature.""" unit_suffix = "°C" if get_unit_system() == "metric" else "°F" await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.set_dhw_temperature(device, temperature), "setting temperature", f"Temperature set to {temperature}{unit_suffix}", )
[docs] async def handle_power_request( mqtt: NavienMqttClient, device: Device, power_on: bool ) -> None: """Set device power state.""" state = "on" if power_on else "off" await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.set_power(device, power_on), f"turning {state}", f"Device turned {state}", )
def _schedule_to_display_list( schedule: ReservationSchedule, ) -> list[dict[str, Any]]: """Convert a ReservationSchedule to a list of display-ready dicts.""" result: list[dict[str, Any]] = [] for i, entry in enumerate(schedule.reservation): d = entry.model_dump() d["number"] = i + 1 d["mode"] = d.pop("mode_name") result.append(d) return result
[docs] async def handle_get_reservations_request( mqtt: NavienMqttClient, device: Device, output_json: bool = False ) -> None: """Request current reservation schedule.""" schedule = await fetch_reservations(mqtt, device) if schedule is None: return if output_json: print_json(schedule.model_dump()) else: reservation_list = _schedule_to_display_list(schedule) _formatter.print_reservations_table(reservation_list, schedule.enabled)
[docs] async def handle_update_reservations_request( mqtt: NavienMqttClient, device: Device, reservations_json: str, enabled: bool, ) -> None: """Update reservation schedule.""" try: data: Any = json.loads(reservations_json) if not isinstance(data, list): raise ValueError("Must be a JSON array") reservations: list[Any] = data # type: ignore[reportUnknownVariableType] except (json.JSONDecodeError, ValueError) as e: _logger.error(f"Invalid reservations JSON: {e}") return future = asyncio.get_running_loop().create_future() def raw_callback(topic: str, message: dict[str, Any]) -> None: if not future.done() and "response" in message: print_json(message) future.set_result(None) device_type = str(device.device_info.device_type) response_topic = MqttTopicBuilder.response_topic( device_type, mqtt.client_id, "rsv/rd" ) await mqtt.subscribe(response_topic, raw_callback) await mqtt.update_reservations(device, reservations, enabled=enabled) try: await asyncio.wait_for(future, timeout=10) except TimeoutError: _logger.error("Timed out updating reservations.")
[docs] async def handle_add_reservation_request( mqtt: NavienMqttClient, device: Device, enabled: bool, days: str, hour: int, minute: int, mode: int, temperature: float, ) -> None: """Add a single reservation to the existing schedule.""" day_list = [d.strip() for d in days.split(",")] try: await add_reservation( mqtt, device, enabled=enabled, days=day_list, hour=hour, minute=minute, mode=mode, temperature=temperature, ) print("✓ Reservation added successfully") except (ValueError, TimeoutError) as e: _logger.error(str(e)) except (RangeValidationError, ValidationError) as e: _logger.error(f"Failed to add reservation: {e}")
[docs] async def handle_delete_reservation_request( mqtt: NavienMqttClient, device: Device, index: int, ) -> None: """Delete a single reservation by 1-based index.""" try: await delete_reservation(mqtt, device, index) print(f"✓ Reservation {index} deleted successfully") except (ValueError, TimeoutError) as e: _logger.error(str(e))
[docs] async def handle_update_reservation_request( mqtt: NavienMqttClient, device: Device, index: int, *, enabled: bool | None = None, days: str | None = None, hour: int | None = None, minute: int | None = None, mode: int | None = None, temperature: float | None = None, ) -> None: """Update a single reservation by 1-based index. Only the provided fields are modified; others are preserved. """ day_list: list[str] | None = ( [d.strip() for d in days.split(",")] if days is not None else None ) try: await update_reservation( mqtt, device, index, enabled=enabled, days=day_list, hour=hour, minute=minute, mode=mode, temperature=temperature, ) print(f"✓ Reservation {index} updated successfully") except (ValueError, TimeoutError) as e: _logger.error(str(e)) except (RangeValidationError, ValidationError) as e: _logger.error(f"Failed to update reservation: {e}")
[docs] async def handle_enable_anti_legionella_request( mqtt: NavienMqttClient, device: Device, period_days: int, ) -> None: """Enable Anti-Legionella disinfection cycle.""" try: await mqtt.enable_anti_legionella(device, period_days) print(f"✓ Anti-Legionella enabled (cycle every {period_days} day(s))") except (RangeValidationError, ValidationError) as e: _logger.error(f"Failed to enable Anti-Legionella: {e}") except DeviceError as e: _logger.error(f"Device error: {e}")
[docs] async def handle_set_anti_legionella_period_request( mqtt: NavienMqttClient, device: Device, period_days: int, ) -> None: """Set Anti-Legionella cycle period without changing enabled state.""" future: asyncio.Future[DeviceStatus] = ( asyncio.get_running_loop().create_future() ) def _on_status(status: DeviceStatus) -> None: if not future.done(): future.set_result(status) try: await mqtt.subscribe_device_status(device, _on_status) await mqtt.request_device_status(device) status = await asyncio.wait_for(future, timeout=10) # Get current enabled state use = getattr(status, "anti_legionella_use", None) if use: await mqtt.enable_anti_legionella(device, period_days) print(f"Anti-Legionella period set to {period_days} day(s)") else: print( "Anti-Legionella is currently disabled. " "Enable it first to set the period, or use " "'anti-legionella enable' with the desired period." ) except (RangeValidationError, ValidationError) as e: _logger.error(f"Failed to set Anti-Legionella period: {e}") except DeviceError as e: _logger.error(f"Device error: {e}") except TimeoutError: _logger.error("Timeout waiting for device status")
[docs] async def handle_disable_anti_legionella_request( mqtt: NavienMqttClient, device: Device, ) -> None: """Disable Anti-Legionella disinfection cycle.""" try: await mqtt.disable_anti_legionella(device) print("✓ Anti-Legionella disabled") except DeviceError as e: _logger.error(f"Device error: {e}")
[docs] async def handle_get_anti_legionella_status_request( mqtt: NavienMqttClient, device: Device, ) -> None: """Display Anti-Legionella status from device status.""" future: asyncio.Future[DeviceStatus] = ( asyncio.get_running_loop().create_future() ) def _on_status(status: DeviceStatus) -> None: if not future.done(): future.set_result(status) await mqtt.subscribe_device_status(device, _on_status) await mqtt.request_device_status(device) try: status = await asyncio.wait_for(future, timeout=10) period = getattr(status, "anti_legionella_period", None) use = getattr(status, "anti_legionella_use", None) busy = getattr(status, "anti_legionella_operation_busy", None) items = [ ( "ANTI-LEGIONELLA", "Status", "Enabled" if use else "Disabled", ), ( "ANTI-LEGIONELLA", "Cycle Period", f"{period} day(s)" if period else "N/A", ), ( "ANTI-LEGIONELLA", "Currently Running", "Yes" if busy else "No", ), ] _formatter.print_status_table(items) except TimeoutError: _logger.error("Timed out waiting for device status.")
[docs] async def handle_get_device_info_rest( api_client: NavienAPIClient, device: Device, raw: bool = False ) -> None: """Get device info from REST API (minimal DeviceInfo fields).""" try: device_info_obj = await api_client.get_device_info( mac_address=device.device_info.mac_address, additional_value=device.device_info.additional_value, ) if raw: print_json(device_info_obj.model_dump()) else: # Print formatted output with rich support info = device_info_obj.device_info install_type_str = info.install_type if info.install_type else "N/A" mac_display = ( redact_serial(info.mac_address) if info.mac_address else "N/A" ) # Collect items for rich formatter all_items = [ ("DEVICE INFO", "Device Name", info.device_name), ("DEVICE INFO", "MAC Address", mac_display), ("DEVICE INFO", "Device Type", str(info.device_type)), ("DEVICE INFO", "Home Seq", str(info.home_seq)), ("DEVICE INFO", "Connected", str(info.connected)), ("DEVICE INFO", "Install Type", install_type_str), ( "DEVICE INFO", "Additional Value", info.additional_value or "N/A", ), ] _formatter.print_status_table(all_items) except Exception as e: _logger.error(f"Error fetching device info: {e}")
[docs] async def handle_get_tou_request( mqtt: NavienMqttClient, device: Device, api_client: Any, *, output_json: bool = False, ) -> None: """Request Time-of-Use settings from REST API.""" from nwp500.encoding import ( decode_price, decode_season_bitfield, decode_week_bitfield, ) try: serial = await get_controller_serial_number(mqtt, device) if not serial: _logger.error("Failed to get controller serial.") return tou_info = await api_client.get_tou_info( mac_address=device.device_info.mac_address, additional_value=device.device_info.additional_value, controller_id=serial, user_type="O", ) if output_json: print_json( { "name": tou_info.name, "utility": tou_info.utility, "zipCode": tou_info.zip_code, "schedule": [ { "season": s.season, "intervals": s.intervals, } for s in tou_info.schedule ], } ) return _formatter.print_tou_schedule( name=tou_info.name, utility=tou_info.utility, zip_code=tou_info.zip_code, schedules=tou_info.schedule, decode_season=decode_season_bitfield, decode_week=decode_week_bitfield, decode_price_fn=decode_price, ) except Exception as e: _logger.error(f"Error fetching TOU: {e}")
[docs] async def handle_set_tou_enabled_request( mqtt: NavienMqttClient, device: Device, enabled: bool ) -> None: """Enable or disable Time-of-Use.""" await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.set_tou_enabled(device, enabled), f"{'enabling' if enabled else 'disabling'} TOU", f"TOU {'enabled' if enabled else 'disabled'}", )
[docs] async def handle_tou_rates_request( zip_code: str, utility: str | None = None, ) -> None: """List utilities and rate plans for a zip code.""" from nwp500.openei import OpenEIClient try: async with OpenEIClient() as client: plans = await client.list_rate_plans(zip_code, utility=utility) if not plans: _formatter.print_error( f"No rate plans found for zip code {zip_code}", title="No Results", ) return # Group by utility utilities: dict[str, list[dict[str, Any]]] = {} for plan in plans: util_name = plan["utility"] utilities.setdefault(util_name, []).append(plan) output: list[dict[str, Any]] = [] for util_name, util_plans in sorted(utilities.items()): unique_names = sorted({p["name"] for p in util_plans}) output.append( { "utility": util_name, "planCount": len(unique_names), "plans": unique_names, } ) print_json(output) except ValueError as e: _formatter.print_error(str(e), title="Configuration Error") except Exception as e: _logger.error(f"Error fetching rate plans: {e}") _formatter.print_error(str(e), title="Error")
[docs] async def handle_tou_plan_request( api_client: NavienAPIClient, zip_code: str, plan_name: str, utility: str | None = None, *, output_json: bool = False, ) -> None: """View a converted rate plan's details.""" from nwp500.encoding import ( decode_price, decode_season_bitfield, decode_week_bitfield, ) from nwp500.openei import OpenEIClient try: async with OpenEIClient() as client: rate_plan = await client.get_rate_plan( zip_code, plan_name, utility=utility ) if not rate_plan: _formatter.print_error( f"Rate plan matching '{plan_name}' not found", title="Not Found", ) return # Convert via Navien backend converted = await api_client.convert_tou([rate_plan]) if not converted: _formatter.print_error( "Backend returned no converted plans", title="Conversion Error", ) return plan = converted[0] if output_json: schedules = [] for sched in plan.schedule: months = decode_season_bitfield(sched.season) intervals = [] for iv in sched.intervals: days = decode_week_bitfield(iv.get("week", 0)) dp = iv.get("decimalPoint", 5) intervals.append( { "days": days, "time": ( f"{iv.get('startHour', 0):02d}:" f"{iv.get('startMinute', 0):02d}-" f"{iv.get('endHour', 0):02d}:" f"{iv.get('endMinute', 0):02d}" ), "priceMin": ( "$" f"{decode_price(iv.get('priceMin', 0), dp):.5f}" "/kWh" ), "priceMax": ( "$" f"{decode_price(iv.get('priceMax', 0), dp):.5f}" "/kWh" ), } ) schedules.append({"months": months, "intervals": intervals}) print_json( { "utility": plan.utility, "name": plan.name, "schedules": schedules, } ) return _formatter.print_tou_schedule( name=plan.name, utility=plan.utility, zip_code=int(zip_code) if zip_code.isdigit() else 0, schedules=plan.schedule, decode_season=decode_season_bitfield, decode_week=decode_week_bitfield, decode_price_fn=decode_price, ) except ValueError as e: _formatter.print_error(str(e), title="Configuration Error") except Exception as e: _logger.error(f"Error viewing rate plan: {e}") _formatter.print_error(str(e), title="Error")
[docs] async def handle_tou_apply_request( mqtt: NavienMqttClient, device: Device, api_client: NavienAPIClient, zip_code: str, plan_name: str, utility: str | None = None, enable: bool = False, ) -> None: """Apply a TOU rate plan to the water heater.""" from nwp500.openei import OpenEIClient try: # Step 1: Find the rate plan from OpenEI async with OpenEIClient() as client: rate_plan = await client.get_rate_plan( zip_code, plan_name, utility=utility ) if not rate_plan: _formatter.print_error( f"Rate plan matching '{plan_name}' not found", title="Not Found", ) return # Step 2: Convert via Navien backend converted = await api_client.convert_tou([rate_plan]) if not converted: _formatter.print_error( "Backend returned no converted plans", title="Conversion Error", ) return plan = converted[0] # Step 3: Get device register path from current TOU info serial = await get_controller_serial_number(mqtt, device) if not serial: _logger.error("Failed to get controller serial.") return current_tou = await api_client.get_tou_info( mac_address=device.device_info.mac_address, additional_value=device.device_info.additional_value, controller_id=serial, ) register_path = current_tou.register_path or "wifi" # Step 4: Apply via PUT /device/tou tou_info_dict = { "name": plan.name, "schedule": [ {"season": s.season, "interval": s.intervals} for s in plan.schedule ], "utility": plan.utility, "zipCode": zip_code, } result = await api_client.update_tou( mac_address=device.device_info.mac_address, additional_value=device.device_info.additional_value, tou_info=tou_info_dict, source_data=rate_plan, zip_code=zip_code, register_path=register_path, ) _formatter.print_success( f"Applied rate plan: {result.name} ({result.utility})" ) # Step 5: Optionally enable TOU if enable: await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.set_tou_enabled(device, True), "enabling TOU", "TOU enabled", ) except ValueError as e: _formatter.print_error(str(e), title="Configuration Error") except Exception as e: _logger.error(f"Error applying rate plan: {e}") _formatter.print_error(str(e), title="Error")
[docs] async def handle_get_energy_request( mqtt: NavienMqttClient, device: Device, year: int, months: list[int] ) -> None: """Request energy usage data. If a single month is provided, shows daily breakdown. If multiple months are provided, shows monthly summary. """ try: res: Any = await _wait_for_response( mqtt.subscribe_energy_usage, device, lambda: mqtt.request_energy_usage(device, year, months), action_name="energy usage", timeout=15, ) # If single month requested, show daily breakdown if len(months) == 1: from .output_formatters import print_daily_energy_usage print_daily_energy_usage( cast(EnergyUsageResponse, res), year, months[0] ) else: print_energy_usage(cast(EnergyUsageResponse, res)) except Exception as e: _logger.error(f"Error getting energy data: {e}")
[docs] async def handle_reset_air_filter_request( mqtt: NavienMqttClient, device: Device ) -> None: """Reset air filter timer.""" await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.reset_air_filter(device), "resetting air filter", "Air filter timer reset", )
[docs] async def handle_set_vacation_days_request( mqtt: NavienMqttClient, device: Device, days: int ) -> None: """Set vacation mode duration.""" await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.set_vacation_days(device, days), "setting vacation days", f"Vacation days set to {days}", )
[docs] async def handle_set_recirculation_mode_request( mqtt: NavienMqttClient, device: Device, mode: int ) -> None: """Set recirculation pump mode.""" mode_map = {1: "ALWAYS", 2: "BUTTON", 3: "SCHEDULE", 4: "TEMPERATURE"} mode_name = mode_map.get(mode, str(mode)) status = await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.set_recirculation_mode(device, mode), "setting recirculation mode", f"Recirculation mode set to {mode_name}", ) if status and status.recirc_operation_mode.value != mode: _logger.warning( f"Device reported mode {status.recirc_operation_mode.name} " f"instead of expected {mode_name}. External factor or " "device state may have prevented the change." )
[docs] async def handle_trigger_recirculation_hot_button_request( mqtt: NavienMqttClient, device: Device ) -> None: """Trigger hot button.""" await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.trigger_recirculation_hot_button(device), "triggering hot button", "Hot button triggered", )
[docs] async def handle_enable_demand_response_request( mqtt: NavienMqttClient, device: Device ) -> None: """Enable demand response.""" await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.enable_demand_response(device), "enabling DR", "Demand response enabled", )
[docs] async def handle_disable_demand_response_request( mqtt: NavienMqttClient, device: Device ) -> None: """Disable demand response.""" await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.disable_demand_response(device), "disabling DR", "Demand response disabled", )
[docs] async def handle_configure_reservation_water_program_request( mqtt: NavienMqttClient, device: Device ) -> None: """Configure water program.""" await _handle_command_with_status_feedback( mqtt, device, lambda: mqtt.configure_reservation_water_program(device), "configuring water program", "Water program configured", )