Source code for nwp500.encoding

"""
Encoding and decoding utilities for Navien API data structures.

This module provides functions for encoding and decoding bitfields,
prices, and building payload structures for reservations and TOU schedules.
These utilities are used by both the API client and MQTT client.
"""

from __future__ import annotations

import logging
from collections.abc import Iterable
from numbers import Real

from .exceptions import ParameterValidationError, RangeValidationError

_logger = logging.getLogger(__name__)

# MGPP Week Bitfield Encoding (from NaviLink APK KDEnum.MgppReservationWeek).
# Uses a single byte where bits 1-7 represent days; bit 0 is unused.
#
#   Bit 7 (128) = Sunday
#   Bit 6 (64)  = Monday
#   Bit 5 (32)  = Tuesday
#   Bit 4 (16)  = Wednesday
#   Bit 3 (8)   = Thursday
#   Bit 2 (4)   = Friday
#   Bit 1 (2)   = Saturday
#   Bit 0 (1)   = Unused
WEEKDAY_ORDER = [
    "Sunday",
    "Monday",
    "Tuesday",
    "Wednesday",
    "Thursday",
    "Friday",
    "Saturday",
]

# Explicit bit values per the MGPP protocol (bit 0 unused).
_WEEKDAY_BIT_VALUES: dict[str, int] = {
    "Sunday": 128,
    "Monday": 64,
    "Tuesday": 32,
    "Wednesday": 16,
    "Thursday": 8,
    "Friday": 4,
    "Saturday": 2,
}

# Pre-computed lookup tables for performance.
WEEKDAY_NAME_TO_BIT: dict[str, int] = {
    name.lower(): bit for name, bit in _WEEKDAY_BIT_VALUES.items()
}
# Add standard 2-letter abbreviations (MO, TU, WE, TH, FR, SA, SU)
_WEEKDAY_ABBREVIATIONS = {
    "mo": "monday",
    "tu": "tuesday",
    "we": "wednesday",
    "th": "thursday",
    "fr": "friday",
    "sa": "saturday",
    "su": "sunday",
}
WEEKDAY_NAME_TO_BIT.update(
    {
        abbr: WEEKDAY_NAME_TO_BIT[full]
        for abbr, full in _WEEKDAY_ABBREVIATIONS.items()
    }
)
MONTH_TO_BIT = {month: 1 << (month - 1) for month in range(1, 13)}


# ============================================================================
# Week Bitfield Encoding/Decoding
# ============================================================================


[docs] def encode_week_bitfield(days: Iterable[str | int]) -> int: """ Convert a collection of day names or indices into a reservation bitfield. Args: days: Collection of weekday names (full or 2-letter abbreviations, case-insensitive) or 0-based indices (Monday=0, Sunday=6) Returns: Integer bitfield (MGPP encoding: Sun=bit7, Mon=bit6, ..., Sat=bit1) Raises: ParameterValidationError: If day name is unknown/invalid RangeValidationError: If day index is out of range (not 0-6) TypeError: If day value is neither string nor integer Examples: >>> encode_week_bitfield(["Monday", "Wednesday", "Friday"]) 84 # 64 + 16 + 4 >>> encode_week_bitfield(["MO", "WE", "FR"]) # 2-letter abbreviations 84 >>> encode_week_bitfield([0, 2, 4]) # 0-indexed: Mon, Wed, Fri 84 >>> encode_week_bitfield([5, 6]) # Saturday and Sunday 130 # 2 + 128 """ # Lookup for integer indices (Monday=0 .. Sunday=6) → bit value _INDEX_TO_BIT = [64, 32, 16, 8, 4, 2, 128] # Mon..Sun bitfield = 0 for value in days: if isinstance(value, str): key = value.strip().lower() if key not in WEEKDAY_NAME_TO_BIT: raise ParameterValidationError( f"Unknown weekday: {value}", parameter="weekday", value=value, ) bitfield |= WEEKDAY_NAME_TO_BIT[key] else: if 0 <= value <= 6: bitfield |= _INDEX_TO_BIT[value] else: raise RangeValidationError( "Day index must be between 0-6 (Monday=0, Sunday=6)", field="day_index", value=value, min_value=0, max_value=6, ) return bitfield
[docs] def decode_week_bitfield(bitfield: int) -> list[str]: """ Decode a reservation bitfield back into a list of weekday names. Args: bitfield: Integer bitfield where each bit represents a day Returns: List of weekday names in order (Monday through Sunday) Examples: >>> decode_week_bitfield(84) ['Monday', 'Wednesday', 'Friday'] >>> decode_week_bitfield(254) # All days ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] >>> decode_week_bitfield(130) ['Saturday', 'Sunday'] """ # Return days in Mon-Sun display order _DECODE_ORDER = [ ("Monday", 64), ("Tuesday", 32), ("Wednesday", 16), ("Thursday", 8), ("Friday", 4), ("Saturday", 2), ("Sunday", 128), ] days: list[str] = [] for name, bit in _DECODE_ORDER: if bitfield & bit: days.append(name) return days
# ============================================================================ # Season Bitfield Encoding/Decoding (TOU) # ============================================================================
[docs] def encode_season_bitfield(months: Iterable[int]) -> int: """ Encode a collection of month numbers (1-12) into a TOU season bitfield. Args: months: Collection of month numbers (1=January, 12=December) Returns: Integer bitfield where each bit represents a month (January=bit 0, etc.) Raises: ValueError: If month number is not in range 1-12 Examples: >>> encode_season_bitfield([6, 7, 8]) # Summer: June, July, August 448 # 0b111000000 >>> encode_season_bitfield([12, 1, 2]) # Winter: Dec, Jan, Feb 4099 # 0b1000000000011 """ bitfield = 0 for month in months: if month not in MONTH_TO_BIT: raise RangeValidationError( "Month values must be in the range 1-12", field="month", value=month, min_value=1, max_value=12, ) bitfield |= MONTH_TO_BIT[month] return bitfield
[docs] def decode_season_bitfield(bitfield: int) -> list[int]: """ Decode a TOU season bitfield into the corresponding month numbers. Args: bitfield: Integer bitfield where each bit represents a month Returns: Sorted list of month numbers (1-12) Examples: >>> decode_season_bitfield(448) [6, 7, 8] >>> decode_season_bitfield(4095) # All months [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] """ months: list[int] = [] for month, mask in MONTH_TO_BIT.items(): if bitfield & mask: months.append(month) return sorted(months)
# ============================================================================ # Price Encoding/Decoding # ============================================================================
[docs] def encode_price(value: Real, decimal_point: int) -> int: """ Encode a price into the integer representation expected by the device. The device stores prices as integers with a separate decimal point indicator. For example, $12.34 with decimal_point=2 is stored as 1234. Args: value: Price value (float or Decimal) decimal_point: Number of decimal places (0-10, typically 2-5) Returns: Integer representation of the price Raises: RangeValidationError: If decimal_point is not in range 0-10 Examples: >>> encode_price(12.34, 2) 1234 >>> encode_price(0.5, 3) 500 >>> encode_price(100, 0) 100 """ if not 0 <= decimal_point <= 10: raise RangeValidationError( "decimal_point must be between 0 and 10", field="decimal_point", value=decimal_point, min_value=0, max_value=10, ) scale = 10**decimal_point return int(round(float(value) * scale))
[docs] def decode_price(value: int, decimal_point: int) -> float: """ Decode an integer price value using the provided decimal point. Args: value: Integer price value from device decimal_point: Number of decimal places (0-10, typically 2-5) Returns: Floating-point price value Raises: RangeValidationError: If decimal_point is not in range 0-10 Examples: >>> decode_price(1234, 2) 12.34 >>> decode_price(500, 3) 0.5 >>> decode_price(100, 0) 100.0 """ if not 0 <= decimal_point <= 10: raise RangeValidationError( "decimal_point must be between 0 and 10", field="decimal_point", value=decimal_point, min_value=0, max_value=10, ) scale = 10**decimal_point return value / scale if scale else float(value)
# ============================================================================ # Payload Builders # ============================================================================
[docs] def decode_reservation_hex(hex_string: str) -> list[dict[str, int]]: """ Decode a hex-encoded reservation string into structured reservation entries. The reservation data is encoded as 6 bytes per entry: - Byte 0: enable (1=enabled, 2=disabled) - Byte 1: week bitfield (days of week) - Byte 2: hour (0-23) - Byte 3: minute (0-59) - Byte 4: mode (operation mode ID) - Byte 5: param (temperature offset by 20°F) Args: hex_string: Hexadecimal string representing reservation data Returns: List of reservation entry dictionaries Examples: >>> decode_reservation_hex("013e061e0478") [{'enable': 1, 'week': 62, 'hour': 6, 'minute': 30, 'mode': 4, 'param': 120}] """ data = bytes.fromhex(hex_string) reservations = [] if len(data) % 6 != 0: _logger.warning( "Reservation hex data length %d is not a multiple of 6; " "trailing %d bytes will be ignored", len(data), len(data) % 6, ) # Process 6 bytes at a time for i in range(0, len(data) - (len(data) % 6), 6): chunk = data[i : i + 6] # Skip empty entries (all zeros) if all(b == 0 for b in chunk): continue reservations.append( { "enable": chunk[0], "week": chunk[1], "hour": chunk[2], "min": chunk[3], "mode": chunk[4], "param": chunk[5], } ) return reservations
[docs] def build_reservation_entry( *, enabled: bool | int, days: Iterable[str | int], hour: int, minute: int, mode_id: int, temperature: float, temperature_min: float | None = None, temperature_max: float | None = None, ) -> dict[str, int]: """ Build a reservation payload entry matching the documented MQTT format. Args: enabled: Enable flag (True/False or 2=enabled/1=disabled per device boolean convention) days: Collection of weekday names or indices hour: Hour (0-23) minute: Minute (0-59) mode_id: DHW operation mode ID (1-6, see DhwOperationSetting) temperature: Target temperature in the user's preferred unit (Celsius or Fahrenheit based on global context). Automatically converted to half-degrees Celsius for the device. temperature_min: Minimum allowed temperature. If not provided, defaults are used: 95°F or ~35°C. temperature_max: Maximum allowed temperature. If not provided, defaults are used: 150°F or ~65°C. Returns: Dictionary with reservation entry fields Raises: RangeValidationError: If hour, minute, mode_id, or temperature is out of range ParameterValidationError: If enabled type is invalid Examples: >>> build_reservation_entry( ... enabled=True, ... days=["Monday", "Wednesday", "Friday"], ... hour=6, ... minute=30, ... mode_id=3, ... temperature=140.0 ... ) { 'enable': 2, 'week': 158, 'hour': 6, 'min': 30, 'mode': 3, 'param': 120, } """ # Import here to avoid circular import from .models import preferred_to_half_celsius from .unit_system import get_unit_system # Read unit system once to keep min/max bounds consistent unit_system = get_unit_system() # Use device-provided limits if available, otherwise use defaults # in the user's preferred unit system. if temperature_min is not None: min_temp = temperature_min elif unit_system == "metric": min_temp = 35.0 # ~35°C else: min_temp = 95.0 # 95°F if temperature_max is not None: max_temp = temperature_max elif unit_system == "metric": max_temp = 65.0 # ~65°C else: max_temp = 150.0 # 150°F if not 0 <= hour <= 23: raise RangeValidationError( "hour must be between 0 and 23", field="hour", value=hour, min_value=0, max_value=23, ) if not 0 <= minute <= 59: raise RangeValidationError( "minute must be between 0 and 59", field="minute", value=minute, min_value=0, max_value=59, ) if not 1 <= mode_id <= 6: raise RangeValidationError( "mode_id must be between 1 and 6 (see DhwOperationSetting)", field="mode_id", value=mode_id, min_value=1, max_value=6, ) if not min_temp <= temperature <= max_temp: raise RangeValidationError( f"temperature must be between {min_temp} and {max_temp}", field="temperature", value=temperature, min_value=min_temp, max_value=max_temp, ) if isinstance(enabled, bool): enable_flag = 2 if enabled else 1 elif enabled in (1, 2): enable_flag = int(enabled) else: raise ParameterValidationError( "enabled must be True/False or 1/2", parameter="enabled", value=enabled, ) week_bitfield = encode_week_bitfield(days) param = preferred_to_half_celsius(temperature) return { "enable": enable_flag, "week": week_bitfield, "hour": hour, "min": minute, "mode": mode_id, "param": param, }
[docs] def build_tou_period( *, season_months: Iterable[int], week_days: Iterable[str | int], start_hour: int, start_minute: int, end_hour: int, end_minute: int, price_min: int | Real, price_max: int | Real, decimal_point: int, ) -> dict[str, int]: """Build a TOU (Time of Use) period entry. Consistent with MQTT command requirements. Args: season_months: Collection of month numbers (1-12) for this period week_days: Collection of weekday names or indices start_hour: Starting hour (0-23) start_minute: Starting minute (0-59) end_hour: Ending hour (0-23) end_minute: Ending minute (0-59) price_min: Minimum price (float or pre-encoded int) price_max: Maximum price (float or pre-encoded int) decimal_point: Number of decimal places for prices Returns: Dictionary with TOU period fields Raises: ValueError: If any parameter is out of valid range Examples: >>> build_tou_period( ... season_months=[6, 7, 8], ... week_days=["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"], ... start_hour=9, ... start_minute=0, ... end_hour=17, ... end_minute=0, ... price_min=0.10, ... price_max=0.25, ... decimal_point=2 ... ) {'season': 448, 'week': 62, 'startHour': 9, 'startMinute': 0, ...} """ # Validate time parameters for label, value, upper in ( ("start_hour", start_hour, 23), ("end_hour", end_hour, 23), ): if not 0 <= value <= upper: raise RangeValidationError( f"{label} must be between 0 and {upper}", field=label, value=value, min_value=0, max_value=upper, ) for label, value in ( ("start_minute", start_minute), ("end_minute", end_minute), ): if not 0 <= value <= 59: raise RangeValidationError( f"{label} must be between 0 and 59", field=label, value=value, min_value=0, max_value=59, ) # Encode bitfields week_bitfield = encode_week_bitfield(week_days) season_bitfield = encode_season_bitfield(season_months) encoded_min: int encoded_max: int # Encode prices if they're Real numbers (not already encoded integers) if not isinstance(price_min, int): encoded_min = encode_price(price_min, decimal_point) else: encoded_min = price_min if not isinstance(price_max, int): encoded_max = encode_price(price_max, decimal_point) else: encoded_max = price_max return { "season": season_bitfield, "week": week_bitfield, "startHour": start_hour, "startMinute": start_minute, "endHour": end_hour, "endMinute": end_minute, "priceMin": encoded_min, "priceMax": encoded_max, "decimalPoint": decimal_point, }