"""
Encoding and decoding utilities for Navien API data structures.
This module provides functions for encoding and decoding bitfields,
prices, and building payload structures for reservations and TOU schedules.
These utilities are used by both the API client and MQTT client.
"""
from __future__ import annotations
import logging
from collections.abc import Iterable
from numbers import Real
from .exceptions import ParameterValidationError, RangeValidationError
_logger = logging.getLogger(__name__)
# MGPP Week Bitfield Encoding (from NaviLink APK KDEnum.MgppReservationWeek).
# Uses a single byte where bits 1-7 represent days; bit 0 is unused.
#
# Bit 7 (128) = Sunday
# Bit 6 (64) = Monday
# Bit 5 (32) = Tuesday
# Bit 4 (16) = Wednesday
# Bit 3 (8) = Thursday
# Bit 2 (4) = Friday
# Bit 1 (2) = Saturday
# Bit 0 (1) = Unused
WEEKDAY_ORDER = [
"Sunday",
"Monday",
"Tuesday",
"Wednesday",
"Thursday",
"Friday",
"Saturday",
]
# Explicit bit values per the MGPP protocol (bit 0 unused).
_WEEKDAY_BIT_VALUES: dict[str, int] = {
"Sunday": 128,
"Monday": 64,
"Tuesday": 32,
"Wednesday": 16,
"Thursday": 8,
"Friday": 4,
"Saturday": 2,
}
# Pre-computed lookup tables for performance.
WEEKDAY_NAME_TO_BIT: dict[str, int] = {
name.lower(): bit for name, bit in _WEEKDAY_BIT_VALUES.items()
}
# Add standard 2-letter abbreviations (MO, TU, WE, TH, FR, SA, SU)
_WEEKDAY_ABBREVIATIONS = {
"mo": "monday",
"tu": "tuesday",
"we": "wednesday",
"th": "thursday",
"fr": "friday",
"sa": "saturday",
"su": "sunday",
}
WEEKDAY_NAME_TO_BIT.update(
{
abbr: WEEKDAY_NAME_TO_BIT[full]
for abbr, full in _WEEKDAY_ABBREVIATIONS.items()
}
)
MONTH_TO_BIT = {month: 1 << (month - 1) for month in range(1, 13)}
# ============================================================================
# Week Bitfield Encoding/Decoding
# ============================================================================
[docs]
def encode_week_bitfield(days: Iterable[str | int]) -> int:
"""
Convert a collection of day names or indices into a reservation bitfield.
Args:
days: Collection of weekday names (full or 2-letter abbreviations,
case-insensitive) or 0-based indices (Monday=0, Sunday=6)
Returns:
Integer bitfield (MGPP encoding: Sun=bit7, Mon=bit6, ..., Sat=bit1)
Raises:
ParameterValidationError: If day name is unknown/invalid
RangeValidationError: If day index is out of range (not 0-6)
TypeError: If day value is neither string nor integer
Examples:
>>> encode_week_bitfield(["Monday", "Wednesday", "Friday"])
84 # 64 + 16 + 4
>>> encode_week_bitfield(["MO", "WE", "FR"]) # 2-letter abbreviations
84
>>> encode_week_bitfield([0, 2, 4]) # 0-indexed: Mon, Wed, Fri
84
>>> encode_week_bitfield([5, 6]) # Saturday and Sunday
130 # 2 + 128
"""
# Lookup for integer indices (Monday=0 .. Sunday=6) → bit value
_INDEX_TO_BIT = [64, 32, 16, 8, 4, 2, 128] # Mon..Sun
bitfield = 0
for value in days:
if isinstance(value, str):
key = value.strip().lower()
if key not in WEEKDAY_NAME_TO_BIT:
raise ParameterValidationError(
f"Unknown weekday: {value}",
parameter="weekday",
value=value,
)
bitfield |= WEEKDAY_NAME_TO_BIT[key]
else:
if 0 <= value <= 6:
bitfield |= _INDEX_TO_BIT[value]
else:
raise RangeValidationError(
"Day index must be between 0-6 (Monday=0, Sunday=6)",
field="day_index",
value=value,
min_value=0,
max_value=6,
)
return bitfield
[docs]
def decode_week_bitfield(bitfield: int) -> list[str]:
"""
Decode a reservation bitfield back into a list of weekday names.
Args:
bitfield: Integer bitfield where each bit represents a day
Returns:
List of weekday names in order (Monday through Sunday)
Examples:
>>> decode_week_bitfield(84)
['Monday', 'Wednesday', 'Friday']
>>> decode_week_bitfield(254) # All days
['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday',
'Saturday', 'Sunday']
>>> decode_week_bitfield(130)
['Saturday', 'Sunday']
"""
# Return days in Mon-Sun display order
_DECODE_ORDER = [
("Monday", 64),
("Tuesday", 32),
("Wednesday", 16),
("Thursday", 8),
("Friday", 4),
("Saturday", 2),
("Sunday", 128),
]
days: list[str] = []
for name, bit in _DECODE_ORDER:
if bitfield & bit:
days.append(name)
return days
# ============================================================================
# Season Bitfield Encoding/Decoding (TOU)
# ============================================================================
[docs]
def encode_season_bitfield(months: Iterable[int]) -> int:
"""
Encode a collection of month numbers (1-12) into a TOU season bitfield.
Args:
months: Collection of month numbers (1=January, 12=December)
Returns:
Integer bitfield where each bit represents a month (January=bit 0, etc.)
Raises:
ValueError: If month number is not in range 1-12
Examples:
>>> encode_season_bitfield([6, 7, 8]) # Summer: June, July, August
448 # 0b111000000
>>> encode_season_bitfield([12, 1, 2]) # Winter: Dec, Jan, Feb
4099 # 0b1000000000011
"""
bitfield = 0
for month in months:
if month not in MONTH_TO_BIT:
raise RangeValidationError(
"Month values must be in the range 1-12",
field="month",
value=month,
min_value=1,
max_value=12,
)
bitfield |= MONTH_TO_BIT[month]
return bitfield
[docs]
def decode_season_bitfield(bitfield: int) -> list[int]:
"""
Decode a TOU season bitfield into the corresponding month numbers.
Args:
bitfield: Integer bitfield where each bit represents a month
Returns:
Sorted list of month numbers (1-12)
Examples:
>>> decode_season_bitfield(448)
[6, 7, 8]
>>> decode_season_bitfield(4095) # All months
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
"""
months: list[int] = []
for month, mask in MONTH_TO_BIT.items():
if bitfield & mask:
months.append(month)
return sorted(months)
# ============================================================================
# Price Encoding/Decoding
# ============================================================================
[docs]
def encode_price(value: Real, decimal_point: int) -> int:
"""
Encode a price into the integer representation expected by the device.
The device stores prices as integers with a separate decimal point
indicator.
For example, $12.34 with decimal_point=2 is stored as 1234.
Args:
value: Price value (float or Decimal)
decimal_point: Number of decimal places (0-10, typically 2-5)
Returns:
Integer representation of the price
Raises:
RangeValidationError: If decimal_point is not in range 0-10
Examples:
>>> encode_price(12.34, 2)
1234
>>> encode_price(0.5, 3)
500
>>> encode_price(100, 0)
100
"""
if not 0 <= decimal_point <= 10:
raise RangeValidationError(
"decimal_point must be between 0 and 10",
field="decimal_point",
value=decimal_point,
min_value=0,
max_value=10,
)
scale = 10**decimal_point
return int(round(float(value) * scale))
[docs]
def decode_price(value: int, decimal_point: int) -> float:
"""
Decode an integer price value using the provided decimal point.
Args:
value: Integer price value from device
decimal_point: Number of decimal places (0-10, typically 2-5)
Returns:
Floating-point price value
Raises:
RangeValidationError: If decimal_point is not in range 0-10
Examples:
>>> decode_price(1234, 2)
12.34
>>> decode_price(500, 3)
0.5
>>> decode_price(100, 0)
100.0
"""
if not 0 <= decimal_point <= 10:
raise RangeValidationError(
"decimal_point must be between 0 and 10",
field="decimal_point",
value=decimal_point,
min_value=0,
max_value=10,
)
scale = 10**decimal_point
return value / scale if scale else float(value)
# ============================================================================
# Payload Builders
# ============================================================================
[docs]
def decode_reservation_hex(hex_string: str) -> list[dict[str, int]]:
"""
Decode a hex-encoded reservation string into structured reservation entries.
The reservation data is encoded as 6 bytes per entry:
- Byte 0: enable (1=enabled, 2=disabled)
- Byte 1: week bitfield (days of week)
- Byte 2: hour (0-23)
- Byte 3: minute (0-59)
- Byte 4: mode (operation mode ID)
- Byte 5: param (temperature offset by 20°F)
Args:
hex_string: Hexadecimal string representing reservation data
Returns:
List of reservation entry dictionaries
Examples:
>>> decode_reservation_hex("013e061e0478")
[{'enable': 1, 'week': 62, 'hour': 6, 'minute': 30, 'mode': 4, 'param':
120}]
"""
data = bytes.fromhex(hex_string)
reservations = []
if len(data) % 6 != 0:
_logger.warning(
"Reservation hex data length %d is not a multiple of 6; "
"trailing %d bytes will be ignored",
len(data),
len(data) % 6,
)
# Process 6 bytes at a time
for i in range(0, len(data) - (len(data) % 6), 6):
chunk = data[i : i + 6]
# Skip empty entries (all zeros)
if all(b == 0 for b in chunk):
continue
reservations.append(
{
"enable": chunk[0],
"week": chunk[1],
"hour": chunk[2],
"min": chunk[3],
"mode": chunk[4],
"param": chunk[5],
}
)
return reservations
[docs]
def build_reservation_entry(
*,
enabled: bool | int,
days: Iterable[str | int],
hour: int,
minute: int,
mode_id: int,
temperature: float,
temperature_min: float | None = None,
temperature_max: float | None = None,
) -> dict[str, int]:
"""
Build a reservation payload entry matching the documented MQTT format.
Args:
enabled: Enable flag (True/False or 2=enabled/1=disabled per device
boolean convention)
days: Collection of weekday names or indices
hour: Hour (0-23)
minute: Minute (0-59)
mode_id: DHW operation mode ID (1-6, see DhwOperationSetting)
temperature: Target temperature in the user's preferred unit
(Celsius or Fahrenheit based on global context).
Automatically converted to half-degrees Celsius for the device.
temperature_min: Minimum allowed temperature. If not provided,
defaults are used: 95°F or ~35°C.
temperature_max: Maximum allowed temperature. If not provided,
defaults are used: 150°F or ~65°C.
Returns:
Dictionary with reservation entry fields
Raises:
RangeValidationError: If hour, minute, mode_id, or temperature is out
of range
ParameterValidationError: If enabled type is invalid
Examples:
>>> build_reservation_entry(
... enabled=True,
... days=["Monday", "Wednesday", "Friday"],
... hour=6,
... minute=30,
... mode_id=3,
... temperature=140.0
... )
{
'enable': 2,
'week': 158,
'hour': 6,
'min': 30,
'mode': 3,
'param': 120,
}
"""
# Import here to avoid circular import
from .models import preferred_to_half_celsius
from .unit_system import get_unit_system
# Read unit system once to keep min/max bounds consistent
unit_system = get_unit_system()
# Use device-provided limits if available, otherwise use defaults
# in the user's preferred unit system.
if temperature_min is not None:
min_temp = temperature_min
elif unit_system == "metric":
min_temp = 35.0 # ~35°C
else:
min_temp = 95.0 # 95°F
if temperature_max is not None:
max_temp = temperature_max
elif unit_system == "metric":
max_temp = 65.0 # ~65°C
else:
max_temp = 150.0 # 150°F
if not 0 <= hour <= 23:
raise RangeValidationError(
"hour must be between 0 and 23",
field="hour",
value=hour,
min_value=0,
max_value=23,
)
if not 0 <= minute <= 59:
raise RangeValidationError(
"minute must be between 0 and 59",
field="minute",
value=minute,
min_value=0,
max_value=59,
)
if not 1 <= mode_id <= 6:
raise RangeValidationError(
"mode_id must be between 1 and 6 (see DhwOperationSetting)",
field="mode_id",
value=mode_id,
min_value=1,
max_value=6,
)
if not min_temp <= temperature <= max_temp:
raise RangeValidationError(
f"temperature must be between {min_temp} and {max_temp}",
field="temperature",
value=temperature,
min_value=min_temp,
max_value=max_temp,
)
if isinstance(enabled, bool):
enable_flag = 2 if enabled else 1
elif enabled in (1, 2):
enable_flag = int(enabled)
else:
raise ParameterValidationError(
"enabled must be True/False or 1/2",
parameter="enabled",
value=enabled,
)
week_bitfield = encode_week_bitfield(days)
param = preferred_to_half_celsius(temperature)
return {
"enable": enable_flag,
"week": week_bitfield,
"hour": hour,
"min": minute,
"mode": mode_id,
"param": param,
}
[docs]
def build_tou_period(
*,
season_months: Iterable[int],
week_days: Iterable[str | int],
start_hour: int,
start_minute: int,
end_hour: int,
end_minute: int,
price_min: int | Real,
price_max: int | Real,
decimal_point: int,
) -> dict[str, int]:
"""Build a TOU (Time of Use) period entry.
Consistent with MQTT command requirements.
Args:
season_months: Collection of month numbers (1-12) for this period
week_days: Collection of weekday names or indices
start_hour: Starting hour (0-23)
start_minute: Starting minute (0-59)
end_hour: Ending hour (0-23)
end_minute: Ending minute (0-59)
price_min: Minimum price (float or pre-encoded int)
price_max: Maximum price (float or pre-encoded int)
decimal_point: Number of decimal places for prices
Returns:
Dictionary with TOU period fields
Raises:
ValueError: If any parameter is out of valid range
Examples:
>>> build_tou_period(
... season_months=[6, 7, 8],
... week_days=["Monday", "Tuesday", "Wednesday", "Thursday",
"Friday"],
... start_hour=9,
... start_minute=0,
... end_hour=17,
... end_minute=0,
... price_min=0.10,
... price_max=0.25,
... decimal_point=2
... )
{'season': 448, 'week': 62, 'startHour': 9, 'startMinute': 0, ...}
"""
# Validate time parameters
for label, value, upper in (
("start_hour", start_hour, 23),
("end_hour", end_hour, 23),
):
if not 0 <= value <= upper:
raise RangeValidationError(
f"{label} must be between 0 and {upper}",
field=label,
value=value,
min_value=0,
max_value=upper,
)
for label, value in (
("start_minute", start_minute),
("end_minute", end_minute),
):
if not 0 <= value <= 59:
raise RangeValidationError(
f"{label} must be between 0 and 59",
field=label,
value=value,
min_value=0,
max_value=59,
)
# Encode bitfields
week_bitfield = encode_week_bitfield(week_days)
season_bitfield = encode_season_bitfield(season_months)
encoded_min: int
encoded_max: int
# Encode prices if they're Real numbers (not already encoded integers)
if not isinstance(price_min, int):
encoded_min = encode_price(price_min, decimal_point)
else:
encoded_min = price_min
if not isinstance(price_max, int):
encoded_max = encode_price(price_max, decimal_point)
else:
encoded_max = price_max
return {
"season": season_bitfield,
"week": week_bitfield,
"startHour": start_hour,
"startMinute": start_minute,
"endHour": end_hour,
"endMinute": end_minute,
"priceMin": encoded_min,
"priceMax": encoded_max,
"decimalPoint": decimal_point,
}