Files
DocsGPT/application/storage/db/bootstrap.py
Alex 81b6ee5daa Pg 4 (#2390)
* feat: postgres tests

* feat: mongo cutoff

* feat: mongo cutoff

* feat: adjust docs and compose files

* fix: mini code mongo removals

* fix: tests and k8s mongo stuff

* feat: test fixes

* fix: ruff

* fix: vale

* Potential fix for pull request finding 'CodeQL / Clear-text logging of sensitive information'

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>

* fix: mini suggestions

* vale lint fix 2

* fix: codeql columns thing

* fix: test mongo

* fix: tests coverage

* feat: better tests 4

* feat: more tests

* feat: decent coverage

* fix: ruff fixes

* fix: remove mongo mock

* feat: enhance workflow engine and API routes; add document retrieval and source handling

* feat: e2e tests

* fix: mcp, mongo and more

* fix: mini codeql warning

* fix: agent chunk view

* fix: mini issues

* fix: more pg fixes

* feat: postgres prep on start

* feat: qa tests

* fix: mini improvements

* fix: tests

---------

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Co-authored-by: Siddhant Rai <siddhant.rai.5686@gmail.com>
2026-04-18 13:13:57 +01:00

321 lines
12 KiB
Python

"""Self-bootstrapping database setup for the DocsGPT user-data Postgres DB.
On app startup the Flask factory (and Celery worker init) can call
:func:`ensure_database_ready` to:
1. Create the target database if it's missing (dev-friendly; requires the
configured role to have ``CREATEDB`` privilege).
2. Apply every pending Alembic migration up to ``head``.
Both steps are gated by settings that default ON for dev convenience and
can be turned off in prod (``AUTO_CREATE_DB`` / ``AUTO_MIGRATE``) where
schema is managed out-of-band by a deploy pipeline.
All heavy imports (alembic, psycopg, sqlalchemy.exc sub-symbols) are
deferred to inside the function so merely importing this module has no
side effects and is cheap for test collection.
"""
from __future__ import annotations
import logging
from typing import Optional
def ensure_database_ready(
uri: Optional[str],
*,
create_db: bool,
migrate: bool,
logger: Optional[logging.Logger] = None,
) -> None:
"""Make sure the target Postgres DB exists and is migrated to ``head``.
This is idempotent and safe to call once per process. Each step is
independently gated so prod deployments that manage schema externally
can disable the migrate step while still allowing the process to boot
against an already-provisioned database.
Args:
uri: SQLAlchemy URI for the user-data Postgres database. If
``None`` or empty, the function logs and returns — the app
supports running without a configured URI for certain dev
flows that don't touch user data.
create_db: If ``True``, auto-create the database when it's
missing. Requires the configured role to have ``CREATEDB``.
migrate: If ``True``, run ``alembic upgrade head`` after the
database is reachable.
logger: Optional logger to use. Defaults to this module's logger.
Raises:
Exception: Any failure in an explicitly-enabled step is re-raised
so the app fails fast rather than booting into a broken state.
Missing-role / auth errors surface cleanly without a
mis-directed auto-create attempt.
"""
log = logger or logging.getLogger(__name__)
if not uri:
log.info(
"ensure_database_ready: POSTGRES_URI is not set; "
"skipping database bootstrap."
)
return
if create_db:
_ensure_database_exists(uri, log)
if migrate:
_run_migrations(log)
def _ensure_database_exists(uri: str, log: logging.Logger) -> None:
"""Create the target database if a connection reveals it's missing.
We probe with a lightweight ``connect().close()``. If Postgres
reports ``InvalidCatalogName`` (SQLSTATE ``3D000``), we reconnect to
the server's ``postgres`` maintenance DB and issue ``CREATE DATABASE``
in AUTOCOMMIT mode (required — CREATE DATABASE can't run in a
transaction). Any other connection failure (bad host, auth failure,
missing role) is re-raised untouched so the operator sees the true
cause instead of a mis-directed auto-create attempt.
"""
# Lazy imports keep module import side-effect free.
from sqlalchemy import create_engine
from sqlalchemy.engine import make_url
from sqlalchemy.exc import OperationalError
url = make_url(uri)
target_db = url.database
if not target_db:
raise RuntimeError(
f"POSTGRES_URI is missing a database name: {uri!r}. "
"Expected something like "
"'postgresql+psycopg://user:pass@host:5432/docsgpt'."
)
probe_engine = create_engine(uri, pool_pre_ping=False)
try:
try:
conn = probe_engine.connect()
except OperationalError as exc:
if _is_missing_database(exc):
log.info(
"ensure_database_ready: database %r is missing; "
"creating it...",
target_db,
)
_create_database(url, target_db, log)
log.info("ensure_database_ready: database %r ready.", target_db)
return
# Not a missing-DB error — surface it as-is. This is the path
# for bad host/auth/role-missing, and auto-creating would be
# actively wrong there.
log.error(
"ensure_database_ready: cannot connect to Postgres for "
"database %r: %s",
target_db,
exc,
)
raise
else:
conn.close()
log.info("ensure_database_ready: database %r ready.", target_db)
finally:
probe_engine.dispose()
def _create_database(url, target_db: str, log: logging.Logger) -> None:
"""Issue ``CREATE DATABASE`` against the server's ``postgres`` DB.
Uses AUTOCOMMIT (required by Postgres — ``CREATE DATABASE`` cannot run
inside a transaction). The database identifier is quoted via
``psycopg.sql.Identifier`` so unusual names (hyphens, reserved words)
are handled correctly.
Args:
url: Parsed SQLAlchemy URL for the target DB; we reuse
host/port/credentials and swap the database to ``postgres``.
target_db: The target database name to create.
log: Logger for INFO/ERROR breadcrumbs.
"""
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError, ProgrammingError
# psycopg is imported lazily — its error classes are the canonical
# cause markers Postgres hands us back.
import psycopg
from psycopg import sql as pg_sql
maintenance_url = url.set(database="postgres")
maintenance_engine = create_engine(
maintenance_url,
isolation_level="AUTOCOMMIT",
pool_pre_ping=False,
)
try:
with maintenance_engine.connect() as conn:
# Use psycopg's Identifier to quote the DB name safely. The
# SQL object renders as a literal ``CREATE DATABASE "<name>"``
# which SQLAlchemy passes through to psycopg verbatim.
stmt = pg_sql.SQL("CREATE DATABASE {}").format(
pg_sql.Identifier(target_db)
)
raw = conn.connection.dbapi_connection # psycopg connection
with raw.cursor() as cur:
try:
cur.execute(stmt)
except psycopg.errors.DuplicateDatabase:
# Another worker won the race — benign.
log.info(
"ensure_database_ready: database %r already "
"created by a concurrent worker; continuing.",
target_db,
)
except psycopg.errors.InsufficientPrivilege as exc:
log.error(
"ensure_database_ready: role lacks CREATEDB "
"privilege to create %r. Either GRANT CREATEDB "
"to the role, create the database manually, or "
"set AUTO_CREATE_DB=False and provision it "
"out-of-band. See docs/Deploying/Postgres-"
"Migration for guidance. Underlying error: %s",
target_db,
exc,
)
raise
except (OperationalError, ProgrammingError) as exc:
log.error(
"ensure_database_ready: failed to create database %r: %s. "
"See docs/Deploying/Postgres-Migration for manual setup.",
target_db,
exc,
)
raise
finally:
maintenance_engine.dispose()
def _is_missing_database(exc: Exception) -> bool:
"""Return True if ``exc`` indicates the target database doesn't exist.
We check three signals in the cause chain:
1. ``psycopg.errors.InvalidCatalogName`` — the canonical class for
SQLSTATE ``3D000`` when raised during a query.
2. ``pgcode`` / ``diag.sqlstate`` equal to ``3D000`` — defensive, for
driver versions that surface the code on a generic class.
3. The canonical server message phrasing ``database "..." does not
exist`` — **required** for connection-time failures, because
psycopg 3's ``OperationalError`` raised by ``connect()`` does NOT
populate ``sqlstate`` (the connection never completed the protocol
handshake, so the attributes stay ``None``). The server's error
message itself is stable across Postgres versions, so this is a
reliable fallback for the only case that matters: DB missing at
boot.
"""
try:
import psycopg
invalid_catalog = psycopg.errors.InvalidCatalogName
except Exception: # noqa: BLE001 — defensive; never break on import
invalid_catalog = None
seen: set[int] = set()
cursor: Optional[BaseException] = exc
while cursor is not None and id(cursor) not in seen:
seen.add(id(cursor))
if invalid_catalog is not None and isinstance(cursor, invalid_catalog):
return True
pgcode = getattr(cursor, "pgcode", None) or getattr(
getattr(cursor, "diag", None), "sqlstate", None
)
if pgcode == "3D000":
return True
msg = str(cursor)
if 'database "' in msg and "does not exist" in msg:
return True
cursor = cursor.__cause__ or cursor.__context__
return False
def _run_migrations(log: logging.Logger) -> None:
"""Run ``alembic upgrade head`` against ``POSTGRES_URI``.
Alembic serializes concurrent workers via its ``alembic_version``
table, so no extra application-level locking is needed. Failures are
logged and re-raised so the app fails fast.
"""
from pathlib import Path
# Lazy imports — alembic pulls in a fair amount of code.
from alembic import command
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from sqlalchemy import create_engine
# Mirror the discovery path used by scripts/db/init_postgres.py so
# both entry points resolve the same alembic.ini regardless of cwd.
alembic_ini = Path(__file__).resolve().parents[2] / "alembic.ini"
if not alembic_ini.exists():
raise RuntimeError(f"alembic.ini not found at {alembic_ini}")
cfg = Config(str(alembic_ini))
cfg.set_main_option("script_location", str(alembic_ini.parent / "alembic"))
# Cheap pre-check: if we're already at head, say so explicitly.
try:
script = ScriptDirectory.from_config(cfg)
head_rev = script.get_current_head()
url = cfg.get_main_option("sqlalchemy.url")
# env.py populates sqlalchemy.url from settings.POSTGRES_URI when
# it's imported, but our Config instance hasn't loaded env.py
# yet. Fall back to reading settings directly for the precheck.
if not url:
from application.core.settings import settings as _settings
url = _settings.POSTGRES_URI
current_rev: Optional[str] = None
if url:
precheck_engine = create_engine(url, pool_pre_ping=False)
try:
with precheck_engine.connect() as conn:
ctx = MigrationContext.configure(conn)
current_rev = ctx.get_current_revision()
finally:
precheck_engine.dispose()
if current_rev is not None and current_rev == head_rev:
log.info(
"ensure_database_ready: migrations already at head (%s); "
"nothing to do.",
head_rev,
)
return
log.info(
"ensure_database_ready: applying Alembic migrations "
"(current=%s, target=%s)...",
current_rev,
head_rev,
)
except Exception as exc: # noqa: BLE001 — precheck is best-effort
# If the precheck itself fails we still want to try the upgrade;
# alembic will give a more actionable error if something's off.
log.info(
"ensure_database_ready: revision precheck failed (%s); "
"proceeding with upgrade anyway.",
exc,
)
try:
command.upgrade(cfg, "head")
except Exception as exc: # noqa: BLE001 — surface everything
log.error(
"ensure_database_ready: alembic upgrade failed: %s. "
"Check migration logs and DB connectivity; the app will not "
"boot until this is resolved (or AUTO_MIGRATE is disabled).",
exc,
)
raise
log.info("ensure_database_ready: migrations applied.")