nwp500 package

Subpackages

Submodules

nwp500.api_client module

Client for interacting with the Navien NWP500 API.

This module provides an async HTTP client for device management and control.

class nwp500.api_client.NavienAPIClient(auth_client: NavienAuthClient, base_url: str = 'https://nlus.naviensmartcontrol.com/api/v2.1', session: ClientSession | None = None, unit_system: Literal['metric', 'us_customary'] | None = None)[source]

Bases: object

High-level client for Navien Smart Control REST API.

This client implements all endpoints from the OpenAPI specification and automatically handles authentication, token refresh, and error handling.

The client requires an authenticated NavienAuthClient to be provided.

Example

>>> async with NavienAuthClient() as auth_client:
...     await auth_client.sign_in("user@example.com", "password")
...     api_client = NavienAPIClient(auth_client=auth_client)
...     devices = await api_client.list_devices()
async convert_tou(source_data: list[dict[str, Any]], source_type: str = 'openei', source_version: int = 7) list[ConvertedTOUPlan][source]

Convert OpenEI rate plans to device TOU format.

Sends raw OpenEI rate plan data to the Navien backend for conversion into device-ready TOU schedules with season/week bitfields and scaled pricing.

Parameters:
  • source_data – List of OpenEI rate plan dictionaries

  • source_type – Data source type (default: “openei”)

  • source_version – API version (default: 7)

Returns:

List of ConvertedTOUPlan objects with device-ready schedules

Raises:
async get_device_info(mac_address: str, additional_value: str = '') Device[source]

Get detailed information about a specific device.

Parameters:
  • mac_address – Device MAC address

  • additional_value – Additional device identifier (optional)

Returns:

Device object with detailed information

Raises:
async get_firmware_info(mac_address: str, additional_value: str = '') list[FirmwareInfo][source]

Get firmware information for a specific device.

Parameters:
  • mac_address – Device MAC address

  • additional_value – Additional device identifier (optional)

Returns:

List of FirmwareInfo objects

Raises:
async get_first_device() Device | None[source]

Get the first device associated with the user.

Returns:

First Device object or None if no devices

async get_tou_info(mac_address: str, additional_value: str, controller_id: str, user_type: str = 'O') TOUInfo[source]

Get Time of Use (TOU) information for a device.

Parameters:
  • mac_address – Device MAC address

  • additional_value – Additional device identifier

  • controller_id – Controller ID

  • user_type – User type (default: “O”)

Returns:

TOUInfo object

Raises:
property is_authenticated: bool

Check if client is authenticated.

async list_devices(offset: int = 0, count: int = 20) list[Device][source]

List all devices associated with the user.

Parameters:
  • offset – Pagination offset (default: 0)

  • count – Number of devices to return (default: 20)

Returns:

List of Device objects

Raises:
async update_push_token(push_token: str, model_name: str = 'Python Client', app_version: str = '1.0.0', os: str = 'Python', os_version: str = '3.8+') bool[source]

Update push notification token.

Parameters:
  • push_token – Push notification token

  • model_name – Device model name (default: “Python Client”)

  • app_version – Application version (default: “1.0.0”)

  • os – Operating system (default: “Python”)

  • os_version – OS version (default: “3.8+”)

Returns:

True if successful

Raises:
async update_tou(mac_address: str, additional_value: str, tou_info: dict[str, Any], source_data: dict[str, Any], zip_code: str, register_path: str = 'wifi', source_type: str = 'openei', user_type: str = 'O') TOUInfo[source]

Apply a TOU rate plan to a device.

Parameters:
  • mac_address – Device MAC address

  • additional_value – Additional device identifier

  • tou_info – Converted TOU info dict (name, schedule, utility, zipCode)

  • source_data – Original OpenEI rate plan dictionary

  • zip_code – Service area zip code

  • register_path – Device connection type (default: “wifi”)

  • source_type – Data source type (default: “openei”)

  • user_type – User type (default: “O”)

Returns:

TOUInfo object with the applied configuration

Raises:
property user_email: str | None

Get current user email.

nwp500.auth module

Authentication module for Navien Smart Control API.

This module provides authentication functionality for the Navien Smart Control REST API, including sign-in, token management, and token refresh capabilities.

The API uses JWT (JSON Web Tokens) for authentication with the following flow: 1. Sign in with email and password 2. Receive idToken, accessToken, and refreshToken 3. Use accessToken as Bearer token in subsequent requests 4. Refresh tokens when accessToken expires

class nwp500.auth.AuthTokens(*, idToken: str = '', accessToken: str = '', refreshToken: str = '', authenticationExpiresIn: int = 3600, accessKeyId: str | None = None, secretKey: str | None = None, sessionToken: str | None = None, authorizationExpiresIn: int | None = None, issuedAt: datetime = <factory>)[source]

Bases: NavienBaseModel

Authentication tokens and AWS credentials returned from the API.

access_key_id: str | None
access_token: str
property are_aws_credentials_expired: bool

Check if AWS credentials have expired.

AWS credentials have a separate expiration time from JWT tokens. If AWS credentials are expired, a full re-authentication is needed since the token refresh endpoint doesn’t provide new AWS credentials.

Returns:

True if AWS credentials are expired, False if expiration time is unknown or credentials are still valid

authentication_expires_in: int
authorization_expires_in: int | None
property bearer_token: str

Get the formatted Bearer token for Authorization header.

property expires_at: datetime

Get the cached expiration timestamp.

classmethod handle_empty_aliases(data: Any) Any[source]

Handle empty camelCase aliases with snake_case fallbacks.

id_token: str
property is_expired: bool

Check if the access token has expired (cached calculation).

issued_at: datetime
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(_AuthTokens__context: Any) None[source]

Cache the expiration timestamp after initialization.

refresh_token: str
secret_key: str | None
session_token: str | None
property time_until_expiry: timedelta

Get time remaining until token expiration.

Uses cached expiration time for efficiency.

to_dict() dict[str, Any][source]

Convert tokens to a dictionary for serialization.

This includes the calculated issued_at timestamp, which is needed to maintain the correct expiration time when restoring tokens.

class nwp500.auth.AuthenticationResponse(*, userInfo: UserInfo, tokens: AuthTokens, legal: list[Any] = <factory>, code: int = 200, msg: str = 'SUCCESS')[source]

Bases: NavienBaseModel

Complete authentication response including user info and tokens.

code: int
legal: list[Any]
message: str
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

tokens: AuthTokens
user_info: UserInfo
classmethod wrap_api_response(data: Any) Any[source]

Handle nested ‘data’ wrapper in API responses.

class nwp500.auth.NavienAuthClient(user_id: str, password: str, base_url: str = 'https://nlus.naviensmartcontrol.com/api/v2.1', session: ClientSession | None = None, timeout: int = 30, stored_tokens: AuthTokens | None = None, unit_system: Literal['metric', 'us_customary'] | None = None)[source]

Bases: object

Asynchronous client for Navien Smart Control API authentication.

This client handles: - User authentication with email/password - Token management and automatic refresh - Session management via aiohttp ClientSession - AWS credentials (if provided by API)

Session and Context Manager

The auth client manages an aiohttp session that is shared with other clients (API, MQTT). The session is created when entering the context manager and closed when exiting.

Authentication is performed automatically when entering the async context manager, unless valid stored tokens are provided.

Important: All API and MQTT clients must be created and used within the context manager. Once the context manager exits, the session is closed and clients can no longer be used.

Example

>>> async with NavienAuthClient(user_id="user@example.com",
password="password") as client:
...     print(f"Welcome {client.current_user.full_name}")
...     # Token is securely stored and not printed in production
...
...     # Create other clients within the context
...     api_client = NavienAPIClient(auth_client=client)
...     mqtt_client = NavienMqttClient(auth_client=client)
...
...     # Use the clients
...     devices = await api_client.list_devices()
...     await mqtt_client.connect()

Restore session from stored tokens: >>> stored_tokens = AuthTokens.model_validate(saved_data) >>> async with NavienAuthClient( … user_id=”user@example.com”, … password=”password”, … stored_tokens=stored_tokens … ) as client: … # Authentication skipped if tokens are still valid … print(f”Welcome {client.current_user.full_name}”)

property auth_response: AuthenticationResponse | None

Get the complete authentication response.

async close() None[source]

Close the aiohttp session if we own it.

property current_tokens: AuthTokens | None

Get current authentication tokens.

property current_user: UserInfo | None

Get current authenticated user info.

async ensure_valid_token() AuthTokens | None[source]

Ensure we have a valid access token, refreshing if necessary.

This method checks both JWT token and AWS credentials expiration. If AWS credentials are expired, it triggers a full re-authentication since the token refresh endpoint doesn’t provide new AWS credentials.

Returns:

Valid AuthTokens or None if not authenticated

Raises:
get_auth_headers() dict[str, str][source]

Get headers for authenticated requests.

Returns:

Dictionary of headers to include in requests

Note

Based on HAR analysis of actual API traffic, the authorization header uses the raw token without ‘Bearer ‘ prefix (lowercase ‘authorization’). This is different from standard Bearer token authentication.

property has_stored_credentials: bool

Check if user credentials are stored for re-authentication.

Returns:

True if both user_id and password are available for re-auth

property has_valid_tokens: bool

Check if both JWT and AWS credentials are valid and not expired.

Returns True only if tokens exist AND neither JWT tokens nor AWS credentials have expired. This is useful for pre-flight checks before operations that require valid credentials (e.g., MQTT connection).

Returns:

True if tokens exist AND not expired (JWT + AWS creds), False otherwise

property is_authenticated: bool

Check if client is currently authenticated.

async re_authenticate() AuthenticationResponse[source]

Re-authenticate using stored credentials.

This is a convenience method that uses the stored user_id and password from initialization to perform a fresh sign-in. Useful for recovering from expired tokens or connection issues.

Returns:

AuthenticationResponse with fresh tokens and user info

Raises:

Example

>>> client = NavienAuthClient(email, password)
>>> await client.re_authenticate()  # Uses stored credentials
async refresh_token(refresh_token: str | None = None) AuthTokens[source]

Refresh access token using refresh token.

Parameters:

refresh_token – The refresh token obtained from sign-in. If not provided, uses the stored refresh token.

Returns:

New AuthTokens with refreshed access token

Raises:

TokenRefreshError – If token refresh fails or no token available

property session: ClientSession | None

Get the active aiohttp session.

async sign_in(user_id: str, password: str) AuthenticationResponse[source]

Authenticate user and obtain tokens.

Parameters:
  • user_id – User email address

  • password – User password

Returns:

AuthenticationResponse containing user info and tokens

Raises:
property user_email: str | None

Get the email address of the authenticated user.

class nwp500.auth.UserInfo(*, userType: str = '', userFirstName: str = '', userLastName: str = '', userStatus: str = '', userSeq: int = 0)[source]

Bases: NavienBaseModel

User information returned from authentication.

property full_name: str

Return the user’s full name.

model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

user_first_name: str
user_last_name: str
user_seq: int
user_status: str
user_type: str
async nwp500.auth.authenticate(user_id: str, password: str) AuthenticationResponse[source]

Authenticate user and obtain tokens.

This is a convenience function that creates a temporary auth client, authenticates, and returns the response.

Parameters:
  • user_id – User email address

  • password – User password

Returns:

AuthenticationResponse with user info and tokens

Example

>>> response = await authenticate("user@example.com", "password")
>>> print(f"Welcome {response.user.full_name}")
>>> # Use the bearer token for API requests
>>> # Do not print tokens in production code
async nwp500.auth.refresh_access_token(refresh_token: str) AuthTokens[source]

Refresh an access token using a refresh token.

This is a convenience function that creates a temporary session to perform the token refresh operation without requiring full authentication.

Parameters:

refresh_token – The refresh token

Returns:

New AuthTokens

Example

>>> new_tokens = await refresh_access_token(old_tokens.refresh_token)

Note

This function creates a temporary client without authentication to perform the token refresh operation.

nwp500.command_decorators module

Decorators for device command validation and capability checking.

This module provides decorators that automatically validate device capabilities before command execution, preventing unsupported commands from being sent.

nwp500.command_decorators.requires_capability(feature: str) Callable[[F], F][source]

Decorator that validates device capability before executing command.

This decorator automatically checks if a device supports a specific controllable feature before allowing the command to execute. If the device doesn’t support the feature, a DeviceCapabilityError is raised.

The decorator automatically caches device info on first call using _get_device_features(), which internally calls ensure_device_info_cached(). This means capability validation is transparent to the caller - no manual caching is required.

The decorator expects the command method to: 1. Have ‘self’ (controller instance with _device_info_cache) 2. Have ‘device’ parameter (Device object with mac_address)

Parameters:

feature – Name of the required capability (e.g., “recirculation_mode”)

Returns:

Decorator function

Raises:

Example

>>> class MyController:
...     def __init__(self, cache):
...         self._device_info_cache = cache
...
...     @requires_capability("recirculation_mode")
...     async def set_recirculation_mode(self, device, mode):
...         # Device info automatically cached on first call
...         # Capability automatically validated before execution
...         return await self._publish(...)

nwp500.config module

Configuration for the Navien API client.

nwp500.converters module

Protocol-specific converters for Navien device communication.

This module handles conversion of device-specific data formats to Python types. The Navien device uses non-standard representations for boolean and numeric values.

See docs/protocol/quick_reference.rst for comprehensive protocol details.

nwp500.converters.device_bool_from_python(value: bool) int[source]

Convert Python bool to device boolean representation.

Parameters:

value – Python boolean.

Returns:

Device value (True→2, False→1).

Example

>>> device_bool_from_python(True)
2
>>> device_bool_from_python(False)
1
nwp500.converters.device_bool_to_python(value: Any) bool[source]

Convert device boolean representation to Python bool.

Device protocol uses: 1 = OFF/False, 2 = ON/True

This design (using 1 and 2 instead of 0 and 1) is likely due to: - 0 being reserved for null/uninitialized state - 1 representing “off” in legacy firmware - 2 representing “on” state

Parameters:

value – Device value (typically 1 or 2).

Returns:

Python boolean (1→False, 2→True).

Example

>>> device_bool_to_python(2)
True
>>> device_bool_to_python(1)
False
nwp500.converters.div_10(value: Any) float[source]

Divide numeric value by 10.0.

Used for fields that need 0.1 precision conversion.

Parameters:

value – Numeric value to divide.

Returns:

Value divided by 10.0.

Example

>>> div_10(150)
15.0
>>> div_10(25.5)
2.55
nwp500.converters.enum_validator(enum_class: type[Any]) Callable[[Any], Any][source]

Create a validator for converting int/value to Enum.

Parameters:

enum_class – The Enum class to validate against.

Returns:

A validator function compatible with Pydantic BeforeValidator.

Example

>>> from enum import Enum
>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
>>> validator = enum_validator(Color)
>>> validator(1)
<Color.RED: 1>
nwp500.converters.mul_10(value: Any) float[source]

Multiply numeric value by 10.0.

Used for energy capacity fields where the device reports in 10Wh units, but we want to store standard Wh.

Parameters:

value – Numeric value to multiply.

Returns:

Value multiplied by 10.0.

Example

>>> mul_10(150)
1500.0
>>> mul_10(25.5)
255.0
nwp500.converters.str_enum_validator(enum_class: type[Any]) Callable[[Any], Any][source]

Create a validator for converting string to str-based Enum.

Parameters:

enum_class – The str Enum class to validate against.

Returns:

A validator function compatible with Pydantic BeforeValidator.

Example

>>> from enum import Enum
>>> class Status(str, Enum):
...     ACTIVE = "A"
...     INACTIVE = "I"
>>> validator = str_enum_validator(Status)
>>> validator("A")
<Status.ACTIVE: 'A'>
nwp500.converters.tou_override_to_python(value: Any) bool[source]

Convert TOU override status to Python bool.

Device representation: 1 = Override Active, 2 = Override Inactive

Parameters:

value – Device TOU override status value.

Returns:

Python boolean.

Example

>>> tou_override_to_python(1)
True
>>> tou_override_to_python(2)
False

nwp500.device_capabilities module

Device capability checking for MQTT commands.

This module provides a generalized framework for checking device capabilities before executing MQTT commands. It uses a mapping-based approach to validate that a device supports specific controllable features without requiring individual checker functions.

class nwp500.device_capabilities.MqttDeviceCapabilityChecker[source]

Bases: object

Generalized MQTT device capability checker using a capability map.

This class uses a mapping of controllable feature names to their check functions, allowing capabilities to be validated in a centralized, extensible way without requiring individual methods for each control.

classmethod assert_supported(feature: str, device_features: DeviceFeature) None[source]

Assert that device supports control of a feature.

Parameters:
  • feature – Name of the controllable feature to check

  • device_features – Device feature information

Raises:
classmethod get_available_controls(device_features: DeviceFeature) dict[str, bool][source]

Get all controllable features available on a device.

Parameters:

device_features – Device feature information

Returns:

Dictionary mapping feature names to whether they can be controlled

classmethod register_capability(name: str, check_fn: CapabilityCheckFn) None[source]

Register a custom controllable feature check.

This allows extensions or applications to define custom capability checks without modifying the core library.

Parameters:
  • name – Feature name

  • check_fn – Function that takes DeviceFeature and returns bool

classmethod supports(feature: str, device_features: DeviceFeature) bool[source]

Check if device supports control of a specific feature.

Parameters:
  • feature – Name of the controllable feature to check

  • device_features – Device feature information

Returns:

True if feature control is supported, False otherwise

Raises:

ValueError – If feature is not recognized

nwp500.device_info_cache module

Device information caching with periodic updates.

This module manages caching of device information (features, capabilities) with automatic periodic updates to keep data synchronized with the device.

class nwp500.device_info_cache.CacheInfoResult[source]

Bases: TypedDict

Result of get_cache_info() call.

device_count: ReadOnly[int]
devices: ReadOnly[list[CachedDeviceInfo]]
update_interval_minutes: ReadOnly[float]
class nwp500.device_info_cache.CachedDeviceInfo[source]

Bases: TypedDict

Cached device information metadata.

cached_at: ReadOnly[str]
expires_at: ReadOnly[str | None]
is_expired: ReadOnly[bool]
mac: ReadOnly[str]
class nwp500.device_info_cache.MqttDeviceInfoCache(update_interval_minutes: int = 30)[source]

Bases: object

Manages caching of device information with periodic updates.

This cache stores device features (capabilities, firmware info, etc.) and automatically refreshes them at regular intervals to keep data synchronized with the actual device state.

The cache is keyed by device MAC address, allowing support for multiple devices connected to the same MQTT client.

async clear() None[source]

Clear all cached device information.

async get(device_mac: str) DeviceFeature | None[source]

Get cached device features if available and not expired.

Parameters:

device_mac – Device MAC address

Returns:

Cached DeviceFeature if available, None otherwise

async get_all_cached() dict[str, DeviceFeature][source]

Get all currently cached device features.

Returns:

Dictionary mapping MAC addresses to DeviceFeature objects

async get_cache_info() CacheInfoResult[source]

Get cache statistics and metadata.

Returns:

  • device_count: Number of cached devices

  • update_interval_minutes: Cache update interval in minutes

  • devices: List of device cache metadata

Return type:

Dictionary with cache info including

async invalidate(device_mac: str) None[source]

Invalidate cache entry for a device.

Forces a refresh on next request.

Parameters:

device_mac – Device MAC address

is_expired(timestamp: datetime) bool[source]

Check if a cache entry is expired.

Parameters:

timestamp – When the cache entry was created

Returns:

True if expired, False if still fresh

async set(device_mac: str, features: DeviceFeature) None[source]

Cache device features with current timestamp.

Parameters:
  • device_mac – Device MAC address

  • features – Device feature information to cache

nwp500.encoding module

Encoding and decoding utilities for Navien API data structures.

This module provides functions for encoding and decoding bitfields, prices, and building payload structures for reservations and TOU schedules. These utilities are used by both the API client and MQTT client.

nwp500.encoding.build_reservation_entry(*, enabled: bool | int, days: Iterable[str | int], hour: int, minute: int, mode_id: int, temperature: float, temperature_min: float | None = None, temperature_max: float | None = None) dict[str, int][source]

Build a reservation payload entry matching the documented MQTT format.

Parameters:
  • enabled – Enable flag (True/False or 2=enabled/1=disabled per device boolean convention)

  • days – Collection of weekday names or indices

  • hour – Hour (0-23)

  • minute – Minute (0-59)

  • mode_id – DHW operation mode ID (1-6, see DhwOperationSetting)

  • temperature – Target temperature in the user’s preferred unit (Celsius or Fahrenheit based on global context). Automatically converted to half-degrees Celsius for the device.

  • temperature_min – Minimum allowed temperature. If not provided, defaults are used: 95°F or ~35°C.

  • temperature_max – Maximum allowed temperature. If not provided, defaults are used: 150°F or ~65°C.

Returns:

Dictionary with reservation entry fields

Raises:

Examples

>>> build_reservation_entry(
...     enabled=True,
...     days=["Monday", "Wednesday", "Friday"],
...     hour=6,
...     minute=30,
...     mode_id=3,
...     temperature=140.0
... )
{
    'enable': 2,
    'week': 158,
    'hour': 6,
    'min': 30,
    'mode': 3,
    'param': 120,
}
nwp500.encoding.build_tou_period(*, season_months: Iterable[int], week_days: Iterable[str | int], start_hour: int, start_minute: int, end_hour: int, end_minute: int, price_min: int | Real, price_max: int | Real, decimal_point: int) dict[str, int][source]

Build a TOU (Time of Use) period entry.

Consistent with MQTT command requirements.

Parameters:
  • season_months – Collection of month numbers (1-12) for this period

  • week_days – Collection of weekday names or indices

  • start_hour – Starting hour (0-23)

  • start_minute – Starting minute (0-59)

  • end_hour – Ending hour (0-23)

  • end_minute – Ending minute (0-59)

  • price_min – Minimum price (float or pre-encoded int)

  • price_max – Maximum price (float or pre-encoded int)

  • decimal_point – Number of decimal places for prices

Returns:

Dictionary with TOU period fields

Raises:

ValueError – If any parameter is out of valid range

Examples

>>> build_tou_period(
...     season_months=[6, 7, 8],
...     week_days=["Monday", "Tuesday", "Wednesday", "Thursday",
"Friday"],
...     start_hour=9,
...     start_minute=0,
...     end_hour=17,
...     end_minute=0,
...     price_min=0.10,
...     price_max=0.25,
...     decimal_point=2
... )
{'season': 448, 'week': 62, 'startHour': 9, 'startMinute': 0, ...}
nwp500.encoding.decode_price(value: int, decimal_point: int) float[source]

Decode an integer price value using the provided decimal point.

Parameters:
  • value – Integer price value from device

  • decimal_point – Number of decimal places (0-10, typically 2-5)

Returns:

Floating-point price value

Raises:

RangeValidationError – If decimal_point is not in range 0-10

Examples

>>> decode_price(1234, 2)
12.34
>>> decode_price(500, 3)
0.5
>>> decode_price(100, 0)
100.0
nwp500.encoding.decode_reservation_hex(hex_string: str) list[dict[str, int]][source]

Decode a hex-encoded reservation string into structured reservation entries.

The reservation data is encoded as 6 bytes per entry: - Byte 0: enable (1=enabled, 2=disabled) - Byte 1: week bitfield (days of week) - Byte 2: hour (0-23) - Byte 3: minute (0-59) - Byte 4: mode (operation mode ID) - Byte 5: param (temperature offset by 20°F)

Parameters:

hex_string – Hexadecimal string representing reservation data

Returns:

List of reservation entry dictionaries

Examples

>>> decode_reservation_hex("013e061e0478")
[{'enable': 1, 'week': 62, 'hour': 6, 'minute': 30, 'mode': 4, 'param':
120}]
nwp500.encoding.decode_season_bitfield(bitfield: int) list[int][source]

Decode a TOU season bitfield into the corresponding month numbers.

Parameters:

bitfield – Integer bitfield where each bit represents a month

Returns:

Sorted list of month numbers (1-12)

Examples

>>> decode_season_bitfield(448)
[6, 7, 8]
>>> decode_season_bitfield(4095)  # All months
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
nwp500.encoding.decode_week_bitfield(bitfield: int) list[str][source]

Decode a reservation bitfield back into a list of weekday names.

Parameters:

bitfield – Integer bitfield where each bit represents a day

Returns:

List of weekday names in order (Monday through Sunday)

Examples

>>> decode_week_bitfield(84)
['Monday', 'Wednesday', 'Friday']
>>> decode_week_bitfield(254)  # All days
['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday',
'Saturday', 'Sunday']
>>> decode_week_bitfield(130)
['Saturday', 'Sunday']
nwp500.encoding.encode_price(value: Real, decimal_point: int) int[source]

Encode a price into the integer representation expected by the device.

The device stores prices as integers with a separate decimal point indicator. For example, $12.34 with decimal_point=2 is stored as 1234.

Parameters:
  • value – Price value (float or Decimal)

  • decimal_point – Number of decimal places (0-10, typically 2-5)

Returns:

Integer representation of the price

Raises:

RangeValidationError – If decimal_point is not in range 0-10

Examples

>>> encode_price(12.34, 2)
1234
>>> encode_price(0.5, 3)
500
>>> encode_price(100, 0)
100
nwp500.encoding.encode_season_bitfield(months: Iterable[int]) int[source]

Encode a collection of month numbers (1-12) into a TOU season bitfield.

Parameters:

months – Collection of month numbers (1=January, 12=December)

Returns:

Integer bitfield where each bit represents a month (January=bit 0, etc.)

Raises:

ValueError – If month number is not in range 1-12

Examples

>>> encode_season_bitfield([6, 7, 8])  # Summer: June, July, August
448  # 0b111000000
>>> encode_season_bitfield([12, 1, 2])  # Winter: Dec, Jan, Feb
4099  # 0b1000000000011
nwp500.encoding.encode_week_bitfield(days: Iterable[str | int]) int[source]

Convert a collection of day names or indices into a reservation bitfield.

Parameters:

days – Collection of weekday names (full or 2-letter abbreviations, case-insensitive) or 0-based indices (Monday=0, Sunday=6)

Returns:

Sun=bit7, Mon=bit6, …, Sat=bit1)

Return type:

Integer bitfield (MGPP encoding

Raises:

Examples

>>> encode_week_bitfield(["Monday", "Wednesday", "Friday"])
84  # 64 + 16 + 4
>>> encode_week_bitfield(["MO", "WE", "FR"])  # 2-letter abbreviations
84
>>> encode_week_bitfield([0, 2, 4])  # 0-indexed: Mon, Wed, Fri
84
>>> encode_week_bitfield([5, 6])  # Saturday and Sunday
130  # 2 + 128

nwp500.enums module

Enumerations for Navien device protocol.

This module contains enumerations for the Navien device protocol. These enums define valid values for device control commands, status fields, and capabilities.

See docs/protocol/quick_reference.rst for comprehensive protocol details.

class nwp500.enums.CommandCode(*values)[source]

Bases: IntEnum

MQTT Command codes for Navien device control.

These command codes are used for MQTT communication with Navien devices. Commands are organized into two categories:

  • Query commands (16777xxx): Request device information

  • Control commands (33554xxx): Change device settings

All commands and their expected payloads are documented in docs/protocol/mqtt_protocol.rst under the “Control Messages” section.

AIR_FILTER_LIFE = 33554474
AIR_FILTER_RESET = 33554473
ANTI_LEGIONELLA_OFF = 33554471
ANTI_LEGIONELLA_ON = 33554472
DEVICE_INFO_REQUEST = 16777217
DHW_MODE = 33554437
DHW_TEMPERATURE = 33554464
DR_OFF = 33554469
DR_ON = 33554470
ENERGY_USAGE_QUERY = 16777225
FREZ_TEMP = 33554451
GOOUT_DAY = 33554466
OTA_CHECK = 33554443
OTA_COMMIT = 33554442
POWER_OFF = 33554433
POWER_ON = 33554434
RECIR_HOT_BTN = 33554444
RECIR_MODE = 33554445
RECIR_RESERVATION = 33554440
RESERVATION_INTELLIGENT_OFF = 33554467
RESERVATION_INTELLIGENT_ON = 33554468
RESERVATION_MANAGEMENT = 16777226
RESERVATION_READ = 16777222
RESERVATION_WATER_PROGRAM = 33554441
RESERVATION_WEEKLY = 33554438
SMART_DIAGNOSTIC = 33554455
STATUS_REQUEST = 16777219
TOU_OFF = 33554475
TOU_ON = 33554476
TOU_RESERVATION = 33554439
WIFI_RECONNECT = 33554446
WIFI_RESET = 33554447
class nwp500.enums.ConnectionStatus(*values)[source]

Bases: IntEnum

Device connection status to cloud/MQTT.

Represents whether the device is currently connected to the Navien cloud service and can receive commands.

CONNECTED = 2
DISCONNECTED = 1
class nwp500.enums.CurrentOperationMode(*values)[source]

Bases: IntEnum

Current operation mode (real-time operational state).

This enum represents the device’s current actual operational state - what the device is doing RIGHT NOW. These values appear in the operation_mode field and change automatically based on heating demand.

HEAT_PUMP_MODE = 32
HYBRID_BOOST_MODE = 96
HYBRID_EFFICIENCY_MODE = 64
STANDBY = 0
class nwp500.enums.DHWControlTypeFlag(*values)[source]

Bases: IntEnum

DHW temperature control precision setting.

Controls the granularity of temperature adjustments available for DHW (Domestic Hot Water) control. Different models support different precision levels.

DISABLE = 1
ENABLE_1_DEGREE = 3
ENABLE_3_STAGE = 4
ENABLE_DOT_5_DEGREE = 2
UNKNOWN = 0
class nwp500.enums.DREvent(*values)[source]

Bases: IntEnum

Demand Response event status.

Allows utilities to manage grid load by signaling water heaters to reduce consumption (shed) or pre-heat (load up) before peak periods.

CPE = 5
LOADUP = 3
LOADUP_ADV = 4
RUN_NORMAL = 1
SHED = 2
UNKNOWN = 0
class nwp500.enums.DeviceType(*values)[source]

Bases: IntEnum

Communication device type.

NPF700_MAIN = 50
NPF700_SUB = 51
NPF700_WIFI = 52
class nwp500.enums.DhwOperationSetting(*values)[source]

Bases: IntEnum

DHW operation setting modes (user-configured heating preferences).

This enum represents the user’s configured mode preference - what heating mode the device should use when it needs to heat water. These values appear in the dhw_operation_setting field and are set via user commands.

ELECTRIC = 2
ENERGY_SAVER = 3
HEAT_PUMP = 1
HIGH_DEMAND = 4
POWER_OFF = 6
VACATION = 5
class nwp500.enums.ErrorCode(*values)[source]

Bases: IntEnum

Device error codes.

Error codes indicate specific faults detected by the device’s diagnostic system. Most errors are Level 1, allowing continued operation with reduced functionality. See docs/protocol/error_codes.rst for complete troubleshooting guide.

E096_UPPER_HEATER = 96
E097_LOWER_HEATER = 97
E326_DRY_FIRE = 326
E407_DHW_TEMP_SENSOR = 407
E445_MIXING_VALVE = 445
E480_TANK_UPPER_TEMP_SENSOR = 480
E481_TANK_LOWER_TEMP_SENSOR = 481
E515_RELAY_FAULT = 515
E517_DIP_SWITCH = 517
E593_PANEL_KEY = 593
E594_EEPROM = 594
E595_POWER_METER = 595
E596_WIFI = 596
E598_RTC = 598
E615_FEEDBACK = 615
E781_CTA2045 = 781
E798_SHUTOFF_VALVE = 798
E799_WATER_LEAK = 799
E901_ECO = 901
E907_COMPRESSOR_POWER = 907
E908_COMPRESSOR = 908
E909_EVAPORATOR_FAN = 909
E910_DISCHARGE_TEMP_SENSOR = 910
E911_DISCHARGE_TEMP_HIGH = 911
E912_SUCTION_TEMP_SENSOR = 912
E913_SUCTION_TEMP_LOW = 913
E914_EVAPORATOR_TEMP_SENSOR = 914
E915_TEMP_DIFFERENCE = 915
E916_EVAPORATOR_TEMP = 916
E920_AMBIENT_TEMP_SENSOR = 920
E940_REFRIGERANT_BLOCKAGE = 940
E990_CONDENSATE_OVERFLOW = 990
NO_ERROR = 0
class nwp500.enums.FilterChange(*values)[source]

Bases: IntEnum

Air filter status for heat pump models.

NORMAL = 0
REPLACE_NEED = 1
UNKNOWN = 2
class nwp500.enums.FirmwareType(*values)[source]

Bases: IntEnum

Firmware component types.

COMMUNICATION_MODULE = 4
CONTROLLER = 1
PANEL = 2
ROOM_CON = 3
SUB_ROOM_CON = 6
UNKNOWN = 0
VALVE_CONTROL = 5
class nwp500.enums.HeatControl(*values)[source]

Bases: IntEnum

Heating control method (for combi-boilers).

OUTSIDE_CONTROL = 3
RETURN = 2
SUPPLY = 1
UNKNOWN = 0
class nwp500.enums.HeatSource(*values)[source]

Bases: IntEnum

Currently active heat source (read-only status).

This reflects what the device is currently using, not what mode it’s set to. In Hybrid mode, this field shows which source(s) are active at any given moment.

HEATELEMENT = 2
HEATPUMP = 1
HEATPUMP_HEATELEMENT = 3
UNKNOWN = 0
class nwp500.enums.InstallType(*values)[source]

Bases: StrEnum

Installation type classification.

Indicates whether the device is installed for residential or commercial use. This affects warranty terms and service requirements.

COMMERCIAL = 'C'
RESIDENTIAL = 'R'
class nwp500.enums.OnOffFlag(*values)[source]

Bases: IntEnum

Generic on/off flag used throughout status fields.

Used for: Power status, TOU status, recirculation status, vacation mode, anti-legionella, and many other boolean device settings.

OFF = 1
ON = 2
class nwp500.enums.Operation(*values)[source]

Bases: IntEnum

Device operation state.

OPERATION = 1
STOP = 2
UNKNOWN = 0
class nwp500.enums.RecirculationMode(*values)[source]

Bases: IntEnum

Recirculation pump operation mode.

ALWAYS = 1
BUTTON = 2
SCHEDULE = 3
TEMPERATURE = 4
UNKNOWN = 0
class nwp500.enums.TempFormulaType(*values)[source]

Bases: IntEnum

Temperature conversion formula type.

Different device models use slightly different rounding algorithms when converting internal Celsius values to Fahrenheit. This ensures the mobile app matches the device’s built-in display.

Type 0: Asymmetric ceiling/floor rounding based on raw value remainder Type 1: Standard rounding to nearest integer

ASYMMETRIC = 0
STANDARD = 1
class nwp500.enums.TemperatureType(*values)[source]

Bases: IntEnum

Temperature display unit preference.

CELSIUS = 1
FAHRENHEIT = 2
class nwp500.enums.TouRateType(*values)[source]

Bases: IntEnum

Electricity rate period type.

Device behavior during each rate period can be configured. Typically, devices heat aggressively during off-peak, maintain temperature during mid-peak, and avoid heating during on-peak unless necessary.

MID_PEAK = 2
OFF_PEAK = 1
ON_PEAK = 3
UNKNOWN = 0
class nwp500.enums.TouWeekType(*values)[source]

Bases: IntEnum

Day grouping for TOU schedules.

TOU schedules can be configured separately for weekdays and weekends to account for different electricity rates and usage patterns.

WEEK_DAY = 0
WEEK_END = 1
class nwp500.enums.UnitType(*values)[source]

Bases: IntEnum

Navien device/unit model types.

CAS_NFB = 7
CAS_NFB_700 = 21
CAS_NHB = 5
CAS_NHB_H = 17
CAS_NPE = 4
CAS_NPE2 = 12
CAS_NPN = 10
CAS_NVW = 15
NCB = 2
NCB_H = 13
NFB = 6
NFB_700 = 20
NFC = 8
NHB = 3
NHB_H = 16
NO_DEVICE = 0
NPE = 1
NPE2 = 11
NPF = 513
NPN = 9
NVW = 14
TWC = 257
class nwp500.enums.VolumeCode(*values)[source]

Bases: IntEnum

Tank volume capacity codes for NWP500 heat pump water heater models.

Represents the nominal tank capacity in gallons for NWP500 series devices. These correspond to the different model variants available.

VOLUME_50 = 1
VOLUME_65 = 2
VOLUME_80 = 3
class nwp500.enums.WaterLevel(*values)[source]

Bases: IntEnum

Hot water level indicator (displayed as gauge in app).

Note: IDs are non-sequential, likely represent bit positions for multi-level displays.

HIGH = 4
LOW = 2
LOW_MEDIUM = 8
MEDIUM_HIGH = 16

nwp500.events module

Event Emitter for Navien device state changes.

This module provides an event-driven architecture for handling device state changes, allowing multiple listeners per event and automatic state change detection.

class nwp500.events.EventEmitter[source]

Bases: object

Event emitter with support for multiple listeners per event.

Provides an event-driven architecture for device state changes with: - Multiple listeners per event - Async handler support - One-time listeners (once) - Priority-based execution order - Automatic state change detection

Example:

emitter = EventEmitter()

# Register listeners
emitter.on('temperature_changed', log_temperature)
emitter.on('temperature_changed', update_ui)

# Emit events
await emitter.emit('temperature_changed', temperature_event)

# One-time listener
emitter.once('device_ready', initialize)

# Remove listener
emitter.off('temperature_changed', log_temperature)
async emit(event: str, *args: Any, **kwargs: Any) int[source]

Emit an event to all registered listeners.

Executes listeners in priority order (highest first). One-time listeners are automatically removed after execution.

Parameters:
  • event – Event name to emit

  • *args – Positional arguments to pass to listeners

  • **kwargs – Keyword arguments to pass to listeners

Returns:

Number of listeners that were called

Example:

# Emit with an event object
await emitter.emit('temperature_changed', temperature_event)

# Emit with keyword arguments
await emitter.emit('status_updated', status=device_status)
event_count(event: str) int[source]

Get the number of times an event has been emitted.

Parameters:

event – Event name

Returns:

Number of times event was emitted

Example:

count = emitter.event_count('temperature_changed')
print(f"Event emitted {count} times")
event_names() list[str][source]

Get list of all registered event names.

Returns:

List of event names with active listeners

Example:

events = emitter.event_names()
print(f"Active events: {', '.join(events)}")
listener_count(event: str) int[source]

Get the number of listeners for an event.

Parameters:

event – Event name

Returns:

Number of registered listeners

Example:

count = emitter.listener_count('temperature_changed')
print(f"{count} listeners registered")
off(event: str, callback: Callable[[...], Any | None] | None = None) int[source]

Remove event listener(s).

Parameters:
  • event – Event name

  • callback – Specific callback to remove, or None to remove all for

  • event

Returns:

Number of listeners removed

Example:

# Remove specific listener
emitter.off('temperature_changed', log_temperature)

# Remove all listeners for event
emitter.off('temperature_changed')
on(event: str, callback: Callable[[...], Any], priority: int = 50) None[source]

Register an event listener.

Parameters:
  • event – Event name to listen for

  • callback – Function to call when event is emitted (can be async)

  • priority – Execution priority (higher = earlier, default: 50)

Example:

from nwp500.unit_system import get_unit_system

def on_temp_change(event):
    unit = "°C" if get_unit_system() == "metric" else "°F"
    print(
        f"Temperature: {event.old_temperature}{unit} → "
        f"{event.new_temperature}{unit}"
    )

emitter.on('temperature_changed', on_temp_change)

# Async handler
async def save_to_db(event):
    await db.save(event.new_temperature)

emitter.on('temperature_changed', save_to_db, priority=100)
once(event: str, callback: Callable[[...], Any], priority: int = 50) None[source]

Register a one-time event listener.

The listener will be automatically removed after first execution.

Parameters:
  • event – Event name to listen for

  • callback – Function to call when event is emitted

  • priority – Execution priority (higher = earlier, default: 50)

Example:

emitter.once('device_ready', initialize_device)
# Will only be called once, then auto-removed
remove_all_listeners(event: str | None = None) int[source]

Remove all listeners for an event, or all listeners for all events.

Parameters:

event – Event name, or None to remove all listeners

Returns:

Number of listeners removed

Example:

# Remove all listeners for specific event
emitter.remove_all_listeners('temperature_changed')

# Remove all listeners for all events
emitter.remove_all_listeners()
async wait_for(event: str, timeout: float | None = None) tuple[Any, ...][source]

Wait for an event to be emitted.

Parameters:
  • event – Event name to wait for

  • timeout – Maximum time to wait in seconds (None = wait forever)

Returns:

Tuple of arguments passed to the event

Raises:

asyncio.TimeoutError – If timeout is reached

Example:

# Wait for device to be ready
await emitter.wait_for('device_ready', timeout=30)

# Wait for specific condition
args, _ = await emitter.wait_for('temperature_changed')
temperature_event = args[0]
current_temp = temperature_event.new_temperature
class nwp500.events.EventListener(callback: Callable[[...], Any], once: bool = False, priority: int = 50)[source]

Bases: object

Represents a registered event listener.

callback: Callable[[...], Any]
once: bool = False
priority: int = 50

nwp500.exceptions module

Exception hierarchy for nwp500-python library.

This module defines all custom exceptions used throughout the library, providing a clear hierarchy for error handling and better developer experience.

Exception Hierarchy:

Nwp500Error (base)
├── AuthenticationError
│   ├── InvalidCredentialsError
│   ├── TokenExpiredError
│   └── TokenRefreshError
├── APIError
├── MqttError
│   ├── MqttConnectionError
│   ├── MqttNotConnectedError
│   ├── MqttPublishError
│   ├── MqttSubscriptionError
│   └── MqttCredentialsError
├── ValidationError
│   ├── ParameterValidationError
│   └── RangeValidationError
└── DeviceError
    ├── DeviceNotFoundError
    ├── DeviceOfflineError
    ├── DeviceOperationError
    └── DeviceCapabilityError

Migration from v4.x

If you were catching generic exceptions in your code, update as follows:

# Old code (v4.x)
try:
    await mqtt_client.request_device_status(device)
except RuntimeError as e:
    if "Not connected" in str(e):
        # handle connection error

# New code (v5.0+)
try:
    await mqtt_client.request_device_status(device)
except MqttNotConnectedError:
    # handle connection error
except MqttError:
    # handle other MQTT errors

# Old code (v4.x)
try:
    set_vacation_mode(days=35)
except ValueError as e:
    # handle validation error

# New code (v5.0+)
try:
    set_vacation_mode(days=35)
except RangeValidationError as e:
    print(f"Invalid {e.field}: {e.message}")
    print(f"Valid range: {e.min_value} to {e.max_value}")
except ValidationError:
    # handle other validation errors
exception nwp500.exceptions.APIError(message: str, code: int | None = None, response: dict[str, Any | None] | None = None, **kwargs: Any)[source]

Bases: Nwp500Error

Raised when API returns an error response.

This exception is raised for various API-related failures including network errors, invalid responses, and API endpoint errors.

message

Error message describing the failure

code

HTTP or API error code

response

Complete API response dictionary (optional)

exception nwp500.exceptions.AuthenticationError(message: str, status_code: int | None = None, response: dict[str, Any | None] | None = None, **kwargs: Any)[source]

Bases: Nwp500Error

Base exception for authentication errors.

Raised when authentication-related operations fail, including sign-in, token management, and credential validation.

message

Error message describing the failure

status_code

HTTP status code (optional)

response

Complete API response dictionary (optional)

exception nwp500.exceptions.DeviceCapabilityError(feature_name: str, message: str | None = None)[source]

Bases: DeviceError

Device does not support a requested capability.

Raised when an MQTT command requires a device capability that the device does not support. This may occur when trying to use features that are not available on specific device models or hardware revisions.

feature_name

Name of the unsupported feature

exception nwp500.exceptions.DeviceError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: Nwp500Error

Base exception for device operations.

All device-related errors inherit from this base class.

exception nwp500.exceptions.DeviceNotFoundError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: DeviceError

Requested device not found.

Raised when a device cannot be found in the user’s device list or when attempting to access a non-existent device.

exception nwp500.exceptions.DeviceOfflineError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: DeviceError

Device is offline or unreachable.

Raised when a device is offline and cannot respond to commands or status requests. The device may be powered off, disconnected from the network, or experiencing connectivity issues.

exception nwp500.exceptions.DeviceOperationError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: DeviceError

Device operation failed.

Raised when a device operation (mode change, temperature setting, etc.) fails. This may occur due to invalid commands, device restrictions, or device-side errors.

exception nwp500.exceptions.InvalidCredentialsError(message: str, status_code: int | None = None, response: dict[str, Any | None] | None = None, **kwargs: Any)[source]

Bases: AuthenticationError

Raised when user credentials are invalid.

This typically indicates a 401 Unauthorized response from the API due to incorrect email/password combination.

exception nwp500.exceptions.MqttConnectionError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: MqttError

Connection establishment or maintenance failed.

Raised when the MQTT connection to AWS IoT Core cannot be established or when an existing connection fails. This may be due to network issues, invalid credentials, or AWS service problems.

exception nwp500.exceptions.MqttCredentialsError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: MqttError

AWS credentials invalid or expired.

Raised when AWS IoT credentials are missing, invalid, or expired. Re-authentication may be required to obtain fresh credentials.

exception nwp500.exceptions.MqttError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: Nwp500Error

Base exception for MQTT operations.

All MQTT-related errors inherit from this base class, allowing consumers to handle all MQTT issues with a single exception handler.

exception nwp500.exceptions.MqttNotConnectedError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: MqttError

Operation requires active MQTT connection.

Raised when attempting MQTT operations (publish, subscribe, etc.) without an established connection. Call connect() before performing MQTT operations.

Example:

mqtt_client = NavienMqttClient(auth_client)
# Must connect first
await mqtt_client.connect()
await mqtt_client.request_device_status(device)
exception nwp500.exceptions.MqttPublishError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: MqttError

Failed to publish message to MQTT broker.

Raised when a message cannot be published to an MQTT topic. This may occur during connection interruptions or when the broker rejects the message.

exception nwp500.exceptions.MqttSubscriptionError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: MqttError

Failed to subscribe to MQTT topic.

Raised when subscription to an MQTT topic fails. This may occur if the connection is interrupted or if the client lacks permissions for the topic.

exception nwp500.exceptions.Nwp500Error(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: Exception

Base exception for all nwp500 library errors.

All custom exceptions in the nwp500 library inherit from this base class, allowing consumers to catch all library-specific errors with a single exception handler if desired.

message

Human-readable error message

error_code

Machine-readable error code (optional)

details

Additional context as a dictionary (optional)

retriable

Whether the operation can be retried (optional)

to_dict() dict[str, Any][source]

Serialize exception for logging/monitoring.

Returns:

Dictionary with error type, message, code, details, and retriability

exception nwp500.exceptions.ParameterValidationError(message: str, parameter: str | None = None, value: Any = None, **kwargs: Any)[source]

Bases: ValidationError

Invalid parameter value provided.

Raised when a parameter value is invalid for reasons other than being out of range (e.g., wrong type, invalid format).

parameter

Name of the invalid parameter

value

The invalid value provided

exception nwp500.exceptions.RangeValidationError(message: str, field: str | None = None, value: Any = None, min_value: Any = None, max_value: Any = None, **kwargs: Any)[source]

Bases: ValidationError

Value outside acceptable range.

Raised when a numeric value is outside its valid range.

field

Name of the field

value

The invalid value provided

min_value

Minimum acceptable value

max_value

Maximum acceptable value

Example:

try:
    set_temperature(200)
except RangeValidationError as e:
    print(f"Invalid {e.field}: must be {e.min_value}-{e.max_value}")
exception nwp500.exceptions.TokenExpiredError(message: str, status_code: int | None = None, response: dict[str, Any | None] | None = None, **kwargs: Any)[source]

Bases: AuthenticationError

Raised when an authentication token has expired.

Tokens have a limited lifetime and must be refreshed periodically. This exception indicates that a token has passed its expiration time.

exception nwp500.exceptions.TokenRefreshError(message: str, status_code: int | None = None, response: dict[str, Any | None] | None = None, **kwargs: Any)[source]

Bases: AuthenticationError

Raised when token refresh operation fails.

Token refresh can fail due to invalid refresh tokens, network issues, or API errors. When this occurs, full re-authentication may be required.

exception nwp500.exceptions.ValidationError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: Nwp500Error

Base exception for validation failures.

Raised when input parameters or data fail validation checks.

nwp500.factory module

Factory functions for convenient client creation and initialization.

This module provides helper functions to simplify the process of creating and authenticating all Navien clients with a single function call.

Use factory functions when you want: - Simplified initialization of all clients at once - Automatic error handling during authentication - Clear initialization order and dependencies - Convenience over fine-grained control

Example

>>> auth, api, mqtt = await create_navien_clients(
...     email="user@example.com",
...     password="password"
... )
>>> async with auth:
...     devices = await api.list_devices()
async nwp500.factory.create_navien_clients(email: str, password: str) tuple[NavienAuthClient, NavienAPIClient, NavienMqttClient][source]

Create and authenticate all Navien clients with one call.

This factory function handles the complete initialization sequence: 1. Creates an auth client with the provided credentials 2. Authenticates with the Navien API (via context manager) 3. Creates API and MQTT clients using the authenticated session 4. Returns all clients ready to use

Parameters:
  • email – Navien account email address

  • password – Navien account password

Returns:

Tuple of (auth_client, api_client, mqtt_client) ready to use

Raises:

Example

>>> auth, api, mqtt = await create_navien_clients(
...     email="user@example.com",
...     password="password"
... )
>>> async with auth:
...     # All clients are ready to use
...     devices = await api.list_devices()
...     await mqtt.connect()
...     # Use clients ...

Note

You must still use the auth client as a context manager to ensure the session is properly cleaned up:

>>> auth, api, mqtt = await create_navien_clients(email, password)
>>> async with auth:
...     # Use api and mqtt clients here
...     ...
>>> # Session is automatically closed when exiting the context

nwp500.field_factory module

Field factory for creating typed Pydantic fields with metadata templates.

This module provides convenience functions for creating Pydantic fields with standard metadata (device_class, unit_of_measurement, etc.) pre-configured, reducing boilerplate in models while maintaining type safety.

Each factory function creates a Pydantic Field with metadata for Home Assistant integration:

  • temperature_field: Adds unit_of_measurement, device_class=’temperature’, suggested_display_precision

  • signal_strength_field: Adds unit_of_measurement, device_class=’signal_strength’

  • energy_field: Adds unit_of_measurement, device_class=’energy’

  • power_field: Adds unit_of_measurement, device_class=’power’

Example

>>> from nwp500.field_factory import temperature_field
>>> class MyModel(BaseModel):
...     temp: float = temperature_field("DHW Temperature", unit="°F")
nwp500.field_factory.energy_field(description: str, *, unit: str = 'kWh', default: Any = None, **kwargs: Any) Any[source]

Create an energy field with standard Home Assistant metadata.

Parameters:
  • description – Field description

  • unit – Energy unit (default: kWh)

  • default – Default value or Pydantic default

  • **kwargs – Additional Pydantic Field arguments

Returns:

Pydantic Field with energy metadata

nwp500.field_factory.power_field(description: str, *, unit: str = 'W', default: Any = None, **kwargs: Any) Any[source]

Create a power field with standard Home Assistant metadata.

Parameters:
  • description – Field description

  • unit – Power unit (default: W)

  • default – Default value or Pydantic default

  • **kwargs – Additional Pydantic Field arguments

Returns:

Pydantic Field with power metadata

nwp500.field_factory.signal_strength_field(description: str, *, unit: str = 'dBm', default: Any = None, **kwargs: Any) Any[source]

Create a signal strength field with standard Home Assistant metadata.

Parameters:
  • description – Field description

  • unit – Signal unit (default: dBm)

  • default – Default value or Pydantic default

  • **kwargs – Additional Pydantic Field arguments

Returns:

Pydantic Field with signal strength metadata

nwp500.field_factory.temperature_field(description: str, *, unit: str = '°F', default: Any = None, **kwargs: Any) Any[source]

Create a temperature field with standard Home Assistant metadata.

The unit parameter is critical for tools consuming this library (e.g., Home Assistant) to correctly interpret the values. While the actual displayed unit is dynamic based on device temperature_type setting (Celsius or Fahrenheit), the unit parameter in json_schema_extra provides the default/fallback unit and schema documentation for proper integration.

Parameters:
  • description – Field description

  • unit – Temperature unit (default: °F). Used in json_schema_extra for Home Assistant and other integrations to understand value units. Displayed units are dynamic based on device temperature_type.

  • default – Default value or Pydantic default

  • **kwargs – Additional Pydantic Field arguments

Returns:

Pydantic Field with temperature metadata

nwp500.mqtt_events module

Typed event definitions for NavienMqttClient.

This module provides a centralized registry of all events emitted by the NavienMqttClient, with full type information and documentation. This enables:

  • IDE autocomplete for event names

  • Type-safe event handlers

  • Clear contracts for event data

  • Programmatic event discovery

Example:

from nwp500.mqtt_events import MqttClientEvents
from nwp500.unit_system import get_unit_system

# Type-safe event listening with autocomplete
def on_temperature_changed(event):
    unit = "°C" if get_unit_system() == "metric" else "°F"
    print(
        f"Temp: {event.old_temperature}{unit} → "
        f"{event.new_temperature}{unit}"
    )

mqtt_client.on(MqttClientEvents.TEMPERATURE_CHANGED, on_temperature_changed)

# List all available events
for event_name in MqttClientEvents.get_all_events():
    print(event_name)
class nwp500.mqtt_events.ConnectionInterruptedEvent(error: Exception)[source]

Bases: object

Emitted when MQTT connection is interrupted.

error

The error that caused the interruption

Type:

Exception

error: Exception
class nwp500.mqtt_events.ConnectionResumedEvent(return_code: int, session_present: bool)[source]

Bases: object

Emitted when MQTT connection is resumed after interruption.

return_code

MQTT return code (0 = success)

Type:

int

session_present

Whether session state was preserved

Type:

bool

return_code: int
session_present: bool
class nwp500.mqtt_events.ErrorClearedEvent(device_mac: str, error_code: ErrorCode)[source]

Bases: object

Emitted when a device error is resolved.

device_mac

MAC address of the origin device

Type:

str

error_code

The error code that was cleared

Type:

ErrorCode

device_mac: str
error_code: ErrorCode
class nwp500.mqtt_events.ErrorDetectedEvent(device_mac: str, error_code: ErrorCode, status: DeviceStatus)[source]

Bases: object

Emitted when a device error is first detected.

device_mac

MAC address of the origin device

Type:

str

error_code

The error code that occurred

Type:

ErrorCode

status

Device status when error was detected

Type:

DeviceStatus

device_mac: str
error_code: ErrorCode
status: DeviceStatus
class nwp500.mqtt_events.FeatureReceivedEvent(device_mac: str, feature: DeviceFeature)[source]

Bases: object

Emitted when device feature information is received.

device_mac

MAC address of the origin device

Type:

str

feature

The device feature information

Type:

DeviceFeature

device_mac: str
feature: DeviceFeature
class nwp500.mqtt_events.HeatingStartedEvent(device_mac: str, status: DeviceStatus)[source]

Bases: object

Emitted when device transitions from idle to heating.

device_mac

MAC address of the origin device

Type:

str

status

Device status when heating started

Type:

DeviceStatus

device_mac: str
status: DeviceStatus
class nwp500.mqtt_events.HeatingStoppedEvent(device_mac: str, status: DeviceStatus)[source]

Bases: object

Emitted when device transitions from heating to idle.

device_mac

MAC address of the origin device

Type:

str

status

Device status when heating stopped

Type:

DeviceStatus

device_mac: str
status: DeviceStatus
class nwp500.mqtt_events.ModeChangedEvent(device_mac: str, old_mode: CurrentOperationMode, new_mode: CurrentOperationMode)[source]

Bases: object

Emitted when the device operation mode changes.

device_mac

MAC address of the origin device

Type:

str

old_mode

Previous operation mode

Type:

CurrentOperationMode

new_mode

New operation mode

Type:

CurrentOperationMode

device_mac: str
new_mode: CurrentOperationMode
old_mode: CurrentOperationMode
class nwp500.mqtt_events.MqttClientEvents[source]

Bases: object

Registry of all NavienMqttClient events.

This class provides string constants for all events emitted by NavienMqttClient, with associated event data types documented in their dataclass definitions.

Usage:

mqtt_client.on(
    MqttClientEvents.TEMPERATURE_CHANGED,
    lambda event: update_display(event.new_temperature)
)

# Wait for a specific event
args, _ = await mqtt_client.wait_for(
    MqttClientEvents.CONNECTION_RESUMED
)
connection_event = args[0]

# List all available events
events = ', '.join(MqttClientEvents.get_all_events())
print(f"Available events: {events}")

See also

../guides/event_system - Comprehensive event handling guide

CONNECTION_INTERRUPTED = 'connection_interrupted'

MQTT connection interrupted with error.

Parameters:

event (ConnectionInterruptedEvent) – Event object with the error field.

See: ConnectionInterruptedEvent

Type:

Emitted

CONNECTION_RESUMED = 'connection_resumed'

MQTT connection resumed after interruption.

Parameters:

event (ConnectionResumedEvent) – Event object with return_code and session_present fields.

See: ConnectionResumedEvent

Type:

Emitted

ERROR_CLEARED = 'error_cleared'

Device error cleared.

Parameters:

event (ErrorClearedEvent) – Event object with the error_code field.

See: ErrorClearedEvent

Type:

Emitted

ERROR_DETECTED = 'error_detected'

Device error detected.

Parameters:

event (ErrorDetectedEvent) – Event object with error_code and status fields.

See: ErrorDetectedEvent

Type:

Emitted

FEATURE_RECEIVED = 'feature_received'

Device feature information received.

Parameters:

event (FeatureReceivedEvent) – Event object with the feature field.

See: FeatureReceivedEvent

Type:

Emitted

HEATING_STARTED = 'heating_started'

Device started heating.

Parameters:

event (HeatingStartedEvent) – Event object with the status field.

See: HeatingStartedEvent

Type:

Emitted

HEATING_STOPPED = 'heating_stopped'

Device stopped heating.

Parameters:

event (HeatingStoppedEvent) – Event object with the status field.

See: HeatingStoppedEvent

Type:

Emitted

MODE_CHANGED = 'mode_changed'

Device operation mode changed.

Parameters:

event (ModeChangedEvent) – Event object with old_mode and new_mode fields.

See: ModeChangedEvent

Type:

Emitted

POWER_CHANGED = 'power_changed'

Instantaneous power consumption changed.

Parameters:

event (PowerChangedEvent) – Event object with old_power and new_power fields.

See: PowerChangedEvent

Type:

Emitted

STATUS_RECEIVED = 'status_received'

Device status message received.

Parameters:

event (StatusReceivedEvent) – Event object with the status field.

See: StatusReceivedEvent

Type:

Emitted

TEMPERATURE_CHANGED = 'temperature_changed'

DHW temperature changed.

Parameters:

event (TemperatureChangedEvent) – Event object with old_temperature and new_temperature fields.

See: TemperatureChangedEvent

Type:

Emitted

classmethod get_all_events() list[str][source]

Get list of all available event names.

Returns:

List of event constant names (not including metadata strings)

Example:

for event_name in MqttClientEvents.get_all_events():
    print(f"- {event_name}")

# Output:
# - CONNECTION_INTERRUPTED
# - CONNECTION_RESUMED
# - STATUS_RECEIVED
# - TEMPERATURE_CHANGED
# - ...
classmethod get_event_value(event_name: str) str[source]

Get the string value of an event constant.

Parameters:

event_name – Event constant name (e.g., “TEMPERATURE_CHANGED”)

Returns:

Event string value (e.g., “temperature_changed”)

Raises:

AttributeError – If event_name does not exist

Example:

value = MqttClientEvents.get_event_value("TEMPERATURE_CHANGED")
print(value)  # Output: "temperature_changed"
class nwp500.mqtt_events.PowerChangedEvent(device_mac: str, old_power: float, new_power: float)[source]

Bases: object

Emitted when instantaneous power consumption changes.

device_mac

MAC address of the origin device

Type:

str

old_power

Previous power consumption in watts

Type:

float

new_power

New power consumption in watts

Type:

float

device_mac: str
new_power: float
old_power: float
class nwp500.mqtt_events.StatusReceivedEvent(device_mac: str, status: DeviceStatus)[source]

Bases: object

Emitted when a device status message is received.

device_mac

MAC address of the origin device

Type:

str

status

The current device status snapshot

Type:

DeviceStatus

device_mac: str
status: DeviceStatus
class nwp500.mqtt_events.TemperatureChangedEvent(device_mac: str, old_temperature: float, new_temperature: float)[source]

Bases: object

Emitted when the DHW temperature changes.

device_mac

MAC address of the origin device

Type:

str

old_temperature

Previous DHW temperature in user’s preferred unit (Celsius or Fahrenheit based on unit system context)

Type:

float

new_temperature

New DHW temperature in user’s preferred unit (Celsius or Fahrenheit based on unit system context)

Type:

float

device_mac: str
new_temperature: float
old_temperature: float

nwp500.openei module

OpenEI Utility Rates API client.

Provides async access to the OpenEI Utility Rates API for querying electricity rate plans by zip code. Used to populate Time-of-Use (TOU) schedules on Navien devices.

API key can be obtained for free at https://openei.org/services/api/signup/

class nwp500.openei.OpenEIClient(api_key: str | None = None, session: ClientSession | None = None)[source]

Bases: object

Async client for the OpenEI Utility Rates API.

Queries residential electricity rate plans by zip code. Requires an API key from https://openei.org/services/api/signup/

The API key is resolved in this order: 1. api_key constructor parameter 2. OPENEI_API_KEY environment variable

Example

>>> async with OpenEIClient() as client:
...     plans = await client.list_rate_plans("94903")
...     for plan in plans:
...         print(f"{plan['utility']}: {plan['name']}")
async fetch_rates(zip_code: str, *, limit: int = 100) list[dict[str, Any]][source]

Fetch all residential rate plans for a zip code.

Parameters:
  • zip_code – US zip code to search

  • limit – Maximum number of results (default: 100)

Returns:

List of raw OpenEI rate plan dictionaries

Raises:
  • ValueError – If no API key is configured

  • aiohttp.ClientError – If the API request fails

async get_rate_plan(zip_code: str, plan_name: str, *, utility: str | None = None) dict[str, Any] | None[source]

Get a specific rate plan by name.

Returns the first matching plan. Use utility to disambiguate if multiple utilities serve the same zip code.

Parameters:
  • zip_code – US zip code to search

  • plan_name – Rate plan name (case-insensitive substring match)

  • utility – Filter by utility name (case-insensitive substring match)

Returns:

Full rate plan dictionary or None if not found

async list_rate_plans(zip_code: str, *, utility: str | None = None) list[dict[str, Any]][source]

List rate plans, optionally filtered by utility.

Parameters:
  • zip_code – US zip code to search

  • utility – Filter by utility name (case-insensitive substring match)

Returns:

name, utility, label, eiaid, approved, has_tou_schedule

Return type:

List of rate plan dictionaries with keys

async list_utilities(zip_code: str) list[str][source]

List unique utility providers for a zip code.

Parameters:

zip_code – US zip code to search

Returns:

Sorted list of unique utility names

nwp500.reservations module

Reservation schedule management helpers.

This module provides high-level helpers for managing individual reservation entries on a Navien device. The device protocol requires sending the full schedule for every change, so each helper follows a read-modify-write pattern: fetch the current schedule, apply the change, then send the updated list back.

All functions are async and require a connected NavienMqttClient.

async nwp500.reservations.add_reservation(mqtt: NavienMqttClient, device: Device, *, enabled: bool, days: Sequence[str | int], hour: int, minute: int, mode: int, temperature: float) None[source]

Add a single reservation entry to the device schedule.

Fetches the current schedule, appends the new entry, and sends the updated list back to the device. The schedule is automatically enabled after a successful add.

Parameters:
  • mqtt – Connected MQTT client.

  • device – Target device.

  • enabled – Whether the new reservation is active.

  • days – Days of the week. Accepts full names ("Monday"), 2-letter abbreviations ("MO"), or integer indices where 0 = Monday and 6 = Sunday.

  • hour – Hour of the day in 24-hour format (0–23).

  • minute – Minute of the hour (0–59).

  • mode – DHW operation mode (1–6).

  • temperature – Target temperature in the user’s preferred unit.

Raises:
async nwp500.reservations.delete_reservation(mqtt: NavienMqttClient, device: Device, index: int) None[source]

Delete a single reservation entry by 1-based index.

Fetches the current schedule, removes the entry at index, and sends the updated list back. If the schedule becomes empty, it is automatically disabled.

Parameters:
  • mqtt – Connected MQTT client.

  • device – Target device.

  • index – 1-based position of the reservation to delete.

Raises:
  • ValueError – If index is out of the valid range.

  • TimeoutError – If the current schedule cannot be fetched.

async nwp500.reservations.fetch_reservations(mqtt: NavienMqttClient, device: Device, *, timeout: float = 10.0) ReservationSchedule | None[source]

Fetch the current reservation schedule from a device.

Sends a request to the device and waits for the response.

Parameters:
  • mqtt – Connected MQTT client.

  • device – Target device.

  • timeout – Seconds to wait for a response before giving up.

Returns:

The current ReservationSchedule, or None on timeout.

async nwp500.reservations.update_reservation(mqtt: NavienMqttClient, device: Device, index: int, *, enabled: bool | None = None, days: Sequence[str | int] | None = None, hour: int | None = None, minute: int | None = None, mode: int | None = None, temperature: float | None = None) None[source]

Update a single reservation entry in-place by 1-based index.

Only the fields that are explicitly provided are changed; all other fields are preserved from the existing entry.

Parameters:
  • mqtt – Connected MQTT client.

  • device – Target device.

  • index – 1-based position of the reservation to update.

  • enabled – Set the enabled state, or None to keep current.

  • days – Replace the days, or None to keep current. Accepts full names, 2-letter abbreviations, or integer indices (see add_reservation()).

  • hour – Replace the hour (0–23), or None to keep current.

  • minute – Replace the minute (0–59), or None to keep current.

  • mode – Replace the mode (1–6), or None to keep current.

  • temperature – Replace the temperature (in the user’s preferred unit), or None to keep the existing raw param value unchanged.

Raises:
  • ValueError – If index is out of the valid range, or if any of hour, minute, or mode are provided but out of range.

  • RangeValidationError – If temperature is out of the device’s range.

  • ValidationError – If the updated entry fails model validation.

  • TimeoutError – If the current schedule cannot be fetched.

nwp500.temperature module

Temperature conversion utilities for different device representations.

The Navien NWP500 uses different temperature precision formats: - HalfCelsius: 0.5°C precision (value / 2.0) - DeciCelsius: 0.1°C precision (value / 10.0)

All values are converted to preferred unit based on device preference.

class nwp500.temperature.DeciCelsius(raw_value: int | float)[source]

Bases: Temperature

Temperature in decicelsius (0.1°C precision).

Used for high-precision temperature measurements. Formula: raw_value / 10.0 converts to Celsius.

Example

>>> temp = DeciCelsius(600)  # Raw device value 600
>>> temp.to_celsius()
60.0
>>> temp.to_fahrenheit()
140.0
classmethod from_celsius(celsius: float) DeciCelsius[source]

Create DeciCelsius from Celsius (for device commands).

Parameters:

celsius – Temperature in Celsius.

Returns:

DeciCelsius instance with raw value for device.

Example

>>> temp = DeciCelsius.from_celsius(60.0)
>>> temp.raw_value
600
classmethod from_fahrenheit(fahrenheit: float) DeciCelsius[source]

Create DeciCelsius from Fahrenheit (for device commands).

Parameters:

fahrenheit – Temperature in Fahrenheit.

Returns:

DeciCelsius instance with raw value for device.

Example

>>> temp = DeciCelsius.from_fahrenheit(140.0)
>>> temp.raw_value
600
to_celsius() float[source]

Convert to Celsius.

Returns:

Temperature in Celsius.

to_fahrenheit() float[source]

Convert to Fahrenheit.

Returns:

Temperature in Fahrenheit.

class nwp500.temperature.DeciCelsiusDelta(raw_value: int | float)[source]

Bases: Temperature

Temperature delta in decicelsius (0.1°C precision).

Represents a temperature difference/delta, NOT an absolute temperature. Used for differential temperature settings (e.g., heat pump on/off Diff). Formula: raw_value / 10.0 converts to Celsius delta.

Key difference from DeciCelsius: When converting to Fahrenheit, we apply the scale factor (9/5) but NOT the offset (+32), since this is a delta not an absolute temperature.

Example

>>> temp = DeciCelsiusDelta(5)  # Raw device value 5
>>> temp.to_celsius()
0.5
>>> temp.to_fahrenheit()
0.9  # 0.5°C * 9/5 = 0.9°F, no +32 offset
classmethod from_celsius(celsius: float) DeciCelsiusDelta[source]

Create DeciCelsiusDelta from Celsius delta (for device commands).

Parameters:

celsius – Temperature delta in Celsius.

Returns:

DeciCelsiusDelta instance with raw value for device.

Example

>>> temp = DeciCelsiusDelta.from_celsius(0.5)
>>> temp.raw_value
5
classmethod from_fahrenheit(fahrenheit: float) DeciCelsiusDelta[source]

Create DeciCelsiusDelta from Fahrenheit delta (for device commands).

Parameters:

fahrenheit – Temperature delta in Fahrenheit.

Returns:

DeciCelsiusDelta instance with raw value for device.

Example

>>> temp = DeciCelsiusDelta.from_fahrenheit(0.9)
>>> temp.raw_value
5
to_celsius() float[source]

Convert to Celsius delta.

Returns:

Temperature delta in Celsius.

to_fahrenheit() float[source]

Convert to Fahrenheit delta (without +32 offset).

Returns:

Temperature delta in Fahrenheit.

class nwp500.temperature.HalfCelsius(raw_value: int | float)[source]

Bases: Temperature

Temperature in half-degree Celsius (0.5°C precision).

Used for DHW (domestic hot water) temperatures in device status. Formula: raw_value / 2.0 converts to Celsius.

Example

>>> temp = HalfCelsius(120)  # Raw device value 120
>>> temp.to_celsius()
60.0
>>> temp.to_fahrenheit()
140.0
classmethod from_celsius(celsius: float) HalfCelsius[source]

Create HalfCelsius from Celsius (for device commands).

Parameters:

celsius – Temperature in Celsius.

Returns:

HalfCelsius instance with raw value for device.

Example

>>> temp = HalfCelsius.from_celsius(60.0)
>>> temp.raw_value
120
classmethod from_fahrenheit(fahrenheit: float) HalfCelsius[source]

Create HalfCelsius from Fahrenheit (for device commands).

Parameters:

fahrenheit – Temperature in Fahrenheit.

Returns:

HalfCelsius instance with raw value for device.

Example

>>> temp = HalfCelsius.from_fahrenheit(140.0)
>>> temp.raw_value
120
to_celsius() float[source]

Convert to Celsius.

Returns:

Temperature in Celsius.

to_fahrenheit() float[source]

Convert to Fahrenheit.

Returns:

Temperature in Fahrenheit.

class nwp500.temperature.RawCelsius(raw_value: int | float)[source]

Bases: Temperature

Temperature in raw halves of Celsius (0.5°C precision).

Used for outdoor/ambient temperature measurements that require formula-specific rounding for Fahrenheit conversion. Formula: raw_value / 2.0 converts to Celsius.

The Fahrenheit conversion supports two formula types: - Type 0 (Asymmetric Rounding): Uses floor/ceil based on remainder - Type 1 (Standard Rounding): Uses standard math rounding

Example

>>> temp = RawCelsius(120)  # Raw device value 120
>>> temp.to_celsius()
60.0
>>> temp.to_fahrenheit()
140.0
classmethod from_celsius(celsius: float) RawCelsius[source]

Create RawCelsius from Celsius (for device commands).

Parameters:

celsius – Temperature in Celsius.

Returns:

RawCelsius instance with raw value for device.

Example

>>> temp = RawCelsius.from_celsius(60.0)
>>> temp.raw_value
120
classmethod from_fahrenheit(fahrenheit: float) RawCelsius[source]

Create RawCelsius from Fahrenheit (for device commands).

Parameters:

fahrenheit – Temperature in Fahrenheit.

Returns:

RawCelsius instance with raw value for device.

Example

>>> temp = RawCelsius.from_fahrenheit(140.0)
>>> temp.raw_value
120
to_celsius() float[source]

Convert to Celsius.

Returns:

Temperature in Celsius.

to_fahrenheit() float[source]

Convert to Fahrenheit using standard rounding.

Returns:

Temperature in Fahrenheit.

to_fahrenheit_with_formula(formula_type: TempFormulaType) float[source]

Convert to Fahrenheit using formula-specific rounding.

Parameters:

formula_type – Temperature formula type (ASYMMETRIC or STANDARD)

Returns:

Temperature in Fahrenheit.

class nwp500.temperature.Temperature(raw_value: int | float)[source]

Bases: ABC

Base class for temperature conversions with device protocol support.

classmethod from_celsius(celsius: float) Temperature[source]

Create instance from Celsius value (for commands).

Parameters:

celsius – Temperature in Celsius.

Returns:

Instance with raw value set for device command.

classmethod from_fahrenheit(fahrenheit: float) Temperature[source]

Create instance from Fahrenheit value (for commands).

Parameters:

fahrenheit – Temperature in Fahrenheit.

Returns:

Instance with raw value set for device command.

classmethod from_preferred(value: float, is_celsius: bool = False) Temperature[source]

Create instance from preferred unit (C or F).

Parameters:
  • value – Temperature value in preferred unit.

  • is_celsius – Whether the input value is in Celsius.

Returns:

Instance with raw value set for device command.

abstractmethod to_celsius() float[source]

Convert to Celsius.

Returns:

Temperature in Celsius.

abstractmethod to_fahrenheit() float[source]

Convert to Fahrenheit.

Returns:

Temperature in Fahrenheit.

to_preferred(is_celsius: bool = False) float[source]

Convert to preferred unit (Celsius or Fahrenheit).

Parameters:

is_celsius – Whether the preferred unit is Celsius.

Returns:

Temperature in Celsius if is_celsius is True, else Fahrenheit.

nwp500.temperature.deci_celsius_to_fahrenheit(value: Any) float[source]

Convert decicelsius to Fahrenheit.

Validator function for Pydantic fields using DeciCelsius format.

Parameters:

value – Raw device value in decicelsius format.

Returns:

Temperature in Fahrenheit.

nwp500.temperature.half_celsius_to_fahrenheit(value: Any) float[source]

Convert half-degrees Celsius to Fahrenheit.

Validator function for Pydantic fields using HalfCelsius format.

Parameters:

value – Raw device value in half-Celsius format.

Returns:

Temperature in Fahrenheit.

nwp500.topic_builder module

MQTT topic building utilities for Navien devices.

All MQTT topic construction goes through this class so that the topic schema is defined in exactly one place.

Topic schema:

Device command (ctrl/query): cmd/{device_type}/navilink-{mac}/{suffix} Device subscribe (wildcard): cmd/{device_type}/navilink-{mac}/# Response (control ack): cmd/{device_type}/navilink-{mac}/{client_id}/res Response (query result): cmd/{device_type}/{client_id}/res/{suffix} Event: evt/{device_type}/navilink-{mac}/{suffix}

class nwp500.topic_builder.MqttTopicBuilder[source]

Bases: object

Helper to construct standard MQTT topics for Navien devices.

static command_topic(device_type: str, mac_address: str, suffix: str = 'ctrl') str[source]

Build a device command topic.

Format: cmd/{device_type}/navilink-{mac}/{suffix}

static device_topic(mac_address: str) str[source]

Get the navilink device path segment from MAC address.

static event_topic(device_type: str, mac_address: str, suffix: str) str[source]

Build a device event topic.

Format: evt/{device_type}/navilink-{mac}/{suffix}

static response_ack_topic(device_type: str, mac_address: str, client_id: str) str[source]

Build the default response topic for control commands.

The device sends its acknowledgement to this topic; the client subscribes via the command_topic(..., "#") wildcard.

Format: cmd/{device_type}/navilink-{mac}/{client_id}/res

static response_topic(device_type: str, client_id: str, suffix: str) str[source]

Build a client-specific response topic for query commands.

Used when the device should reply directly to a client-keyed topic rather than the device topic (e.g. reservation reads, TOU reads, energy queries).

Format: cmd/{device_type}/{client_id}/res/{suffix}

nwp500.unit_system module

Unit system management for temperature, flow rate, and volume conversions.

This module provides context-based unit system management, allowing applications to override the device’s temperature_type setting and specify a preferred measurement system (Metric or Imperial).

The unit system preference can be set at library initialization and is used during model validation to convert device values to the user’s preferred units.

nwp500.unit_system.get_unit_system() Literal['metric', 'us_customary'] | None[source]

Get the currently configured unit system preference.

Returns:

  • “metric”: Celsius, LPM, Liters

  • ”us_customary”: Fahrenheit, GPM, Gallons

  • None: Auto-detect from device (default)

Return type:

The current unit system preference

nwp500.unit_system.is_metric_preferred(override: Literal['metric', 'us_customary'] | None = None) bool[source]

Check if metric (Celsius) is preferred.

Checks the override first, then falls back to the context-configured unit system. Used during validation to determine preferred units.

Parameters:

override – Optional override value. If provided, this takes precedence over the context-configured unit system.

Returns:

True if metric (Celsius) is preferred, False if us_customary (Fahrenheit).

nwp500.unit_system.reset_unit_system() None[source]

Reset unit system preference to auto-detect (None).

This is useful for tests or when switching between different device configurations.

nwp500.unit_system.set_unit_system(unit_system: Literal['metric', 'us_customary'] | None) None[source]

Set preferred unit system for temperature, flow, and volume conversions.

This setting overrides the device’s temperature_type setting and applies to all subsequent model validation operations in the current async context.

Parameters:

unit_system – Preferred unit system: - “metric”: Use Celsius, LPM, and Liters - “us_customary”: Use Fahrenheit, GPM, and Gallons - None: Auto-detect from device’s temperature_type (default)

Example

>>> from nwp500 import set_unit_system
>>> set_unit_system("us_customary")
>>> # All values now in F, GPM, Gallons
>>> set_unit_system(None)  # Reset to auto-detect

Note

This is context-aware and works with async code. Each async task maintains its own unit system preference.

nwp500.unit_system.unit_system_to_temperature_type(unit_system: Literal['metric', 'us_customary'] | None) TemperatureType | None[source]

Convert unit system preference to TemperatureType enum.

Parameters:

unit_system – Unit system preference (“metric”, “us_customary”, or None)

Returns:

  • TemperatureType.CELSIUS for “metric”

  • TemperatureType.FAHRENHEIT for “us_customary”

  • None for None (auto-detect)

nwp500.utils module

General utility functions for the nwp500 library.

This module provides utilities that are used across multiple components, including performance monitoring decorators and helper functions.

nwp500.utils.log_performance(func: F) F[source]

Log execution time for async functions at DEBUG level.

This decorator measures the execution time of async functions and logs the duration when DEBUG logging is enabled. It’s useful for identifying performance bottlenecks and monitoring critical paths.

Parameters:

func – Async function to wrap

Returns:

Wrapped function that logs its execution time

Example:

@log_performance
async def fetch_device_status(device_id: str) -> dict:
    # ... expensive operation ...
    return status

# When called, logs: "fetch_device_status completed in 0.234s"

Note

  • Only logs when DEBUG level is enabled to minimize overhead in production

  • Uses time.perf_counter() for high-resolution timing

  • Preserves function metadata (name, docstring, etc.)

Module contents

Navien NWP500 water heater control library.

This package provides Python bindings for Navien Smart Control API and MQTT communication for NWP500 heat pump water heaters.

exception nwp500.APIError(message: str, code: int | None = None, response: dict[str, Any | None] | None = None, **kwargs: Any)[source]

Bases: Nwp500Error

Raised when API returns an error response.

This exception is raised for various API-related failures including network errors, invalid responses, and API endpoint errors.

message

Error message describing the failure

code

HTTP or API error code

response

Complete API response dictionary (optional)

class nwp500.AuthTokens(*, idToken: str = '', accessToken: str = '', refreshToken: str = '', authenticationExpiresIn: int = 3600, accessKeyId: str | None = None, secretKey: str | None = None, sessionToken: str | None = None, authorizationExpiresIn: int | None = None, issuedAt: datetime = <factory>)[source]

Bases: NavienBaseModel

Authentication tokens and AWS credentials returned from the API.

access_key_id: str | None
access_token: str
property are_aws_credentials_expired: bool

Check if AWS credentials have expired.

AWS credentials have a separate expiration time from JWT tokens. If AWS credentials are expired, a full re-authentication is needed since the token refresh endpoint doesn’t provide new AWS credentials.

Returns:

True if AWS credentials are expired, False if expiration time is unknown or credentials are still valid

authentication_expires_in: int
authorization_expires_in: int | None
property bearer_token: str

Get the formatted Bearer token for Authorization header.

property expires_at: datetime

Get the cached expiration timestamp.

classmethod handle_empty_aliases(data: Any) Any[source]

Handle empty camelCase aliases with snake_case fallbacks.

id_token: str
property is_expired: bool

Check if the access token has expired (cached calculation).

issued_at: datetime
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(_AuthTokens__context: Any) None[source]

Cache the expiration timestamp after initialization.

refresh_token: str
secret_key: str | None
session_token: str | None
property time_until_expiry: timedelta

Get time remaining until token expiration.

Uses cached expiration time for efficiency.

to_dict() dict[str, Any][source]

Convert tokens to a dictionary for serialization.

This includes the calculated issued_at timestamp, which is needed to maintain the correct expiration time when restoring tokens.

exception nwp500.AuthenticationError(message: str, status_code: int | None = None, response: dict[str, Any | None] | None = None, **kwargs: Any)[source]

Bases: Nwp500Error

Base exception for authentication errors.

Raised when authentication-related operations fail, including sign-in, token management, and credential validation.

message

Error message describing the failure

status_code

HTTP status code (optional)

response

Complete API response dictionary (optional)

class nwp500.AuthenticationResponse(*, userInfo: UserInfo, tokens: AuthTokens, legal: list[Any] = <factory>, code: int = 200, msg: str = 'SUCCESS')[source]

Bases: NavienBaseModel

Complete authentication response including user info and tokens.

code: int
legal: list[Any]
message: str
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

tokens: AuthTokens
user_info: UserInfo
classmethod wrap_api_response(data: Any) Any[source]

Handle nested ‘data’ wrapper in API responses.

class nwp500.CommandCode(*values)[source]

Bases: IntEnum

MQTT Command codes for Navien device control.

These command codes are used for MQTT communication with Navien devices. Commands are organized into two categories:

  • Query commands (16777xxx): Request device information

  • Control commands (33554xxx): Change device settings

All commands and their expected payloads are documented in docs/protocol/mqtt_protocol.rst under the “Control Messages” section.

AIR_FILTER_LIFE = 33554474
AIR_FILTER_RESET = 33554473
ANTI_LEGIONELLA_OFF = 33554471
ANTI_LEGIONELLA_ON = 33554472
DEVICE_INFO_REQUEST = 16777217
DHW_MODE = 33554437
DHW_TEMPERATURE = 33554464
DR_OFF = 33554469
DR_ON = 33554470
ENERGY_USAGE_QUERY = 16777225
FREZ_TEMP = 33554451
GOOUT_DAY = 33554466
OTA_CHECK = 33554443
OTA_COMMIT = 33554442
POWER_OFF = 33554433
POWER_ON = 33554434
RECIR_HOT_BTN = 33554444
RECIR_MODE = 33554445
RECIR_RESERVATION = 33554440
RESERVATION_INTELLIGENT_OFF = 33554467
RESERVATION_INTELLIGENT_ON = 33554468
RESERVATION_MANAGEMENT = 16777226
RESERVATION_READ = 16777222
RESERVATION_WATER_PROGRAM = 33554441
RESERVATION_WEEKLY = 33554438
SMART_DIAGNOSTIC = 33554455
STATUS_REQUEST = 16777219
TOU_OFF = 33554475
TOU_ON = 33554476
TOU_RESERVATION = 33554439
WIFI_RECONNECT = 33554446
WIFI_RESET = 33554447
class nwp500.ConnectionDropEvent(timestamp: str, error_name: str | None = None, error_message: str | None = None, error_code: int | None = None, reconnect_attempt: int = 0, duration_connected_seconds: float | None = None, active_subscriptions: int = 0, queued_commands: int = 0)[source]

Bases: object

Record of a single connection drop event.

active_subscriptions: int = 0
duration_connected_seconds: float | None = None
error_code: int | None = None
error_message: str | None = None
error_name: str | None = None
queued_commands: int = 0
reconnect_attempt: int = 0
timestamp: str
to_dict() dict[str, Any][source]

Convert to dictionary.

class nwp500.ConnectionEvent(timestamp: str, event_type: str, session_present: bool = False, return_code: int | None = None, attempt_number: int = 0, time_to_reconnect_seconds: float | None = None)[source]

Bases: object

Record of a connection success/resumption event.

attempt_number: int = 0
event_type: str
return_code: int | None = None
session_present: bool = False
time_to_reconnect_seconds: float | None = None
timestamp: str
to_dict() dict[str, Any][source]

Convert to dictionary.

class nwp500.ConvertedTOUPlan(*, utility: str = '', name: str = '', schedule: list[TOUSchedule] = <factory>)[source]

Bases: NavienBaseModel

A rate plan converted by the Navien backend from OpenEI format.

Returned by POST /device/tou/convert. Contains the utility name, plan name, and device-ready schedule with season/week bitfields and scaled pricing.

model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
schedule: list[TOUSchedule]
utility: str
class nwp500.CurrentOperationMode(*values)[source]

Bases: IntEnum

Current operation mode (real-time operational state).

This enum represents the device’s current actual operational state - what the device is doing RIGHT NOW. These values appear in the operation_mode field and change automatically based on heating demand.

HEAT_PUMP_MODE = 32
HYBRID_BOOST_MODE = 96
HYBRID_EFFICIENCY_MODE = 64
STANDBY = 0
class nwp500.DREvent(*values)[source]

Bases: IntEnum

Demand Response event status.

Allows utilities to manage grid load by signaling water heaters to reduce consumption (shed) or pre-heat (load up) before peak periods.

CPE = 5
LOADUP = 3
LOADUP_ADV = 4
RUN_NORMAL = 1
SHED = 2
UNKNOWN = 0
class nwp500.Device(*, deviceInfo: DeviceInfo, location: Location)[source]

Bases: NavienBaseModel

Complete device information including location.

device_info: DeviceInfo
location: Location
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

with_info(info: DeviceInfo) Self[source]

Return a new Device instance with updated DeviceInfo.

exception nwp500.DeviceCapabilityError(feature_name: str, message: str | None = None)[source]

Bases: DeviceError

Device does not support a requested capability.

Raised when an MQTT command requires a device capability that the device does not support. This may occur when trying to use features that are not available on specific device models or hardware revisions.

feature_name

Name of the unsupported feature

exception nwp500.DeviceError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: Nwp500Error

Base exception for device operations.

All device-related errors inherit from this base class.

class nwp500.DeviceFeature(*, temperatureType: ~nwp500.enums.TemperatureType = TemperatureType.FAHRENHEIT, macAddress: str | None = None, countryCode: int, modelTypeCode: ~nwp500.enums.UnitType | int, controlTypeCode: int, volumeCode: ~typing.Annotated[~nwp500.enums.VolumeCode, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.enum_validator.<locals>.validate, json_schema_input_type=PydanticUndefined)], controllerSwVersion: int, panelSwVersion: int, wifiSwVersion: int, controllerSwCode: int, panelSwCode: int, wifiSwCode: int, recircSwVersion: int, recircModelTypeCode: int, controllerSerialNumber: str, powerUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, holidayUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, programReservationUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, dhwUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, dhwTemperatureSettingUse: ~nwp500.enums.DHWControlTypeFlag, smartDiagnosticUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, wifiRssiUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, tempFormulaType: ~nwp500.enums.TempFormulaType = TempFormulaType.ASYMMETRIC, energyUsageUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, freezeProtectionUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, mixingValveUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, drSettingUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, antiLegionellaSettingUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, hpwhUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, dhwRefillUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, ecoUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, electricUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, heatpumpUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, energySaverUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, highDemandUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, recirculationUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, recircReservationUse: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, title24Use: ~typing.Annotated[bool, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, dhwTemperatureMin: int = None, dhwTemperatureMax: int = None, freezeProtectionTempMin: int = None, freezeProtectionTempMax: int = None, recircTemperatureMin: int = None, recircTemperatureMax: int = None)[source]

Bases: NavienBaseModel

Device capabilities, configuration, and firmware info.

anti_legionella_setting_use: CapabilityFlag
control_type_code: int
controller_serial_number: str
controller_sw_code: int
controller_sw_version: int
country_code: int
dhw_refill_use: CapabilityFlag
property dhw_temperature_max: float
dhw_temperature_max_raw: int
property dhw_temperature_min: float
dhw_temperature_min_raw: int
dhw_temperature_setting_use: DHWControlTypeFlag
dhw_use: CapabilityFlag
dr_setting_use: CapabilityFlag
eco_use: CapabilityFlag
electric_use: CapabilityFlag
energy_saver_use: CapabilityFlag
energy_usage_use: CapabilityFlag
property freeze_protection_temp_max: float
freeze_protection_temp_max_raw: int
property freeze_protection_temp_min: float
freeze_protection_temp_min_raw: int
freeze_protection_use: CapabilityFlag
get_field_unit(field_name: str) str[source]

Get the correct unit suffix based on temperature preference.

Resolves dynamic units for temperature, flow rate, and volume fields that change based on unit system context override or the device’s temperature_type setting (Celsius or Fahrenheit).

Parameters:

field_name – Name of the field to get the unit for

Returns:

Unit string (e.g., “ °C”, “ LPM”, “ L”) or empty if field not found

heatpump_use: CapabilityFlag
high_demand_use: CapabilityFlag
holiday_use: CapabilityFlag
hpwh_use: CapabilityFlag
mac_address: str | None
mixing_valve_use: CapabilityFlag
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_type_code: UnitType | int
panel_sw_code: int
panel_sw_version: int
power_use: CapabilityFlag
program_reservation_use: CapabilityFlag
recirc_model_type_code: int
recirc_reservation_use: CapabilityFlag
recirc_sw_version: int
property recirc_temperature_max: float
recirc_temperature_max_raw: int
property recirc_temperature_min: float
recirc_temperature_min_raw: int
recirculation_use: CapabilityFlag
smart_diagnostic_use: CapabilityFlag
temp_formula_type: TempFormulaType
temperature_type: TemperatureType
title24_use: CapabilityFlag
volume_code: VolumeCodeField
wifi_rssi_use: CapabilityFlag
wifi_sw_code: int
wifi_sw_version: int
class nwp500.DeviceInfo(*, homeSeq: int = 0, macAddress: str = '', additionalValue: str = '', deviceType: DeviceType | int = DeviceType.NPF700_WIFI, deviceName: str = 'Unknown', connected: ConnectionStatus, ~pydantic.functional_validators.BeforeValidator(func=~nwp500.converters.enum_validator.<locals>.validate, json_schema_input_type=PydanticUndefined)] = ConnectionStatus.DISCONNECTED, installType: str | None = None)[source]

Bases: NavienBaseModel

Device information from API.

additional_value: str
connected: ConnectionStatusField
device_name: str
device_type: DeviceType | int
home_seq: int
install_type: str | None
mac_address: str
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

exception nwp500.DeviceNotFoundError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: DeviceError

Requested device not found.

Raised when a device cannot be found in the user’s device list or when attempting to access a non-existent device.

exception nwp500.DeviceOfflineError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: DeviceError

Device is offline or unreachable.

Raised when a device is offline and cannot respond to commands or status requests. The device may be powered off, disconnected from the network, or experiencing connectivity issues.

exception nwp500.DeviceOperationError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: DeviceError

Device operation failed.

Raised when a device operation (mode change, temperature setting, etc.) fails. This may occur due to invalid commands, device restrictions, or device-side errors.

class nwp500.DeviceStatus(*, temperatureType: TemperatureType = TemperatureType.FAHRENHEIT, macAddress: str | None = None, command: int, specialFunctionStatus: int, errorCode: ErrorCode = ErrorCode.NO_ERROR, subErrorCode: int, smartDiagnostic: int, faultStatus1: int, faultStatus2: int, wifiRssi: int = None, dhwChargePer: float, drEventStatus: DREvent = DREvent.UNKNOWN, vacationDaySetting: int, vacationDayElapsed: int, antiLegionellaPeriod: int, programReservationType: int, tempFormulaType: TempFormulaType, outsideTemperature: int = None, currentStatenum: int, targetFanRpm: int, currentFanRpm: int, fanPwm: int, mixingRate: float, eevStep: int, airFilterAlarmPeriod: int, airFilterAlarmElapsed: int, cumulatedOpTimeEvaFan: int, cumulatedDhwFlowRate: int, touStatus: Annotated[bool, BeforeValidator(func=bool, json_schema_input_type=PydanticUndefined)], drOverrideStatus: int, touOverrideStatus: Annotated[bool, BeforeValidator(func=tou_override_to_python, json_schema_input_type=PydanticUndefined)], totalEnergyCapacity: Annotated[float, BeforeValidator(func=mul_10, json_schema_input_type=PydanticUndefined)], availableEnergyCapacity: Annotated[float, BeforeValidator(func=mul_10, json_schema_input_type=PydanticUndefined)], recircOperationMode: RecirculationMode, recircPumpOperationStatus: int, recircHotBtnReady: int, recircOperationReason: int, recircErrorStatus: int, currentInstPower: float, didReload: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], operationBusy: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], freezeProtectionUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], dhwUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], dhwUseSustained: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], dhwOperationBusy: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)] = False, programReservationUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], ecoUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], compUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], eevUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], evaFanUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], shutOffValveUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], conOvrSensorUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], wtrOvrSensorUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], antiLegionellaUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], antiLegionellaOperationBusy: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], errorBuzzerUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], currentHeatUse: HeatSource, heatUpperUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], heatLowerUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], scaldUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], airFilterAlarmUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], recircOperationBusy: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], recircReservationUse: Annotated[bool, BeforeValidator(func=device_bool_to_python, json_schema_input_type=PydanticUndefined)], dhwTemperature: int = None, dhwTemperatureSetting: int = None, dhwTargetTemperatureSetting: int = None, freezeProtectionTemperature: int = None, dhwTemperature2: int = None, hpUpperOnTempSetting: int = None, hpUpperOffTempSetting: int = None, hpLowerOnTempSetting: int = None, hpLowerOffTempSetting: int = None, heUpperOnTempSetting: int = None, heUpperOffTempSetting: int = None, heLowerOnTempSetting: int = None, heLowerOffTempSetting: int = None, heatMinOpTemperature: int = None, recircTempSetting: int = None, recircTemperature: int = None, recircFaucetTemperature: int = None, currentInletTemperature: int = None, currentDhwFlowRate: int, hpUpperOnDiffTempSetting: int, hpUpperOffDiffTempSetting: int, hpLowerOnDiffTempSetting: int, hpLowerOffDiffTempSetting: int, heUpperOnDiffTempSetting: int, heUpperOffDiffTempSetting: int, heLowerOnTDiffempSetting: int, heLowerOffDiffTempSetting: int, recircDhwFlowRate: int, tankUpperTemperature: int = None, tankLowerTemperature: int = None, dischargeTemperature: int = None, suctionTemperature: int = None, evaporatorTemperature: int = None, ambientTemperature: int = None, targetSuperHeat: int = None, currentSuperHeat: int = None, operationMode: CurrentOperationMode = CurrentOperationMode.STANDBY, dhwOperationSetting: DhwOperationSetting = DhwOperationSetting.ENERGY_SAVER, freezeProtectionTempMin: int = 43, freezeProtectionTempMax: int = 65)[source]

Bases: NavienBaseModel

Represents the status of the Navien water heater device.

air_filter_alarm_elapsed: int
air_filter_alarm_period: int
air_filter_alarm_use: DeviceBool
property ambient_temperature: float
ambient_temperature_raw: int
anti_legionella_operation_busy: DeviceBool
anti_legionella_period: int
anti_legionella_use: DeviceBool
available_energy_capacity: TenWhToWh
command: int
comp_use: DeviceBool
con_ovr_sensor_use: DeviceBool
property cumulated_dhw_flow_rate: float
cumulated_dhw_flow_rate_raw: int
cumulated_op_time_eva_fan: int
property current_dhw_flow_rate: float
current_dhw_flow_rate_raw: int
current_fan_rpm: int
current_heat_use: HeatSource
property current_inlet_temperature: float
current_inlet_temperature_raw: int
current_inst_power: float
current_statenum: int
property current_super_heat: float
current_super_heat_raw: int
dhw_charge_per: float
dhw_operation_busy: DeviceBool
dhw_operation_setting: DhwOperationSetting
property dhw_target_temperature_setting: float
dhw_target_temperature_setting_raw: int
property dhw_temperature: float
property dhw_temperature2: float
dhw_temperature2_raw: int
dhw_temperature_raw: int
property dhw_temperature_setting: float
dhw_temperature_setting_raw: int
dhw_use: DeviceBool
dhw_use_sustained: DeviceBool
did_reload: DeviceBool
property discharge_temperature: float
discharge_temperature_raw: int
dr_event_status: DREvent
dr_override_status: int
eco_use: DeviceBool
eev_step: int
eev_use: DeviceBool
error_buzzer_use: DeviceBool
error_code: ErrorCode
eva_fan_use: DeviceBool
property evaporator_temperature: float
evaporator_temperature_raw: int
fan_pwm: int
fault_status1: int
fault_status2: int
property freeze_protection_temp_max: float
freeze_protection_temp_max_raw: int
property freeze_protection_temp_min: float
freeze_protection_temp_min_raw: int
property freeze_protection_temperature: float
freeze_protection_temperature_raw: int
freeze_protection_use: DeviceBool
get_field_unit(field_name: str) str[source]

Get the correct unit suffix based on temperature preference.

Resolves dynamic units for temperature, flow rate, and volume fields that change based on unit system context override or the device’s temperature_type setting (Celsius or Fahrenheit).

Parameters:

field_name – Name of the field to get the unit for

Returns:

Unit string (e.g., “ °C”, “ LPM”, “ L”) or empty if field not found

property he_lower_off_diff_temp_setting: float
he_lower_off_diff_temp_setting_raw: int
property he_lower_off_temp_setting: float
he_lower_off_temp_setting_raw: int
property he_lower_on_diff_temp_setting: float
he_lower_on_diff_temp_setting_raw: int
property he_lower_on_temp_setting: float
he_lower_on_temp_setting_raw: int
property he_upper_off_diff_temp_setting: float
he_upper_off_diff_temp_setting_raw: int
property he_upper_off_temp_setting: float
he_upper_off_temp_setting_raw: int
property he_upper_on_diff_temp_setting: float
he_upper_on_diff_temp_setting_raw: int
property he_upper_on_temp_setting: float
he_upper_on_temp_setting_raw: int
heat_lower_use: DeviceBool
property heat_min_op_temperature: float
heat_min_op_temperature_raw: int
heat_upper_use: DeviceBool
property hp_lower_off_diff_temp_setting: float
hp_lower_off_diff_temp_setting_raw: int
property hp_lower_off_temp_setting: float
hp_lower_off_temp_setting_raw: int
property hp_lower_on_diff_temp_setting: float
hp_lower_on_diff_temp_setting_raw: int
property hp_lower_on_temp_setting: float
hp_lower_on_temp_setting_raw: int
property hp_upper_off_diff_temp_setting: float
hp_upper_off_diff_temp_setting_raw: int
property hp_upper_off_temp_setting: float
hp_upper_off_temp_setting_raw: int
property hp_upper_on_diff_temp_setting: float
hp_upper_on_diff_temp_setting_raw: int
property hp_upper_on_temp_setting: float
hp_upper_on_temp_setting_raw: int
mac_address: str | None
mixing_rate: float
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

operation_busy: DeviceBool
operation_mode: CurrentOperationMode
property outside_temperature: float
outside_temperature_raw: int
program_reservation_type: int
program_reservation_use: DeviceBool
property recirc_dhw_flow_rate: float
recirc_dhw_flow_rate_raw: int
recirc_error_status: int
property recirc_faucet_temperature: float
recirc_faucet_temperature_raw: int
recirc_hot_btn_ready: int
recirc_operation_busy: DeviceBool
recirc_operation_mode: RecirculationMode
recirc_operation_reason: int
recirc_pump_operation_status: int
recirc_reservation_use: DeviceBool
property recirc_temp_setting: float
recirc_temp_setting_raw: int
property recirc_temperature: float
recirc_temperature_raw: int
scald_use: DeviceBool
shut_off_valve_use: DeviceBool
smart_diagnostic: int
special_function_status: int
sub_error_code: int
property suction_temperature: float
suction_temperature_raw: int
property tank_lower_temperature: float
tank_lower_temperature_raw: int
property tank_upper_temperature: float
tank_upper_temperature_raw: int
target_fan_rpm: int
property target_super_heat: float
target_super_heat_raw: int
temp_formula_type: TempFormulaType
temperature_type: TemperatureType
total_energy_capacity: TenWhToWh
tou_override_status: TouOverride
tou_status: TouStatus
vacation_day_elapsed: int
vacation_day_setting: int
wifi_rssi: int
wtr_ovr_sensor_use: DeviceBool
class nwp500.DhwOperationSetting(*values)[source]

Bases: IntEnum

DHW operation setting modes (user-configured heating preferences).

This enum represents the user’s configured mode preference - what heating mode the device should use when it needs to heat water. These values appear in the dhw_operation_setting field and are set via user commands.

ELECTRIC = 2
ENERGY_SAVER = 3
HEAT_PUMP = 1
HIGH_DEMAND = 4
POWER_OFF = 6
VACATION = 5
class nwp500.EnergyUsageDay(*, hpUsage: int = 0, heUsage: int = 0, hpTime: int = 0, heTime: int = 0)[source]

Bases: EnergyUsageBase

Daily energy usage data.

model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class nwp500.EnergyUsageResponse(*, total: EnergyUsageTotal, usage: list[MonthlyEnergyData])[source]

Bases: NavienBaseModel

Response for energy usage query.

get_month_data(year: int, month: int) MonthlyEnergyData | None[source]

Get energy usage data for a specific month.

Parameters:
  • year – Year (e.g., 2025)

  • month – Month (1-12)

Returns:

MonthlyEnergyData for that month, or None if not found

model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

total: EnergyUsageTotal
usage: list[MonthlyEnergyData]
class nwp500.EnergyUsageTotal(*, hpUsage: int = 0, heUsage: int = 0, hpTime: int = 0, heTime: int = 0)[source]

Bases: EnergyUsageBase

Total energy usage data.

property heat_element_percentage: float
property heat_pump_percentage: float
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property total_time: int
class nwp500.ErrorCode(*values)[source]

Bases: IntEnum

Device error codes.

Error codes indicate specific faults detected by the device’s diagnostic system. Most errors are Level 1, allowing continued operation with reduced functionality. See docs/protocol/error_codes.rst for complete troubleshooting guide.

E096_UPPER_HEATER = 96
E097_LOWER_HEATER = 97
E326_DRY_FIRE = 326
E407_DHW_TEMP_SENSOR = 407
E445_MIXING_VALVE = 445
E480_TANK_UPPER_TEMP_SENSOR = 480
E481_TANK_LOWER_TEMP_SENSOR = 481
E515_RELAY_FAULT = 515
E517_DIP_SWITCH = 517
E593_PANEL_KEY = 593
E594_EEPROM = 594
E595_POWER_METER = 595
E596_WIFI = 596
E598_RTC = 598
E615_FEEDBACK = 615
E781_CTA2045 = 781
E798_SHUTOFF_VALVE = 798
E799_WATER_LEAK = 799
E901_ECO = 901
E907_COMPRESSOR_POWER = 907
E908_COMPRESSOR = 908
E909_EVAPORATOR_FAN = 909
E910_DISCHARGE_TEMP_SENSOR = 910
E911_DISCHARGE_TEMP_HIGH = 911
E912_SUCTION_TEMP_SENSOR = 912
E913_SUCTION_TEMP_LOW = 913
E914_EVAPORATOR_TEMP_SENSOR = 914
E915_TEMP_DIFFERENCE = 915
E916_EVAPORATOR_TEMP = 916
E920_AMBIENT_TEMP_SENSOR = 920
E940_REFRIGERANT_BLOCKAGE = 940
E990_CONDENSATE_OVERFLOW = 990
NO_ERROR = 0
class nwp500.EventEmitter[source]

Bases: object

Event emitter with support for multiple listeners per event.

Provides an event-driven architecture for device state changes with: - Multiple listeners per event - Async handler support - One-time listeners (once) - Priority-based execution order - Automatic state change detection

Example:

emitter = EventEmitter()

# Register listeners
emitter.on('temperature_changed', log_temperature)
emitter.on('temperature_changed', update_ui)

# Emit events
await emitter.emit('temperature_changed', temperature_event)

# One-time listener
emitter.once('device_ready', initialize)

# Remove listener
emitter.off('temperature_changed', log_temperature)
async emit(event: str, *args: Any, **kwargs: Any) int[source]

Emit an event to all registered listeners.

Executes listeners in priority order (highest first). One-time listeners are automatically removed after execution.

Parameters:
  • event – Event name to emit

  • *args – Positional arguments to pass to listeners

  • **kwargs – Keyword arguments to pass to listeners

Returns:

Number of listeners that were called

Example:

# Emit with an event object
await emitter.emit('temperature_changed', temperature_event)

# Emit with keyword arguments
await emitter.emit('status_updated', status=device_status)
event_count(event: str) int[source]

Get the number of times an event has been emitted.

Parameters:

event – Event name

Returns:

Number of times event was emitted

Example:

count = emitter.event_count('temperature_changed')
print(f"Event emitted {count} times")
event_names() list[str][source]

Get list of all registered event names.

Returns:

List of event names with active listeners

Example:

events = emitter.event_names()
print(f"Active events: {', '.join(events)}")
listener_count(event: str) int[source]

Get the number of listeners for an event.

Parameters:

event – Event name

Returns:

Number of registered listeners

Example:

count = emitter.listener_count('temperature_changed')
print(f"{count} listeners registered")
off(event: str, callback: Callable[[...], Any | None] | None = None) int[source]

Remove event listener(s).

Parameters:
  • event – Event name

  • callback – Specific callback to remove, or None to remove all for

  • event

Returns:

Number of listeners removed

Example:

# Remove specific listener
emitter.off('temperature_changed', log_temperature)

# Remove all listeners for event
emitter.off('temperature_changed')
on(event: str, callback: Callable[[...], Any], priority: int = 50) None[source]

Register an event listener.

Parameters:
  • event – Event name to listen for

  • callback – Function to call when event is emitted (can be async)

  • priority – Execution priority (higher = earlier, default: 50)

Example:

from nwp500.unit_system import get_unit_system

def on_temp_change(event):
    unit = "°C" if get_unit_system() == "metric" else "°F"
    print(
        f"Temperature: {event.old_temperature}{unit} → "
        f"{event.new_temperature}{unit}"
    )

emitter.on('temperature_changed', on_temp_change)

# Async handler
async def save_to_db(event):
    await db.save(event.new_temperature)

emitter.on('temperature_changed', save_to_db, priority=100)
once(event: str, callback: Callable[[...], Any], priority: int = 50) None[source]

Register a one-time event listener.

The listener will be automatically removed after first execution.

Parameters:
  • event – Event name to listen for

  • callback – Function to call when event is emitted

  • priority – Execution priority (higher = earlier, default: 50)

Example:

emitter.once('device_ready', initialize_device)
# Will only be called once, then auto-removed
remove_all_listeners(event: str | None = None) int[source]

Remove all listeners for an event, or all listeners for all events.

Parameters:

event – Event name, or None to remove all listeners

Returns:

Number of listeners removed

Example:

# Remove all listeners for specific event
emitter.remove_all_listeners('temperature_changed')

# Remove all listeners for all events
emitter.remove_all_listeners()
async wait_for(event: str, timeout: float | None = None) tuple[Any, ...][source]

Wait for an event to be emitted.

Parameters:
  • event – Event name to wait for

  • timeout – Maximum time to wait in seconds (None = wait forever)

Returns:

Tuple of arguments passed to the event

Raises:

asyncio.TimeoutError – If timeout is reached

Example:

# Wait for device to be ready
await emitter.wait_for('device_ready', timeout=30)

# Wait for specific condition
args, _ = await emitter.wait_for('temperature_changed')
temperature_event = args[0]
current_temp = temperature_event.new_temperature
class nwp500.EventListener(callback: Callable[[...], Any], once: bool = False, priority: int = 50)[source]

Bases: object

Represents a registered event listener.

callback: Callable[[...], Any]
once: bool = False
priority: int = 50
class nwp500.FilterChange(*values)[source]

Bases: IntEnum

Air filter status for heat pump models.

NORMAL = 0
REPLACE_NEED = 1
UNKNOWN = 2
class nwp500.FirmwareInfo(*, macAddress: str = '', additionalValue: str = '', deviceType: DeviceType | int = DeviceType.NPF700_WIFI, curSwCode: int = 0, curVersion: int = 0, downloadedVersion: int | None = None, deviceGroup: str | None = None)[source]

Bases: NavienBaseModel

Firmware information for a device.

additional_value: str
cur_sw_code: int
cur_version: int
device_group: str | None
device_type: DeviceType | int
downloaded_version: int | None
mac_address: str
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class nwp500.HeatSource(*values)[source]

Bases: IntEnum

Currently active heat source (read-only status).

This reflects what the device is currently using, not what mode it’s set to. In Hybrid mode, this field shows which source(s) are active at any given moment.

HEATELEMENT = 2
HEATPUMP = 1
HEATPUMP_HEATELEMENT = 3
UNKNOWN = 0
class nwp500.InstallType(*values)[source]

Bases: StrEnum

Installation type classification.

Indicates whether the device is installed for residential or commercial use. This affects warranty terms and service requirements.

COMMERCIAL = 'C'
RESIDENTIAL = 'R'
exception nwp500.InvalidCredentialsError(message: str, status_code: int | None = None, response: dict[str, Any | None] | None = None, **kwargs: Any)[source]

Bases: AuthenticationError

Raised when user credentials are invalid.

This typically indicates a 401 Unauthorized response from the API due to incorrect email/password combination.

class nwp500.Location(*, state: str | None = None, city: str | None = None, address: str | None = None, latitude: float | None = None, longitude: float | None = None, altitude: float | None = None)[source]

Bases: NavienBaseModel

Location information for a device.

address: str | None
altitude: float | None
city: str | None
latitude: float | None
longitude: float | None
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

state: str | None
class nwp500.MonthlyEnergyData(*, year: int, month: int, data: list[EnergyUsageDay])[source]

Bases: NavienBaseModel

Monthly energy usage data grouping.

data: list[EnergyUsageDay]
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

month: int
year: int
class nwp500.MqttClientEvents[source]

Bases: object

Registry of all NavienMqttClient events.

This class provides string constants for all events emitted by NavienMqttClient, with associated event data types documented in their dataclass definitions.

Usage:

mqtt_client.on(
    MqttClientEvents.TEMPERATURE_CHANGED,
    lambda event: update_display(event.new_temperature)
)

# Wait for a specific event
args, _ = await mqtt_client.wait_for(
    MqttClientEvents.CONNECTION_RESUMED
)
connection_event = args[0]

# List all available events
events = ', '.join(MqttClientEvents.get_all_events())
print(f"Available events: {events}")

See also

../guides/event_system - Comprehensive event handling guide

CONNECTION_INTERRUPTED = 'connection_interrupted'

MQTT connection interrupted with error.

Parameters:

event (ConnectionInterruptedEvent) – Event object with the error field.

See: ConnectionInterruptedEvent

Type:

Emitted

CONNECTION_RESUMED = 'connection_resumed'

MQTT connection resumed after interruption.

Parameters:

event (ConnectionResumedEvent) – Event object with return_code and session_present fields.

See: ConnectionResumedEvent

Type:

Emitted

ERROR_CLEARED = 'error_cleared'

Device error cleared.

Parameters:

event (ErrorClearedEvent) – Event object with the error_code field.

See: ErrorClearedEvent

Type:

Emitted

ERROR_DETECTED = 'error_detected'

Device error detected.

Parameters:

event (ErrorDetectedEvent) – Event object with error_code and status fields.

See: ErrorDetectedEvent

Type:

Emitted

FEATURE_RECEIVED = 'feature_received'

Device feature information received.

Parameters:

event (FeatureReceivedEvent) – Event object with the feature field.

See: FeatureReceivedEvent

Type:

Emitted

HEATING_STARTED = 'heating_started'

Device started heating.

Parameters:

event (HeatingStartedEvent) – Event object with the status field.

See: HeatingStartedEvent

Type:

Emitted

HEATING_STOPPED = 'heating_stopped'

Device stopped heating.

Parameters:

event (HeatingStoppedEvent) – Event object with the status field.

See: HeatingStoppedEvent

Type:

Emitted

MODE_CHANGED = 'mode_changed'

Device operation mode changed.

Parameters:

event (ModeChangedEvent) – Event object with old_mode and new_mode fields.

See: ModeChangedEvent

Type:

Emitted

POWER_CHANGED = 'power_changed'

Instantaneous power consumption changed.

Parameters:

event (PowerChangedEvent) – Event object with old_power and new_power fields.

See: PowerChangedEvent

Type:

Emitted

STATUS_RECEIVED = 'status_received'

Device status message received.

Parameters:

event (StatusReceivedEvent) – Event object with the status field.

See: StatusReceivedEvent

Type:

Emitted

TEMPERATURE_CHANGED = 'temperature_changed'

DHW temperature changed.

Parameters:

event (TemperatureChangedEvent) – Event object with old_temperature and new_temperature fields.

See: TemperatureChangedEvent

Type:

Emitted

classmethod get_all_events() list[str][source]

Get list of all available event names.

Returns:

List of event constant names (not including metadata strings)

Example:

for event_name in MqttClientEvents.get_all_events():
    print(f"- {event_name}")

# Output:
# - CONNECTION_INTERRUPTED
# - CONNECTION_RESUMED
# - STATUS_RECEIVED
# - TEMPERATURE_CHANGED
# - ...
classmethod get_event_value(event_name: str) str[source]

Get the string value of an event constant.

Parameters:

event_name – Event constant name (e.g., “TEMPERATURE_CHANGED”)

Returns:

Event string value (e.g., “temperature_changed”)

Raises:

AttributeError – If event_name does not exist

Example:

value = MqttClientEvents.get_event_value("TEMPERATURE_CHANGED")
print(value)  # Output: "temperature_changed"
class nwp500.MqttCommand(*, clientID: str, sessionID: str, requestTopic: str, responseTopic: str, request: MqttRequest | dict[str, Any], protocolVersion: int = 2)[source]

Bases: NavienBaseModel

Represents an MQTT command message.

client_id: str
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

protocol_version: int
request: MqttRequest | dict[str, Any]
request_topic: str
response_topic: str
session_id: str
class nwp500.MqttConnectionConfig(endpoint: str = 'a1t30mldyslmuq-ats.iot.us-east-1.amazonaws.com', region: str = 'us-east-1', client_id: str | None = None, clean_session: bool = True, keep_alive_secs: int = 1200, auto_reconnect: bool = True, max_reconnect_attempts: int = -1, initial_reconnect_delay: float = 1.0, max_reconnect_delay: float = 120.0, reconnect_backoff_multiplier: float = 2.0, deep_reconnect_threshold: int = 10, enable_command_queue: bool = True, max_queued_commands: int = 100)[source]

Bases: object

Configuration for MQTT connection.

endpoint

AWS IoT endpoint URL

Type:

str

region

AWS region

Type:

str

client_id

MQTT client ID (auto-generated if None)

Type:

str | None

clean_session

Whether to start with a clean session

Type:

bool

keep_alive_secs

Keep-alive interval in seconds

Type:

int

auto_reconnect

Enable automatic reconnection

Type:

bool

max_reconnect_attempts

Maximum reconnection attempts (-1 for unlimited)

Type:

int

initial_reconnect_delay

Initial delay between reconnect attempts

Type:

float

max_reconnect_delay

Maximum delay between reconnect attempts

Type:

float

reconnect_backoff_multiplier

Exponential backoff multiplier

Type:

float

deep_reconnect_threshold

Attempt count to trigger full connection rebuild

Type:

int

enable_command_queue

Enable command queueing when disconnected

Type:

bool

max_queued_commands

Maximum number of queued commands

Type:

int

auto_reconnect: bool = True
clean_session: bool = True
client_id: str | None = None
deep_reconnect_threshold: int = 10
enable_command_queue: bool = True
endpoint: str = 'a1t30mldyslmuq-ats.iot.us-east-1.amazonaws.com'
initial_reconnect_delay: float = 1.0
keep_alive_secs: int = 1200
max_queued_commands: int = 100
max_reconnect_attempts: int = -1
max_reconnect_delay: float = 120.0
reconnect_backoff_multiplier: float = 2.0
region: str = 'us-east-1'
exception nwp500.MqttConnectionError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: MqttError

Connection establishment or maintenance failed.

Raised when the MQTT connection to AWS IoT Core cannot be established or when an existing connection fails. This may be due to network issues, invalid credentials, or AWS service problems.

exception nwp500.MqttCredentialsError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: MqttError

AWS credentials invalid or expired.

Raised when AWS IoT credentials are missing, invalid, or expired. Re-authentication may be required to obtain fresh credentials.

class nwp500.MqttDeviceCapabilityChecker[source]

Bases: object

Generalized MQTT device capability checker using a capability map.

This class uses a mapping of controllable feature names to their check functions, allowing capabilities to be validated in a centralized, extensible way without requiring individual methods for each control.

classmethod assert_supported(feature: str, device_features: DeviceFeature) None[source]

Assert that device supports control of a feature.

Parameters:
  • feature – Name of the controllable feature to check

  • device_features – Device feature information

Raises:
classmethod get_available_controls(device_features: DeviceFeature) dict[str, bool][source]

Get all controllable features available on a device.

Parameters:

device_features – Device feature information

Returns:

Dictionary mapping feature names to whether they can be controlled

classmethod register_capability(name: str, check_fn: CapabilityCheckFn) None[source]

Register a custom controllable feature check.

This allows extensions or applications to define custom capability checks without modifying the core library.

Parameters:
  • name – Feature name

  • check_fn – Function that takes DeviceFeature and returns bool

classmethod supports(feature: str, device_features: DeviceFeature) bool[source]

Check if device supports control of a specific feature.

Parameters:
  • feature – Name of the controllable feature to check

  • device_features – Device feature information

Returns:

True if feature control is supported, False otherwise

Raises:

ValueError – If feature is not recognized

class nwp500.MqttDeviceInfoCache(update_interval_minutes: int = 30)[source]

Bases: object

Manages caching of device information with periodic updates.

This cache stores device features (capabilities, firmware info, etc.) and automatically refreshes them at regular intervals to keep data synchronized with the actual device state.

The cache is keyed by device MAC address, allowing support for multiple devices connected to the same MQTT client.

async clear() None[source]

Clear all cached device information.

async get(device_mac: str) DeviceFeature | None[source]

Get cached device features if available and not expired.

Parameters:

device_mac – Device MAC address

Returns:

Cached DeviceFeature if available, None otherwise

async get_all_cached() dict[str, DeviceFeature][source]

Get all currently cached device features.

Returns:

Dictionary mapping MAC addresses to DeviceFeature objects

async get_cache_info() CacheInfoResult[source]

Get cache statistics and metadata.

Returns:

  • device_count: Number of cached devices

  • update_interval_minutes: Cache update interval in minutes

  • devices: List of device cache metadata

Return type:

Dictionary with cache info including

async invalidate(device_mac: str) None[source]

Invalidate cache entry for a device.

Forces a refresh on next request.

Parameters:

device_mac – Device MAC address

is_expired(timestamp: datetime) bool[source]

Check if a cache entry is expired.

Parameters:

timestamp – When the cache entry was created

Returns:

True if expired, False if still fresh

async set(device_mac: str, features: DeviceFeature) None[source]

Cache device features with current timestamp.

Parameters:
  • device_mac – Device MAC address

  • features – Device feature information to cache

class nwp500.MqttDiagnosticsCollector(max_events_retained: int = 1000, enable_verbose_logging: bool = False)[source]

Bases: object

Collects detailed diagnostics and metrics for MQTT connection analysis.

This collector tracks: - Connection drop events with error details - Connection recovery timeline - Error frequency and patterns - Session duration statistics - Network topology and timing information

For debugging: - Export logs to JSON for correlation with AWS CloudWatch - Enables continuous monitoring with configurable retention

export_json() str[source]

Export all collected diagnostics as JSON.

Returns:

JSON string suitable for storing or sending to monitoring systems

get_metrics() MqttMetrics[source]

Get current aggregate metrics.

get_recent_connections(limit: int = 10) list[ConnectionEvent][source]

Get the N most recent connection events.

get_recent_drops(limit: int = 10) list[ConnectionDropEvent][source]

Get the N most recent connection drop events.

on_connection_drop(callback: Callable[[ConnectionDropEvent], None]) None[source]

Register a callback to be invoked on each connection drop event.

Parameters:

callback – Function that receives ConnectionDropEvent

print_summary() None[source]

Print a human-readable summary of diagnostics.

async record_connection_drop(error: Exception | None = None, reconnect_attempt: int = 0, active_subscriptions: int = 0, queued_commands: int = 0) None[source]

Record a connection drop event.

Parameters:
  • error – The exception that caused the drop

  • reconnect_attempt – Which reconnection attempt this is (0 = initial)

  • active_subscriptions – Number of active subscriptions at time of drop

  • queued_commands – Number of commands in the queue

async record_connection_success(event_type: str = 'connected', session_present: bool = False, return_code: int | None = None, attempt_number: int = 0) None[source]

Record a successful connection or reconnection event.

Parameters:
  • event_type – “connected”, “resumed”, or “deep_reconnected”

  • session_present – Whether MQTT session was present

  • return_code – MQTT return code

  • attempt_number – Reconnection attempt number (0 = initial connect)

record_publish(queued: bool = False) None[source]

Record a publish/queue operation.

async update_metrics() None[source]

Update current metrics (e.g., current session uptime).

exception nwp500.MqttError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: Nwp500Error

Base exception for MQTT operations.

All MQTT-related errors inherit from this base class, allowing consumers to handle all MQTT issues with a single exception handler.

class nwp500.MqttMetrics(total_connections: int = 0, total_disconnects: int = 0, total_connection_drops: int = 0, total_reconnect_attempts: int = 0, longest_session_seconds: float = 0.0, shortest_session_seconds: float = inf, average_session_seconds: float = 0.0, current_session_uptime_seconds: float = 0.0, connection_drops_by_error: dict[str, int]=<factory>, reconnection_attempts_distribution: dict[str, int]=<factory>, last_drop_timestamp: str | None = None, last_successful_connect_timestamp: str | None = None, connection_recovered: int = 0, messages_published: int = 0, messages_queued: int = 0)[source]

Bases: object

Aggregate metrics for MQTT connection stability.

average_session_seconds: float = 0.0
connection_drops_by_error: dict[str, int]
connection_recovered: int = 0
current_session_uptime_seconds: float = 0.0
last_drop_timestamp: str | None = None
last_successful_connect_timestamp: str | None = None
longest_session_seconds: float = 0.0
messages_published: int = 0
messages_queued: int = 0
reconnection_attempts_distribution: dict[str, int]
shortest_session_seconds: float = inf
to_dict() dict[str, Any][source]

Convert to dictionary for JSON serialization.

total_connection_drops: int = 0
total_connections: int = 0
total_disconnects: int = 0
total_reconnect_attempts: int = 0
exception nwp500.MqttNotConnectedError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: MqttError

Operation requires active MQTT connection.

Raised when attempting MQTT operations (publish, subscribe, etc.) without an established connection. Call connect() before performing MQTT operations.

Example:

mqtt_client = NavienMqttClient(auth_client)
# Must connect first
await mqtt_client.connect()
await mqtt_client.request_device_status(device)
exception nwp500.MqttPublishError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: MqttError

Failed to publish message to MQTT broker.

Raised when a message cannot be published to an MQTT topic. This may occur during connection interruptions or when the broker rejects the message.

class nwp500.MqttRequest(*, command: int, deviceType: DeviceType | int, macAddress: str, additionalValue: str = '...', mode: str | None = None, param: list[int | float] = <factory>, paramStr: str = '', month: list[int] | None = None, year: int | None = None)[source]

Bases: NavienBaseModel

MQTT command request payload.

additional_value: str
command: int
device_type: DeviceType | int
mac_address: str
mode: str | None
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

month: list[int] | None
param: list[int | float]
param_str: str
year: int | None
exception nwp500.MqttSubscriptionError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: MqttError

Failed to subscribe to MQTT topic.

Raised when subscription to an MQTT topic fails. This may occur if the connection is interrupted or if the client lacks permissions for the topic.

class nwp500.NavienAPIClient(auth_client: NavienAuthClient, base_url: str = 'https://nlus.naviensmartcontrol.com/api/v2.1', session: ClientSession | None = None, unit_system: Literal['metric', 'us_customary'] | None = None)[source]

Bases: object

High-level client for Navien Smart Control REST API.

This client implements all endpoints from the OpenAPI specification and automatically handles authentication, token refresh, and error handling.

The client requires an authenticated NavienAuthClient to be provided.

Example

>>> async with NavienAuthClient() as auth_client:
...     await auth_client.sign_in("user@example.com", "password")
...     api_client = NavienAPIClient(auth_client=auth_client)
...     devices = await api_client.list_devices()
async convert_tou(source_data: list[dict[str, Any]], source_type: str = 'openei', source_version: int = 7) list[ConvertedTOUPlan][source]

Convert OpenEI rate plans to device TOU format.

Sends raw OpenEI rate plan data to the Navien backend for conversion into device-ready TOU schedules with season/week bitfields and scaled pricing.

Parameters:
  • source_data – List of OpenEI rate plan dictionaries

  • source_type – Data source type (default: “openei”)

  • source_version – API version (default: 7)

Returns:

List of ConvertedTOUPlan objects with device-ready schedules

Raises:
async get_device_info(mac_address: str, additional_value: str = '') Device[source]

Get detailed information about a specific device.

Parameters:
  • mac_address – Device MAC address

  • additional_value – Additional device identifier (optional)

Returns:

Device object with detailed information

Raises:
async get_firmware_info(mac_address: str, additional_value: str = '') list[FirmwareInfo][source]

Get firmware information for a specific device.

Parameters:
  • mac_address – Device MAC address

  • additional_value – Additional device identifier (optional)

Returns:

List of FirmwareInfo objects

Raises:
async get_first_device() Device | None[source]

Get the first device associated with the user.

Returns:

First Device object or None if no devices

async get_tou_info(mac_address: str, additional_value: str, controller_id: str, user_type: str = 'O') TOUInfo[source]

Get Time of Use (TOU) information for a device.

Parameters:
  • mac_address – Device MAC address

  • additional_value – Additional device identifier

  • controller_id – Controller ID

  • user_type – User type (default: “O”)

Returns:

TOUInfo object

Raises:
property is_authenticated: bool

Check if client is authenticated.

async list_devices(offset: int = 0, count: int = 20) list[Device][source]

List all devices associated with the user.

Parameters:
  • offset – Pagination offset (default: 0)

  • count – Number of devices to return (default: 20)

Returns:

List of Device objects

Raises:
async update_push_token(push_token: str, model_name: str = 'Python Client', app_version: str = '1.0.0', os: str = 'Python', os_version: str = '3.8+') bool[source]

Update push notification token.

Parameters:
  • push_token – Push notification token

  • model_name – Device model name (default: “Python Client”)

  • app_version – Application version (default: “1.0.0”)

  • os – Operating system (default: “Python”)

  • os_version – OS version (default: “3.8+”)

Returns:

True if successful

Raises:
async update_tou(mac_address: str, additional_value: str, tou_info: dict[str, Any], source_data: dict[str, Any], zip_code: str, register_path: str = 'wifi', source_type: str = 'openei', user_type: str = 'O') TOUInfo[source]

Apply a TOU rate plan to a device.

Parameters:
  • mac_address – Device MAC address

  • additional_value – Additional device identifier

  • tou_info – Converted TOU info dict (name, schedule, utility, zipCode)

  • source_data – Original OpenEI rate plan dictionary

  • zip_code – Service area zip code

  • register_path – Device connection type (default: “wifi”)

  • source_type – Data source type (default: “openei”)

  • user_type – User type (default: “O”)

Returns:

TOUInfo object with the applied configuration

Raises:
property user_email: str | None

Get current user email.

class nwp500.NavienAuthClient(user_id: str, password: str, base_url: str = 'https://nlus.naviensmartcontrol.com/api/v2.1', session: ClientSession | None = None, timeout: int = 30, stored_tokens: AuthTokens | None = None, unit_system: Literal['metric', 'us_customary'] | None = None)[source]

Bases: object

Asynchronous client for Navien Smart Control API authentication.

This client handles: - User authentication with email/password - Token management and automatic refresh - Session management via aiohttp ClientSession - AWS credentials (if provided by API)

Session and Context Manager

The auth client manages an aiohttp session that is shared with other clients (API, MQTT). The session is created when entering the context manager and closed when exiting.

Authentication is performed automatically when entering the async context manager, unless valid stored tokens are provided.

Important: All API and MQTT clients must be created and used within the context manager. Once the context manager exits, the session is closed and clients can no longer be used.

Example

>>> async with NavienAuthClient(user_id="user@example.com",
password="password") as client:
...     print(f"Welcome {client.current_user.full_name}")
...     # Token is securely stored and not printed in production
...
...     # Create other clients within the context
...     api_client = NavienAPIClient(auth_client=client)
...     mqtt_client = NavienMqttClient(auth_client=client)
...
...     # Use the clients
...     devices = await api_client.list_devices()
...     await mqtt_client.connect()

Restore session from stored tokens: >>> stored_tokens = AuthTokens.model_validate(saved_data) >>> async with NavienAuthClient( … user_id=”user@example.com”, … password=”password”, … stored_tokens=stored_tokens … ) as client: … # Authentication skipped if tokens are still valid … print(f”Welcome {client.current_user.full_name}”)

property auth_response: AuthenticationResponse | None

Get the complete authentication response.

async close() None[source]

Close the aiohttp session if we own it.

property current_tokens: AuthTokens | None

Get current authentication tokens.

property current_user: UserInfo | None

Get current authenticated user info.

async ensure_valid_token() AuthTokens | None[source]

Ensure we have a valid access token, refreshing if necessary.

This method checks both JWT token and AWS credentials expiration. If AWS credentials are expired, it triggers a full re-authentication since the token refresh endpoint doesn’t provide new AWS credentials.

Returns:

Valid AuthTokens or None if not authenticated

Raises:
get_auth_headers() dict[str, str][source]

Get headers for authenticated requests.

Returns:

Dictionary of headers to include in requests

Note

Based on HAR analysis of actual API traffic, the authorization header uses the raw token without ‘Bearer ‘ prefix (lowercase ‘authorization’). This is different from standard Bearer token authentication.

property has_stored_credentials: bool

Check if user credentials are stored for re-authentication.

Returns:

True if both user_id and password are available for re-auth

property has_valid_tokens: bool

Check if both JWT and AWS credentials are valid and not expired.

Returns True only if tokens exist AND neither JWT tokens nor AWS credentials have expired. This is useful for pre-flight checks before operations that require valid credentials (e.g., MQTT connection).

Returns:

True if tokens exist AND not expired (JWT + AWS creds), False otherwise

property is_authenticated: bool

Check if client is currently authenticated.

async re_authenticate() AuthenticationResponse[source]

Re-authenticate using stored credentials.

This is a convenience method that uses the stored user_id and password from initialization to perform a fresh sign-in. Useful for recovering from expired tokens or connection issues.

Returns:

AuthenticationResponse with fresh tokens and user info

Raises:

Example

>>> client = NavienAuthClient(email, password)
>>> await client.re_authenticate()  # Uses stored credentials
async refresh_token(refresh_token: str | None = None) AuthTokens[source]

Refresh access token using refresh token.

Parameters:

refresh_token – The refresh token obtained from sign-in. If not provided, uses the stored refresh token.

Returns:

New AuthTokens with refreshed access token

Raises:

TokenRefreshError – If token refresh fails or no token available

property session: ClientSession | None

Get the active aiohttp session.

async sign_in(user_id: str, password: str) AuthenticationResponse[source]

Authenticate user and obtain tokens.

Parameters:
  • user_id – User email address

  • password – User password

Returns:

AuthenticationResponse containing user info and tokens

Raises:
property user_email: str | None

Get the email address of the authenticated user.

class nwp500.NavienMqttClient(auth_client: NavienAuthClient, config: MqttConnectionConfig | None = None, unit_system: Literal['metric', 'us_customary'] | None = None)[source]

Bases: EventEmitter

Async MQTT client for Navien device communication over AWS IoT.

This client establishes WebSocket connections to AWS IoT Core using temporary AWS credentials from the authentication API. It handles: - Connection management with automatic reconnection and exponential backoff - Topic subscriptions for device events and responses - Command publishing for device control - Message routing and callbacks - Command queuing when disconnected (sends when reconnected) - Event-driven architecture with state change detection

The client extends EventEmitter to provide an event-driven architecture: - Multiple listeners per event - State change detection (temperature_changed, mode_changed, etc.) - Async handler support - Priority-based execution

The client automatically reconnects when the connection is interrupted, using exponential backoff (default: 1s, 2s, 4s, 8s, … up to 120s). Reconnection behavior can be customized via MqttConnectionConfig.

When enabled, the command queue stores commands sent while disconnected and automatically sends them when the connection is restored. This ensures commands are not lost during temporary network interruptions.

Example (Traditional Callbacks):

>>> async with NavienAuthClient(email, password) as auth_client:
...     mqtt_client = NavienMqttClient(auth_client)
...     await mqtt_client.connect()
...
...     # Traditional callback style
...     await mqtt_client.subscribe_device_status(device, on_status)

Example (Event Emitter):

>>> from nwp500.mqtt_events import MqttClientEvents
>>> mqtt_client = NavienMqttClient(auth_client)
...
... # Type-safe event listeners with IDE autocomplete
... mqtt_client.on(
...     MqttClientEvents.TEMPERATURE_CHANGED,
...     lambda event: log_temperature(event.new_temperature),
... )
... mqtt_client.on(MqttClientEvents.TEMPERATURE_CHANGED, update_ui)
... mqtt_client.on(
...     MqttClientEvents.MODE_CHANGED, handle_mode_change
... )
...
... # One-time listener
... mqtt_client.once(MqttClientEvents.STATUS_RECEIVED, initialize)
...
... await mqtt_client.connect()
Events Emitted:

See nwp500.mqtt_events.MqttClientEvents for a complete, type-safe registry of all events with full documentation.

Key events include: - status_received: Raw status update - feature_received: Device feature/capability information - temperature_changed: DHW temperature changed - mode_changed: Operation mode changed - power_changed: Power consumption changed - heating_started: Device started heating - heating_stopped: Device stopped heating - error_detected: Device error occurred - error_cleared: Device error resolved - connection_interrupted: Connection lost - connection_resumed: Connection restored

async check_firmware_update(device: Device) int[source]

Check for available over-the-air firmware updates.

clear_command_queue() int[source]

Clear all queued commands. …

property client_id: str

Get client ID.

async commit_firmware_update(device: Device, payload: OtaCommitPayload) int[source]

Commit a previously downloaded firmware update.

async configure_recirculation_schedule(device: Device, schedule: RecirculationSchedule) int[source]

Configure the recirculation pump timed schedule.

async configure_reservation_water_program(device: Device) int[source]

Enable/configure water program reservation mode.

async configure_tou_schedule(device: Device, controller_serial_number: str, periods: Sequence[dict[str, Any]], *, enabled: bool = True) int[source]

Configure the Time-of-Use rate schedule.

async connect() bool[source]

Establish connection to AWS IoT Core.

Ensures tokens are valid before connecting and refreshes if necessary.

Returns:

True if connection successful

Raises:

Exception – If connection fails

property control: MqttDeviceController

Deprecated access to device controller.

property diagnostics: MqttDiagnosticsCollector

Get the diagnostics collector instance.

async disable_anti_legionella(device: Device) int[source]

Disable the Anti-Legionella disinfection cycle.

async disable_demand_response(device: Device) int[source]

Disable utility demand response participation.

async disable_intelligent_scheduling(device: Device) int[source]

Disable intelligent/adaptive heating mode.

async disconnect() None[source]

Disconnect from AWS IoT Core and stop all periodic tasks.

async enable_anti_legionella(device: Device, period_days: int) int[source]

Enable Anti-Legionella disinfection.

async enable_demand_response(device: Device) int[source]

Enable utility demand response participation.

async enable_intelligent_scheduling(device: Device) int[source]

Enable intelligent/adaptive heating mode.

async ensure_device_info_cached(device: Device, timeout: float = 30.0) bool[source]

Ensure device info is cached, requesting if necessary.

Called by control commands and CLI to ensure device capabilities are available before execution.

Parameters:
  • device – Device to ensure info for

  • timeout – Maximum time to wait for response (default: 30 seconds)

Returns:

True if device info was successfully cached, False on timeout

Raises:

MqttNotConnectedError – If not connected

property is_connected: bool

Check if client is connected.

property is_reconnecting: bool

Check if client is currently attempting to reconnect.

async publish(topic: str, payload: dict[str, Any], qos: QoS = QoS.AT_LEAST_ONCE) int[source]

Publish a message to an MQTT topic.

If not connected and command queue is enabled, the command will be queued and sent automatically when the connection is restored.

Parameters:
  • topic – MQTT topic to publish to

  • payload – Message payload (will be JSON-encoded)

  • qos – Quality of Service level

Returns:

Publish packet ID (or 0 if queued)

Raises:

RuntimeError – If not connected and command queue is disabled

property queued_commands_count: int

Get the number of commands currently queued.

property reconnect_attempts: int

Get the number of reconnection attempts made.

async reconnect_wifi(device: Device) int[source]

Trigger a WiFi reconnection on the device.

async recover_connection() bool[source]

Recover from authentication-related connection failures.

This method is useful when MQTT connection fails due to stale/expired authentication tokens. It refreshes the tokens and attempts to reconnect the MQTT client.

Returns:

True if recovery was successful and MQTT is reconnected, False otherwise

Raises:

Example

>>> mqtt_client = NavienMqttClient(auth_client)
>>> try:
...     await mqtt_client.connect()
... except MqttConnectionError:
...     # Connection may have failed due to stale tokens
...     if await mqtt_client.recover_connection():
...         print("Successfully recovered connection")
...     else:
...         print("Recovery failed, check logs")
async request_device_info(device: Device) int[source]

Request device information (features, firmware, etc.).

async request_device_status(device: Device) int[source]

Request general device status.

async request_energy_usage(device: Device, year: int, months: list[int]) int[source]

Request daily energy usage data for specified month(s).

async request_reservations(device: Device) int[source]

Request the current reservation program from the device.

async request_tou_settings(device: Device, controller_serial_number: str) int[source]

Request the current TOU settings from the device.

async reset_air_filter(device: Device) int[source]

Reset air filter maintenance timer.

async reset_reconnect() None[source]

Reset reconnection state and trigger a new reconnection attempt. …

async reset_wifi(device: Device) int[source]

Reset WiFi settings to factory defaults.

async run_smart_diagnostic(device: Device) int[source]

Trigger the smart diagnostic routine on the device.

property session_id: str

Get session ID.

async set_dhw_mode(device: Device, mode_id: int, vacation_days: int | None = None) int[source]

Set DHW operation mode.

async set_dhw_temperature(device: Device, temperature: float) int[source]

Set DHW target temperature in the user’s preferred unit.

async set_freeze_protection_temperature(device: Device, temperature: float) int[source]

Set the freeze protection activation temperature.

async set_power(device: Device, power_on: bool) int[source]

Turn device on or off.

async set_recirculation_mode(device: Device, mode: int) int[source]

Set recirculation pump operation mode (1-4).

async set_tou_enabled(device: Device, enabled: bool) int[source]

Enable or disable Time-of-Use optimization.

async set_vacation_days(device: Device, days: int) int[source]

Set vacation/away mode duration (1-30 days).

async signal_app_connection(device: Device) int[source]

Signal that the app has connected.

async start_periodic_requests(device: Device, request_type: PeriodicRequestType = PeriodicRequestType.DEVICE_STATUS, period_seconds: float = 300.0) None[source]

Start sending periodic requests for device information or status. …

async stop_all_periodic_tasks(_reason: str | None = None) None[source]

Stop all periodic request tasks. …

async stop_periodic_requests(device: Device, request_type: PeriodicRequestType | None = None) None[source]

Stop sending periodic requests for a device. …

async subscribe(topic: str, callback: Callable[[str, dict[str, Any]], None], qos: QoS = QoS.AT_LEAST_ONCE) int[source]

Subscribe to an MQTT topic.

Parameters:
  • topic – MQTT topic to subscribe to (can include wildcards)

  • callback – Function to call when messages arrive (topic, message)

  • qos – Quality of Service level

Returns:

Subscription packet ID

Raises:

Exception – If subscription fails

async subscribe_device(device: Device, callback: Callable[[str, dict[str, Any]], None]) int[source]

Subscribe to all messages from a specific device.

Parameters:
  • device – Device object

  • callback – Message handler

Returns:

Subscription packet ID

async subscribe_device_feature(device: Device, callback: Callable[[DeviceFeature], None]) int[source]

Subscribe to device feature/info messages with automatic parsing.

async subscribe_device_status(device: Device, callback: Callable[[DeviceStatus], None]) int[source]

Subscribe to device status messages with automatic parsing.

async subscribe_energy_usage(device: Device, callback: Callable[[EnergyUsageResponse], None]) int[source]

Subscribe to energy usage query responses with automatic parsing.

async subscribe_recirculation_schedule_response(device: Device, callback: Callable[[RecirculationSchedule], None]) int[source]

Subscribe to recirculation schedule read responses.

async subscribe_reservation_response(device: Device, callback: Callable[[ReservationSchedule], None]) int[source]

Subscribe to reservation read responses with automatic parsing.

async subscribe_tou_response(device: Device, callback: Callable[[TOUReservationSchedule], None]) int[source]

Subscribe to Time-of-Use schedule read responses with automatic parsing.

Subscribes to the tou/rd response topic for the given device. The callback receives a fully-parsed TOUReservationSchedule whenever the device responds to a TOU read or configure request (triggered by request_tou_settings() or configure_tou_schedule()).

Parameters:
  • device – Device whose TOU responses to receive.

  • callback – Called with the parsed schedule on each response.

Returns:

Publish packet ID from the MQTT subscribe call.

async subscribe_weekly_reservation_response(device: Device, callback: Callable[[WeeklyReservationSchedule], None]) int[source]

Subscribe to weekly reservation read responses.

async trigger_recirculation_hot_button(device: Device) int[source]

Manually trigger the recirculation pump hot button.

async unsubscribe(topic: str) int[source]

Unsubscribe from an MQTT topic.

Parameters:

topic – MQTT topic to unsubscribe from

Returns:

Unsubscribe packet ID

Raises:

Exception – If unsubscribe fails

async unsubscribe_device_feature(device: Device, callback: Callable[[DeviceFeature], None]) None[source]

Unsubscribe a specific device feature callback.

async unsubscribe_device_status(device: Device, callback: Callable[[DeviceStatus], None]) None[source]

Unsubscribe a specific device status callback.

async unsubscribe_energy_usage(device: Device, callback: Callable[[EnergyUsageResponse], None]) None[source]

Unsubscribe a specific energy usage callback.

async unsubscribe_recirculation_schedule_response(device: Device, callback: Callable[[RecirculationSchedule], None]) None[source]

Unsubscribe a specific recirculation schedule callback.

async unsubscribe_reservation_response(device: Device, callback: Callable[[ReservationSchedule], None]) None[source]

Unsubscribe a specific reservation response callback.

async unsubscribe_tou_response(device: Device, callback: Callable[[TOUReservationSchedule], None]) None[source]

Unsubscribe a specific TOU response callback.

async unsubscribe_weekly_reservation_response(device: Device, callback: Callable[[WeeklyReservationSchedule], None]) None[source]

Unsubscribe a specific weekly reservation callback.

async update_reservations(device: Device, reservations: Sequence[dict[str, Any]], *, enabled: bool = True) int[source]

Update programmed reservations.

async update_weekly_reservation(device: Device, schedule: WeeklyReservationSchedule) int[source]

Configure the weekly temperature reservation schedule.

exception nwp500.Nwp500Error(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: Exception

Base exception for all nwp500 library errors.

All custom exceptions in the nwp500 library inherit from this base class, allowing consumers to catch all library-specific errors with a single exception handler if desired.

message

Human-readable error message

error_code

Machine-readable error code (optional)

details

Additional context as a dictionary (optional)

retriable

Whether the operation can be retried (optional)

to_dict() dict[str, Any][source]

Serialize exception for logging/monitoring.

Returns:

Dictionary with error type, message, code, details, and retriability

class nwp500.OnOffFlag(*values)[source]

Bases: IntEnum

Generic on/off flag used throughout status fields.

Used for: Power status, TOU status, recirculation status, vacation mode, anti-legionella, and many other boolean device settings.

OFF = 1
ON = 2
class nwp500.OpenEIClient(api_key: str | None = None, session: ClientSession | None = None)[source]

Bases: object

Async client for the OpenEI Utility Rates API.

Queries residential electricity rate plans by zip code. Requires an API key from https://openei.org/services/api/signup/

The API key is resolved in this order: 1. api_key constructor parameter 2. OPENEI_API_KEY environment variable

Example

>>> async with OpenEIClient() as client:
...     plans = await client.list_rate_plans("94903")
...     for plan in plans:
...         print(f"{plan['utility']}: {plan['name']}")
async fetch_rates(zip_code: str, *, limit: int = 100) list[dict[str, Any]][source]

Fetch all residential rate plans for a zip code.

Parameters:
  • zip_code – US zip code to search

  • limit – Maximum number of results (default: 100)

Returns:

List of raw OpenEI rate plan dictionaries

Raises:
  • ValueError – If no API key is configured

  • aiohttp.ClientError – If the API request fails

async get_rate_plan(zip_code: str, plan_name: str, *, utility: str | None = None) dict[str, Any] | None[source]

Get a specific rate plan by name.

Returns the first matching plan. Use utility to disambiguate if multiple utilities serve the same zip code.

Parameters:
  • zip_code – US zip code to search

  • plan_name – Rate plan name (case-insensitive substring match)

  • utility – Filter by utility name (case-insensitive substring match)

Returns:

Full rate plan dictionary or None if not found

async list_rate_plans(zip_code: str, *, utility: str | None = None) list[dict[str, Any]][source]

List rate plans, optionally filtered by utility.

Parameters:
  • zip_code – US zip code to search

  • utility – Filter by utility name (case-insensitive substring match)

Returns:

name, utility, label, eiaid, approved, has_tou_schedule

Return type:

List of rate plan dictionaries with keys

async list_utilities(zip_code: str) list[str][source]

List unique utility providers for a zip code.

Parameters:

zip_code – US zip code to search

Returns:

Sorted list of unique utility names

class nwp500.Operation(*values)[source]

Bases: IntEnum

Device operation state.

OPERATION = 1
STOP = 2
UNKNOWN = 0
class nwp500.OtaCommitPayload(*, swCode: int, swVersion: int)[source]

Bases: NavienBaseModel

Payload for committing a firmware component update.

Used with the OTA_COMMIT command (33554442). This command uses a special commitOta structure instead of the standard mode/param format.

Parameters:
  • sw_code – Software component code identifying which firmware to commit. 1 = Controller, 2 = Panel, 4 = WiFi/communication module.

  • sw_version – Version number to commit (as reported by the OTA check).

model_config = {'alias_generator': None, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

sw_code: int
sw_version: int
exception nwp500.ParameterValidationError(message: str, parameter: str | None = None, value: Any = None, **kwargs: Any)[source]

Bases: ValidationError

Invalid parameter value provided.

Raised when a parameter value is invalid for reasons other than being out of range (e.g., wrong type, invalid format).

parameter

Name of the invalid parameter

value

The invalid value provided

class nwp500.PeriodicRequestType(*values)[source]

Bases: Enum

Types of periodic requests that can be sent.

DEVICE_INFO

Request device information periodically

DEVICE_STATUS

Request device status periodically

DEVICE_INFO = 'device_info'
DEVICE_STATUS = 'device_status'
exception nwp500.RangeValidationError(message: str, field: str | None = None, value: Any = None, min_value: Any = None, max_value: Any = None, **kwargs: Any)[source]

Bases: ValidationError

Value outside acceptable range.

Raised when a numeric value is outside its valid range.

field

Name of the field

value

The invalid value provided

min_value

Minimum acceptable value

max_value

Maximum acceptable value

Example:

try:
    set_temperature(200)
except RangeValidationError as e:
    print(f"Invalid {e.field}: must be {e.min_value}-{e.max_value}")
class nwp500.RecirculationMode(*values)[source]

Bases: IntEnum

Recirculation pump operation mode.

ALWAYS = 1
BUTTON = 2
SCHEDULE = 3
TEMPERATURE = 4
UNKNOWN = 0
class nwp500.RecirculationSchedule(*, schedule: list[RecirculationScheduleEntry] = <factory>)[source]

Bases: NavienBaseModel

Complete recirculation pump schedule (RECIR_RESERVATION command).

Used with command code 33554444 to configure timed recirculation pump operation windows.

model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

schedule: list[RecirculationScheduleEntry]
class nwp500.RecirculationScheduleEntry(*, enable: int = 2, week: int = 0, startHour: int = 0, startMin: int = 0, endHour: int = 0, endMin: int = 0, mode: int = 1)[source]

Bases: NavienBaseModel

A single entry in a recirculation pump schedule.

Used with the RECIR_RESERVATION command (33554444) to set timed recirculation cycles. Each entry defines a time window and pump mode.

Fields:
  • enable: 2=enabled, 1=disabled (device boolean)

  • week: bitfield of active days (Sun=bit7, Mon=bit6, …, Sat=bit1)

  • start_hour: 0-23

  • start_min: 0-59

  • end_hour: 0-23

  • end_min: 0-59

  • mode: recirculation mode (1=Constant, 2=Timer, 3=Temperature, 4=Sensor)

property days: list[str]

Weekday names for this entry.

enable: int
property enabled: bool

2=on, 1=off).

Type:

Whether this entry is active (device bool

end_hour: int
end_min: int
property end_time: str

MM).

Type:

Formatted end time string (HH

mode: int
property mode_name: str

Human-readable recirculation mode name.

model_config = {'alias_generator': None, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

start_hour: int
start_min: int
property start_time: str

MM).

Type:

Formatted start time string (HH

week: int
class nwp500.ReservationEntry(*, enable: int = 2, week: int = 0, hour: int = 0, min: int = 0, mode: int = 1, param: int = 0)[source]

Bases: NavienBaseModel

A single scheduled reservation entry.

Wraps the raw 6-byte protocol fields and provides computed properties for display-ready values including unit-aware temperature conversion.

The raw protocol fields are:
  • enable: 2=enabled, 1=disabled (device boolean)

  • week: bitfield of active days (Sun=bit7, Mon=bit6, …, Sat=bit1)

  • hour: 0-23

  • min: 0-59

  • mode: DHW operation mode ID (1-6)

  • param: temperature in half-degrees Celsius

property days: list[str]

Weekday names for this reservation.

enable: int
property enabled: bool

2=on, 1=off).

Type:

Whether this reservation is active (device bool

hour: int
min: int
mode: int
property mode_name: str

Human-readable operation mode name.

model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

param: int
property temperature: float

Temperature in the user’s preferred unit.

property time: str

MM).

Type:

Formatted time string (HH

property unit: str

Temperature unit symbol.

week: int
class nwp500.ReservationSchedule(*, reservationUse: int = 0, reservation: list[ReservationEntry] = <factory>)[source]

Bases: NavienBaseModel

Complete reservation schedule from the device.

Can be constructed from raw MQTT response data. The reservation field accepts either a hex string (from GET responses) or a list of dicts/ReservationEntry objects.

property enabled: bool

Whether the reservation system is globally enabled.

Device bool convention: 2=on, 1=off.

model_config = {'alias_generator': None, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

reservation: list[ReservationEntry]
reservation_use: int
class nwp500.TOUInfo(*, registerPath: str = '', sourceType: str = '', controllerId: str = '', manufactureId: str = '', name: str = '', utility: str = '', zipCode: int = 0, schedule: list[TOUSchedule] = <factory>)[source]

Bases: NavienBaseModel

Time of Use information.

controller_id: str
manufacture_id: str
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
register_path: str
schedule: list[TOUSchedule]
source_type: str
utility: str
zip_code: int
class nwp500.TOUPeriod(*, season: int = 0, week: int = 0, startHour: int = 0, startMinute: int = 0, endHour: int = 0, endMinute: int = 0, priceMin: int = 0, priceMax: int = 0, decimalPoint: int = 5)[source]

Bases: NavienBaseModel

A single TOU pricing period from an MQTT tou/rd response.

Each period defines a time window, active season/week bitfields, and the pricing range for that window.

Fields use camelCase aliases to match the raw MQTT payload:
  • season: bitfield of active months (bit N-1 set for month N)

  • week: bitfield of active weekdays (Sun=bit7, …, Sat=bit1)

  • startHour / startMinute: start of the time window (0-23 / 0-59)

  • endHour / endMinute: end of the time window (0-23 / 0-59)

  • priceMin / priceMax: encoded integer prices (divide by 10^decimalPoint)

  • decimalPoint: number of decimal places for price values

decimal_point: int
property decoded_price_max: float

Maximum price decoded to a float (price_max / 10^decimal_point).

property decoded_price_min: float

Minimum price decoded to a float (price_min / 10^decimal_point).

end_hour: int
end_min: int
property end_time: str

MM).

Type:

Formatted end time (HH

model_config = {'alias_generator': None, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

price_max: int
price_min: int
season: int
start_hour: int
start_min: int
property start_time: str

MM).

Type:

Formatted start time (HH

week: int
class nwp500.TOUReservationSchedule(*, reservationUse: int = 0, reservation: list[TOUPeriod] = <factory>)[source]

Bases: NavienBaseModel

TOU schedule as returned by the MQTT tou/rd response topic.

This model matches the raw MQTT payload for both request_tou_settings() read responses and configure_tou_schedule() write confirmations — both use CommandCode.TOU_RESERVATION and the tou/rd response topic.

The payload structure is:

{
    "reservationUse": 2,          # 0=disabled, 2=enabled
    "reservation": [              # list of TOU period dicts
        {
            "season": 4095, "week": 254,
            "startHour": 0, "startMinute": 0,
            "endHour": 23, "endMinute": 59,
            "priceMin": 10, "priceMax": 25,
            "decimalPoint": 2
        },
        ...
    ]
}
property enabled: bool

Whether TOU scheduling is globally enabled.

Protocol convention: 0=disabled, 2=enabled.

model_config = {'alias_generator': None, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

reservation: list[TOUPeriod]
reservation_use: int
class nwp500.TOUSchedule(*, season: int = 0, interval: list[dict[str, ~typing.Any]]=<factory>)[source]

Bases: NavienBaseModel

Time of Use schedule information.

intervals: list[dict[str, Any]]
model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

season: int
class nwp500.TempFormulaType(*values)[source]

Bases: IntEnum

Temperature conversion formula type.

Different device models use slightly different rounding algorithms when converting internal Celsius values to Fahrenheit. This ensures the mobile app matches the device’s built-in display.

Type 0: Asymmetric ceiling/floor rounding based on raw value remainder Type 1: Standard rounding to nearest integer

ASYMMETRIC = 0
STANDARD = 1
class nwp500.TemperatureType(*values)[source]

Bases: IntEnum

Temperature display unit preference.

CELSIUS = 1
FAHRENHEIT = 2
exception nwp500.TokenExpiredError(message: str, status_code: int | None = None, response: dict[str, Any | None] | None = None, **kwargs: Any)[source]

Bases: AuthenticationError

Raised when an authentication token has expired.

Tokens have a limited lifetime and must be refreshed periodically. This exception indicates that a token has passed its expiration time.

exception nwp500.TokenRefreshError(message: str, status_code: int | None = None, response: dict[str, Any | None] | None = None, **kwargs: Any)[source]

Bases: AuthenticationError

Raised when token refresh operation fails.

Token refresh can fail due to invalid refresh tokens, network issues, or API errors. When this occurs, full re-authentication may be required.

class nwp500.TouRateType(*values)[source]

Bases: IntEnum

Electricity rate period type.

Device behavior during each rate period can be configured. Typically, devices heat aggressively during off-peak, maintain temperature during mid-peak, and avoid heating during on-peak unless necessary.

MID_PEAK = 2
OFF_PEAK = 1
ON_PEAK = 3
UNKNOWN = 0
class nwp500.TouWeekType(*values)[source]

Bases: IntEnum

Day grouping for TOU schedules.

TOU schedules can be configured separately for weekdays and weekends to account for different electricity rates and usage patterns.

WEEK_DAY = 0
WEEK_END = 1
class nwp500.UnitType(*values)[source]

Bases: IntEnum

Navien device/unit model types.

CAS_NFB = 7
CAS_NFB_700 = 21
CAS_NHB = 5
CAS_NHB_H = 17
CAS_NPE = 4
CAS_NPE2 = 12
CAS_NPN = 10
CAS_NVW = 15
NCB = 2
NCB_H = 13
NFB = 6
NFB_700 = 20
NFC = 8
NHB = 3
NHB_H = 16
NO_DEVICE = 0
NPE = 1
NPE2 = 11
NPF = 513
NPN = 9
NVW = 14
TWC = 257
class nwp500.UserInfo(*, userType: str = '', userFirstName: str = '', userLastName: str = '', userStatus: str = '', userSeq: int = 0)[source]

Bases: NavienBaseModel

User information returned from authentication.

property full_name: str

Return the user’s full name.

model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

user_first_name: str
user_last_name: str
user_seq: int
user_status: str
user_type: str
exception nwp500.ValidationError(message: str, *, error_code: str | None = None, details: dict[str, Any | None] | None = None, retriable: bool = False)[source]

Bases: Nwp500Error

Base exception for validation failures.

Raised when input parameters or data fail validation checks.

class nwp500.VolumeCode(*values)[source]

Bases: IntEnum

Tank volume capacity codes for NWP500 heat pump water heater models.

Represents the nominal tank capacity in gallons for NWP500 series devices. These correspond to the different model variants available.

VOLUME_50 = 1
VOLUME_65 = 2
VOLUME_80 = 3
class nwp500.WeeklyReservationEntry(*, enable: int = 2, week: int = 0, hour: int = 0, min: int = 0, mode: int = 1, param: int = 0)[source]

Bases: NavienBaseModel

A single entry in a weekly temperature reservation schedule.

Similar to ReservationEntry but used with the RESERVATION_WEEKLY command (33554438), which configures a separate weekly temperature schedule independent of the timed reservation system.

The raw protocol fields mirror the standard reservation format:
  • enable: 2=enabled, 1=disabled (device boolean)

  • week: bitfield of active days (Sun=bit7, Mon=bit6, …, Sat=bit1)

  • hour: 0-23

  • min: 0-59

  • mode: DHW operation mode ID (1-6)

  • param: temperature in half-degrees Celsius

property days: list[str]

Weekday names for this entry.

enable: int
property enabled: bool

2=on, 1=off).

Type:

Whether this entry is active (device bool

hour: int
min: int
mode: int
property mode_name: str

Human-readable operation mode name.

model_config = {'alias_generator': <function to_camel>, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

param: int
property temperature: float

Temperature in the user’s preferred unit.

property time: str

MM).

Type:

Formatted time string (HH

property unit: str

Temperature unit symbol.

week: int
class nwp500.WeeklyReservationSchedule(*, reservationUse: int = 0, reservation: list[WeeklyReservationEntry] = <factory>)[source]

Bases: NavienBaseModel

Complete weekly reservation schedule (RESERVATION_WEEKLY command).

Used with command code 33554438 to configure a temperature schedule that repeats weekly. Accepts the same hex-encoded format as the standard reservation schedule.

property enabled: bool

Whether the weekly reservation system is globally enabled.

Device bool convention: 2=on, 1=off.

model_config = {'alias_generator': None, 'extra': 'ignore', 'populate_by_name': True, 'use_enum_values': False, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

reservation: list[WeeklyReservationEntry]
reservation_use: int
async nwp500.add_reservation(mqtt: NavienMqttClient, device: Device, *, enabled: bool, days: Sequence[str | int], hour: int, minute: int, mode: int, temperature: float) None[source]

Add a single reservation entry to the device schedule.

Fetches the current schedule, appends the new entry, and sends the updated list back to the device. The schedule is automatically enabled after a successful add.

Parameters:
  • mqtt – Connected MQTT client.

  • device – Target device.

  • enabled – Whether the new reservation is active.

  • days – Days of the week. Accepts full names ("Monday"), 2-letter abbreviations ("MO"), or integer indices where 0 = Monday and 6 = Sunday.

  • hour – Hour of the day in 24-hour format (0–23).

  • minute – Minute of the hour (0–59).

  • mode – DHW operation mode (1–6).

  • temperature – Target temperature in the user’s preferred unit.

Raises:
async nwp500.authenticate(user_id: str, password: str) AuthenticationResponse[source]

Authenticate user and obtain tokens.

This is a convenience function that creates a temporary auth client, authenticates, and returns the response.

Parameters:
  • user_id – User email address

  • password – User password

Returns:

AuthenticationResponse with user info and tokens

Example

>>> response = await authenticate("user@example.com", "password")
>>> print(f"Welcome {response.user.full_name}")
>>> # Use the bearer token for API requests
>>> # Do not print tokens in production code
nwp500.build_reservation_entry(*, enabled: bool | int, days: Iterable[str | int], hour: int, minute: int, mode_id: int, temperature: float, temperature_min: float | None = None, temperature_max: float | None = None) dict[str, int][source]

Build a reservation payload entry matching the documented MQTT format.

Parameters:
  • enabled – Enable flag (True/False or 2=enabled/1=disabled per device boolean convention)

  • days – Collection of weekday names or indices

  • hour – Hour (0-23)

  • minute – Minute (0-59)

  • mode_id – DHW operation mode ID (1-6, see DhwOperationSetting)

  • temperature – Target temperature in the user’s preferred unit (Celsius or Fahrenheit based on global context). Automatically converted to half-degrees Celsius for the device.

  • temperature_min – Minimum allowed temperature. If not provided, defaults are used: 95°F or ~35°C.

  • temperature_max – Maximum allowed temperature. If not provided, defaults are used: 150°F or ~65°C.

Returns:

Dictionary with reservation entry fields

Raises:

Examples

>>> build_reservation_entry(
...     enabled=True,
...     days=["Monday", "Wednesday", "Friday"],
...     hour=6,
...     minute=30,
...     mode_id=3,
...     temperature=140.0
... )
{
    'enable': 2,
    'week': 158,
    'hour': 6,
    'min': 30,
    'mode': 3,
    'param': 120,
}
nwp500.build_tou_period(*, season_months: Iterable[int], week_days: Iterable[str | int], start_hour: int, start_minute: int, end_hour: int, end_minute: int, price_min: int | Real, price_max: int | Real, decimal_point: int) dict[str, int][source]

Build a TOU (Time of Use) period entry.

Consistent with MQTT command requirements.

Parameters:
  • season_months – Collection of month numbers (1-12) for this period

  • week_days – Collection of weekday names or indices

  • start_hour – Starting hour (0-23)

  • start_minute – Starting minute (0-59)

  • end_hour – Ending hour (0-23)

  • end_minute – Ending minute (0-59)

  • price_min – Minimum price (float or pre-encoded int)

  • price_max – Maximum price (float or pre-encoded int)

  • decimal_point – Number of decimal places for prices

Returns:

Dictionary with TOU period fields

Raises:

ValueError – If any parameter is out of valid range

Examples

>>> build_tou_period(
...     season_months=[6, 7, 8],
...     week_days=["Monday", "Tuesday", "Wednesday", "Thursday",
"Friday"],
...     start_hour=9,
...     start_minute=0,
...     end_hour=17,
...     end_minute=0,
...     price_min=0.10,
...     price_max=0.25,
...     decimal_point=2
... )
{'season': 448, 'week': 62, 'startHour': 9, 'startMinute': 0, ...}
async nwp500.create_navien_clients(email: str, password: str) tuple[NavienAuthClient, NavienAPIClient, NavienMqttClient][source]

Create and authenticate all Navien clients with one call.

This factory function handles the complete initialization sequence: 1. Creates an auth client with the provided credentials 2. Authenticates with the Navien API (via context manager) 3. Creates API and MQTT clients using the authenticated session 4. Returns all clients ready to use

Parameters:
  • email – Navien account email address

  • password – Navien account password

Returns:

Tuple of (auth_client, api_client, mqtt_client) ready to use

Raises:

Example

>>> auth, api, mqtt = await create_navien_clients(
...     email="user@example.com",
...     password="password"
... )
>>> async with auth:
...     # All clients are ready to use
...     devices = await api.list_devices()
...     await mqtt.connect()
...     # Use clients ...

Note

You must still use the auth client as a context manager to ensure the session is properly cleaned up:

>>> auth, api, mqtt = await create_navien_clients(email, password)
>>> async with auth:
...     # Use api and mqtt clients here
...     ...
>>> # Session is automatically closed when exiting the context
nwp500.decode_price(value: int, decimal_point: int) float[source]

Decode an integer price value using the provided decimal point.

Parameters:
  • value – Integer price value from device

  • decimal_point – Number of decimal places (0-10, typically 2-5)

Returns:

Floating-point price value

Raises:

RangeValidationError – If decimal_point is not in range 0-10

Examples

>>> decode_price(1234, 2)
12.34
>>> decode_price(500, 3)
0.5
>>> decode_price(100, 0)
100.0
nwp500.decode_season_bitfield(bitfield: int) list[int][source]

Decode a TOU season bitfield into the corresponding month numbers.

Parameters:

bitfield – Integer bitfield where each bit represents a month

Returns:

Sorted list of month numbers (1-12)

Examples

>>> decode_season_bitfield(448)
[6, 7, 8]
>>> decode_season_bitfield(4095)  # All months
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
nwp500.decode_week_bitfield(bitfield: int) list[str][source]

Decode a reservation bitfield back into a list of weekday names.

Parameters:

bitfield – Integer bitfield where each bit represents a day

Returns:

List of weekday names in order (Monday through Sunday)

Examples

>>> decode_week_bitfield(84)
['Monday', 'Wednesday', 'Friday']
>>> decode_week_bitfield(254)  # All days
['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday',
'Saturday', 'Sunday']
>>> decode_week_bitfield(130)
['Saturday', 'Sunday']
async nwp500.delete_reservation(mqtt: NavienMqttClient, device: Device, index: int) None[source]

Delete a single reservation entry by 1-based index.

Fetches the current schedule, removes the entry at index, and sends the updated list back. If the schedule becomes empty, it is automatically disabled.

Parameters:
  • mqtt – Connected MQTT client.

  • device – Target device.

  • index – 1-based position of the reservation to delete.

Raises:
  • ValueError – If index is out of the valid range.

  • TimeoutError – If the current schedule cannot be fetched.

nwp500.encode_price(value: Real, decimal_point: int) int[source]

Encode a price into the integer representation expected by the device.

The device stores prices as integers with a separate decimal point indicator. For example, $12.34 with decimal_point=2 is stored as 1234.

Parameters:
  • value – Price value (float or Decimal)

  • decimal_point – Number of decimal places (0-10, typically 2-5)

Returns:

Integer representation of the price

Raises:

RangeValidationError – If decimal_point is not in range 0-10

Examples

>>> encode_price(12.34, 2)
1234
>>> encode_price(0.5, 3)
500
>>> encode_price(100, 0)
100
nwp500.encode_season_bitfield(months: Iterable[int]) int[source]

Encode a collection of month numbers (1-12) into a TOU season bitfield.

Parameters:

months – Collection of month numbers (1=January, 12=December)

Returns:

Integer bitfield where each bit represents a month (January=bit 0, etc.)

Raises:

ValueError – If month number is not in range 1-12

Examples

>>> encode_season_bitfield([6, 7, 8])  # Summer: June, July, August
448  # 0b111000000
>>> encode_season_bitfield([12, 1, 2])  # Winter: Dec, Jan, Feb
4099  # 0b1000000000011
nwp500.encode_week_bitfield(days: Iterable[str | int]) int[source]

Convert a collection of day names or indices into a reservation bitfield.

Parameters:

days – Collection of weekday names (full or 2-letter abbreviations, case-insensitive) or 0-based indices (Monday=0, Sunday=6)

Returns:

Sun=bit7, Mon=bit6, …, Sat=bit1)

Return type:

Integer bitfield (MGPP encoding

Raises:

Examples

>>> encode_week_bitfield(["Monday", "Wednesday", "Friday"])
84  # 64 + 16 + 4
>>> encode_week_bitfield(["MO", "WE", "FR"])  # 2-letter abbreviations
84
>>> encode_week_bitfield([0, 2, 4])  # 0-indexed: Mon, Wed, Fri
84
>>> encode_week_bitfield([5, 6])  # Saturday and Sunday
130  # 2 + 128
nwp500.fahrenheit_to_half_celsius(fahrenheit: float) int[source]

Convert Fahrenheit to half-degrees Celsius (for device commands).

Parameters:

fahrenheit – Temperature in Fahrenheit.

Returns:

Raw device value in half-Celsius format.

Example

>>> fahrenheit_to_half_celsius(140.0)
120
async nwp500.fetch_reservations(mqtt: NavienMqttClient, device: Device, *, timeout: float = 10.0) ReservationSchedule | None[source]

Fetch the current reservation schedule from a device.

Sends a request to the device and waits for the response.

Parameters:
  • mqtt – Connected MQTT client.

  • device – Target device.

  • timeout – Seconds to wait for a response before giving up.

Returns:

The current ReservationSchedule, or None on timeout.

nwp500.get_unit_system() Literal['metric', 'us_customary'] | None[source]

Get the currently configured unit system preference.

Returns:

  • “metric”: Celsius, LPM, Liters

  • ”us_customary”: Fahrenheit, GPM, Gallons

  • None: Auto-detect from device (default)

Return type:

The current unit system preference

nwp500.log_performance(func: F) F[source]

Log execution time for async functions at DEBUG level.

This decorator measures the execution time of async functions and logs the duration when DEBUG logging is enabled. It’s useful for identifying performance bottlenecks and monitoring critical paths.

Parameters:

func – Async function to wrap

Returns:

Wrapped function that logs its execution time

Example:

@log_performance
async def fetch_device_status(device_id: str) -> dict:
    # ... expensive operation ...
    return status

# When called, logs: "fetch_device_status completed in 0.234s"

Note

  • Only logs when DEBUG level is enabled to minimize overhead in production

  • Uses time.perf_counter() for high-resolution timing

  • Preserves function metadata (name, docstring, etc.)

nwp500.preferred_to_half_celsius(temperature: float) int[source]

Convert temperature from preferred unit to half-degrees Celsius.

Converts temperature from the user’s preferred unit (Celsius or Fahrenheit, based on global unit system context) to the half-Celsius format used by the device for commands and reservations.

Parameters:

temperature – Temperature in user’s preferred unit (Celsius or Fahrenheit).

Returns:

Raw device value in half-Celsius format.

Example

>>> # With us_customary unit system
>>> preferred_to_half_celsius(140.0)  # 140°F
120
>>> # With metric unit system
>>> preferred_to_half_celsius(60.0)  # 60°C
120
async nwp500.refresh_access_token(refresh_token: str) AuthTokens[source]

Refresh an access token using a refresh token.

This is a convenience function that creates a temporary session to perform the token refresh operation without requiring full authentication.

Parameters:

refresh_token – The refresh token

Returns:

New AuthTokens

Example

>>> new_tokens = await refresh_access_token(old_tokens.refresh_token)

Note

This function creates a temporary client without authentication to perform the token refresh operation.

nwp500.requires_capability(feature: str) Callable[[F], F][source]

Decorator that validates device capability before executing command.

This decorator automatically checks if a device supports a specific controllable feature before allowing the command to execute. If the device doesn’t support the feature, a DeviceCapabilityError is raised.

The decorator automatically caches device info on first call using _get_device_features(), which internally calls ensure_device_info_cached(). This means capability validation is transparent to the caller - no manual caching is required.

The decorator expects the command method to: 1. Have ‘self’ (controller instance with _device_info_cache) 2. Have ‘device’ parameter (Device object with mac_address)

Parameters:

feature – Name of the required capability (e.g., “recirculation_mode”)

Returns:

Decorator function

Raises:

Example

>>> class MyController:
...     def __init__(self, cache):
...         self._device_info_cache = cache
...
...     @requires_capability("recirculation_mode")
...     async def set_recirculation_mode(self, device, mode):
...         # Device info automatically cached on first call
...         # Capability automatically validated before execution
...         return await self._publish(...)
nwp500.reservation_param_to_preferred(param: int) float[source]

Convert reservation param to user’s preferred temperature unit.

Device returns reservation temperatures as half-degrees Celsius (param). This converts them to the user’s preferred unit (Celsius or Fahrenheit) based on the global unit system context.

Parameters:

param – Raw device value in half-Celsius format.

Returns:

Temperature in user’s preferred unit (Celsius or Fahrenheit).

Example

>>> # With metric (Celsius) unit system
>>> reservation_param_to_preferred(120)
60.0
>>> # With us_customary (Fahrenheit) unit system
>>> reservation_param_to_preferred(120)
140.0
nwp500.reset_unit_system() None[source]

Reset unit system preference to auto-detect (None).

This is useful for tests or when switching between different device configurations.

nwp500.set_unit_system(unit_system: Literal['metric', 'us_customary'] | None) None[source]

Set preferred unit system for temperature, flow, and volume conversions.

This setting overrides the device’s temperature_type setting and applies to all subsequent model validation operations in the current async context.

Parameters:

unit_system – Preferred unit system: - “metric”: Use Celsius, LPM, and Liters - “us_customary”: Use Fahrenheit, GPM, and Gallons - None: Auto-detect from device’s temperature_type (default)

Example

>>> from nwp500 import set_unit_system
>>> set_unit_system("us_customary")
>>> # All values now in F, GPM, Gallons
>>> set_unit_system(None)  # Reset to auto-detect

Note

This is context-aware and works with async code. Each async task maintains its own unit system preference.

async nwp500.update_reservation(mqtt: NavienMqttClient, device: Device, index: int, *, enabled: bool | None = None, days: Sequence[str | int] | None = None, hour: int | None = None, minute: int | None = None, mode: int | None = None, temperature: float | None = None) None[source]

Update a single reservation entry in-place by 1-based index.

Only the fields that are explicitly provided are changed; all other fields are preserved from the existing entry.

Parameters:
  • mqtt – Connected MQTT client.

  • device – Target device.

  • index – 1-based position of the reservation to update.

  • enabled – Set the enabled state, or None to keep current.

  • days – Replace the days, or None to keep current. Accepts full names, 2-letter abbreviations, or integer indices (see add_reservation()).

  • hour – Replace the hour (0–23), or None to keep current.

  • minute – Replace the minute (0–59), or None to keep current.

  • mode – Replace the mode (1–6), or None to keep current.

  • temperature – Replace the temperature (in the user’s preferred unit), or None to keep the existing raw param value unchanged.

Raises:
  • ValueError – If index is out of the valid range, or if any of hour, minute, or mode are provided but out of range.

  • RangeValidationError – If temperature is out of the device’s range.

  • ValidationError – If the updated entry fails model validation.

  • TimeoutError – If the current schedule cannot be fetched.