44 lines
1.5 KiB
Python
44 lines
1.5 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
|
|
|
|
class PolicyViolation(PermissionError):
|
|
"""Raised when runtime behavior violates declared workflow policy."""
|
|
|
|
|
|
def _normalize_path(path: str) -> str:
|
|
return path.replace("\\", "/").lstrip("./")
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class RuntimePolicy:
|
|
safe_outputs: dict[str, dict[str, int | str | bool]]
|
|
path_scope: list[str]
|
|
_operation_counts: dict[str, int] = field(default_factory=dict)
|
|
|
|
def assert_operation_allowed(self, action: str) -> None:
|
|
config = self.safe_outputs.get(action)
|
|
if config is None:
|
|
raise PolicyViolation(f"write action '{action}' is not declared in safe_outputs")
|
|
|
|
current_count = self._operation_counts.get(action, 0) + 1
|
|
max_count = int(config.get("max", current_count))
|
|
if current_count > max_count:
|
|
raise PolicyViolation(f"write action '{action}' exceeded max count {max_count}")
|
|
|
|
self._operation_counts[action] = current_count
|
|
|
|
def assert_path_allowed(self, path: str) -> None:
|
|
normalized = _normalize_path(path)
|
|
if not self.path_scope:
|
|
raise PolicyViolation("file writes are not allowed without an explicit path scope")
|
|
|
|
for allowed_prefix in self.path_scope:
|
|
if normalized.startswith(_normalize_path(allowed_prefix)):
|
|
return
|
|
|
|
raise PolicyViolation(
|
|
f"path '{normalized}' is outside allowed path scope {self.path_scope}"
|
|
)
|