3 Commits

Author SHA1 Message Date
jamie dddfa347e6 refactor: 🎨 use shared helpers 2026-05-23 16:24:34 +00:00
jamie bd5f2e7e32 refactor: 🎨 consolidate to a single file 2026-05-23 16:16:51 +00:00
jamie c5406e2c7c refactor: 🎨 remove caching 2026-05-23 16:07:09 +00:00
6 changed files with 4768 additions and 5907 deletions
+4764 -22
View File
File diff suppressed because it is too large Load Diff
-191
View File
@@ -1,191 +0,0 @@
"""
In-memory caching module with TTL support and cache invalidation
"""
import time
import sys
from threading import Lock
from functools import wraps
class Cache:
"""Simple in-memory cache with TTL support and size limiting"""
def __init__(self, max_size_mb=50):
self._cache = {}
self._lock = Lock()
self._max_size_bytes = max_size_mb * 1024 * 1024 # Convert MB to bytes
self._access_order = [] # Track access order for LRU eviction
def _get_size(self, obj):
"""Estimate size of an object in bytes"""
size = sys.getsizeof(obj)
if isinstance(obj, dict):
size += sum(self._get_size(k) + self._get_size(v) for k, v in obj.items())
elif isinstance(obj, (list, tuple)):
size += sum(self._get_size(item) for item in obj)
elif isinstance(obj, str):
size += sys.getsizeof(obj) - sys.getsizeof('')
return size
def _get_cache_size(self):
"""Get approximate total size of cache in bytes"""
total_size = sys.getsizeof(self._cache)
for key, (value, expiry) in self._cache.items():
total_size += self._get_size(key) + self._get_size(value) + sys.getsizeof(expiry)
return total_size
def _evict_if_needed(self):
"""Evict entries if cache exceeds size limit"""
current_size = self._get_cache_size()
if current_size <= self._max_size_bytes:
return
# First, remove expired entries
current_time = time.time()
expired_keys = []
for key in list(self._cache.keys()):
_, expiry = self._cache[key]
if expiry is not None and current_time >= expiry:
expired_keys.append(key)
for key in expired_keys:
if key in self._cache:
del self._cache[key]
if key in self._access_order:
self._access_order.remove(key)
# If still over limit, remove oldest entries (LRU)
current_size = self._get_cache_size()
while current_size > self._max_size_bytes and self._access_order:
oldest_key = self._access_order.pop(0)
if oldest_key in self._cache:
del self._cache[oldest_key]
current_size = self._get_cache_size()
def get(self, key):
"""Get value from cache if it exists and hasn't expired"""
with self._lock:
if key in self._cache:
value, expiry = self._cache[key]
if expiry is None or time.time() < expiry:
# Update access order (move to end for LRU)
if key in self._access_order:
self._access_order.remove(key)
self._access_order.append(key)
return value
else:
# Expired, remove it
del self._cache[key]
if key in self._access_order:
self._access_order.remove(key)
return None
def set(self, key, value, ttl=None):
"""Set value in cache with optional TTL (time to live in seconds)"""
with self._lock:
# Remove old entry if it exists
if key in self._cache:
if key in self._access_order:
self._access_order.remove(key)
expiry = None if ttl is None else time.time() + ttl
self._cache[key] = (value, expiry)
self._access_order.append(key)
# Evict if needed to stay under size limit
self._evict_if_needed()
def delete(self, key):
"""Delete a key from cache"""
with self._lock:
if key in self._cache:
del self._cache[key]
if key in self._access_order:
self._access_order.remove(key)
def clear(self, pattern=None):
"""Clear cache entries. If pattern is provided, only clear keys matching the pattern."""
with self._lock:
if pattern is None:
self._cache.clear()
self._access_order.clear()
else:
keys_to_delete = [key for key in self._cache.keys() if pattern in key]
for key in keys_to_delete:
del self._cache[key]
if key in self._access_order:
self._access_order.remove(key)
def invalidate_subnet(self, subnet_id):
"""Invalidate all cache entries related to a specific subnet"""
patterns = [
f'subnet:{subnet_id}',
f'subnet_list',
f'index',
f'admin',
f'utilization:{subnet_id}'
]
with self._lock:
keys_to_delete = []
for key in self._cache.keys():
for pattern in patterns:
if pattern in key:
keys_to_delete.append(key)
break
for key in keys_to_delete:
del self._cache[key]
if key in self._access_order:
self._access_order.remove(key)
def invalidate_device(self, device_id):
"""Invalidate all cache entries related to a specific device"""
patterns = [
f'device:{device_id}',
f'device_list',
f'devices',
f'device_types'
]
with self._lock:
keys_to_delete = []
for key in self._cache.keys():
for pattern in patterns:
if pattern in key:
keys_to_delete.append(key)
break
for key in keys_to_delete:
del self._cache[key]
if key in self._access_order:
self._access_order.remove(key)
def invalidate_all(self):
"""Invalidate all cache entries"""
self.clear()
# Global cache instance
cache = Cache()
def cached(ttl=None, key_prefix=''):
"""
Decorator to cache function results
Args:
ttl: Time to live in seconds (None = no expiration)
key_prefix: Prefix for cache key
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Create cache key from function name, args, and kwargs
cache_key = f"{key_prefix}{func.__name__}:{str(args)}:{str(sorted(kwargs.items()))}"
# Try to get from cache
cached_value = cache.get(cache_key)
if cached_value is not None:
return cached_value
# Call function and cache result
result = func(*args, **kwargs)
cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator
+1
View File
@@ -6,6 +6,7 @@ import mysql.connector
import logging
from flask import current_app
# ── Connection, crypto, schema init ─────────────────────────────────────────
def hash_password(password, salt=None):
if salt is None:
salt = base64.b64encode(os.urandom(16)).decode('utf-8')
-1
View File
@@ -5,4 +5,3 @@ gunicorn
requests
pyotp
qrcode[pil]
Flask-Limiter
-5620
View File
File diff suppressed because it is too large Load Diff
-70
View File
@@ -1,70 +0,0 @@
import pyotp
import qrcode
import secrets
import json
import base64
from io import BytesIO
from flask import current_app
def generate_totp_secret():
"""Generate a new TOTP secret"""
return pyotp.random_base32()
def get_totp_uri(secret, email, issuer_name="IPAM"):
"""Generate TOTP URI for QR code"""
totp = pyotp.TOTP(secret)
return totp.provisioning_uri(
name=email,
issuer_name=issuer_name
)
def generate_qr_code(uri):
"""Generate QR code image from URI"""
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format='PNG')
buffer.seek(0)
return base64.b64encode(buffer.getvalue()).decode('utf-8')
def verify_totp(secret, code):
"""Verify a TOTP code"""
if not secret or not code:
return False
try:
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1) # Allow 1 time step window for clock skew
except Exception:
return False
def generate_backup_codes(count=10):
"""Generate backup codes for 2FA"""
return [secrets.token_urlsafe(8).upper() for _ in range(count)]
def verify_backup_code(backup_codes_json, code):
"""Verify a backup code and remove it if valid"""
if not backup_codes_json or not code:
return False, None
try:
codes = json.loads(backup_codes_json)
code_upper = code.upper().strip()
if code_upper in codes:
codes.remove(code_upper)
return True, json.dumps(codes) if codes else None
return False, None
except (json.JSONDecodeError, AttributeError):
return False, None
def format_backup_codes(codes):
"""Format backup codes for display (group in pairs)"""
formatted = []
for i in range(0, len(codes), 2):
if i + 1 < len(codes):
formatted.append(f"{codes[i]} {codes[i+1]}")
else:
formatted.append(codes[i])
return formatted