0. Series Loop (Follow Along Without Public Source Code)

End-to-End Pipeline: Vue frontend β†’ api/routes/chat.py β†’ Guide multi-turn SSE β†’ run_analysis_pipeline (parse β†’ analyze β†’ match β†’ report) β†’ tools/pdf_exporter PDF.
This Article: 9/17 Β· Integration Loop Β· SSE / Progress

Phase User Visible Code Entry Article
Create Session Welcome Message POST /api/sessions 09
Multi-turn Chat SSE Streaming chat/stream β†’ run_guide_single_turn 06, 14
Info Sufficient Start Analysis _run_analysis_background 05, 07
Resume Parsing Progress 30% run_resume_parser 12
Profile/RIASEC Progress 50% run_profile_analyzer 03, 13
Career Matching Progress 70% run_career_matcher 02
Report Progress 90% run_reporter 11
Download PDF File GET …/report/pdf 11, 15
Description
Before reading this Article 07 run_analysis_pipeline
After reading this You can follow the sequence of chat/stream and _run_analysis_background
Next ring Article 10: Writing results to workflow_data (Article 10)

Full series loop index: SERIES-LOOP.md

1. What Problem Does This Solve

The iCan frontend has two types of real-time requirements:

  1. Guide Phase: After the user sends a message, they want to see the AI reply word by word (typewriter effect);
  2. Analysis Phase: After sufficient information is collected, the backend runs run_analysis_pipeline (parse β†’ analyze β†’ match β†’ report). The frontend needs to know β€œwhich step it has reached”.

If LLM streaming and analysis progress are mixed in the same HTTP long connection, the protocol and timeout strategies become entangled. The current implementation uses SSE for LLM output, an in-memory dictionary + polling for analysis progress, and WebSocket infrastructure is built but progress push has not yet been integrated into the main path.


2. Implementation Locations

Module Responsibility
api/routes/chat.py Chat CRUD, SSE streaming, run_guide_chat, background analysis tasks
workflow.py run_guide_chat (single-turn guidance), run_analysis_pipeline (four stages of analysis)
api/ws_manager.py ConnectionManager: connect / send_progress / send_completed / send_error
api/routes/ws.py WebSocket endpoint /ws/{session_id}

Route prefix: chat.py router has prefix="/api/sessions".


SSE Streaming and WebSocket Progress


3. Non-Streaming Chat: run_guide_chat

Regular chat goes through two POST endpoints, both internally calling workflow.run_guide_chat β†’ agents/guide.run_guide_single_turn (single-turn LLM, not going through the Guide’s inner 5-node subgraph; see Article 6):

Endpoint Scenario
POST /api/sessions Create session + first message
POST /api/sessions/{session_id}/chat Subsequent multi-turn

When is_info_sufficient=True, chat.py calls _set_status(session_id, "processing", "parse") and asyncio.create_task(_run_analysis_background(...)). The HTTP response immediately returns stage="processing", and the analysis runs in the background.

Timeout: asyncio.wait_for(..., timeout=90) wraps run_guide_chat. On timeout, a fixed prompt is returned instead of throwing a 500.


4. SSE Streaming: POST /api/sessions/{session_id}/chat/stream

This is the only SSE entry for the typewriter effect during the Guide phase. Returns StreamingResponse(..., media_type="text/event-stream").

4.1 Guide Mode (Report Not Yet Generated)

Flow: Producer-Consumer + asyncio.Queue (not using thread queue.Queue).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# api/routes/chat.py β€” event_generator core (excerpt)
async def _stream_worker():
async for chunk in model.astream(processed):
if chunk.content:
await chunk_queue.put(chunk.content)

task = asyncio.create_task(_stream_worker())

while True:
chunk = await asyncio.wait_for(chunk_queue.get(), timeout=120)
if chunk is None:
break
yield f"data: {json.dumps({'type': 'chunk', 'content': chunk})}\n\n"

# Cancel worker after stream ends
if not task.done():
task.cancel()

Message body: GUIDE_SYSTEM_PROMPT + existing conversation_history + current user message; processed through _inject_no_think (Ollama Qwen3; see Article 8).

SSE event types:

type Description
chunk LLM incremental text, content field
done Stream ends, stage is guide or processing, is_sufficient boolean
error Timeout or exception

After the stream ends, the server performs keyword sufficiency check (same rule as run_guide_single_turn: found_keywords >= 6 or >=4 and text>=50), writes to DB conversation_history / collected_info. If sufficient, _set_status(..., "parse") and create_task(_run_analysis_background), the done event carries stage=processing.

4.2 QA Mode (final_report Exists)

If workflow_data.final_report is not empty, chat/stream switches to report Q&A: system prompt injects personal_profile and career_matches JSON summary, still uses astream + Queue, but done has stage=qa and writes to qa_history.

4.3 Concurrency Protection

When _workflow_status[session_id].status == "processing", chat/stream returns 409 β€œProcessing in progress”. When a report exists, Guide streaming is unavailable and goes to QA branch.


5. Analysis Progress: In-Memory + Timer + Polling (Not WebSocket Main Path)

Progress status is maintained in chat.py module-level dictionary _workflow_status and asynchronously persisted to workflow_data.status via _save_status_to_db:

1
2
3
4
5
6
7
8
9
10
11
12
13
STAGE_PROGRESS = {"guide": 0.1, "parse": 0.3, "analyze": 0.5,
"match": 0.7, "report": 0.9, "completed": 1.0}

def _set_status(session_id, status, stage, reply=None):
data = {
"status": status,
"stage": stage,
"progress": STAGE_PROGRESS.get(stage, 0.0),
"message": STAGE_LABELS.get(stage, stage),
"reply": reply,
}
_workflow_status[session_id] = data
# ensure_future β†’ _save_status_to_db

The background task _run_analysis_background calls run_analysis_pipeline and also starts _update_progress_periodically:

1
2
3
4
5
6
async def _update_progress_periodically(session_id: str):
for stage in ["parse", "analyze", "match", "report"]:
await asyncio.sleep(20) # Switch stage every 20 seconds
if _workflow_status.get(session_id, {}).get("status") in ("completed", "failed"):
return
_set_status(session_id, "processing", stage)

Frontend polls GET /api/sessions/{session_id}/status: first reads in-memory _workflow_status, on miss restores from DB workflow_data.status.

When api/routes/report_gen.py independently uploads a resume to generate a report, it reuses the same _set_status / _update_progress_periodically (imported from chat.py).


6. WebSocket: ws_manager Current Status

6.1 Connection Endpoint

1
2
3
4
5
6
7
8
9
10
# api/routes/ws.py
@router.websocket("/ws/{session_id}")
async def websocket_endpoint(ws: WebSocket, session_id: str):
await ws.accept()
await ws_manager.connect(session_id, ws)
try:
while True:
await ws.receive_text() # Keep alive, client can send heartbeat
except WebSocketDisconnect:
ws_manager.disconnect(session_id, ws)

ConnectionManager in api/ws_manager.py maintains connection list per session_id, providing:

1
2
3
4
5
6
7
8
9
10
async def send_progress(self, session_id, stage, progress, message):
data = json.dumps({"type": "progress", "stage": stage,
"progress": progress, "message": message})
# Iterate active[session_id], only warning on failure

async def send_completed(self, session_id, reply=""):
data = json.dumps({"type": "completed", "reply": reply})

async def send_error(self, session_id, error: str):
data = json.dumps({"type": "error", "error": error})

6.2 Actual Call Sites

Grepping the whole repo: send_progress / send_completed are NOT called by chat.py or _run_analysis_background.
The only production path call is await ws_manager.send_error(session_id, str(e)) in the top-level except of workflow.run_analysis_pipeline.

Conclusion: The WebSocket manager is ready, the current frontend progress should rely on GET .../status polling. To use WS for progress push, you need to add await ws_manager.send_progress(...) at _set_status or in each stage of _run_analysis_background, and add send_completed when finished.


7. Data Flow Overview

1
2
3
4
5
6
7
8
9
10
11
12
13
User Message
β”œβ”€ POST /chat ────────── run_guide_chat ──► One-shot JSON reply
└─ POST /chat/stream ─── model.astream ──► SSE chunk/done
β”‚
└─ is_sufficient ──► create_task(_run_analysis_background)
β”‚
β”œβ”€ run_analysis_pipeline (workflow.py)
β”œβ”€ _update_progress_periodically (timer changes stage)
└─ _set_status β†’ memory + DB
β–²
Frontend GET /status β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

WebSocket /ws/{id} ── Connected, progress push not wired (only pipeline error send_error)

After analysis completes, _run_analysis_background merges final_report, personal_profile, etc. into workflow_data, and calls _set_status(..., "completed", "completed", reply="Report generated!").


8. Pitfalls

β‘  WS progress methods not connected to business logic
send_progress / send_completed are implemented in ws_manager.py, but _run_analysis_background only uses _set_status. Documentation or frontend that assumes β€œconnecting to WS will receive progress events” will be inconsistent with the source code.

β‘‘ Progress is simulated by timer, not real Agent callback
_update_progress_periodically changes the stage name every 20 seconds, unrelated to whether run_resume_parser etc. have actually completed. Long-running steps might show report stage while the Reporter is still running.

β‘’ SSE and REST each have their own sufficiency logic
chat/stream computes keywords locally after stream ends. POST /chat goes through run_guide_single_turn. Both use the same rules, but they differ from the inner check_sufficiency (LLM judges sufficient) β€” do not confuse with Article 6 subgraph logic.

β‘£ Producer must use asyncio.create_task
If _stream_worker is directly awaited, it would block event_generator from yielding. The Queue must be asyncio.Queue. On consumer exit, task.cancel() must be called, otherwise the worker leaks.

β‘€ connect is async, disconnect is sync
ws_manager.connect is an async function (though no async IO inside); disconnect is a regular def. Be aware of interface consistency when extending.


9. Summary

  • SSE endpoint: POST /api/sessions/{session_id}/chat/stream; events chunk / done / error; after report generation, the same endpoint switches to QA mode.
  • Non-streaming: POST /api/sessions and POST .../chat, through workflow.run_guide_chat.
  • Analysis progress: _workflow_status + _set_status + timer _update_progress_periodically; frontend polls GET /api/sessions/{session_id}/status.
  • WebSocket: /ws/{session_id} can be connected; ws_manager progress/complete methods are not yet wired into the main flow, only pipeline error send_error.
  • To extend real-time capabilities, prefer adding WS push alongside _set_status, or switch to explicit callbacks inside Agent nodes, rather than continuing to rely on pure timer-based stages.

Next article: SQLAlchemy auto-migration (db/session.py).


Appendix: Key Source Code (Line-by-Line Comments)

The following code is excerpted from iCan implementation, each line has Chinese comments above. You can follow along even without the public repository.
Generation command: python3 bin/build-ican-annotated-snippets.py

SSE Endpoint Entry chat_stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# ========== SSE endpoint entry chat_stream ==========
# Source file: api/routes/chat.py lines 257-275

# L257: Decorator
@router.post("/{session_id}/chat/stream")
# L258: Async function chat_stream: can be awaited, suitable for IO-bound LLM/DB calls
async def chat_stream(session_id: str, request: ChatRequest, user_id: str = Depends(get_current_user)):
# L259: Begin try block, subsequent except handles fallback
try:
# L260: Assignment: update local variable or state field
session_data = repository.get_session(session_id)
# L261: Conditional branch
if not session_data:
# L262: Raise exception upward, handled by caller or LangGraph
raise HTTPException(status_code=404, detail=f"Session does not exist: {session_id}")

# L264: Conditional branch
if user_id and session_data.get("user_id") != user_id:
# L265: Raise exception upward, handled by caller or LangGraph
raise HTTPException(status_code=403, detail="No permission to access this session")

# L267: Assignment: update local variable or state field
current_status = _workflow_status.get(session_id, {}).get("status")
# L268: Conditional branch
if current_status == "processing":
# L269: Raise exception upward, handled by caller or LangGraph
raise HTTPException(status_code=409, detail="Processing in progress, please wait")

# L271: JSON field: stores conversation history, intermediate results, final_report, etc.
workflow_data = session_data.get("workflow_data") or {}
# L272: JSON field: stores conversation history, intermediate results, final_report, etc.
final_report = workflow_data.get("final_report", "")
# L273: JSON field: stores conversation history, intermediate results, final_report, etc.
personal_profile = workflow_data.get("personal_profile") or workflow_data.get("structured_profile") or {}
# L274: JSON field: stores conversation history, intermediate results, final_report, etc.
career_matches = workflow_data.get("career_matches") or []

_run_analysis_background

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# ========== _run_analysis_background ==========
# Source file: api/routes/chat.py lines 495-561

# L495: Async function _run_analysis_background: can be awaited, suitable for IO-bound LLM/DB calls
async def _run_analysis_background(session_id: str, conversation_history: list, collected_info: dict, user_id: str):
# L496: Begin try block, subsequent except handles fallback
try:
# L497: Execute statement (details in business description above)
_set_status(session_id, "processing", "parse")

# L499: Assignment: update local variable or state field
progress_task = asyncio.create_task(_update_progress_periodically(session_id))

# L501: Import dependency module
from ican.workflow import run_analysis_pipeline
# L502: HTTP main analysis chain: parse→analyze→match→report, skip top-level guide loop
workflow_result = await run_analysis_pipeline(
# L503: List of multi-turn dialogues, elements are {role, content}
conversation_history=conversation_history,
# L504: Assignment: update local variable or state field
collected_info=collected_info,
# L505: Assignment: update local variable or state field
session_id=session_id,
# L506: Assignment: update local variable or state field
user_id=user_id,
# L507: Execute statement (details in business description above)
)

# L509: Execute statement (details in business description above)
progress_task.cancel()
# L510: Begin try block, subsequent except handles fallback
try:
# L511: Execute statement (details in business description above)
await progress_task
# L512: Catch exception, prevent graph/request from crashing
except asyncio.CancelledError:
# L513: Execute statement (details in business description above)
pass

# L515: Assignment: update local variable or state field
final_report = workflow_result.get("final_report", "")

# L517: Begin try block, subsequent except handles fallback
try:
# L518: Persist session/report to SQLite (via repository layer)
repository.save_report(
# L519: Assignment: update local variable or state field
session_id=session_id,
# L520: Assignment: update local variable or state field
report_type="comprehensive",
# L521: Assignment: update local variable or state field
report_content=final_report,
# L522: Assignment: update local variable or state field
user_id=user_id,
# L523: Execute statement (details in business description above)
)
# L524: Catch exception, prevent graph/request from crashing
except Exception as db_err:
# L525: Log, helpful for debugging node input/output in production
logger.warning("[_run_analysis_background] Exception saving report to database: %s", db_err)

# L527: Assignment: update local variable or state field
session_data = repository.get_session(session_id) or {}
# L528: JSON field: stores conversation history, intermediate results, final_report, etc.
old_workflow = session_data.get("workflow_data", {}) if isinstance(session_data, dict) else {}
# L529: Assignment: update local variable or state field
merged_workflow = dict(old_workflow)
# L530: Execute statement (details in business description above)
merged_workflow.update({
# L531: Execute statement (details in business description above)
"final_report": final_report,
# L532: Execute statement (details in business description above)
"structured_profile": workflow_result.get("structured_profile", {}),
# L533: Execute statement (details in business description above)
"personal_profile": workflow_result.get("personal_profile", {}),
# L534: Execute statement (details in business description above)
"career_matches": workflow_result.get("career_matches", []),
# L535: Execute statement (details in business description above)
})

# L537: Persist session/report to SQLite (via repository layer)
repository.save_session(
# L538: Assignment: update local variable or state field
session_id=session_id,
# L539: Assignment: update local variable or state field
user_id=user_id,
# L540: Assignment: update local variable or state field
status="completed",
# L541: Assignment: update local variable or state field
current_stage="completed",
# L542: JSON field: stores conversation history, intermediate results, final_report, etc.
workflow_data=merged_workflow,
# L543: Execute statement (details in business description above)
)

# L545: Conditional branch
if final_report:
# L546: Execute statement (details in business description above)
repository.save_career_report(
# L547: Assignment: update local variable or state field
session_id=session_id,
# L548: Assignment: update local variable or state field
report_data={"report_type": "full", "full_report": {"content": final_report}},
# L549: Execute statement (details in business description above)
)

# L551: Assignment: update local variable or state field
personal_profile = workflow_result.get("personal_profile", {})
# L552: Conditional branch
if personal_profile:
# L553: Assignment: update local variable or state field
repository.save_user_profile(session_id=session_id, profile_data=personal_profile)

# L555: Assignment: update local variable or state field
_set_status(session_id, "completed", "completed", reply="Report generated!")
# L556: Log, helpful for debugging node input/output in production
logger.info("[_run_analysis_background] Analysis complete: session_id=%s, report length=%d", session_id, len(final_report))

# L558: Catch exception, prevent graph/request from crashing
except Exception as e:
# L559: Log, helpful for debugging node input/output in production
logger.error("[_run_analysis_background] Analysis exception: session_id=%s, error=%s", session_id, e, exc_info=True)
# L560: Assignment: update local variable or state field
_set_status(session_id, "failed", "failed", reply=f"Report generation failed: {str(e)}")
# L561: Execute statement (details in business description above)
repository.update_session_status(session_id, "failed", "failed")

run_guide_chat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# ========== run_guide_chat ==========
# Source file: workflow.py lines 558-600

# L558: Async function run_guide_chat: can be awaited, suitable for IO-bound LLM/DB calls
async def run_guide_chat(conversation_history: list, user_message: str) -> dict:
# L560: [Document] Run single-turn chat guidance (does not trigger analysis pipeline)
# L562: [Document] Function description:
# L563: [Document] Calls the Guide Agent's single-turn dialogue mode, quickly returns AI reply.
# L564: [Document] Based on conversation history and new user message, determines if information is sufficient.
# L566: [Document] Inputs:
# L567: [Document] conversation_history (list): Existing conversation history
# L568: [Document] user_message (str): Latest user message
# L570: [Document] Output:
# L571: [Document] dict: Contains reply, is_info_sufficient, conversation_history (updated)
# (L559-572 are function/module docstrings, converted to comments for readability)
# L573: Begin try block, subsequent except handles fallback
try:
# L574: List of multi-turn dialogues, elements are {role, content}
logger.info("[run_guide_chat] Starting single-turn chat, history length=%d, message length=%d", len(conversation_history), len(user_message))

# L576: Import dependency module
from ican.agents.guide import run_guide_single_turn
# L577: API single-turn Guide: does not run inner 5-node subgraph, one LLM reply
result = await run_guide_single_turn(conversation_history, user_message)

# L579: List of multi-turn dialogues, elements are {role, content}
updated_history = list(conversation_history)
# L580: Execute statement (details in business description above)
updated_history.append({"role": "user", "content": user_message})
# L581: Conditional branch
if result.get("reply"):
# L582: Execute statement (details in business description above)
updated_history.append({"role": "assistant", "content": result["reply"]})

# L584: Return fields to be merged into state by this node (LangGraph will merge)
return {
# L585: Execute statement (details in business description above)
"reply": result.get("reply", ""),
# L586: Guide determines if user info is sufficient to enter analysis phase
"is_info_sufficient": result.get("is_info_sufficient", False),
# L587: Execute statement (details in business description above)
"collected_info": result.get("collected_info", {}),
# L588: List of multi-turn dialogues, elements are {role, content}
"conversation_history": updated_history,
# L589: Execute statement (details in business description above)
}

# L591: Catch exception, prevent graph/request from crashing
except Exception as e:
# L592: Log, helpful for debugging node input/output in production
logger.error("[run_guide_chat] Single-turn chat exception: %s", e, exc_info=True)
# L593: Return fields to be merged into state by this node (LangGraph will merge)
return {
# L594: Execute statement (details in business description above)
"reply": "Sorry, something went wrong. Could you say that again?",
# L595: Guide determines if user info is sufficient to enter analysis phase
"is_info_sufficient": False,
# L596: Execute statement (details in business description above)
"collected_info": {},
# L597: List of multi-turn dialogues, elements are {role, content}
"conversation_history": conversation_history + [
# L598: Execute statement (details in business description above)
{"role": "user", "content": user_message},
# L599: Execute statement (details in business description above)
],
# L600: Execute statement (details in business description above)
}

Series Navigation

Article Topic
1 System Overview
2 Five Agent Collaboration
3 Holland RIASEC
4–7 State Β· Routing Β· Nesting Β· Fault Tolerance
8–11 LLM Layer Β· SSE/WS Β· DB Migration Β· PDF
12–14 JSON Prompt Β· RIASEC Prompt Β· Guide Prompt
15–17 Docker Β· Middleware Β· Configuration

← Back to iCan Special