Skip to content

Scheduler & Task Architecture

This document provides visual diagrams of the scheduler and task architecture.

TaskScope Classification

graph TB
    subgraph TaskScope["TaskScope (SPOT: shared.default.task_types)"]
        direction TB

        subgraph BACKUP["backup_infrastructure"]
            B1[SYSTEM_BACKUP]
            B2[SYSTEM_RESTORE]
        end

        subgraph SYSTEM["system"]
            S1[HOUSEKEEPING]
            S2[RUNTIME_CAPABILITIES_FETCH]
        end

        subgraph PROJECT["project"]
            P1[CRON]
            P2[STANDARD]
            P3[DATABASE_GENERATE_MODEL]
            P4["... all other tasks"]
        end
    end

    subgraph Storage["Storage Decisions"]
        BACKUP --> |schema| MAINT[maintenance]
        BACKUP --> |bucket| BACKUP_BUCKET[backup bucket]
        BACKUP --> |in_backup| NO1[No]

        SYSTEM --> |schema| BACKEND1[backend]
        SYSTEM --> |bucket| LOG_BUCKET1[log bucket]
        SYSTEM --> |in_backup| YES1[Yes]

        PROJECT --> |schema| BACKEND2[backend]
        PROJECT --> |bucket| LOG_BUCKET2[log bucket]
        PROJECT --> |in_backup| YES2[Yes]
    end

    subgraph Requirements["Requirements"]
        BACKUP --> |project_id| NULL1[NULL]
        SYSTEM --> |project_id| NULL2[NULL]
        PROJECT --> |project_id| REQUIRED[Required UUID]
    end

Schedule Lifecycle

sequenceDiagram
    participant UI as UI/API
    participant Router as Router Layer
    participant Service as SchedulingService
    participant DAO as ScheduleDao
    participant DB as Postgres<br/>schedule table
    participant Scheduler as PostgresScheduler
    participant Celery as Celery/RabbitMQ
    participant Worker as Worker

    Note over UI,Worker: Schedule Creation Flow

    UI->>Router: POST /schedules or /backup
    Router->>Service: create_schedule()
    Service->>Service: _validate_task_type()
    Service->>Service: _validate_crontab()
    Service->>Service: _validate_schedule_scope()
    Service->>Service: _validate_task_args()

    alt system/backup_infrastructure
        Service->>DAO: upsert_schedule_singleton()
        Note over DAO: Upsert (one per task_type)
    else PROJECT
        Service->>DAO: create_schedule()
        Note over DAO: Insert (multiple allowed)
    end

    DAO->>DB: INSERT/UPSERT
    DB-->>DAO: Schedule row
    DAO-->>Service: Schedule
    Service-->>Router: ScheduleSnapshot
    Router-->>UI: 201 Created

    Note over UI,Worker: Schedule Dispatch Flow (every tick)

    loop Every tick interval
        Scheduler->>DB: SELECT enabled schedules
        DB-->>Scheduler: Schedule rows + project names

        loop For each schedule
            Scheduler->>Scheduler: Parse & validate

            alt Invalid schedule
                Scheduler->>DB: UPDATE enabled=false
                Note over Scheduler: Auto-disable + log
            else Valid & due
                Scheduler->>Scheduler: Build TaskArgs from ScheduleTaskArgs
                Scheduler->>DB: UPDATE last_run_at, total_run_count
                Scheduler->>Celery: send_task(task_type, task_args)
                Celery->>Worker: Dispatch task
            end
        end
    end

Component Architecture

graph TB
    subgraph Server["Server (FastAPI)"]
        direction TB
        JR[schedule_routers.py]
        BR[backup_routers.py]
        HR[housekeeping_routers.py]

        SS[SchedulingService]
        JS[ScheduleService]

        SD[ScheduleDao]

        JR --> JS
        BR --> SS
        HR --> SS
        JS --> SD
        SS --> SD
    end

    subgraph Database["Postgres"]
        direction TB
        SCHED[(schedule table<br/>backend schema)]
        PROJ[(project table<br/>backend schema)]
        WT[(task table<br/>worker schema)]
        ST[(task table<br/>maintenance schema)]

        SCHED -.->|FK| PROJ
    end

    subgraph Scheduler["Scheduler Process"]
        PS[PostgresScheduler]
        SCHED_DB[scheduler/db.py<br/>SQLAlchemy Core]

        PS --> SCHED_DB
        SCHED_DB --> SCHED
    end

    subgraph Worker["Celery Worker"]
        TASKS[Task Handlers]
        MON[Monitor<br/>task.py]

        MON --> WT
        MON --> ST
    end

    subgraph MessageBroker["RabbitMQ"]
        Q[Task Queues]
    end

    SD --> SCHED
    PS --> Q
    Q --> TASKS
    TASKS -.->|events| MON

Schedule Table Schema

Column Type Nullable Description
id UUID NO Primary key
scope String NO backup_infrastructure, system, or project
project_id String YES FK to project.identifier (CASCADE delete)
task_type String NO Task type identifier
crontab String NO 5-field cron format
task_args JSONB NO Serialized ScheduleTaskArgs
enabled Boolean NO Default true
last_run_at DateTime YES Last execution time
total_run_count Integer NO Default 0
tc_creation_src String YES Source identifier (e.g., "rdbeat_importer")
tc_creation DateTime YES Creation timestamp
tc_update_src String YES Source identifier (e.g., "scheduler")
tc_update DateTime YES Last update timestamp

Check Constraints

  • ck_schedule_scope_enum: scope must be one of backup_infrastructure, system, project
  • ck_schedule_scope_project_id: project scope requires project_id; system/backup scopes require NULL project_id

Scheduler Database Configuration

The scheduler uses a dedicated SQLAlchemy engine with these settings:

Setting Value Purpose
pool_size 2 Connection pool size (single-instance scheduler)
max_overflow 3 Additional connections under load
pool_pre_ping true Detect stale connections
lock_timeout 3000ms Prevent long lock waits

Batched Updates

The scheduler batches all last_run_at updates into a single UPDATE ... WHERE id IN (...) statement per tick to reduce database round-trips.

Startup Verification

On startup, the scheduler verifies the schedule table schema by querying the scope column. If migrations have not run, it fails fast with:

RuntimeError: Schedule table schema is missing or outdated; run migrations before starting the scheduler.

Schedule Uniqueness

graph LR
    subgraph Uniqueness["Schedule Uniqueness Rules"]
        direction TB

        subgraph SYS["system / backup_infrastructure"]
            SYS_KEY["Unique Key: (task_type) per scope"]
            SYS_ENF["Enforced: DB partial unique index"]
            SYS_METHOD["Method: upsert_schedule_singleton()"]
        end

        subgraph PROJ["project"]
            PROJ_KEY["No uniqueness constraint"]
            PROJ_REASON["Reason: User can schedule<br/>same task at different crons"]
            PROJ_METHOD["Method: create_project_schedule()"]
        end
    end

Audit Trail

The tc_creation_src and tc_update_src columns track which component modified a schedule:

Source Used By
"scheduler" PostgresScheduler when updating last_run_at
"rdbeat_importer" One-time Redis migration script
"<user_email>" REST API schedule operations (requester email)

Why Two Table Definitions?

  • src/scheduler/db.py uses SQLAlchemy Core - synchronous, minimal dependencies for the Celery beat process.
  • src/server/models/schedule.py uses SQLAlchemy ORM - async, full ORM features for FastAPI.

Both define the same table but serve different runtime contexts.

Data Flow: ScheduleTaskArgs vs TaskArgs

graph LR
    subgraph Creation["Schedule Creation"]
        TA1[TaskArgs<br/>user_email<br/>project_id<br/>project_name<br/>other<br/>crontab]

        TA1 -->|from_task_args| STA[ScheduleTaskArgs<br/>user_email<br/>other]

        STA -->|stored in| DB1[(schedule.task_args)]
        PID[project_id] -->|stored in| DB2[(schedule.project_id)]
        CRON[crontab] -->|stored in| DB3[(schedule.crontab)]
    end

    subgraph Dispatch["Schedule Dispatch"]
        DB4[(schedule row)]
        DB4 -->|read| STA2[ScheduleTaskArgs]
        DB4 -->|read| PID2[project_id]
        DB4 -->|read| CRON2[crontab]
        DB4 -->|join| PNAME[project.name]

        STA2 --> REBUILD[Rebuild TaskArgs]
        PID2 --> REBUILD
        CRON2 --> REBUILD
        PNAME --> REBUILD

        REBUILD --> TA2[TaskArgs<br/>user_email<br/>project_id<br/>project_name<br/>other<br/>crontab]

        TA2 -->|to_transfer_list| CELERY[Celery dispatch]
    end

Monitor Task Routing

flowchart TD
    EVENT[Celery Task Event] --> RESOLVE{Resolve TaskType}

    RESOLVE -->|task_name or routing_key| TT[TaskType]

    TT --> SCOPE{Check Scope}

    SCOPE -->|backup_infrastructure| ST[(task table<br/>maintenance schema)]
    SCOPE -->|system or project| WT[(task table<br/>worker schema)]

    subgraph Normalization["project_id Normalization"]
        WT --> NORM{_normalize_project_id}

        NORM -->|system| NULL1[Store NULL]
        NORM -->|project + valid UUID| KEEP[Keep project_id]
        NORM -->|project + invalid| NULL2[Store NULL + warn]
    end

Legacy Redis Import (One-Time)

  • src/scheduler/migrate.py imports schedules from legacy Redis (RDBeat) into the Postgres schedule table.
  • The scheduler triggers this import on startup (one-time migration behavior) before the first tick.
  • WHY: Keeps legacy import behavior isolated while the main scheduler enforces scope-based rules.