Files
DocsGPT/application/api/user/tasks.py

353 lines
10 KiB
Python

from datetime import timedelta
from application.api.user.idempotency import with_idempotency
from application.celery_init import celery
from application.worker import (
agent_webhook_worker,
attachment_worker,
ingest_worker,
mcp_oauth,
remote_worker,
sync,
sync_worker,
)
# Shared decorator config for long-running, side-effecting tasks. ``acks_late``
# is also the celeryconfig default but stays explicit here so each task's
# durability story is grep-able next to the body. Combined with
# ``autoretry_for=(Exception,)`` and a bounded ``max_retries`` so a poison
# message can't loop forever.
DURABLE_TASK = dict(
bind=True,
acks_late=True,
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 3, "countdown": 60},
retry_backoff=True,
)
# operation tag for the poison-path source.ingest.failed event, per task.
_INGEST_POISON_OPERATION = {
"ingest": "upload",
"ingest_remote": "upload",
"ingest_connector_task": "upload",
"reingest_source_task": "reingest",
}
def _emit_ingest_poison_event(task_name, bound):
"""Publish a terminal ``source.ingest.failed`` when the poison-guard trips.
The guard returns before the worker runs, so the worker's own failed
event never fires — without this the upload toast spins on "training".
"""
user = bound.get("user")
source_id = bound.get("source_id")
if not user or not source_id:
return
from application.events.publisher import publish_user_event
publish_user_event(
user,
"source.ingest.failed",
{
"source_id": str(source_id),
"filename": bound.get("filename") or "",
"operation": _INGEST_POISON_OPERATION.get(task_name, "upload"),
"error": "Ingestion stopped after repeated failures.",
},
scope={"kind": "source", "id": str(source_id)},
)
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="ingest", on_poison=_emit_ingest_poison_event)
def ingest(
self,
directory,
formats,
job_name,
user,
file_path,
filename,
file_name_map=None,
idempotency_key=None,
source_id=None,
):
resp = ingest_worker(
self,
directory,
formats,
job_name,
file_path,
filename,
user,
file_name_map=file_name_map,
idempotency_key=idempotency_key,
source_id=source_id,
)
return resp
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="ingest_remote", on_poison=_emit_ingest_poison_event)
def ingest_remote(
self, source_data, job_name, user, loader,
idempotency_key=None, source_id=None,
):
resp = remote_worker(
self, source_data, job_name, user, loader,
idempotency_key=idempotency_key,
source_id=source_id,
)
return resp
@celery.task(**DURABLE_TASK)
@with_idempotency(
task_name="reingest_source_task", on_poison=_emit_ingest_poison_event,
)
def reingest_source_task(self, source_id, user, idempotency_key=None):
from application.worker import reingest_source_worker
resp = reingest_source_worker(self, source_id, user)
return resp
# Beat-driven dispatch tasks default to ``acks_late=False``: a SIGKILL
# of a beat tick is harmless to redeliver only if the dispatch itself is
# idempotent. We keep these early-ACK so the broker doesn't replay a
# dispatch that already enqueued downstream work.
@celery.task(bind=True, acks_late=False)
def schedule_syncs(self, frequency):
resp = sync_worker(self, frequency)
return resp
@celery.task(bind=True)
def sync_source(
self,
source_data,
job_name,
user,
loader,
sync_frequency,
retriever,
doc_id,
):
resp = sync(
self,
source_data,
job_name,
user,
loader,
sync_frequency,
retriever,
doc_id,
)
return resp
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="store_attachment")
def store_attachment(self, file_info, user, idempotency_key=None):
resp = attachment_worker(self, file_info, user)
return resp
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="process_agent_webhook")
def process_agent_webhook(self, agent_id, payload, idempotency_key=None):
resp = agent_webhook_worker(self, agent_id, payload)
return resp
@celery.task(**DURABLE_TASK)
@with_idempotency(
task_name="ingest_connector_task", on_poison=_emit_ingest_poison_event,
)
def ingest_connector_task(
self,
job_name,
user,
source_type,
session_token=None,
file_ids=None,
folder_ids=None,
recursive=True,
retriever="classic",
operation_mode="upload",
doc_id=None,
sync_frequency="never",
idempotency_key=None,
source_id=None,
):
from application.worker import ingest_connector
resp = ingest_connector(
self,
job_name,
user,
source_type,
session_token=session_token,
file_ids=file_ids,
folder_ids=folder_ids,
recursive=recursive,
retriever=retriever,
operation_mode=operation_mode,
doc_id=doc_id,
sync_frequency=sync_frequency,
idempotency_key=idempotency_key,
source_id=source_id,
)
return resp
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
timedelta(days=1),
schedule_syncs.s("daily"),
)
sender.add_periodic_task(
timedelta(weeks=1),
schedule_syncs.s("weekly"),
)
sender.add_periodic_task(
timedelta(days=30),
schedule_syncs.s("monthly"),
)
# Replaces Mongo's TTL index on pending_tool_state.expires_at.
sender.add_periodic_task(
timedelta(seconds=60),
cleanup_pending_tool_state.s(),
name="cleanup-pending-tool-state",
)
# Pure housekeeping for ``task_dedup`` / ``webhook_dedup`` — the
# upsert paths already handle stale rows, so cadence only bounds
# table size. Hourly is plenty for typical traffic.
sender.add_periodic_task(
timedelta(hours=1),
cleanup_idempotency_dedup.s(),
name="cleanup-idempotency-dedup",
)
sender.add_periodic_task(
timedelta(seconds=30),
reconciliation_task.s(),
name="reconciliation",
)
sender.add_periodic_task(
timedelta(hours=7),
version_check_task.s(),
name="version-check",
)
# Bound ``message_events`` growth — every streamed SSE chunk writes
# one row, so retained chats accumulate hundreds of rows per
# message. Reconnect-replay is only meaningful for streams the user
# could plausibly still be waiting on, so 14 days is generous.
sender.add_periodic_task(
timedelta(hours=24),
cleanup_message_events.s(),
name="cleanup-message-events",
)
@celery.task(bind=True)
def mcp_oauth_task(self, config, user):
resp = mcp_oauth(self, config, user)
return resp
@celery.task(bind=True, acks_late=False)
def cleanup_pending_tool_state(self):
"""Revert stale ``resuming`` rows, then delete TTL-expired rows."""
from application.core.settings import settings
if not settings.POSTGRES_URI:
return {"deleted": 0, "reverted": 0, "skipped": "POSTGRES_URI not set"}
from application.storage.db.engine import get_engine
from application.storage.db.repositories.pending_tool_state import (
PendingToolStateRepository,
)
engine = get_engine()
with engine.begin() as conn:
repo = PendingToolStateRepository(conn)
reverted = repo.revert_stale_resuming(grace_seconds=600)
deleted = repo.cleanup_expired()
return {"deleted": deleted, "reverted": reverted}
@celery.task(bind=True, acks_late=False)
def cleanup_idempotency_dedup(self):
"""Delete TTL-expired rows from ``task_dedup`` and ``webhook_dedup``.
Pure housekeeping — the upsert paths already ignore stale rows
(TTL-aware ``ON CONFLICT DO UPDATE``), so this only bounds table
growth and keeps SELECT planning tight on large deployments.
"""
from application.core.settings import settings
if not settings.POSTGRES_URI:
return {
"task_dedup_deleted": 0,
"webhook_dedup_deleted": 0,
"skipped": "POSTGRES_URI not set",
}
from application.storage.db.engine import get_engine
from application.storage.db.repositories.idempotency import (
IdempotencyRepository,
)
engine = get_engine()
with engine.begin() as conn:
return IdempotencyRepository(conn).cleanup_expired()
@celery.task(bind=True, acks_late=False)
def reconciliation_task(self):
"""Sweep stuck durability rows and escalate them to terminal status + alert."""
from application.api.user.reconciliation import run_reconciliation
return run_reconciliation()
@celery.task(bind=True, acks_late=False)
def cleanup_message_events(self):
"""Delete ``message_events`` rows older than the retention window.
Streamed answer responses write one journal row per SSE yield,
so unbounded growth would dominate Postgres for any retained-
conversations deployment. The reconnect-replay path only needs
rows for in-flight streams; 14 days covers paused/tool-action
flows comfortably.
"""
from application.core.settings import settings
if not settings.POSTGRES_URI:
return {"deleted": 0, "skipped": "POSTGRES_URI not set"}
from application.storage.db.engine import get_engine
from application.storage.db.repositories.message_events import (
MessageEventsRepository,
)
ttl_days = settings.MESSAGE_EVENTS_RETENTION_DAYS
engine = get_engine()
with engine.begin() as conn:
deleted = MessageEventsRepository(conn).cleanup_older_than(ttl_days)
return {"deleted": deleted, "ttl_days": ttl_days}
@celery.task(bind=True, acks_late=False)
def version_check_task(self):
"""Periodic anonymous version check.
Complements the ``worker_ready`` boot trigger so long-running
deployments (>6h cache TTL) still refresh advisories. ``run_check``
is fail-silent and coordinates across replicas via Redis lock +
cache (see ``application.updates.version_check``).
"""
from application.updates.version_check import run_check
run_check()