Scheduler & Task Architecture¶
This document provides visual diagrams of the scheduler and task architecture.
TaskScope Classification¶
graph TB
subgraph TaskScope["TaskScope (SPOT: shared.default.task_types)"]
direction TB
subgraph BACKUP["backup_infrastructure"]
B1[SYSTEM_BACKUP]
B2[SYSTEM_RESTORE]
end
subgraph SYSTEM["system"]
S1[HOUSEKEEPING]
S2[RUNTIME_CAPABILITIES_FETCH]
end
subgraph PROJECT["project"]
P1[CRON]
P2[STANDARD]
P3[DATABASE_GENERATE_MODEL]
P4["... all other tasks"]
end
end
subgraph Storage["Storage Decisions"]
BACKUP --> |schema| MAINT[maintenance]
BACKUP --> |bucket| BACKUP_BUCKET[backup bucket]
BACKUP --> |in_backup| NO1[No]
SYSTEM --> |schema| BACKEND1[backend]
SYSTEM --> |bucket| LOG_BUCKET1[log bucket]
SYSTEM --> |in_backup| YES1[Yes]
PROJECT --> |schema| BACKEND2[backend]
PROJECT --> |bucket| LOG_BUCKET2[log bucket]
PROJECT --> |in_backup| YES2[Yes]
end
subgraph Requirements["Requirements"]
BACKUP --> |project_id| NULL1[NULL]
SYSTEM --> |project_id| NULL2[NULL]
PROJECT --> |project_id| REQUIRED[Required UUID]
end
Schedule Lifecycle¶
sequenceDiagram
participant UI as UI/API
participant Router as Router Layer
participant Service as SchedulingService
participant DAO as ScheduleDao
participant DB as Postgres<br/>schedule table
participant Scheduler as PostgresScheduler
participant Celery as Celery/RabbitMQ
participant Worker as Worker
Note over UI,Worker: Schedule Creation Flow
UI->>Router: POST /schedules or /backup
Router->>Service: create_schedule()
Service->>Service: _validate_task_type()
Service->>Service: _validate_crontab()
Service->>Service: _validate_schedule_scope()
Service->>Service: _validate_task_args()
alt system/backup_infrastructure
Service->>DAO: upsert_schedule_singleton()
Note over DAO: Upsert (one per task_type)
else PROJECT
Service->>DAO: create_schedule()
Note over DAO: Insert (multiple allowed)
end
DAO->>DB: INSERT/UPSERT
DB-->>DAO: Schedule row
DAO-->>Service: Schedule
Service-->>Router: ScheduleSnapshot
Router-->>UI: 201 Created
Note over UI,Worker: Schedule Dispatch Flow (every tick)
loop Every tick interval
Scheduler->>DB: SELECT enabled schedules
DB-->>Scheduler: Schedule rows + project names
loop For each schedule
Scheduler->>Scheduler: Parse & validate
alt Invalid schedule
Scheduler->>DB: UPDATE enabled=false
Note over Scheduler: Auto-disable + log
else Valid & due
Scheduler->>Scheduler: Build TaskArgs from ScheduleTaskArgs
Scheduler->>DB: UPDATE last_run_at, total_run_count
Scheduler->>Celery: send_task(task_type, task_args)
Celery->>Worker: Dispatch task
end
end
end
Component Architecture¶
graph TB
subgraph Server["Server (FastAPI)"]
direction TB
JR[schedule_routers.py]
BR[backup_routers.py]
HR[housekeeping_routers.py]
SS[SchedulingService]
JS[ScheduleService]
SD[ScheduleDao]
JR --> JS
BR --> SS
HR --> SS
JS --> SD
SS --> SD
end
subgraph Database["Postgres"]
direction TB
SCHED[(schedule table<br/>backend schema)]
PROJ[(project table<br/>backend schema)]
WT[(task table<br/>worker schema)]
ST[(task table<br/>maintenance schema)]
SCHED -.->|FK| PROJ
end
subgraph Scheduler["Scheduler Process"]
PS[PostgresScheduler]
SCHED_DB[scheduler/db.py<br/>SQLAlchemy Core]
PS --> SCHED_DB
SCHED_DB --> SCHED
end
subgraph Worker["Celery Worker"]
TASKS[Task Handlers]
MON[Monitor<br/>task.py]
MON --> WT
MON --> ST
end
subgraph MessageBroker["RabbitMQ"]
Q[Task Queues]
end
SD --> SCHED
PS --> Q
Q --> TASKS
TASKS -.->|events| MON
Schedule Table Schema¶
| Column | Type | Nullable | Description |
|---|---|---|---|
| id | UUID | NO | Primary key |
| scope | String | NO | backup_infrastructure, system, or project |
| project_id | String | YES | FK to project.identifier (CASCADE delete) |
| task_type | String | NO | Task type identifier |
| crontab | String | NO | 5-field cron format |
| task_args | JSONB | NO | Serialized ScheduleTaskArgs |
| enabled | Boolean | NO | Default true |
| last_run_at | DateTime | YES | Last execution time |
| total_run_count | Integer | NO | Default 0 |
| tc_creation_src | String | YES | Source identifier (e.g., "rdbeat_importer") |
| tc_creation | DateTime | YES | Creation timestamp |
| tc_update_src | String | YES | Source identifier (e.g., "scheduler") |
| tc_update | DateTime | YES | Last update timestamp |
Check Constraints¶
ck_schedule_scope_enum: scope must be one ofbackup_infrastructure,system,projectck_schedule_scope_project_id: project scope requires project_id; system/backup scopes require NULL project_id
Scheduler Database Configuration¶
The scheduler uses a dedicated SQLAlchemy engine with these settings:
| Setting | Value | Purpose |
|---|---|---|
pool_size |
2 | Connection pool size (single-instance scheduler) |
max_overflow |
3 | Additional connections under load |
pool_pre_ping |
true | Detect stale connections |
lock_timeout |
3000ms | Prevent long lock waits |
Batched Updates¶
The scheduler batches all last_run_at updates into a single UPDATE ... WHERE id IN (...) statement per tick to reduce database round-trips.
Startup Verification¶
On startup, the scheduler verifies the schedule table schema by querying the scope column. If migrations have not run, it fails fast with:
RuntimeError: Schedule table schema is missing or outdated; run migrations before starting the scheduler.
Schedule Uniqueness¶
graph LR
subgraph Uniqueness["Schedule Uniqueness Rules"]
direction TB
subgraph SYS["system / backup_infrastructure"]
SYS_KEY["Unique Key: (task_type) per scope"]
SYS_ENF["Enforced: DB partial unique index"]
SYS_METHOD["Method: upsert_schedule_singleton()"]
end
subgraph PROJ["project"]
PROJ_KEY["No uniqueness constraint"]
PROJ_REASON["Reason: User can schedule<br/>same task at different crons"]
PROJ_METHOD["Method: create_project_schedule()"]
end
end
Audit Trail¶
The tc_creation_src and tc_update_src columns track which component modified a schedule:
| Source | Used By |
|---|---|
"scheduler" |
PostgresScheduler when updating last_run_at |
"rdbeat_importer" |
One-time Redis migration script |
"<user_email>" |
REST API schedule operations (requester email) |
Why Two Table Definitions?¶
src/scheduler/db.pyuses SQLAlchemy Core - synchronous, minimal dependencies for the Celery beat process.src/server/models/schedule.pyuses SQLAlchemy ORM - async, full ORM features for FastAPI.
Both define the same table but serve different runtime contexts.
Data Flow: ScheduleTaskArgs vs TaskArgs¶
graph LR
subgraph Creation["Schedule Creation"]
TA1[TaskArgs<br/>user_email<br/>project_id<br/>project_name<br/>other<br/>crontab]
TA1 -->|from_task_args| STA[ScheduleTaskArgs<br/>user_email<br/>other]
STA -->|stored in| DB1[(schedule.task_args)]
PID[project_id] -->|stored in| DB2[(schedule.project_id)]
CRON[crontab] -->|stored in| DB3[(schedule.crontab)]
end
subgraph Dispatch["Schedule Dispatch"]
DB4[(schedule row)]
DB4 -->|read| STA2[ScheduleTaskArgs]
DB4 -->|read| PID2[project_id]
DB4 -->|read| CRON2[crontab]
DB4 -->|join| PNAME[project.name]
STA2 --> REBUILD[Rebuild TaskArgs]
PID2 --> REBUILD
CRON2 --> REBUILD
PNAME --> REBUILD
REBUILD --> TA2[TaskArgs<br/>user_email<br/>project_id<br/>project_name<br/>other<br/>crontab]
TA2 -->|to_transfer_list| CELERY[Celery dispatch]
end
Monitor Task Routing¶
flowchart TD
EVENT[Celery Task Event] --> RESOLVE{Resolve TaskType}
RESOLVE -->|task_name or routing_key| TT[TaskType]
TT --> SCOPE{Check Scope}
SCOPE -->|backup_infrastructure| ST[(task table<br/>maintenance schema)]
SCOPE -->|system or project| WT[(task table<br/>worker schema)]
subgraph Normalization["project_id Normalization"]
WT --> NORM{_normalize_project_id}
NORM -->|system| NULL1[Store NULL]
NORM -->|project + valid UUID| KEEP[Keep project_id]
NORM -->|project + invalid| NULL2[Store NULL + warn]
end
Legacy Redis Import (One-Time)¶
src/scheduler/migrate.pyimports schedules from legacy Redis (RDBeat) into the Postgres schedule table.- The scheduler triggers this import on startup (one-time migration behavior) before the first tick.
- WHY: Keeps legacy import behavior isolated while the main scheduler enforces scope-based rules.