Merge pull request #960 from pocketpaw/fix/sec-b-auth-bypass

fix(security): require_scope fails closed + tool_profile raises on typos
This commit is contained in:
Prakash Dalai
2026-04-21 13:34:30 +05:30
committed by GitHub
7 changed files with 219 additions and 36 deletions

View File

@@ -403,3 +403,6 @@ dev = [
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "--ignore=tests/cloud --ignore=tests/e2e"
markers = [
"enforce_scope: opt a test out of the global _TESTING_FULL_ACCESS bypass so require_scope fails closed as in production",
]

View File

@@ -1,53 +1,80 @@
# Shared FastAPI dependencies for the API layer.
# Created: 2026-02-20
# 2026-04-16: require_scope now fails closed. Master/session/cookie/localhost
# auth must set request.state.full_access = True explicitly — no implicit
# bypass. Closes #888.
from __future__ import annotations
from fastapi import HTTPException, Request
# Testing escape hatch — set to True by tests/v1/conftest.py so that v1
# router-only tests (which mount routers without the dashboard middleware
# that normally sets full_access) can exercise route logic without having
# to install middleware in every fixture. Always False in production.
_TESTING_FULL_ACCESS: bool = False
def require_scope(*scopes: str):
"""FastAPI dependency that checks API key scopes.
"""FastAPI dependency that checks scopes against the authenticated caller.
Usage::
@router.put("/settings", dependencies=[Depends(require_scope("settings:write"))])
async def update_settings(...): ...
If the request was authenticated via API key, verifies the key has at least
one of the required scopes. Master token, session token, cookie, and
localhost auth bypass scope checks (they have full access).
The check is fail-closed. A request is accepted only when one of the
following is true:
* ``request.state.full_access`` is truthy (set by the master token,
session token, cookie, and localhost auth paths in
``dashboard_auth.py`` — explicit "this caller is fully trusted").
* ``request.state.api_key`` is a key whose scopes include one of the
required scopes or ``admin``.
* ``request.state.oauth_token`` carries one of the required scopes or
``admin``.
If none of those hold, the request is rejected with ``403``.
"""
async def _check(request: Request) -> None:
# Testing escape hatch (flag is always False in production).
if _TESTING_FULL_ACCESS:
return
# Explicit full-access marker — set by dashboard_auth when master,
# session, cookie, or localhost auth succeeds.
if getattr(request.state, "full_access", False):
return
required = set(scopes)
# Check API key scopes
api_key = getattr(request.state, "api_key", None)
if api_key is not None:
key_scopes = set(api_key.scopes)
if "admin" in key_scopes:
if "admin" in key_scopes or key_scopes & required:
return
required = set(scopes)
if not key_scopes & required:
raise HTTPException(
status_code=403,
detail=f"API key missing required scope: {' or '.join(sorted(required))}",
)
return
raise HTTPException(
status_code=403,
detail=f"API key missing required scope: {' or '.join(sorted(required))}",
)
# Check OAuth2 token scopes
oauth_token = getattr(request.state, "oauth_token", None)
if oauth_token is not None:
token_scopes = set(oauth_token.scope.split()) if oauth_token.scope else set()
if "admin" in token_scopes:
if "admin" in token_scopes or token_scopes & required:
return
required = set(scopes)
if not token_scopes & required:
raise HTTPException(
status_code=403,
detail=f"OAuth token missing required scope: {' or '.join(sorted(required))}",
)
return
raise HTTPException(
status_code=403,
detail=f"OAuth token missing required scope: {' or '.join(sorted(required))}",
)
# Not an API key or OAuth auth — master/session/cookie/localhost have full access
# No full-access marker, no API key, no OAuth token — fail closed.
raise HTTPException(
status_code=403,
detail=f"Missing required scope: {' or '.join(sorted(required))}",
)
return _check

View File

@@ -377,13 +377,18 @@ async def _auth_dispatch(request: Request) -> Response | None:
current_token = get_access_token()
is_valid = False
# full_access means "bypass scope checks" (issue #888). Set by the
# master/session/cookie/localhost paths — NOT by API key or OAuth auth.
request.state.full_access = False
# 1. Check Query Param (master token or session token)
if token:
if hmac.compare_digest(token, current_token):
is_valid = True
request.state.full_access = True
elif ":" in token and verify_session_token(token, current_token):
is_valid = True
request.state.full_access = True
# 2. Check Header
elif auth_header:
@@ -392,8 +397,10 @@ async def _auth_dispatch(request: Request) -> Response | None:
)
if hmac.compare_digest(bearer_value, current_token):
is_valid = True
request.state.full_access = True
elif ":" in bearer_value and verify_session_token(bearer_value, current_token):
is_valid = True
request.state.full_access = True
# 3. Check HTTP-only session cookie
if not is_valid:
@@ -401,8 +408,10 @@ async def _auth_dispatch(request: Request) -> Response | None:
if cookie_token:
if hmac.compare_digest(cookie_token, current_token):
is_valid = True
request.state.full_access = True
elif ":" in cookie_token and verify_session_token(cookie_token, current_token):
is_valid = True
request.state.full_access = True
# 4. Check API key (pp_* prefix)
if not is_valid:
@@ -465,6 +474,7 @@ async def _auth_dispatch(request: Request) -> Response | None:
# 6. Allow genuine localhost (not tunneled proxies)
if not is_valid and _is_genuine_localhost(request):
is_valid = True
request.state.full_access = True
# Allow frontend assets (/, /static/*, /uploads/*) through for SPA bootstrap.
if (

View File

@@ -202,14 +202,13 @@ class ToolPolicy:
return result
def _resolve(self) -> set[str]:
"""Build the final allowed set from profile + explicit allow list."""
# Start with the profile
try:
profile_set = self.resolve_profile(self.profile)
except ValueError:
logger.warning("Unknown profile '%s', falling back to 'full'", self.profile)
profile_set = set() # full = no restrictions
"""Build the final allowed set from profile + explicit allow list.
# Merge in explicit allow list
Raises ValueError when the profile name is not recognised. The
previous silent fallback to ``set()`` (equivalent to the ``full``
profile) meant a typo in ``tool_profile`` lifted all restrictions —
see issue #889.
"""
profile_set = self.resolve_profile(self.profile)
explicit = self._expand_names(self._allow_raw)
return profile_set | explicit

View File

@@ -23,6 +23,21 @@ def _setup_asyncio_child_watcher():
yield
@pytest.fixture(autouse=True)
def _enable_test_full_access(request, monkeypatch):
"""Flip the require_scope testing-bypass on for all tests by default.
Router-only tests (which mount FastAPI routers without the dashboard
middleware) can't set request.state.full_access on their own — this
fixture lets them exercise route logic without every fixture having
to install middleware. Tests that explicitly verify fail-closed
scope behaviour use the ``enforce_scope`` marker to opt out.
"""
if "enforce_scope" in request.keywords:
return
monkeypatch.setattr("pocketpaw.api.deps._TESTING_FULL_ACCESS", True)
@pytest.fixture(autouse=True)
def _isolate_audit_log(tmp_path):
"""Prevent tests from writing to the real ~/.pocketpaw/audit.jsonl.

View File

@@ -0,0 +1,131 @@
# Scope enforcement + tool profile fail-closed tests.
# Added: 2026-04-16 for security sprint cluster B (#888, #889).
from __future__ import annotations
import pytest
from fastapi import Depends, FastAPI
from fastapi.testclient import TestClient
from pocketpaw.api.deps import require_scope
# Every test in this module needs the real fail-closed behaviour — opt out
# of the _TESTING_FULL_ACCESS bypass that the root conftest sets up.
pytestmark = pytest.mark.enforce_scope
class _APIKey:
def __init__(self, scopes: list[str]):
self.scopes = scopes
class _OAuthToken:
def __init__(self, scope: str):
self.scope = scope
def _build_app_with_state(**state_kwargs):
"""FastAPI app that sets request.state from kwargs and has one protected route."""
app = FastAPI()
@app.middleware("http")
async def _inject(request, call_next):
for k, v in state_kwargs.items():
setattr(request.state, k, v)
return await call_next(request)
@app.get("/protected", dependencies=[Depends(require_scope("memory"))])
async def protected():
return {"ok": True}
return app
# ---------------------------------------------------------------------------
# #888 — scope bypass via master/session/cookie auth
# ---------------------------------------------------------------------------
class TestRequireScopeNoFullAccessMarker:
"""Without an explicit full_access marker, scopeless requests must be rejected.
Today the silent fallback at the end of require_scope() lets master,
session, cookie, and localhost auth through without any check.
After the fix, they must set request.state.full_access = True explicitly.
"""
def test_request_with_no_auth_markers_is_rejected(self):
app = _build_app_with_state(api_key=None, oauth_token=None)
resp = TestClient(app).get("/protected")
assert resp.status_code == 403, (
"require_scope must fail closed when no auth marker is set"
)
def test_request_with_full_access_marker_is_allowed(self):
app = _build_app_with_state(api_key=None, oauth_token=None, full_access=True)
resp = TestClient(app).get("/protected")
assert resp.status_code == 200
def test_apikey_without_required_scope_is_rejected(self):
app = _build_app_with_state(
api_key=_APIKey(scopes=["chat"]), oauth_token=None
)
resp = TestClient(app).get("/protected")
assert resp.status_code == 403
def test_apikey_with_required_scope_is_allowed(self):
app = _build_app_with_state(
api_key=_APIKey(scopes=["memory"]), oauth_token=None
)
resp = TestClient(app).get("/protected")
assert resp.status_code == 200
def test_apikey_with_admin_scope_is_allowed(self):
app = _build_app_with_state(
api_key=_APIKey(scopes=["admin"]), oauth_token=None
)
resp = TestClient(app).get("/protected")
assert resp.status_code == 200
def test_oauth_without_required_scope_is_rejected(self):
app = _build_app_with_state(
api_key=None, oauth_token=_OAuthToken(scope="chat")
)
resp = TestClient(app).get("/protected")
assert resp.status_code == 403
def test_oauth_with_required_scope_is_allowed(self):
app = _build_app_with_state(
api_key=None, oauth_token=_OAuthToken(scope="memory chat")
)
resp = TestClient(app).get("/protected")
assert resp.status_code == 200
# ---------------------------------------------------------------------------
# #889 — tool_profile fail-open on unknown name
# ---------------------------------------------------------------------------
class TestToolPolicyUnknownProfile:
def test_unknown_profile_raises_at_construction(self):
from pocketpaw.tools.policy import ToolPolicy
with pytest.raises(ValueError, match="Unknown tool profile"):
ToolPolicy(profile="this-profile-does-not-exist")
def test_valid_profile_constructs(self):
from pocketpaw.tools.policy import ToolPolicy
pol = ToolPolicy(profile="minimal")
# minimal allows memory + sessions + explorer
assert pol.is_tool_allowed("remember") is True
# but not shell
assert pol.is_tool_allowed("shell") is False
def test_full_profile_is_unrestricted(self):
from pocketpaw.tools.policy import ToolPolicy
pol = ToolPolicy(profile="full")
assert pol.is_tool_allowed("shell") is True
assert pol.is_tool_allowed("any_unknown_tool") is True

View File

@@ -140,14 +140,12 @@ class TestToolPolicyDeny:
class TestToolPolicyFallback:
"""Test unknown profile fallback."""
"""Unknown profile names fail closed (#889) — previously they silently
fell back to 'full', which lifted every tool restriction on a typo."""
def test_unknown_profile_falls_back_to_full(self):
"""Unknown profile logs a warning and acts like 'full'."""
policy = ToolPolicy(profile="nonexistent_profile")
# Should allow everything (full fallback)
assert policy.is_tool_allowed("shell") is True
assert policy.is_tool_allowed("browser") is True
def test_unknown_profile_raises(self):
with pytest.raises(ValueError, match="Unknown tool profile"):
ToolPolicy(profile="nonexistent_profile")
class TestFilterToolNames: