diff --git a/pyproject.toml b/pyproject.toml index 79746cec..1b9de15b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -403,3 +403,6 @@ dev = [ asyncio_mode = "auto" testpaths = ["tests"] addopts = "--ignore=tests/cloud --ignore=tests/e2e" +markers = [ + "enforce_scope: opt a test out of the global _TESTING_FULL_ACCESS bypass so require_scope fails closed as in production", +] diff --git a/src/pocketpaw/api/deps.py b/src/pocketpaw/api/deps.py index 4e14cef7..70600d67 100644 --- a/src/pocketpaw/api/deps.py +++ b/src/pocketpaw/api/deps.py @@ -1,53 +1,80 @@ # Shared FastAPI dependencies for the API layer. # Created: 2026-02-20 +# 2026-04-16: require_scope now fails closed. Master/session/cookie/localhost +# auth must set request.state.full_access = True explicitly — no implicit +# bypass. Closes #888. from __future__ import annotations from fastapi import HTTPException, Request +# Testing escape hatch — set to True by tests/v1/conftest.py so that v1 +# router-only tests (which mount routers without the dashboard middleware +# that normally sets full_access) can exercise route logic without having +# to install middleware in every fixture. Always False in production. +_TESTING_FULL_ACCESS: bool = False + def require_scope(*scopes: str): - """FastAPI dependency that checks API key scopes. + """FastAPI dependency that checks scopes against the authenticated caller. Usage:: @router.put("/settings", dependencies=[Depends(require_scope("settings:write"))]) async def update_settings(...): ... - If the request was authenticated via API key, verifies the key has at least - one of the required scopes. Master token, session token, cookie, and - localhost auth bypass scope checks (they have full access). + The check is fail-closed. A request is accepted only when one of the + following is true: + + * ``request.state.full_access`` is truthy (set by the master token, + session token, cookie, and localhost auth paths in + ``dashboard_auth.py`` — explicit "this caller is fully trusted"). + * ``request.state.api_key`` is a key whose scopes include one of the + required scopes or ``admin``. + * ``request.state.oauth_token`` carries one of the required scopes or + ``admin``. + + If none of those hold, the request is rejected with ``403``. """ async def _check(request: Request) -> None: + # Testing escape hatch (flag is always False in production). + if _TESTING_FULL_ACCESS: + return + + # Explicit full-access marker — set by dashboard_auth when master, + # session, cookie, or localhost auth succeeds. + if getattr(request.state, "full_access", False): + return + + required = set(scopes) + # Check API key scopes api_key = getattr(request.state, "api_key", None) if api_key is not None: key_scopes = set(api_key.scopes) - if "admin" in key_scopes: + if "admin" in key_scopes or key_scopes & required: return - required = set(scopes) - if not key_scopes & required: - raise HTTPException( - status_code=403, - detail=f"API key missing required scope: {' or '.join(sorted(required))}", - ) - return + raise HTTPException( + status_code=403, + detail=f"API key missing required scope: {' or '.join(sorted(required))}", + ) # Check OAuth2 token scopes oauth_token = getattr(request.state, "oauth_token", None) if oauth_token is not None: token_scopes = set(oauth_token.scope.split()) if oauth_token.scope else set() - if "admin" in token_scopes: + if "admin" in token_scopes or token_scopes & required: return - required = set(scopes) - if not token_scopes & required: - raise HTTPException( - status_code=403, - detail=f"OAuth token missing required scope: {' or '.join(sorted(required))}", - ) - return + raise HTTPException( + status_code=403, + detail=f"OAuth token missing required scope: {' or '.join(sorted(required))}", + ) - # Not an API key or OAuth auth — master/session/cookie/localhost have full access + # No full-access marker, no API key, no OAuth token — fail closed. + raise HTTPException( + status_code=403, + detail=f"Missing required scope: {' or '.join(sorted(required))}", + ) return _check diff --git a/src/pocketpaw/dashboard_auth.py b/src/pocketpaw/dashboard_auth.py index e85e20cb..e46876a4 100644 --- a/src/pocketpaw/dashboard_auth.py +++ b/src/pocketpaw/dashboard_auth.py @@ -377,13 +377,18 @@ async def _auth_dispatch(request: Request) -> Response | None: current_token = get_access_token() is_valid = False + # full_access means "bypass scope checks" (issue #888). Set by the + # master/session/cookie/localhost paths — NOT by API key or OAuth auth. + request.state.full_access = False # 1. Check Query Param (master token or session token) if token: if hmac.compare_digest(token, current_token): is_valid = True + request.state.full_access = True elif ":" in token and verify_session_token(token, current_token): is_valid = True + request.state.full_access = True # 2. Check Header elif auth_header: @@ -392,8 +397,10 @@ async def _auth_dispatch(request: Request) -> Response | None: ) if hmac.compare_digest(bearer_value, current_token): is_valid = True + request.state.full_access = True elif ":" in bearer_value and verify_session_token(bearer_value, current_token): is_valid = True + request.state.full_access = True # 3. Check HTTP-only session cookie if not is_valid: @@ -401,8 +408,10 @@ async def _auth_dispatch(request: Request) -> Response | None: if cookie_token: if hmac.compare_digest(cookie_token, current_token): is_valid = True + request.state.full_access = True elif ":" in cookie_token and verify_session_token(cookie_token, current_token): is_valid = True + request.state.full_access = True # 4. Check API key (pp_* prefix) if not is_valid: @@ -465,6 +474,7 @@ async def _auth_dispatch(request: Request) -> Response | None: # 6. Allow genuine localhost (not tunneled proxies) if not is_valid and _is_genuine_localhost(request): is_valid = True + request.state.full_access = True # Allow frontend assets (/, /static/*, /uploads/*) through for SPA bootstrap. if ( diff --git a/src/pocketpaw/tools/policy.py b/src/pocketpaw/tools/policy.py index 9a4e60ad..b8a34841 100644 --- a/src/pocketpaw/tools/policy.py +++ b/src/pocketpaw/tools/policy.py @@ -202,14 +202,13 @@ class ToolPolicy: return result def _resolve(self) -> set[str]: - """Build the final allowed set from profile + explicit allow list.""" - # Start with the profile - try: - profile_set = self.resolve_profile(self.profile) - except ValueError: - logger.warning("Unknown profile '%s', falling back to 'full'", self.profile) - profile_set = set() # full = no restrictions + """Build the final allowed set from profile + explicit allow list. - # Merge in explicit allow list + Raises ValueError when the profile name is not recognised. The + previous silent fallback to ``set()`` (equivalent to the ``full`` + profile) meant a typo in ``tool_profile`` lifted all restrictions — + see issue #889. + """ + profile_set = self.resolve_profile(self.profile) explicit = self._expand_names(self._allow_raw) return profile_set | explicit diff --git a/tests/conftest.py b/tests/conftest.py index d0a0786d..0e1f9045 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,6 +23,21 @@ def _setup_asyncio_child_watcher(): yield +@pytest.fixture(autouse=True) +def _enable_test_full_access(request, monkeypatch): + """Flip the require_scope testing-bypass on for all tests by default. + + Router-only tests (which mount FastAPI routers without the dashboard + middleware) can't set request.state.full_access on their own — this + fixture lets them exercise route logic without every fixture having + to install middleware. Tests that explicitly verify fail-closed + scope behaviour use the ``enforce_scope`` marker to opt out. + """ + if "enforce_scope" in request.keywords: + return + monkeypatch.setattr("pocketpaw.api.deps._TESTING_FULL_ACCESS", True) + + @pytest.fixture(autouse=True) def _isolate_audit_log(tmp_path): """Prevent tests from writing to the real ~/.pocketpaw/audit.jsonl. diff --git a/tests/test_require_scope_enforcement.py b/tests/test_require_scope_enforcement.py new file mode 100644 index 00000000..0ac76908 --- /dev/null +++ b/tests/test_require_scope_enforcement.py @@ -0,0 +1,131 @@ +# Scope enforcement + tool profile fail-closed tests. +# Added: 2026-04-16 for security sprint cluster B (#888, #889). + +from __future__ import annotations + +import pytest +from fastapi import Depends, FastAPI +from fastapi.testclient import TestClient + +from pocketpaw.api.deps import require_scope + +# Every test in this module needs the real fail-closed behaviour — opt out +# of the _TESTING_FULL_ACCESS bypass that the root conftest sets up. +pytestmark = pytest.mark.enforce_scope + + +class _APIKey: + def __init__(self, scopes: list[str]): + self.scopes = scopes + + +class _OAuthToken: + def __init__(self, scope: str): + self.scope = scope + + +def _build_app_with_state(**state_kwargs): + """FastAPI app that sets request.state from kwargs and has one protected route.""" + app = FastAPI() + + @app.middleware("http") + async def _inject(request, call_next): + for k, v in state_kwargs.items(): + setattr(request.state, k, v) + return await call_next(request) + + @app.get("/protected", dependencies=[Depends(require_scope("memory"))]) + async def protected(): + return {"ok": True} + + return app + + +# --------------------------------------------------------------------------- +# #888 — scope bypass via master/session/cookie auth +# --------------------------------------------------------------------------- + + +class TestRequireScopeNoFullAccessMarker: + """Without an explicit full_access marker, scopeless requests must be rejected. + + Today the silent fallback at the end of require_scope() lets master, + session, cookie, and localhost auth through without any check. + After the fix, they must set request.state.full_access = True explicitly. + """ + + def test_request_with_no_auth_markers_is_rejected(self): + app = _build_app_with_state(api_key=None, oauth_token=None) + resp = TestClient(app).get("/protected") + assert resp.status_code == 403, ( + "require_scope must fail closed when no auth marker is set" + ) + + def test_request_with_full_access_marker_is_allowed(self): + app = _build_app_with_state(api_key=None, oauth_token=None, full_access=True) + resp = TestClient(app).get("/protected") + assert resp.status_code == 200 + + def test_apikey_without_required_scope_is_rejected(self): + app = _build_app_with_state( + api_key=_APIKey(scopes=["chat"]), oauth_token=None + ) + resp = TestClient(app).get("/protected") + assert resp.status_code == 403 + + def test_apikey_with_required_scope_is_allowed(self): + app = _build_app_with_state( + api_key=_APIKey(scopes=["memory"]), oauth_token=None + ) + resp = TestClient(app).get("/protected") + assert resp.status_code == 200 + + def test_apikey_with_admin_scope_is_allowed(self): + app = _build_app_with_state( + api_key=_APIKey(scopes=["admin"]), oauth_token=None + ) + resp = TestClient(app).get("/protected") + assert resp.status_code == 200 + + def test_oauth_without_required_scope_is_rejected(self): + app = _build_app_with_state( + api_key=None, oauth_token=_OAuthToken(scope="chat") + ) + resp = TestClient(app).get("/protected") + assert resp.status_code == 403 + + def test_oauth_with_required_scope_is_allowed(self): + app = _build_app_with_state( + api_key=None, oauth_token=_OAuthToken(scope="memory chat") + ) + resp = TestClient(app).get("/protected") + assert resp.status_code == 200 + + +# --------------------------------------------------------------------------- +# #889 — tool_profile fail-open on unknown name +# --------------------------------------------------------------------------- + + +class TestToolPolicyUnknownProfile: + def test_unknown_profile_raises_at_construction(self): + from pocketpaw.tools.policy import ToolPolicy + + with pytest.raises(ValueError, match="Unknown tool profile"): + ToolPolicy(profile="this-profile-does-not-exist") + + def test_valid_profile_constructs(self): + from pocketpaw.tools.policy import ToolPolicy + + pol = ToolPolicy(profile="minimal") + # minimal allows memory + sessions + explorer + assert pol.is_tool_allowed("remember") is True + # but not shell + assert pol.is_tool_allowed("shell") is False + + def test_full_profile_is_unrestricted(self): + from pocketpaw.tools.policy import ToolPolicy + + pol = ToolPolicy(profile="full") + assert pol.is_tool_allowed("shell") is True + assert pol.is_tool_allowed("any_unknown_tool") is True diff --git a/tests/test_tool_policy.py b/tests/test_tool_policy.py index ae199175..c57eb9b4 100644 --- a/tests/test_tool_policy.py +++ b/tests/test_tool_policy.py @@ -140,14 +140,12 @@ class TestToolPolicyDeny: class TestToolPolicyFallback: - """Test unknown profile fallback.""" + """Unknown profile names fail closed (#889) — previously they silently + fell back to 'full', which lifted every tool restriction on a typo.""" - def test_unknown_profile_falls_back_to_full(self): - """Unknown profile logs a warning and acts like 'full'.""" - policy = ToolPolicy(profile="nonexistent_profile") - # Should allow everything (full fallback) - assert policy.is_tool_allowed("shell") is True - assert policy.is_tool_allowed("browser") is True + def test_unknown_profile_raises(self): + with pytest.raises(ValueError, match="Unknown tool profile"): + ToolPolicy(profile="nonexistent_profile") class TestFilterToolNames: