Performance Optimization
Guidelines for maintaining and improving integration performance.
Performance Goals
Target metrics:
- Coordinator update: <500ms (typical: 200-300ms)
- Sensor update: <10ms per sensor
- Period calculation: <100ms (typical: 20-50ms)
- Memory footprint: <10MB per home
- API calls: <100 per day per home
Profiling
Timing Decorator
Use for performance-critical functions:
import time
import functools
def timing(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
duration = time.perf_counter() - start
_LOGGER.debug("%s took %.3fms", func.__name__, duration * 1000)
return result
return wrapper
@timing
def expensive_calculation():
# Your code here
Memory Profiling
import tracemalloc
tracemalloc.start()
# Run your code
current, peak = tracemalloc.get_traced_memory()
_LOGGER.info("Memory: current=%.2fMB peak=%.2fMB",
current / 1024**2, peak / 1024**2)
tracemalloc.stop()
Async Profiling
# Install aioprof
uv pip install aioprof
# Run with profiling
python -m aioprof homeassistant -c config
Optimization Patterns
Caching
1. Persistent Cache (API data):
# Already implemented in coordinator/cache.py
store = Store(hass, STORAGE_VERSION, STORAGE_KEY)
data = await store.async_load()
2. Translation Cache (in-memory):
# Already implemented in const.py
_TRANSLATION_CACHE: dict[str, dict] = {}
def get_translation(path: str, language: str) -> dict:
cache_key = f"{path}_{language}"
if cache_key not in _TRANSLATION_CACHE:
_TRANSLATION_CACHE[cache_key] = load_translation(path, language)
return _TRANSLATION_CACHE[cache_key]
3. Config Cache (invalidated on options change):
class DataTransformer:
def __init__(self):
self._config_cache: dict | None = None
def get_config(self) -> dict:
if self._config_cache is None:
self._config_cache = self._build_config()
return self._config_cache
def invalidate_config_cache(self):
self._config_cache = None
Lazy Loading
Load data only when needed:
@property
def extra_state_attributes(self) -> dict | None:
"""Return attributes."""
# Calculate only when accessed
if self.entity_description.key == "complex_sensor":
return self._calculate_complex_attributes()
return None
Bulk Operations
Process multiple items at once:
# ❌ Slow - loop with individual operations
for interval in intervals:
enriched = enrich_single_interval(interval)
results.append(enriched)
# ✅ Fast - bulk processing
results = enrich_intervals_bulk(intervals)
Async Best Practices
1. Concurrent API calls:
# ❌ Sequential (slow)
user_data = await fetch_user_data()
price_data = await fetch_price_data()
# ✅ Concurrent (fast)
user_data, price_data = await asyncio.gather(
fetch_user_data(),
fetch_price_data()
)
2. Don't block event loop:
# ❌ Blocking
result = heavy_computation() # Blocks for seconds
# ✅ Non-blocking
result = await hass.async_add_executor_job(heavy_computation)
Memory Management
Avoid Memory Leaks
1. Clear references:
class Coordinator:
async def async_shutdown(self):
"""Clean up resources."""
self._listeners.clear()
self._data = None
self._cache = None
2. Use weak references for callbacks:
import weakref
class Manager:
def __init__(self):
self._callbacks: list[weakref.ref] = []
def register(self, callback):
self._callbacks.append(weakref.ref(callback))
Efficient Data Structures
Use appropriate types:
# ❌ List for lookups (O(n))
if timestamp in timestamp_list:
...
# ✅ Set for lookups (O(1))
if timestamp in timestamp_set:
...
# ❌ List comprehension with filter
results = [x for x in items if condition(x)]
# ✅ Generator for large datasets
results = (x for x in items if condition(x))
Coordinator Optimization
Minimize API Calls
Already implemented:
- Cache valid until midnight
- User data cached for 24h
- Only poll when tomorrow data expected
Monitor API usage:
_LOGGER.debug("API call: %s (cache_age=%s)",
endpoint, cache_age)
Smart Updates
Only update when needed:
async def _async_update_data(self) -> dict:
"""Fetch data from API."""
if self._is_cache_valid():
_LOGGER.debug("Using cached data")
return self.data
# Fetch new data
return await self._fetch_data()
Database Impact
State Class Selection
Affects long-term statistics storage:
# ❌ MEASUREMENT for prices (stores every change)
state_class=SensorStateClass.MEASUREMENT # ~35K records/year
# ✅ None for prices (no long-term stats)
state_class=None # Only current state
# ✅ TOTAL for counters only
state_class=SensorStateClass.TOTAL # For cumulative values
Attribute Size
Keep attributes minimal:
# ❌ Large nested structures (KB per update)
attributes = {
"all_intervals": [...], # 384 intervals
"full_history": [...], # Days of data
}
# ✅ Essential data only (bytes per update)
attributes = {
"timestamp": "...",
"rating_level": "...",
"next_interval": "...",
}
Testing Performance
Benchmark Tests
import pytest
import time
@pytest.mark.benchmark
def test_period_calculation_performance(coordinator):
"""Period calculation should complete in <100ms."""
start = time.perf_counter()
periods = calculate_periods(coordinator.data)
duration = time.perf_counter() - start
assert duration < 0.1, f"Too slow: {duration:.3f}s"
Load Testing
@pytest.mark.integration
async def test_multiple_homes_performance(hass):
"""Test with 10 homes."""
coordinators = []
for i in range(10):
coordinator = create_coordinator(hass, home_id=f"home_{i}")
await coordinator.async_refresh()
coordinators.append(coordinator)
# Verify memory usage
# Verify update times
Monitoring in Production
Log Performance Metrics
@timing
async def _async_update_data(self) -> dict:
"""Fetch data with timing."""
result = await self._fetch_data()
_LOGGER.info("Update completed in %.2fs", timing_duration)
return result
Memory Tracking
import psutil
import os
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024**2
_LOGGER.debug("Current memory usage: %.2f MB", memory_mb)
💡 Related:
- Caching Strategy - Cache layers
- Architecture - System design
- Debugging - Profiling tools