Files
device_manager/radius_client/device_manager_radius.py
T

352 lines
9.8 KiB
Python

"""FreeRADIUS rlm_python bridge for remote Device Manager instances.
This standalone module calls a Frappe Device Manager API over token-authenticated
HTTP(S) and keeps a local SQLite credential cache for long-lived IoT devices when
Frappe is temporarily unavailable.
This module does NOT require Frappe or device_manager to be installed locally.
"""
from __future__ import annotations
import json
import os
import sqlite3
import time
from collections.abc import Iterable
from contextlib import closing
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
try:
import radiusd
except ImportError: # pragma: no cover - radiusd exists only inside FreeRADIUS.
radiusd = None
_cache_initialized = False
RLM_MODULE_OK = getattr(radiusd, "RLM_MODULE_OK", 2)
RLM_MODULE_REJECT = getattr(radiusd, "RLM_MODULE_REJECT", 0)
RLM_MODULE_FAIL = getattr(radiusd, "RLM_MODULE_FAIL", 1)
RLM_MODULE_NOOP = getattr(radiusd, "RLM_MODULE_NOOP", 7)
REQUEST_MAC_ATTRIBUTES = (
"Calling-Station-Id",
"TLS-Client-Cert-Common-Name",
)
USERNAME_ATTRIBUTES = ("User-Name", "Stripped-User-Name")
def _log(message: str):
if radiusd:
radiusd.radlog(radiusd.L_INFO, f"device_manager_radius: {message}")
def _error(message: str):
if radiusd:
radiusd.radlog(radiusd.L_ERR, f"device_manager_radius: {message}")
def _as_request_dict(packet: Iterable[tuple[str, str]]) -> dict[str, str]:
request = {}
for key, value in packet or ():
request.setdefault(key, value)
return request
def _get_first(request: dict[str, str], *keys: str) -> str | None:
for key in keys:
if request.get(key):
return request[key]
return None
def _remote_api_url() -> str:
explicit_url = os.environ.get("DEVICE_MANAGER_API_URL")
if explicit_url:
return explicit_url
frappe_url = (os.environ.get("DEVICE_MANAGER_FRAPPE_URL") or "").rstrip("/")
if not frappe_url:
raise RuntimeError(
"Set DEVICE_MANAGER_FRAPPE_URL or DEVICE_MANAGER_API_URL to the Frappe server URL."
)
return f"{frappe_url}/api/method/device_manager.api.radius_authorize"
def _cache_path() -> str:
return os.environ.get("DEVICE_MANAGER_CACHE_PATH") or "/var/lib/freeradius/device_manager_cache.sqlite3"
def _http_timeout() -> float:
return float(os.environ.get("DEVICE_MANAGER_HTTP_TIMEOUT") or "2.5")
def _cache_max_stale_seconds() -> int:
# 0 means cached credentials remain usable until Frappe returns a newer deny
# decision or the device-specific cache expiration date is reached.
return int(os.environ.get("DEVICE_MANAGER_CACHE_MAX_STALE_SECONDS") or "0")
def _reply_attributes_from_decision(decision: dict) -> tuple[tuple[str, str], ...]:
attributes = []
if decision.get("vlan_id"):
vlan_id = str(decision["vlan_id"])
attributes.extend(
[
("Tunnel-Type", "VLAN"),
("Tunnel-Medium-Type", "IEEE-802"),
("Tunnel-Private-Group-Id", vlan_id),
]
)
if decision.get("radius_reply_attributes"):
reply_attributes = decision["radius_reply_attributes"]
if isinstance(reply_attributes, str):
reply_attributes = json.loads(reply_attributes)
if not isinstance(reply_attributes, dict):
raise ValueError("radius_reply_attributes must be a JSON object")
for key, value in reply_attributes.items():
attributes.append((key, str(value)))
return tuple(attributes)
def _control_attributes_from_decision(decision: dict) -> tuple[tuple[str, str], ...]:
credentials = decision.get("cacheable_credentials") or {}
control_attributes = credentials.get("control_attributes") or {}
if not control_attributes:
return ()
return tuple((key, str(value)) for key, value in control_attributes.items())
def _initialize_cache():
global _cache_initialized
if _cache_initialized:
return
path = _cache_path()
parent = os.path.dirname(path)
if parent:
os.makedirs(parent, exist_ok=True)
with closing(sqlite3.connect(path)) as connection:
connection.execute(
"""
create table if not exists radius_verifier_cache (
username text primary key,
control_attributes text not null,
device text,
result text not null,
reason text,
vlan_id integer,
radius_reply_attributes text,
cache_expires_on text,
last_synced integer not null
)
"""
)
connection.commit()
_cache_initialized = True
def _cache_decision(decision: dict):
credentials = decision.get("cacheable_credentials") or {}
username = credentials.get("username")
control_attributes = credentials.get("control_attributes")
if not username:
return
_initialize_cache()
with closing(sqlite3.connect(_cache_path())) as connection:
if decision.get("result") == "Deny" or not control_attributes or not credentials.get("cache_allowed"):
connection.execute("delete from radius_verifier_cache where username = ?", (username,))
else:
connection.execute(
"""
insert into radius_verifier_cache (
username,
control_attributes,
device,
result,
reason,
vlan_id,
radius_reply_attributes,
cache_expires_on,
last_synced
)
values (?, ?, ?, ?, ?, ?, ?, ?, ?)
on conflict(username) do update set
control_attributes = excluded.control_attributes,
device = excluded.device,
result = excluded.result,
reason = excluded.reason,
vlan_id = excluded.vlan_id,
radius_reply_attributes = excluded.radius_reply_attributes,
cache_expires_on = excluded.cache_expires_on,
last_synced = excluded.last_synced
""",
(
username,
json.dumps(control_attributes, sort_keys=True),
decision.get("device"),
decision.get("result") or "Allow",
decision.get("reason"),
decision.get("vlan_id"),
decision.get("radius_reply_attributes"),
credentials.get("cache_expires_on"),
int(time.time()),
),
)
connection.commit()
def _cached_decision(username: str | None) -> dict | None:
if not username:
return None
_initialize_cache()
with closing(sqlite3.connect(_cache_path())) as connection:
connection.row_factory = sqlite3.Row
row = connection.execute(
"select * from radius_verifier_cache where username = ?",
(username,),
).fetchone()
if not row:
return None
now = int(time.time())
max_stale = _cache_max_stale_seconds()
if max_stale and now - int(row["last_synced"]) > max_stale:
return None
if row["cache_expires_on"]:
expiry = time.strptime(row["cache_expires_on"], "%Y-%m-%d")
if time.mktime(expiry) < now:
return None
return {
"event": None,
"decision": None,
"device": row["device"],
"result": row["result"],
"reason": row["reason"] or "Frappe unavailable; using cached static RADIUS credentials.",
"network_segment": None,
"vlan_id": row["vlan_id"],
"radius_reply_attributes": row["radius_reply_attributes"],
"cacheable_credentials": {
"username": row["username"],
"control_attributes": json.loads(row["control_attributes"]),
"cache_expires_on": row["cache_expires_on"],
},
"from_cache": True,
}
def _evaluate_remotely(request: dict[str, str]) -> dict:
api_url = _remote_api_url()
api_key = os.environ.get("DEVICE_MANAGER_API_KEY")
api_secret = os.environ.get("DEVICE_MANAGER_API_SECRET")
if not api_key or not api_secret:
raise RuntimeError("Set DEVICE_MANAGER_API_KEY and DEVICE_MANAGER_API_SECRET for authentication.")
payload = urlencode(
{
"calling_station_id": _get_first(request, *REQUEST_MAC_ATTRIBUTES) or "",
"username": _get_first(request, *USERNAME_ATTRIBUTES) or "",
"nas_identifier": request.get("NAS-Identifier") or "",
"nas_ip_address": request.get("NAS-IP-Address") or "",
"ssid": _get_first(request, "Called-Station-SSID", "WLAN-SSID") or "",
"raw_request": json.dumps(request, sort_keys=True),
}
).encode()
http_request = Request(
api_url,
data=payload,
headers={
"Authorization": f"token {api_key}:{api_secret}",
"Content-Type": "application/x-www-form-urlencoded",
},
method="POST",
)
with urlopen(http_request, timeout=_http_timeout()) as response:
response_payload = json.loads(response.read().decode())
return response_payload.get("message") or response_payload
def instantiate(_config):
try:
_initialize_cache()
api_url = _remote_api_url()
_log(f"initialized remote Device Manager mode: {api_url}")
_log("SQLite credential cache enabled for offline fallback")
except Exception as exc:
_error(f"failed to initialize: {exc}")
return RLM_MODULE_FAIL
return RLM_MODULE_OK
def authorize(packet):
return _evaluate_packet(packet, allow_cache_fallback=True)
def post_auth(packet):
if os.environ.get("DEVICE_MANAGER_POST_AUTH_EVALUATE") == "1":
return _evaluate_packet(packet, allow_cache_fallback=False)
return RLM_MODULE_NOOP
def authenticate(_packet):
# EAP/password authentication remains owned by FreeRADIUS. Device Manager
# contributes static credential material, authorization, and segmentation.
return RLM_MODULE_NOOP
def detach():
return RLM_MODULE_OK
def _evaluate_packet(packet, *, allow_cache_fallback: bool):
request = _as_request_dict(packet)
username = _get_first(request, *USERNAME_ATTRIBUTES)
try:
decision = _evaluate_remotely(request)
_cache_decision(decision)
except (HTTPError, URLError, TimeoutError, OSError, RuntimeError) as exc:
if not allow_cache_fallback:
_error(f"authorization failed: {exc}")
return RLM_MODULE_FAIL
decision = _cached_decision(username)
if not decision:
_error(
f"authorization failed and no cached credentials matched {username or '<missing username>'}: {exc}"
)
return RLM_MODULE_FAIL
_log(f"using cached credentials for {username}")
except Exception as exc:
_error(f"authorization failed: {exc}")
return RLM_MODULE_FAIL
try:
reply = _reply_attributes_from_decision(decision)
control = _control_attributes_from_decision(decision)
except Exception as exc:
_error(f"failed to build RADIUS attributes: {exc}")
return RLM_MODULE_FAIL
if decision["result"] == "Deny":
return RLM_MODULE_REJECT, reply, control
return RLM_MODULE_OK, reply, control