Source code for nwp500.device_info_cache
"""Device information caching with periodic updates.
This module manages caching of device information (features, capabilities)
with automatic periodic updates to keep data synchronized with the device.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING, ReadOnly, TypedDict
if TYPE_CHECKING:
from .models import DeviceFeature
__author__ = "Emmanuel Levijarvi"
_logger = logging.getLogger(__name__)
[docs]
class CachedDeviceInfo(TypedDict):
"""Cached device information metadata."""
mac: ReadOnly[str]
cached_at: ReadOnly[str]
expires_at: ReadOnly[str | None]
is_expired: ReadOnly[bool]
[docs]
class CacheInfoResult(TypedDict):
"""Result of get_cache_info() call."""
device_count: ReadOnly[int]
update_interval_minutes: ReadOnly[float]
devices: ReadOnly[list[CachedDeviceInfo]]
[docs]
class MqttDeviceInfoCache:
"""Manages caching of device information with periodic updates.
This cache stores device features (capabilities, firmware info, etc.)
and automatically refreshes them at regular intervals to keep data
synchronized with the actual device state.
The cache is keyed by device MAC address, allowing support for
multiple devices connected to the same MQTT client.
"""
def __init__(self, update_interval_minutes: int = 30) -> None:
"""Initialize the device info cache.
Args:
update_interval_minutes: How often to refresh device info
(default: 30 minutes). Set to 0 to disable auto-updates.
"""
self.update_interval = timedelta(minutes=update_interval_minutes)
# Cache: {mac_address: (feature, timestamp)}
self._cache: dict[str, tuple[DeviceFeature, datetime]] = {}
self._lock = asyncio.Lock()
[docs]
async def get(self, device_mac: str) -> DeviceFeature | None:
"""Get cached device features if available and not expired.
Args:
device_mac: Device MAC address
Returns:
Cached DeviceFeature if available, None otherwise
"""
async with self._lock:
if device_mac not in self._cache:
return None
features, timestamp = self._cache[device_mac]
# Check if cache is still fresh
if self.is_expired(timestamp):
del self._cache[device_mac]
return None
return features
[docs]
async def set(self, device_mac: str, features: DeviceFeature) -> None:
"""Cache device features with current timestamp.
Args:
device_mac: Device MAC address
features: Device feature information to cache
"""
async with self._lock:
self._cache[device_mac] = (features, datetime.now(UTC))
_logger.debug("Device info cached")
[docs]
async def invalidate(self, device_mac: str) -> None:
"""Invalidate cache entry for a device.
Forces a refresh on next request.
Args:
device_mac: Device MAC address
"""
async with self._lock:
if device_mac in self._cache:
del self._cache[device_mac]
from .mqtt.utils import redact_mac
redacted = redact_mac(device_mac)
_logger.debug(f"Invalidated cache for {redacted}")
[docs]
async def clear(self) -> None:
"""Clear all cached device information."""
async with self._lock:
self._cache.clear()
_logger.debug("Cleared device info cache")
[docs]
def is_expired(self, timestamp: datetime) -> bool:
"""Check if a cache entry is expired.
Args:
timestamp: When the cache entry was created
Returns:
True if expired, False if still fresh
"""
if self.update_interval.total_seconds() == 0:
# Auto-updates disabled
return False
age = datetime.now(UTC) - timestamp
return age > self.update_interval
[docs]
async def get_all_cached(self) -> dict[str, DeviceFeature]:
"""Get all currently cached device features.
Returns:
Dictionary mapping MAC addresses to DeviceFeature objects
"""
async with self._lock:
# Filter out expired entries and purge them from cache
expired_keys = [
mac
for mac, (_, timestamp) in self._cache.items()
if self.is_expired(timestamp)
]
for mac in expired_keys:
del self._cache[mac]
return {mac: features for mac, (features, _) in self._cache.items()}
[docs]
async def get_cache_info(
self,
) -> CacheInfoResult:
"""Get cache statistics and metadata.
Returns:
Dictionary with cache info including:
- device_count: Number of cached devices
- update_interval_minutes: Cache update interval in minutes
- devices: List of device cache metadata
"""
async with self._lock:
devices: list[CachedDeviceInfo] = []
for mac, (_features, timestamp) in self._cache.items():
expires_at = (
timestamp + self.update_interval
if self.update_interval.total_seconds() > 0
else None
)
device_info: CachedDeviceInfo = {
"mac": mac,
"cached_at": timestamp.isoformat(),
"expires_at": expires_at.isoformat()
if expires_at
else None,
"is_expired": self.is_expired(timestamp),
}
devices.append(device_info)
result: CacheInfoResult = {
"device_count": len(devices),
"update_interval_minutes": (
self.update_interval.total_seconds() / 60
if self.update_interval.total_seconds() > 0
else 0
),
"devices": devices,
}
return result