feat: Implement first pass at Radius Device Management suite

This commit is contained in:
UGAIF
2026-06-17 16:20:24 +00:00
parent 1a77266a2f
commit 5c8fd52e1b
39 changed files with 1149 additions and 2 deletions
+36
View File
@@ -2,6 +2,42 @@
Device Manager is a device registration and access management portal for laboratory, research, and institutional networks. It enables users to register and manage devices, supports RADIUS-based authentication workflows, and facilitates secure network access through WPA3-Enterprise authorization policies and credential management.
### FreeRADIUS module
Device Manager includes an `rlm_python` bridge at `device_manager.freeradius`. FreeRADIUS stays responsible for EAP/password authentication; this module evaluates registered device identity, records the RADIUS request, creates an auditable access decision, and returns VLAN or reply attributes for allow, quarantine, or reject outcomes.
Set these environment variables for the FreeRADIUS service:
```bash
DEVICE_MANAGER_BENCH_PATH=/home/frappe/frappe-bench
DEVICE_MANAGER_SITE=your-site-name
```
Example `mods-available/python3` module stanza:
```text
python3 device_manager {
module = device_manager.freeradius
instantiate = ${.module}
authorize = ${.module}
post_auth = ${.module}
}
```
Then call the module from the relevant virtual server:
```text
authorize {
device_manager
}
post-auth {
device_manager
}
```
The module reads common request attributes such as `Calling-Station-Id`, `User-Name`, `NAS-Identifier`, `NAS-IP-Address`, and SSID attributes, then writes `DM Radius Auth Event`, `DM Access Decision`, and `DM Device Audit Event` records.
### Installation
You can install this app using the [bench](https://github.com/frappe/bench) CLI:
+36
View File
@@ -0,0 +1,36 @@
import frappe
from device_manager.lifecycle import create_device_from_registration, reject_registration, transition_device
from device_manager.radius import record_radius_auth_event
@frappe.whitelist()
def approve_registration(registration_name: str):
return create_device_from_registration(registration_name)
@frappe.whitelist()
def reject_device_registration(registration_name: str, reason: str | None = None):
return reject_registration(registration_name, reason=reason)
@frappe.whitelist()
def quarantine_device(device_name: str, reason: str | None = None):
return transition_device(device_name, "Quarantined", reason=reason)
@frappe.whitelist(allow_guest=True)
def radius_authorize(
calling_station_id: str | None = None,
username: str | None = None,
nas_identifier: str | None = None,
nas_ip_address: str | None = None,
ssid: str | None = None,
):
return record_radius_auth_event(
calling_station_id=calling_station_id,
username=username,
nas_identifier=nas_identifier,
nas_ip_address=nas_ip_address,
ssid=ssid,
)
+33
View File
@@ -0,0 +1,33 @@
import json
import frappe
def emit_audit_event(
event_type: str,
subject_doctype: str,
subject_name: str,
*,
device: str | None = None,
actor: str | None = None,
decision: str | None = None,
reason: str | None = None,
payload: dict | None = None,
commit: bool = False,
):
event = frappe.new_doc("DM Device Audit Event")
event.event_type = event_type
event.subject_doctype = subject_doctype
event.subject_name = subject_name
event.device = device or (subject_name if subject_doctype == "DM Device" else None)
event.actor = actor or frappe.session.user
event.decision = decision
event.reason = reason
if payload:
event.payload = json.dumps(payload, sort_keys=True, default=str)
event.insert(ignore_permissions=True)
if commit:
frappe.db.commit()
return event.name
+40
View File
@@ -0,0 +1,40 @@
DEVICE_STATUSES = (
"Draft",
"Pending Approval",
"Approved",
"Active",
"Suspended",
"Quarantined",
"Retired",
)
AUTHORIZATION_STATUSES = (
"Not Requested",
"Pending",
"Authorized",
"Denied",
"Expired",
"Revoked",
)
ACCESS_RESULTS = ("Allow", "Deny", "Quarantine")
DEVICE_TYPES = (
"Personal Computer",
"Laboratory Equipment",
"IoT Device",
"Sensor",
"Server",
"Other",
)
IDENTIFIER_TYPES = ("MAC Address", "Serial Number", "Certificate Subject", "Asset Tag", "Hostname", "Other")
NSF_CICI_DATASET_USE_CATEGORIES = (
"None",
"Collection",
"Processing",
"Storage",
"Transmission",
"Analysis",
)
@@ -0,0 +1 @@
@@ -0,0 +1,31 @@
{
"actions": [],
"autoname": "DM-DEC-.#####",
"doctype": "DocType",
"document_type": "Document",
"engine": "InnoDB",
"field_order": ["device", "radius_auth_event", "mac_address", "decision", "reason", "access_policy", "network_segment", "vlan_id", "radius_reply_attributes"],
"fields": [
{"fieldname": "device", "fieldtype": "Link", "in_list_view": 1, "label": "Device", "options": "DM Device", "read_only": 1},
{"fieldname": "radius_auth_event", "fieldtype": "Link", "label": "RADIUS Auth Event", "options": "DM Radius Auth Event", "read_only": 1},
{"fieldname": "mac_address", "fieldtype": "Data", "in_list_view": 1, "label": "MAC Address", "read_only": 1},
{"fieldname": "decision", "fieldtype": "Select", "in_list_view": 1, "label": "Decision", "options": "Allow\nDeny\nQuarantine", "read_only": 1},
{"fieldname": "reason", "fieldtype": "Small Text", "label": "Reason", "read_only": 1},
{"fieldname": "access_policy", "fieldtype": "Link", "label": "Access Policy", "options": "DM Access Policy", "read_only": 1},
{"fieldname": "network_segment", "fieldtype": "Link", "label": "Network Segment", "options": "DM Network Segment", "read_only": 1},
{"fieldname": "vlan_id", "fieldtype": "Int", "label": "VLAN ID", "read_only": 1},
{"fieldname": "radius_reply_attributes", "fieldtype": "JSON", "label": "RADIUS Reply Attributes", "read_only": 1}
],
"grid_page_length": 50,
"links": [],
"module": "Device Manager",
"name": "DM Access Decision",
"permissions": [
{"email": 1, "export": 1, "print": 1, "read": 1, "report": 1, "role": "System Manager"}
],
"row_format": "Dynamic",
"sort_field": "creation",
"sort_order": "DESC",
"states": [],
"track_changes": 1
}
@@ -0,0 +1,5 @@
from frappe.model.document import Document
class DMAccessDecision(Document):
pass
@@ -0,0 +1,34 @@
{
"actions": [],
"allow_rename": 1,
"autoname": "field:policy_name",
"doctype": "DocType",
"document_type": "Document",
"engine": "InnoDB",
"field_order": ["policy_name", "enabled", "priority", "access_result", "network_segment", "criteria_section", "target_device_type", "target_lifecycle_status", "target_authorization_status", "requires_active_dataset_authorization", "notes"],
"fields": [
{"fieldname": "policy_name", "fieldtype": "Data", "in_list_view": 1, "label": "Policy Name", "reqd": 1, "unique": 1},
{"default": "1", "fieldname": "enabled", "fieldtype": "Check", "in_list_view": 1, "label": "Enabled"},
{"default": "100", "fieldname": "priority", "fieldtype": "Int", "in_list_view": 1, "label": "Priority"},
{"default": "Allow", "fieldname": "access_result", "fieldtype": "Select", "in_list_view": 1, "label": "Access Result", "options": "Allow\nDeny\nQuarantine", "reqd": 1},
{"fieldname": "network_segment", "fieldtype": "Link", "in_list_view": 1, "label": "Network Segment", "options": "DM Network Segment"},
{"fieldname": "criteria_section", "fieldtype": "Section Break", "label": "Criteria"},
{"default": "Any", "fieldname": "target_device_type", "fieldtype": "Select", "label": "Target Device Type", "options": "Any\nPersonal Computer\nLaboratory Equipment\nIoT Device\nSensor\nServer\nOther"},
{"default": "Any", "fieldname": "target_lifecycle_status", "fieldtype": "Select", "label": "Target Lifecycle Status", "options": "Any\nDraft\nPending Approval\nApproved\nActive\nSuspended\nQuarantined\nRetired"},
{"default": "Any", "fieldname": "target_authorization_status", "fieldtype": "Select", "label": "Target Authorization Status", "options": "Any\nNot Requested\nPending\nAuthorized\nDenied\nExpired\nRevoked"},
{"default": "0", "fieldname": "requires_active_dataset_authorization", "fieldtype": "Check", "label": "Requires Active Dataset Authorization"},
{"fieldname": "notes", "fieldtype": "Small Text", "label": "Notes"}
],
"grid_page_length": 50,
"links": [],
"module": "Device Manager",
"name": "DM Access Policy",
"permissions": [
{"create": 1, "delete": 1, "email": 1, "export": 1, "print": 1, "read": 1, "report": 1, "role": "System Manager", "share": 1, "write": 1}
],
"row_format": "Dynamic",
"sort_field": "priority",
"sort_order": "ASC",
"states": [],
"track_changes": 1
}
@@ -0,0 +1,5 @@
from frappe.model.document import Document
class DMAccessPolicy(Document):
pass
@@ -0,0 +1,30 @@
{
"actions": [],
"autoname": "DM-DATASET-AUTH-.#####",
"doctype": "DocType",
"document_type": "Document",
"engine": "InnoDB",
"field_order": ["device", "status", "dataset_name", "authorized_use", "authorized_by", "authorization_starts_on", "authorization_expires_on", "conditions"],
"fields": [
{"fieldname": "device", "fieldtype": "Link", "in_list_view": 1, "label": "Device", "options": "DM Device", "reqd": 1},
{"default": "Draft", "fieldname": "status", "fieldtype": "Select", "in_list_view": 1, "label": "Status", "options": "Draft\nActive\nExpired\nRevoked", "reqd": 1},
{"fieldname": "dataset_name", "fieldtype": "Data", "in_list_view": 1, "label": "Dataset Name", "reqd": 1},
{"fieldname": "authorized_use", "fieldtype": "Small Text", "label": "Authorized Use", "reqd": 1},
{"fieldname": "authorized_by", "fieldtype": "Link", "label": "Authorized By", "options": "User"},
{"fieldname": "authorization_starts_on", "fieldtype": "Date", "label": "Authorization Starts On"},
{"fieldname": "authorization_expires_on", "fieldtype": "Date", "label": "Authorization Expires On"},
{"fieldname": "conditions", "fieldtype": "Small Text", "label": "Conditions"}
],
"grid_page_length": 50,
"links": [],
"module": "Device Manager",
"name": "DM Dataset Authorization",
"permissions": [
{"create": 1, "delete": 1, "email": 1, "export": 1, "print": 1, "read": 1, "report": 1, "role": "System Manager", "share": 1, "write": 1}
],
"row_format": "Dynamic",
"sort_field": "modified",
"sort_order": "DESC",
"states": [],
"track_changes": 1
}
@@ -0,0 +1,5 @@
from frappe.model.document import Document
class DMDatasetAuthorization(Document):
pass
@@ -0,0 +1 @@
@@ -0,0 +1,65 @@
{
"actions": [],
"allow_rename": 1,
"autoname": "DM-DEV-.#####",
"doctype": "DocType",
"document_type": "Document",
"engine": "InnoDB",
"field_order": [
"identity_section",
"device_label",
"device_type",
"primary_mac_address",
"column_break_identity",
"lifecycle_status",
"authorization_status",
"owner_user",
"ownership_section",
"project",
"laboratory",
"organizational_unit",
"dataset_use_category",
"dataset_authorization",
"network_section",
"network_segment",
"last_seen_on",
"risk_notes",
"identifiers_section",
"identifiers"
],
"fields": [
{"fieldname": "identity_section", "fieldtype": "Section Break", "label": "Identity"},
{"fieldname": "device_label", "fieldtype": "Data", "in_list_view": 1, "label": "Device Label", "reqd": 1},
{"fieldname": "device_type", "fieldtype": "Select", "in_list_view": 1, "label": "Device Type", "options": "Personal Computer\nLaboratory Equipment\nIoT Device\nSensor\nServer\nOther", "reqd": 1},
{"fieldname": "primary_mac_address", "fieldtype": "Data", "in_list_view": 1, "label": "Primary MAC Address"},
{"fieldname": "column_break_identity", "fieldtype": "Column Break"},
{"default": "Draft", "fieldname": "lifecycle_status", "fieldtype": "Select", "in_list_view": 1, "label": "Lifecycle Status", "options": "Draft\nPending Approval\nApproved\nActive\nSuspended\nQuarantined\nRetired", "reqd": 1},
{"default": "Not Requested", "fieldname": "authorization_status", "fieldtype": "Select", "in_list_view": 1, "label": "Authorization Status", "options": "Not Requested\nPending\nAuthorized\nDenied\nExpired\nRevoked", "reqd": 1},
{"fieldname": "owner_user", "fieldtype": "Link", "label": "Owner User", "options": "User"},
{"fieldname": "ownership_section", "fieldtype": "Section Break", "label": "Ownership and Research Context"},
{"fieldname": "project", "fieldtype": "Data", "in_list_view": 1, "label": "Project"},
{"fieldname": "laboratory", "fieldtype": "Data", "in_list_view": 1, "label": "Laboratory"},
{"fieldname": "organizational_unit", "fieldtype": "Data", "label": "Organizational Unit"},
{"default": "None", "fieldname": "dataset_use_category", "fieldtype": "Select", "label": "NSF CICI Dataset Use Category", "options": "None\nCollection\nProcessing\nStorage\nTransmission\nAnalysis"},
{"fieldname": "dataset_authorization", "fieldtype": "Link", "label": "Current Dataset Authorization", "options": "DM Dataset Authorization", "read_only": 1},
{"fieldname": "network_section", "fieldtype": "Section Break", "label": "Network Access"},
{"fieldname": "network_segment", "fieldtype": "Link", "label": "Preferred Network Segment", "options": "DM Network Segment"},
{"fieldname": "last_seen_on", "fieldtype": "Datetime", "label": "Last Seen On", "read_only": 1},
{"fieldname": "risk_notes", "fieldtype": "Small Text", "label": "Risk Notes"},
{"fieldname": "identifiers_section", "fieldtype": "Section Break", "label": "Identifiers"},
{"fieldname": "identifiers", "fieldtype": "Table", "label": "Identifiers", "options": "DM Device Identifier"}
],
"grid_page_length": 50,
"index_web_pages_for_search": 1,
"links": [],
"module": "Device Manager",
"name": "DM Device",
"permissions": [
{"create": 1, "delete": 1, "email": 1, "export": 1, "print": 1, "read": 1, "report": 1, "role": "System Manager", "share": 1, "write": 1}
],
"row_format": "Dynamic",
"sort_field": "modified",
"sort_order": "DESC",
"states": [],
"track_changes": 1
}
@@ -0,0 +1,27 @@
import frappe
from frappe import _
from frappe.model.document import Document
from device_manager.constants import AUTHORIZATION_STATUSES, DEVICE_STATUSES
from device_manager.identity import append_identifier, normalize_mac_address
class DMDevice(Document):
def validate(self):
if self.lifecycle_status not in DEVICE_STATUSES:
frappe.throw(_("Unsupported lifecycle status: {0}").format(self.lifecycle_status))
if self.authorization_status not in AUTHORIZATION_STATUSES:
frappe.throw(_("Unsupported authorization status: {0}").format(self.authorization_status))
self.primary_mac_address = normalize_mac_address(self.primary_mac_address)
if self.primary_mac_address:
append_identifier(self, "MAC Address", self.primary_mac_address, is_primary=True)
seen = set()
for row in self.get("identifiers") or []:
if row.identifier_type == "MAC Address":
row.identifier_value = normalize_mac_address(row.identifier_value)
key = (row.identifier_type, row.identifier_value)
if key in seen:
frappe.throw(_("Duplicate device identifier: {0} {1}").format(row.identifier_type, row.identifier_value))
seen.add(key)
@@ -0,0 +1,30 @@
{
"actions": [],
"autoname": "DM-AUDIT-.#####",
"doctype": "DocType",
"document_type": "Document",
"engine": "InnoDB",
"field_order": ["event_type", "subject_doctype", "subject_name", "device", "actor", "decision", "reason", "payload"],
"fields": [
{"fieldname": "event_type", "fieldtype": "Data", "in_list_view": 1, "label": "Event Type", "read_only": 1},
{"fieldname": "subject_doctype", "fieldtype": "Data", "label": "Subject DocType", "read_only": 1},
{"fieldname": "subject_name", "fieldtype": "Dynamic Link", "label": "Subject", "options": "subject_doctype", "read_only": 1},
{"fieldname": "device", "fieldtype": "Link", "in_list_view": 1, "label": "Device", "options": "DM Device", "read_only": 1},
{"fieldname": "actor", "fieldtype": "Link", "label": "Actor", "options": "User", "read_only": 1},
{"fieldname": "decision", "fieldtype": "Data", "in_list_view": 1, "label": "Decision", "read_only": 1},
{"fieldname": "reason", "fieldtype": "Small Text", "label": "Reason", "read_only": 1},
{"fieldname": "payload", "fieldtype": "JSON", "label": "Payload", "read_only": 1}
],
"grid_page_length": 50,
"links": [],
"module": "Device Manager",
"name": "DM Device Audit Event",
"permissions": [
{"email": 1, "export": 1, "print": 1, "read": 1, "report": 1, "role": "System Manager"}
],
"row_format": "Dynamic",
"sort_field": "creation",
"sort_order": "DESC",
"states": [],
"track_changes": 1
}
@@ -0,0 +1,5 @@
from frappe.model.document import Document
class DMDeviceAuditEvent(Document):
pass
@@ -0,0 +1,24 @@
{
"actions": [],
"doctype": "DocType",
"document_type": "Document",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": ["identifier_type", "identifier_value", "is_primary", "verified_on"],
"fields": [
{"fieldname": "identifier_type", "fieldtype": "Select", "in_list_view": 1, "label": "Identifier Type", "options": "MAC Address\nSerial Number\nCertificate Subject\nAsset Tag\nHostname\nOther", "reqd": 1},
{"fieldname": "identifier_value", "fieldtype": "Data", "in_list_view": 1, "label": "Identifier Value", "reqd": 1},
{"default": "0", "fieldname": "is_primary", "fieldtype": "Check", "in_list_view": 1, "label": "Primary"},
{"fieldname": "verified_on", "fieldtype": "Datetime", "label": "Verified On"}
],
"grid_page_length": 50,
"istable": 1,
"links": [],
"module": "Device Manager",
"name": "DM Device Identifier",
"permissions": [],
"row_format": "Dynamic",
"sort_field": "modified",
"sort_order": "DESC",
"states": []
}
@@ -0,0 +1,5 @@
from frappe.model.document import Document
class DMDeviceIdentifier(Document):
pass
@@ -0,0 +1,61 @@
{
"actions": [],
"allow_rename": 1,
"autoname": "DM-REG-.#####",
"doctype": "DocType",
"document_type": "Document",
"engine": "InnoDB",
"field_order": [
"request_section",
"status",
"requested_by",
"device_label",
"device_type",
"primary_mac_address",
"hostname",
"serial_number",
"research_context_section",
"project",
"laboratory",
"organizational_unit",
"dataset_use_category",
"justification",
"approval_section",
"approved_by",
"approved_device",
"approval_notes"
],
"fields": [
{"fieldname": "request_section", "fieldtype": "Section Break", "label": "Request"},
{"default": "Submitted", "fieldname": "status", "fieldtype": "Select", "in_list_view": 1, "label": "Status", "options": "Draft\nSubmitted\nUnder Review\nApproved\nRejected\nCancelled", "reqd": 1},
{"fieldname": "requested_by", "fieldtype": "Link", "in_list_view": 1, "label": "Requested By", "options": "User", "reqd": 1},
{"fieldname": "device_label", "fieldtype": "Data", "in_list_view": 1, "label": "Device Label", "reqd": 1},
{"fieldname": "device_type", "fieldtype": "Select", "label": "Device Type", "options": "Personal Computer\nLaboratory Equipment\nIoT Device\nSensor\nServer\nOther", "reqd": 1},
{"fieldname": "primary_mac_address", "fieldtype": "Data", "in_list_view": 1, "label": "Primary MAC Address", "reqd": 1},
{"fieldname": "hostname", "fieldtype": "Data", "label": "Hostname"},
{"fieldname": "serial_number", "fieldtype": "Data", "label": "Serial Number"},
{"fieldname": "research_context_section", "fieldtype": "Section Break", "label": "Research Context"},
{"fieldname": "project", "fieldtype": "Data", "label": "Project"},
{"fieldname": "laboratory", "fieldtype": "Data", "label": "Laboratory"},
{"fieldname": "organizational_unit", "fieldtype": "Data", "label": "Organizational Unit"},
{"default": "None", "fieldname": "dataset_use_category", "fieldtype": "Select", "label": "NSF CICI Dataset Use Category", "options": "None\nCollection\nProcessing\nStorage\nTransmission\nAnalysis"},
{"fieldname": "justification", "fieldtype": "Small Text", "label": "Justification"},
{"fieldname": "approval_section", "fieldtype": "Section Break", "label": "Approval"},
{"fieldname": "approved_by", "fieldtype": "Link", "label": "Approved By", "options": "User", "read_only": 1},
{"fieldname": "approved_device", "fieldtype": "Link", "label": "Approved Device", "options": "DM Device", "read_only": 1},
{"fieldname": "approval_notes", "fieldtype": "Small Text", "label": "Approval Notes"}
],
"grid_page_length": 50,
"index_web_pages_for_search": 1,
"links": [],
"module": "Device Manager",
"name": "DM Device Registration",
"permissions": [
{"create": 1, "delete": 1, "email": 1, "export": 1, "print": 1, "read": 1, "report": 1, "role": "System Manager", "share": 1, "write": 1}
],
"row_format": "Dynamic",
"sort_field": "creation",
"sort_order": "DESC",
"states": [],
"track_changes": 1
}
@@ -0,0 +1,18 @@
import frappe
from frappe import _
from frappe.model.document import Document
from device_manager.identity import find_device_by_identifier, normalize_mac_address
class DMDeviceRegistration(Document):
def validate(self):
self.primary_mac_address = normalize_mac_address(self.primary_mac_address)
if self.primary_mac_address:
existing_device = find_device_by_identifier("MAC Address", self.primary_mac_address)
if existing_device and existing_device != self.approved_device:
frappe.throw(_("A device already exists for MAC address {0}: {1}").format(self.primary_mac_address, existing_device))
if self.status == "Draft":
self.status = "Submitted"
@@ -0,0 +1,28 @@
{
"actions": [],
"allow_rename": 1,
"autoname": "field:segment_name",
"doctype": "DocType",
"document_type": "Document",
"engine": "InnoDB",
"field_order": ["segment_name", "vlan_id", "is_quarantine_segment", "radius_reply_attributes", "description"],
"fields": [
{"fieldname": "segment_name", "fieldtype": "Data", "in_list_view": 1, "label": "Segment Name", "reqd": 1, "unique": 1},
{"fieldname": "vlan_id", "fieldtype": "Int", "in_list_view": 1, "label": "VLAN ID"},
{"default": "0", "fieldname": "is_quarantine_segment", "fieldtype": "Check", "in_list_view": 1, "label": "Quarantine Segment"},
{"fieldname": "radius_reply_attributes", "fieldtype": "JSON", "label": "RADIUS Reply Attributes"},
{"fieldname": "description", "fieldtype": "Small Text", "label": "Description"}
],
"grid_page_length": 50,
"links": [],
"module": "Device Manager",
"name": "DM Network Segment",
"permissions": [
{"create": 1, "delete": 1, "email": 1, "export": 1, "print": 1, "read": 1, "report": 1, "role": "System Manager", "share": 1, "write": 1}
],
"row_format": "Dynamic",
"sort_field": "modified",
"sort_order": "DESC",
"states": [],
"track_changes": 1
}
@@ -0,0 +1,16 @@
import json
import frappe
from frappe import _
from frappe.model.document import Document
class DMNetworkSegment(Document):
def validate(self):
if self.vlan_id is not None and (self.vlan_id < 1 or self.vlan_id > 4094):
frappe.throw(_("VLAN ID must be between 1 and 4094."))
if self.radius_reply_attributes:
try:
json.loads(self.radius_reply_attributes)
except ValueError:
frappe.throw(_("RADIUS reply attributes must be valid JSON."))
@@ -0,0 +1,31 @@
{
"actions": [],
"autoname": "DM-RADIUS-.#####",
"doctype": "DocType",
"document_type": "Document",
"engine": "InnoDB",
"field_order": ["device", "calling_station_id", "username", "nas_identifier", "nas_ip_address", "ssid", "result", "access_decision", "raw_request"],
"fields": [
{"fieldname": "device", "fieldtype": "Link", "in_list_view": 1, "label": "Device", "options": "DM Device", "read_only": 1},
{"fieldname": "calling_station_id", "fieldtype": "Data", "in_list_view": 1, "label": "Calling Station ID", "read_only": 1},
{"fieldname": "username", "fieldtype": "Data", "in_list_view": 1, "label": "Username", "read_only": 1},
{"fieldname": "nas_identifier", "fieldtype": "Data", "label": "NAS Identifier", "read_only": 1},
{"fieldname": "nas_ip_address", "fieldtype": "Data", "label": "NAS IP Address", "read_only": 1},
{"fieldname": "ssid", "fieldtype": "Data", "label": "SSID", "read_only": 1},
{"fieldname": "result", "fieldtype": "Select", "in_list_view": 1, "label": "Result", "options": "\nAllow\nDeny\nQuarantine", "read_only": 1},
{"fieldname": "access_decision", "fieldtype": "Link", "label": "Access Decision", "options": "DM Access Decision", "read_only": 1},
{"fieldname": "raw_request", "fieldtype": "JSON", "label": "Raw Request", "read_only": 1}
],
"grid_page_length": 50,
"links": [],
"module": "Device Manager",
"name": "DM Radius Auth Event",
"permissions": [
{"email": 1, "export": 1, "print": 1, "read": 1, "report": 1, "role": "System Manager"}
],
"row_format": "Dynamic",
"sort_field": "creation",
"sort_order": "DESC",
"states": [],
"track_changes": 1
}
@@ -0,0 +1,5 @@
from frappe.model.document import Document
class DMRadiusAuthEvent(Document):
pass
+168
View File
@@ -0,0 +1,168 @@
"""FreeRADIUS rlm_python bridge for Device Manager.
Configure FreeRADIUS to import this module from the bench app path. The module
boots Frappe lazily, records each request, evaluates the registered device, and
returns RADIUS reply/control attributes for allow, quarantine, or reject.
"""
from __future__ import annotations
import os
from contextlib import suppress
from typing import Iterable
try:
import radiusd
except ImportError: # pragma: no cover - radiusd exists only inside FreeRADIUS.
radiusd = None
_frappe_initialized = False
RLM_MODULE_OK = getattr(radiusd, "RLM_MODULE_OK", 2)
RLM_MODULE_REJECT = getattr(radiusd, "RLM_MODULE_REJECT", 0)
RLM_MODULE_FAIL = getattr(radiusd, "RLM_MODULE_FAIL", 1)
RLM_MODULE_NOOP = getattr(radiusd, "RLM_MODULE_NOOP", 7)
REQUEST_MAC_ATTRIBUTES = (
"Calling-Station-Id",
"TLS-Client-Cert-Common-Name",
"Stripped-User-Name",
"User-Name",
)
def _log(message: str):
if radiusd:
radiusd.radlog(radiusd.L_INFO, f"device_manager: {message}")
def _error(message: str):
if radiusd:
radiusd.radlog(radiusd.L_ERR, f"device_manager: {message}")
def _as_request_dict(packet: Iterable[tuple[str, str]]) -> dict[str, str]:
request = {}
for key, value in packet or ():
request.setdefault(key, value)
return request
def _reply_attributes_from_decision(decision: dict) -> tuple[tuple[str, str], ...]:
attributes = []
if decision.get("vlan_id"):
vlan_id = str(decision["vlan_id"])
attributes.extend(
[
("Tunnel-Type", "VLAN"),
("Tunnel-Medium-Type", "IEEE-802"),
("Tunnel-Private-Group-Id", vlan_id),
]
)
if decision.get("radius_reply_attributes"):
import json
reply_attributes = json.loads(decision["radius_reply_attributes"])
if not isinstance(reply_attributes, dict):
raise ValueError("radius_reply_attributes must be a JSON object")
for key, value in reply_attributes.items():
attributes.append((key, str(value)))
return tuple(attributes)
def _get_first(request: dict[str, str], *keys: str) -> str | None:
for key in keys:
if request.get(key):
return request[key]
return None
def _bootstrap_frappe():
global _frappe_initialized
if _frappe_initialized:
return
bench_path = os.environ.get("DEVICE_MANAGER_BENCH_PATH") or os.environ.get("FRAPPE_BENCH_PATH")
site = os.environ.get("DEVICE_MANAGER_SITE") or os.environ.get("FRAPPE_SITE")
if not site:
raise RuntimeError("Set DEVICE_MANAGER_SITE or FRAPPE_SITE for the FreeRADIUS Device Manager module.")
if bench_path:
import sys
apps_path = os.path.join(bench_path, "apps")
for app_name in ("frappe", "device_manager"):
app_path = os.path.join(apps_path, app_name)
if app_path not in sys.path:
sys.path.insert(0, app_path)
import frappe
frappe.init(site=site, sites_path=os.path.join(bench_path, "sites") if bench_path else None)
frappe.connect()
_frappe_initialized = True
_log(f"connected to Frappe site {site}")
def instantiate(_config):
try:
_bootstrap_frappe()
except Exception as exc:
_error(f"failed to initialize Frappe: {exc}")
return RLM_MODULE_FAIL
return RLM_MODULE_OK
def authorize(packet):
return _evaluate_packet(packet)
def post_auth(packet):
return _evaluate_packet(packet)
def authenticate(_packet):
# EAP/password authentication remains owned by FreeRADIUS. Device Manager
# contributes device authorization and segmentation decisions.
return RLM_MODULE_NOOP
def detach():
if not _frappe_initialized:
return RLM_MODULE_OK
with suppress(Exception):
import frappe
frappe.destroy()
return RLM_MODULE_OK
def _evaluate_packet(packet):
try:
_bootstrap_frappe()
request = _as_request_dict(packet)
calling_station_id = _get_first(request, *REQUEST_MAC_ATTRIBUTES)
from device_manager.radius import record_radius_auth_event
decision = record_radius_auth_event(
calling_station_id=calling_station_id,
username=_get_first(request, "User-Name", "Stripped-User-Name"),
nas_identifier=request.get("NAS-Identifier"),
nas_ip_address=request.get("NAS-IP-Address"),
ssid=_get_first(request, "Called-Station-SSID", "WLAN-SSID"),
raw_request=request,
)
except Exception as exc:
_error(f"authorization failed: {exc}")
return RLM_MODULE_FAIL
reply = _reply_attributes_from_decision(decision)
if decision["result"] == "Deny":
return RLM_MODULE_REJECT, reply, ()
return RLM_MODULE_OK, reply, ()
+1 -2
View File
@@ -86,7 +86,7 @@ app_license = "apache-2.0"
# ------------
# before_install = "device_manager.install.before_install"
# after_install = "device_manager.install.after_install"
after_install = "device_manager.install.after_install"
# Uninstallation
# ------------
@@ -255,4 +255,3 @@ app_license = "apache-2.0"
# ------------
# List of apps whose translatable strings should be excluded from this app's translations.
# ignore_translatable_strings_from = []
+57
View File
@@ -0,0 +1,57 @@
import re
import frappe
from frappe import _
MAC_RE = re.compile(r"^[0-9a-f]{12}$")
def normalize_mac_address(mac_address: str | None) -> str:
if not mac_address:
return ""
value = re.sub(r"[^0-9A-Fa-f]", "", mac_address).lower()
if not value:
return ""
if not MAC_RE.match(value):
frappe.throw(_("MAC address must contain exactly 12 hexadecimal characters."))
return ":".join(value[index : index + 2] for index in range(0, 12, 2))
def find_device_by_identifier(identifier_type: str, identifier_value: str):
if not identifier_type or not identifier_value:
return None
if identifier_type == "MAC Address":
identifier_value = normalize_mac_address(identifier_value)
row = frappe.db.get_value(
"DM Device Identifier",
{"identifier_type": identifier_type, "identifier_value": identifier_value},
["parent", "name"],
as_dict=True,
)
return row.parent if row else None
def append_identifier(device_doc, identifier_type: str, identifier_value: str, is_primary: bool = False):
if identifier_type == "MAC Address":
identifier_value = normalize_mac_address(identifier_value)
if not identifier_value:
return None
for row in device_doc.get("identifiers") or []:
if row.identifier_type == identifier_type and row.identifier_value == identifier_value:
if is_primary:
row.is_primary = 1
return row
return device_doc.append(
"identifiers",
{
"identifier_type": identifier_type,
"identifier_value": identifier_value,
"is_primary": 1 if is_primary else 0,
},
)
+39
View File
@@ -0,0 +1,39 @@
import frappe
def after_install():
_create_network_segment(
segment_name="Quarantine",
vlan_id=999,
is_quarantine_segment=1,
description="Default restricted network for unknown, suspended, or denied devices.",
)
_create_network_segment(
segment_name="Research Devices",
vlan_id=100,
description="Default managed segment for approved research and laboratory devices.",
)
_create_access_policy(
policy_name="Approved Authorized Devices",
priority=100,
access_result="Allow",
network_segment="Research Devices",
target_lifecycle_status="Active",
target_authorization_status="Authorized",
)
def _create_network_segment(**values):
if frappe.db.exists("DM Network Segment", values["segment_name"]):
return
doc = frappe.new_doc("DM Network Segment")
doc.update(values)
doc.insert(ignore_permissions=True)
def _create_access_policy(**values):
if frappe.db.exists("DM Access Policy", values["policy_name"]):
return
doc = frappe.new_doc("DM Access Policy")
doc.update(values)
doc.insert(ignore_permissions=True)
+94
View File
@@ -0,0 +1,94 @@
import frappe
from frappe import _
from device_manager.audit import emit_audit_event
from device_manager.identity import append_identifier, normalize_mac_address
def create_device_from_registration(registration_name: str, *, approver: str | None = None):
registration = frappe.get_doc("DM Device Registration", registration_name)
if registration.status not in ("Submitted", "Under Review"):
frappe.throw(_("Only submitted registrations can be approved."))
device = frappe.new_doc("DM Device")
device.device_label = registration.device_label
device.device_type = registration.device_type
device.primary_mac_address = normalize_mac_address(registration.primary_mac_address)
device.owner_user = registration.requested_by
device.project = registration.project
device.laboratory = registration.laboratory
device.organizational_unit = registration.organizational_unit
device.dataset_use_category = registration.dataset_use_category
device.lifecycle_status = "Approved"
device.authorization_status = "Authorized" if registration.dataset_use_category == "None" else "Pending"
device.risk_notes = registration.justification
append_identifier(device, "MAC Address", device.primary_mac_address, is_primary=True)
if registration.hostname:
append_identifier(device, "Hostname", registration.hostname)
if registration.serial_number:
append_identifier(device, "Serial Number", registration.serial_number)
device.insert(ignore_permissions=True)
registration.status = "Approved"
registration.approved_by = approver or frappe.session.user
registration.approved_device = device.name
registration.save(ignore_permissions=True)
emit_audit_event(
"Registration Approved",
"DM Device Registration",
registration.name,
device=device.name,
actor=approver,
decision="Approved",
reason=registration.approval_notes,
payload={"registration": registration.name, "device": device.name},
)
emit_audit_event(
"Device Created",
"DM Device",
device.name,
device=device.name,
actor=approver,
payload={"source_registration": registration.name},
)
return device.name
def reject_registration(registration_name: str, *, reason: str | None = None, actor: str | None = None):
registration = frappe.get_doc("DM Device Registration", registration_name)
registration.status = "Rejected"
registration.approval_notes = reason
registration.save(ignore_permissions=True)
return emit_audit_event(
"Registration Rejected",
"DM Device Registration",
registration.name,
actor=actor,
decision="Rejected",
reason=reason,
)
def transition_device(device_name: str, lifecycle_status: str, *, reason: str | None = None, actor: str | None = None):
device = frappe.get_doc("DM Device", device_name)
previous_status = device.lifecycle_status
device.lifecycle_status = lifecycle_status
if lifecycle_status == "Quarantined":
device.authorization_status = "Denied"
if lifecycle_status == "Retired":
device.authorization_status = "Revoked"
device.save(ignore_permissions=True)
return emit_audit_event(
"Device Lifecycle Transition",
"DM Device",
device.name,
device=device.name,
actor=actor,
decision=lifecycle_status,
reason=reason,
payload={"from": previous_status, "to": lifecycle_status},
)
+111
View File
@@ -0,0 +1,111 @@
from __future__ import annotations
from dataclasses import dataclass
import frappe
@dataclass(frozen=True)
class AccessEvaluation:
result: str
reason: str
policy: str | None = None
network_segment: str | None = None
vlan_id: int | None = None
radius_reply_attributes: str | None = None
def get_current_dataset_authorization(device_name: str) -> str | None:
return frappe.db.get_value(
"DM Dataset Authorization",
{
"device": device_name,
"status": "Active",
},
"name",
order_by="authorization_expires_on desc",
)
def _policy_matches(policy, device) -> bool:
target_device_type = policy.get("target_device_type")
if target_device_type and target_device_type != "Any" and target_device_type != device.get("device_type"):
return False
target_lifecycle_status = policy.get("target_lifecycle_status")
if (
target_lifecycle_status
and target_lifecycle_status != "Any"
and target_lifecycle_status != device.get("lifecycle_status")
):
return False
target_authorization_status = policy.get("target_authorization_status")
if (
target_authorization_status
and target_authorization_status != "Any"
and target_authorization_status != device.get("authorization_status")
):
return False
if policy.get("requires_active_dataset_authorization") and not get_current_dataset_authorization(device.name):
return False
return True
def evaluate_device_access(device_name: str) -> AccessEvaluation:
device = frappe.get_doc("DM Device", device_name)
if device.lifecycle_status in ("Draft", "Pending Approval", "Retired"):
return AccessEvaluation("Deny", f"Device lifecycle status is {device.lifecycle_status}.")
if device.lifecycle_status in ("Suspended", "Quarantined") or device.authorization_status in ("Denied", "Revoked"):
segment = frappe.db.get_value("DM Network Segment", {"is_quarantine_segment": 1}, ["name", "vlan_id", "radius_reply_attributes"], as_dict=True)
return AccessEvaluation(
"Quarantine",
"Device is suspended, quarantined, denied, or revoked.",
network_segment=segment.name if segment else None,
vlan_id=segment.vlan_id if segment else None,
radius_reply_attributes=segment.radius_reply_attributes if segment else None,
)
policies = frappe.get_all(
"DM Access Policy",
filters={"enabled": 1},
fields=[
"name",
"priority",
"target_device_type",
"target_lifecycle_status",
"target_authorization_status",
"requires_active_dataset_authorization",
"access_result",
"network_segment",
],
order_by="priority asc, modified desc",
)
for policy in policies:
if not _policy_matches(policy, device):
continue
segment = None
if policy.network_segment:
segment = frappe.db.get_value(
"DM Network Segment",
policy.network_segment,
["name", "vlan_id", "radius_reply_attributes"],
as_dict=True,
)
return AccessEvaluation(
policy.access_result or "Allow",
f"Matched access policy {policy.name}.",
policy=policy.name,
network_segment=segment.name if segment else None,
vlan_id=segment.vlan_id if segment else None,
radius_reply_attributes=segment.radius_reply_attributes if segment else None,
)
return AccessEvaluation("Deny", "No enabled access policy matched the device.")
+99
View File
@@ -0,0 +1,99 @@
import frappe
from frappe import _
from device_manager.audit import emit_audit_event
from device_manager.identity import find_device_by_identifier, normalize_mac_address
from device_manager.policy import evaluate_device_access
def _record_access_decision(device_name: str | None, event_name: str | None, evaluation, mac_address: str | None = None):
decision = frappe.new_doc("DM Access Decision")
decision.device = device_name
decision.radius_auth_event = event_name
decision.mac_address = mac_address
decision.decision = evaluation.result
decision.reason = evaluation.reason
decision.access_policy = evaluation.policy
decision.network_segment = evaluation.network_segment
decision.vlan_id = evaluation.vlan_id
decision.radius_reply_attributes = evaluation.radius_reply_attributes
decision.insert(ignore_permissions=True)
emit_audit_event(
"Access Decision",
"DM Access Decision",
decision.name,
device=device_name,
decision=evaluation.result,
reason=evaluation.reason,
payload={
"radius_auth_event": event_name,
"access_policy": evaluation.policy,
"network_segment": evaluation.network_segment,
"vlan_id": evaluation.vlan_id,
"mac_address": mac_address,
},
)
return decision
def record_radius_auth_event(
*,
calling_station_id: str | None = None,
username: str | None = None,
nas_identifier: str | None = None,
nas_ip_address: str | None = None,
ssid: str | None = None,
raw_request: dict | None = None,
):
mac_address = normalize_mac_address(calling_station_id)
device_name = find_device_by_identifier("MAC Address", mac_address) if mac_address else None
event = frappe.new_doc("DM Radius Auth Event")
event.device = device_name
event.calling_station_id = mac_address
event.username = username
event.nas_identifier = nas_identifier
event.nas_ip_address = nas_ip_address
event.ssid = ssid
if raw_request:
event.raw_request = frappe.as_json(raw_request)
event.insert(ignore_permissions=True)
if not device_name:
segment = frappe.db.get_value(
"DM Network Segment",
{"is_quarantine_segment": 1},
["name", "vlan_id", "radius_reply_attributes"],
as_dict=True,
)
evaluation = type(
"UnknownDeviceEvaluation",
(),
{
"result": "Quarantine",
"reason": _("No registered device matched the presented MAC address."),
"policy": None,
"network_segment": segment.name if segment else None,
"vlan_id": segment.vlan_id if segment else None,
"radius_reply_attributes": segment.radius_reply_attributes if segment else None,
},
)()
else:
evaluation = evaluate_device_access(device_name)
frappe.db.set_value("DM Device", device_name, "last_seen_on", frappe.utils.now(), update_modified=False)
decision = _record_access_decision(device_name, event.name, evaluation, mac_address=mac_address)
event.access_decision = decision.name
event.result = decision.decision
event.save(ignore_permissions=True)
return {
"event": event.name,
"decision": decision.name,
"device": device_name,
"result": decision.decision,
"reason": decision.reason,
"network_segment": decision.network_segment,
"vlan_id": decision.vlan_id,
"radius_reply_attributes": decision.radius_reply_attributes,
}