0. Series Loop (Read Along Without Public Source Code)

End-to-End Pipeline: Vue frontend → api/routes/chat.py → Guide multi-turn SSE → run_analysis_pipeline (parsing → analysis → matching → report) → tools/pdf_exporter PDF.
This Article: 10/17 · Persistence Ring · DB & Migration

Stage User Visible Code Entry Article
Create Session Welcome Message POST /api/sessions 09
Multi-turn Dialogue SSE Streaming chat/stream → run_guide_single_turn 06, 14
Info Sufficient Start Analysis _run_analysis_background 05, 07
Resume Parsing Progress 30% run_resume_parser 12
Profile / RIASEC Progress 50% run_profile_analyzer 03, 13
Career Matching Progress 70% run_career_matcher 02
Report Progress 90% run_reporter 11
Download PDF File GET …/report/pdf 11, 15
Description
Before reading this Session saving from Article 09
After reading this Understand init_db + _auto_migrate startup sequence
Next ring Article 11: From final_report to PDF (Article 11)

Full series loop index: SERIES-LOOP.md

1. What Problem Are We Solving?

iCan uses SQLite (default sqlite:///./ican.db) to store sessions and reports. During iteration, workflow_data (JSON, storing conversation history and intermediate analysis results) was added to the Session table.
If we only use create_all(): new tables can be created, but missing columns in existing tables won’t be automatically added. After local upgrades, it’s common to get a “column does not exist” error.

In the MVP stage, we don’t want the full Alembic setup, so we added _auto_migrate in db/session.py: on startup, compare ORM with actual table structure, if columns are missing, ALTER TABLE ADD COLUMN.


2. Call Chain: lifespan → init_db → _auto_migrate

The entry point is in main.py‘s FastAPI lifespan:

1
2
3
4
5
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db() # create_all + _auto_migrate
yield
dispose_engine()

init_db() (db/session.py) has a fixed order:

  1. Base.metadata.create_all(bind=engine, checkfirst=True) — creates non-existent tables;
  2. _auto_migrate(engine) — adds columns to existing tables.

This runs every time the container/process starts, no manual migration commands needed.


SQLAlchemy Auto Migration


3. _auto_migrate Implementation (Consistent with Source Code)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def _auto_migrate(engine):
from ican.db.models import Base
from sqlalchemy import inspect, text

inspector = inspect(engine)
for table_name, table_obj in Base.metadata.tables.items():
if not inspector.has_table(table_name):
continue
existing_columns = {col["name"] for col in inspector.get_columns(table_name)}
for column in table_obj.columns:
if column.name not in existing_columns:
col_type = column.type.compile(engine.dialect)
nullable = "NULL" if column.nullable else "NOT NULL"
default = ""
if column.default is not None:
default = f" DEFAULT {column.default.arg}"
elif column.server_default is not None:
default = f" DEFAULT {column.server_default.arg}"
sql = f"ALTER TABLE {table_name} ADD COLUMN {column.name} {col_type} {nullable}{default}"
with engine.connect() as conn:
conn.execute(text(sql))
conn.commit()

Logic: only adds columns, does not delete, change types, or rename.


4. Typical Scenario: sessions.workflow_data

db/models.py Session definition (excerpt):

1
2
3
4
5
6
7
8
9
class Session(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "sessions"
user_id: Mapped[str] = mapped_column(String(36), nullable=False, default="anonymous")
status: Mapped[str] = mapped_column(String(20), nullable=False, default="active")
current_stage: Mapped[str] = mapped_column(String(50), nullable=False, default="guide")
workflow_data: Mapped[dict | None] = mapped_column(
JSON, nullable=True, default=None,
comment="工作流完整数据: conversation_history、collected_info 等",
)

If the old database only has the first three columns, a log similar to this appears on startup:

1
[_auto_migrate] 自动迁移: ALTER TABLE sessions ADD COLUMN workflow_data JSON NULL

workflow.run_analysis_pipeline writes quick reports into SessionRepository.save_session(..., workflow_data={...}) when Ollama is unavailable — it depends on this column existing.

The same file also has tables like messages, user_profiles, career_reports, etc. New fields also go through this logic.


5. Engine and Session: Relationship with Migration

_get_engine() is a lazy singleton that reads settings.DB_URL:

  • SQLite: check_same_thread=False;
  • PostgreSQL etc.: pool_size=5, pool_recycle=3600.

Migration runs after the engine is created and before business requests; it’s independent of the get_db_session() context manager (commit/rollback).


6. Capability Boundaries (Must Be Clear)

Operation _auto_migrate Alembic
New Table ✅ create_all
Add Column
Delete Column
Change Column Type
Rollback
Multi-environment Version Chain

Adding a NOT NULL column without a DEFAULT: On tables with existing data, ALTER ADD may fail or leave empty rows violating constraints — iCan’s current new columns are mostly nullable=True or have defaults, intentionally avoiding this.

SQLite and JSON: SQLAlchemy JSON stores as text on SQLite; switching to PostgreSQL cross-dialect requires re-validating the type compilation result.


7. Pitfalls

① create_all cannot replace migration
Running only create_all without _auto_migrate means old user databases will always miss columns. Ensure lifespan calls init_db(), not just create_all alone.

② Production multi-instance simultaneous startup
Multiple workers running ALTER TABLE on the same column concurrently may result in a “duplicate column” error for the second one — the current code does not catch this; in production use Alembic or run migration on a single instance before startup.

③ Type changes require manual work
For example, changing String(20) to String(50)_auto_migrate won’t handle it; you need to write SQL manually or use an Alembic revision.

workflow_data size
Packing the entire conversation and analysis results into JSON can cause a single row to bloat over time; this is a product-level issue, not a migration one, but DBA perspective should be aware.


8. Summary

  • Migration entry: main.lifespaninit_db()create_all + _auto_migrate.
  • Implementation file: db/session.py; model definition: db/models.py.
  • Suitable for MVP quick column addition; for delete/change columns, rollback, team collaboration, switch to Alembic.
  • Business-critical column: sessions.workflow_data supports persistence for the analysis pipeline and Ollama fallback reports.

Next article: How tools/pdf_exporter.py turns Reporter’s Markdown into a Chinese PDF.


Appendix: Key Source Code (Line-by-Line Annotations)

The following code is from the iCan implementation, with Chinese comments above each line, allowing you to follow along even without the public repo.
Generation command: python3 bin/build-ican-annotated-snippets.py

lifespan calls init_db

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# ========== lifespan calls init_db ==========
# Source file: main.py Lines 24-49

# L24: Decorator
@asynccontextmanager
# L25: Async function lifespan: can be awaited, suitable for IO-based LLM/DB calls
async def lifespan(app: FastAPI):
# L27: 【Doc】Application lifecycle management: initialize database on startup, clean up resources on shutdown
# L29: 【Doc】Function description:
# L30: 【Doc】Execute database initialization (create all tables) when the FastAPI application starts,
# L31: 【Doc】and release database engine resources when the application shuts down. Uses asynccontextmanager
# L32: 【Doc】to ensure proper acquisition and release of resources.
# L34: 【Doc】Input parameters:
# L35: 【Doc】app (FastAPI): FastAPI application instance
# L37: 【Doc】Output parameters:
# L38: 【Doc】None (context manager, yield gives control to the application runtime)
# L40: 【Doc】Notes:
# L41: 【Doc】- Startup phase: calls init_db() to initialize database table structure
# L42: 【Doc】- Runtime phase: application processes requests normally
# L43: 【Doc】- Shutdown phase: calls dispose_engine() to release database connection pool
# (L26-44 are function/module docstrings, converted to comments for readability)
# L45: Start try block, except catches any exceptions
try:
# L46: Log to help online debugging of node parameters/outputs
logger.info("[lifespan] 应用启动,开始初始化资源")
# L47: Startup phase: initialize database
# L48: Create tables + auto migrate missing columns
init_db()
# L49: Log to help online debugging of node parameters/outputs
logger.info("[lifespan] 数据库初始化完成,应用已就绪")

init_db

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# ========== init_db ==========
# Source file: db/session.py Lines 197-228

# L197: Synchronous function init_db: routing decision or factory method
def init_db() -> None:
# L199: 【Doc】Initialize the database, create all tables.
# L201: 【Doc】Function description:
# L202: 【Doc】Based on ORM model definitions, create all non-existent tables in the database.
# L203: 【Doc】Uses checkfirst=True to avoid recreating existing tables.
# L205: 【Doc】Input parameters:
# L206: 【Doc】None
# L208: 【Doc】Output parameters:
# L209: 【Doc】None
# (L198-210 are function/module docstrings, converted to comments for readability)
# L211: Start try block, except catches any exceptions
try:
# L212: Log to help online debugging of node parameters/outputs
logger.info("[init_db] 开始执行,入参: 无")

# L214: Import all models to ensure they are registered in Base.metadata
# L215: Import dependency module
from ican.db.models import Base # noqa: F401

# L217: Assignment: update local variable or state field
engine = _get_engine()

# L219: Log to help online debugging of node parameters/outputs
logger.info("[init_db] 开始创建数据库表...")
# L220: Assignment: update local variable or state field
Base.metadata.create_all(bind=engine, checkfirst=True)

# L222: Compare ORM with database, ALTER TABLE to add missing columns
_auto_migrate(engine)

# L224: Log to help online debugging of node parameters/outputs
logger.info("[init_db] 执行完成,数据库表初始化成功")

# L226: Catch exception to prevent entire graph/request from crashing
except Exception as e:
# L227: Log to help online debugging of node parameters/outputs
logger.error(f"[init_db] 初始化数据库异常: {e}")
# L228: Re-raise exception, to be handled by caller or LangGraph
raise

_auto_migrate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# ========== _auto_migrate ==========
# Source file: db/session.py Lines 158-194

# L158: Synchronous function _auto_migrate: routing decision or factory method
def _auto_migrate(engine):
# L160: 【Doc】Automatically check and add missing columns in database tables.
# L162: 【Doc】Function description:
# L163: 【Doc】Compare ORM model definitions with actual database table structure,
# L164: 【Doc】automatically ALTER TABLE to add missing columns.
# L165: 【Doc】Only handles new columns, does not handle column type changes or deletions.
# L167: 【Doc】Input parameters:
# L168: 【Doc】engine: SQLAlchemy database engine
# L170: 【Doc】Output parameters:
# L171: 【Doc】None
# (L159-172 are function/module docstrings, converted to comments for readability)
# L173: Import dependency module
from ican.db.models import Base
# L174: Import dependency module
from sqlalchemy import inspect, text

# L176: Assignment: update local variable or state field
inspector = inspect(engine)
# L177: Loop
for table_name, table_obj in Base.metadata.tables.items():
# L178: Conditional branch
if not inspector.has_table(table_name):
# L179: Execute this statement (details in description above)
continue
# L180: Assignment: update local variable or state field
existing_columns = {col["name"] for col in inspector.get_columns(table_name)}
# L181: Loop
for column in table_obj.columns:
# L182: Conditional branch
if column.name not in existing_columns:
# L183: Assignment: update local variable or state field
col_type = column.type.compile(engine.dialect)
# L184: Assignment: update local variable or state field
nullable = "NULL" if column.nullable else "NOT NULL"
# L185: Assignment: update local variable or state field
default = ""
# L186: Conditional branch
if column.default is not None:
# L187: Assignment: update local variable or state field
default = f" DEFAULT {column.default.arg}"
# L188: Conditional branch
elif column.server_default is not None:
# L189: Assignment: update local variable or state field
default = f" DEFAULT {column.server_default.arg}"
# L190: Assignment: update local variable or state field
sql = f"ALTER TABLE {table_name} ADD COLUMN {column.name} {col_type} {nullable}{default}"
# L191: Execute this statement (details in description above)
with engine.connect() as conn:
# L192: Execute this statement (details in description above)
conn.execute(text(sql))
# L193: Execute this statement (details in description above)
conn.commit()
# L194: Compare ORM with database, ALTER TABLE to add missing columns
logger.info(f"[_auto_migrate] 自动迁移: {sql}")

Session.workflow_data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# ========== Session.workflow_data ==========
# Source file: db/models.py Lines 170-215

# L170: Define class (configuration or ORM model)
class Session(UUIDPrimaryKeyMixin, TimestampMixin, Base):
# L172: 【Doc】Session table model.
# L174: 【Doc】Function description:
# L175: 【Doc】Records information for each user consultation session, including session status, current stage, etc.
# L176: 【Doc】A session corresponds to one complete career planning consultation workflow.
# L178: 【Doc】Input parameters:
# L179: 【Doc】user_id (str): User identifier
# L180: 【Doc】status (str): Session status
# L181: 【Doc】current_stage (str): Current stage
# L183: 【Doc】Output parameters:
# L184: 【Doc】Session ORM instance
# (L171-185 are function/module docstrings, converted to comments for readability)

# L187: Assignment: update local variable or state field
__tablename__ = "sessions"

# L189: Assignment: update local variable or state field
user_id: Mapped[str] = mapped_column(
# L190: Execute this statement (details in description above)
String(36),
# L191: Assignment: update local variable or state field
nullable=False,
# L192: Assignment: update local variable or state field
default="anonymous",
# L193: Assignment: update local variable or state field
comment="用户标识",
# L194: Execute this statement (details in description above)
)

# L196: Assignment: update local variable or state field
status: Mapped[str] = mapped_column(
# L197: Execute this statement (details in description above)
String(20),
# L198: Assignment: update local variable or state field
nullable=False,
# L199: Assignment: update local variable or state field
default="active",
# L200: Assignment: update local variable or state field
comment="会话状态: active, completed, abandoned",
# L201: Execute this statement (details in description above)
)

# L203: Assignment: update local variable or state field
current_stage: Mapped[str] = mapped_column(
# L204: Execute this statement (details in description above)
String(50),
# L205: Assignment: update local variable or state field
nullable=False,
# L206: Assignment: update local variable or state field
default="guide",
# L207: Assignment: update local variable or state field
comment="当前阶段: guide, parse, analyze, match, plan, report",
# L208: Execute this statement (details in description above)
)

# L210: JSON field: stores conversation history, intermediate results, final_report, etc.
workflow_data: Mapped[dict | None] = mapped_column(
# L211: Execute this statement (details in description above)
JSON,
# L212: Assignment: update local variable or state field
nullable=True,
# L213: Assignment: update local variable or state field
default=None,
# L214: Multi-turn conversation list, each element is {role, content}
comment="工作流完整数据 (JSON): 包含 conversation_history、collected_info 等全部中间结果",
# L215: Execute this statement (details in description above)
)

Series Navigation

Article Topic
1 System Overview
2 Five-Agent Collaboration
3 Holland RIASEC
4-7 State · Routing · Nesting · Fault Tolerance
8-11 LLM Layer · SSE/WS · DB Migration · PDF
12-14 JSON Prompt · RIASEC Prompt · Guide Prompt
15-17 Docker · Middleware · Configuration

← Back to iCan Special