Source code for nwp500.cli.output_formatters

"""Output formatting utilities for CLI (CSV, JSON)."""

from __future__ import annotations

import csv
import json
import logging
from calendar import month_name
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any

from nwp500 import DeviceFeature, DeviceStatus

from .rich_output import get_formatter

_logger = logging.getLogger(__name__)


def _format_number(value: Any) -> str:
    """Format number to one decimal place if float, otherwise return as-is."""
    if isinstance(value, float):
        return f"{value:.1f}"
    return str(value)


def _get_unit_suffix(
    field_name: str,
    model_class: Any = DeviceStatus,
    instance: Any = None,
) -> str:
    """Extract unit suffix from model field metadata.

    For dynamic fields (temperature, flow_rate, water), use the instance's
    get_field_unit() method to get the correct unit based on device preferences.

    Args:
        field_name: Name of the field to get unit for
        model_class: The Pydantic model class (default: DeviceStatus)
        instance: Optional instance of the model for dynamic unit resolution

    Returns:
        Unit string (e.g., "°F", "°C", "GPM", "Wh") or empty string if not found
    """
    # Use instance's method if available for dynamic unit resolution
    if instance and hasattr(instance, "get_field_unit"):
        return instance.get_field_unit(field_name)

    # Fallback to static unit from schema
    if not hasattr(model_class, "model_fields"):
        return ""

    model_fields = model_class.model_fields
    if field_name not in model_fields:
        return ""

    field_info = model_fields[field_name]
    if not hasattr(field_info, "json_schema_extra"):
        return ""

    extra = field_info.json_schema_extra
    if isinstance(extra, dict) and "unit_of_measurement" in extra:
        unit_val = extra["unit_of_measurement"]
        unit: str = unit_val if unit_val is not None else ""
        return f" {unit}" if unit else ""

    return ""


def _add_numeric_item(
    items: list[tuple[str, str, str]],
    device_status: Any,
    field_name: str,
    category: str,
    label: str,
) -> None:
    """Add a numeric field with unit to items list, extracting unit from model.

    Args:
        items: List to append to
        device_status: DeviceStatus object
        field_name: Name of the field to display
        category: Category section in the output
        label: Display label for the field
    """
    if hasattr(device_status, field_name):
        value = getattr(device_status, field_name)
        unit = _get_unit_suffix(field_name, instance=device_status)
        formatted = f"{_format_number(value)}{unit}"
        items.append((category, label, formatted))


def _json_default_serializer(obj: Any) -> Any:
    """Serialize objects not serializable by default json code.

    Note: Enums are handled by model.model_dump() which converts them to names.
    This function handles any remaining non-JSON-serializable types that might
    appear in raw MQTT messages.

    Args:
        obj: Object to serialize

    Returns:
        JSON-serializable representation of the object

    Raises:
        TypeError: If object cannot be serialized
    """
    if isinstance(obj, datetime):
        return obj.isoformat()
    if isinstance(obj, Enum):
        return obj.name  # Fallback for any enums not in model output
    # Handle Pydantic models
    if hasattr(obj, "model_dump"):
        return obj.model_dump()
    raise TypeError(f"Type {type(obj)} not serializable")


[docs] def format_energy_usage(energy_response: Any) -> str: """ Format energy usage response as a human-readable table. Args: energy_response: EnergyUsageResponse object Returns: Formatted string with energy usage data in tabular form """ lines: list[str] = [] # Add header lines.append("=" * 90) lines.append("ENERGY USAGE REPORT") lines.append("=" * 90) # Total summary total = energy_response.total total_usage_wh = total.total_usage total_time_hours = total.total_time lines.append("") lines.append("TOTAL SUMMARY") lines.append("-" * 90) lines.append( f"Total Energy Used: {total_usage_wh:,} Wh ({total_usage_wh / 1000:.2f} kWh)" # noqa: E501 ) lines.append( f" Heat Pump: {total.heat_pump_usage:,} Wh ({total.heat_pump_percentage:.1f}%)" # noqa: E501 ) lines.append( f" Heat Element: {total.heat_element_usage:,} Wh ({total.heat_element_percentage:.1f}%)" # noqa: E501 ) lines.append(f"Total Time Running: {total_time_hours} hours") lines.append(f" Heat Pump: {total.heat_pump_time} hours") lines.append(f" Heat Element: {total.heat_element_time} hours") # Monthly data if energy_response.usage: lines.append("") lines.append("MONTHLY BREAKDOWN") lines.append("-" * 90) lines.append( f"{'Month':<20} {'Energy (Wh)':<18} {'HP (Wh)':<15} {'HE (Wh)':<15} {'HP Time (h)':<15}" # noqa: E501 ) lines.append("-" * 90) for month_data in energy_response.usage: month_name_str = ( f"{month_name[month_data.month]} {month_data.year}" if 1 <= month_data.month <= 12 else f"Month {month_data.month} {month_data.year}" ) total_wh = sum( d.heat_pump_usage + d.heat_element_usage for d in month_data.data ) hp_wh = sum(d.heat_pump_usage for d in month_data.data) he_wh = sum(d.heat_element_usage for d in month_data.data) hp_time = sum(d.heat_pump_time for d in month_data.data) lines.append( f"{month_name_str:<20} {total_wh:>16,} {hp_wh:>13,} {he_wh:>13,} {hp_time:>13}" # noqa: E501 ) lines.append("=" * 90) return "\n".join(lines)
[docs] def format_daily_energy_usage( energy_response: Any, year: int, month: int ) -> str: """ Format daily energy usage for a specific month as a human-readable table. Args: energy_response: EnergyUsageResponse object year: Year to filter for (e.g., 2025) month: Month to filter for (1-12) Returns: Formatted string with daily energy usage data in tabular form """ lines: list[str] = [] # Add header lines.append("=" * 100) month_str = ( f"{month_name[month]} {year}" if 1 <= month <= 12 else f"Month {month} {year}" ) lines.append(f"DAILY ENERGY USAGE - {month_str}") lines.append("=" * 100) # Find the month data month_data = energy_response.get_month_data(year, month) if not month_data or not month_data.data: lines.append("No data available for this month") lines.append("=" * 100) return "\n".join(lines) # Total summary for the month total = energy_response.total total_usage_wh = total.total_usage total_time_hours = total.total_time lines.append("") lines.append("TOTAL SUMMARY") lines.append("-" * 100) lines.append( f"Total Energy Used: {total_usage_wh:,} Wh ({total_usage_wh / 1000:.2f} kWh)" # noqa: E501 ) lines.append( f" Heat Pump: {total.heat_pump_usage:,} Wh ({total.heat_pump_percentage:.1f}%)" # noqa: E501 ) lines.append( f" Heat Element: {total.heat_element_usage:,} Wh ({total.heat_element_percentage:.1f}%)" # noqa: E501 ) lines.append(f"Total Time Running: {total_time_hours} hours") lines.append(f" Heat Pump: {total.heat_pump_time} hours") lines.append(f" Heat Element: {total.heat_element_time} hours") # Daily breakdown lines.append("") lines.append("DAILY BREAKDOWN") lines.append("-" * 100) lines.append( f"{'Day':<5} {'Energy (Wh)':<18} {'HP (Wh)':<15} {'HE (Wh)':<15} {'HP Time':<12} {'HE Time':<12}" # noqa: E501 ) lines.append("-" * 100) for day_num, day_data in enumerate(month_data.data, start=1): total_wh = day_data.total_usage hp_wh = day_data.heat_pump_usage he_wh = day_data.heat_element_usage hp_time = day_data.heat_pump_time he_time = day_data.heat_element_time lines.append( f"{day_num:<5} {total_wh:>16,} {hp_wh:>13,} {he_wh:>13,} {hp_time:>10} {he_time:>10}" # noqa: E501 ) lines.append("=" * 100) return "\n".join(lines)
[docs] def write_status_to_csv(file_path: str, status: DeviceStatus) -> None: """ Append device status to a CSV file. Args: file_path: Path to the CSV file status: DeviceStatus object to write """ try: # Convert status to dict (enums are already converted to names) status_dict = status.model_dump() # Add a timestamp to the beginning of the data status_dict["timestamp"] = datetime.now().isoformat() # Check if file exists to determine if we need to write the header file_exists = Path(file_path).exists() with open(file_path, "a", newline="") as csvfile: # Get the field names from the dict keys fieldnames = list(status_dict.keys()) writer = csv.DictWriter(csvfile, fieldnames=fieldnames) # Write header only if this is a new file if not file_exists: writer.writeheader() writer.writerow(status_dict) _logger.debug(f"Status written to {file_path}") except OSError as e: _logger.error(f"Failed to write to CSV: {e}")
[docs] def format_json_output(data: Any, indent: int = 2) -> str: """ Format data as JSON string with custom serialization. Args: data: Data to format indent: Number of spaces for indentation (default: 2) Returns: JSON-formatted string """ return json.dumps(data, indent=indent, default=_json_default_serializer)