0. Series Loop (Follow Along Without Public Source Code)
End-to-End Pipeline: Vue frontend β api/routes/chat.py β Guide multi-turn SSE β run_analysis_pipeline (parse β analyze β match β report) β tools/pdf_exporter PDF.
This Article: 9/17 Β· Integration Loop Β· SSE / Progress
| Phase | User Visible | Code Entry | Article |
|---|---|---|---|
| Create Session | Welcome Message | POST /api/sessions | 09 |
| Multi-turn Chat | SSE Streaming | chat/stream β run_guide_single_turn | 06, 14 |
| Info Sufficient | Start Analysis | _run_analysis_background | 05, 07 |
| Resume Parsing | Progress 30% | run_resume_parser | 12 |
| Profile/RIASEC | Progress 50% | run_profile_analyzer | 03, 13 |
| Career Matching | Progress 70% | run_career_matcher | 02 |
| Report | Progress 90% | run_reporter | 11 |
| Download PDF | File | GET β¦/report/pdf | 11, 15 |
| Description | |
|---|---|
| Before reading this | Article 07 run_analysis_pipeline |
| After reading this | You can follow the sequence of chat/stream and _run_analysis_background |
| Next ring | Article 10: Writing results to workflow_data (Article 10) |
Full series loop index: SERIES-LOOP.md
1. What Problem Does This Solve
The iCan frontend has two types of real-time requirements:
- Guide Phase: After the user sends a message, they want to see the AI reply word by word (typewriter effect);
- Analysis Phase: After sufficient information is collected, the backend runs
run_analysis_pipeline(parse β analyze β match β report). The frontend needs to know βwhich step it has reachedβ.
If LLM streaming and analysis progress are mixed in the same HTTP long connection, the protocol and timeout strategies become entangled. The current implementation uses SSE for LLM output, an in-memory dictionary + polling for analysis progress, and WebSocket infrastructure is built but progress push has not yet been integrated into the main path.
2. Implementation Locations
| Module | Responsibility |
|---|---|
api/routes/chat.py |
Chat CRUD, SSE streaming, run_guide_chat, background analysis tasks |
workflow.py |
run_guide_chat (single-turn guidance), run_analysis_pipeline (four stages of analysis) |
api/ws_manager.py |
ConnectionManager: connect / send_progress / send_completed / send_error |
api/routes/ws.py |
WebSocket endpoint /ws/{session_id} |
Route prefix: chat.py router has prefix="/api/sessions".
3. Non-Streaming Chat: run_guide_chat
Regular chat goes through two POST endpoints, both internally calling workflow.run_guide_chat β agents/guide.run_guide_single_turn (single-turn LLM, not going through the Guideβs inner 5-node subgraph; see Article 6):
| Endpoint | Scenario |
|---|---|
POST /api/sessions |
Create session + first message |
POST /api/sessions/{session_id}/chat |
Subsequent multi-turn |
When is_info_sufficient=True, chat.py calls _set_status(session_id, "processing", "parse") and asyncio.create_task(_run_analysis_background(...)). The HTTP response immediately returns stage="processing", and the analysis runs in the background.
Timeout: asyncio.wait_for(..., timeout=90) wraps run_guide_chat. On timeout, a fixed prompt is returned instead of throwing a 500.
4. SSE Streaming: POST /api/sessions/{session_id}/chat/stream
This is the only SSE entry for the typewriter effect during the Guide phase. Returns StreamingResponse(..., media_type="text/event-stream").
4.1 Guide Mode (Report Not Yet Generated)
Flow: Producer-Consumer + asyncio.Queue (not using thread queue.Queue).
1 | |
Message body: GUIDE_SYSTEM_PROMPT + existing conversation_history + current user message; processed through _inject_no_think (Ollama Qwen3; see Article 8).
SSE event types:
| type | Description |
|---|---|
chunk |
LLM incremental text, content field |
done |
Stream ends, stage is guide or processing, is_sufficient boolean |
error |
Timeout or exception |
After the stream ends, the server performs keyword sufficiency check (same rule as run_guide_single_turn: found_keywords >= 6 or >=4 and text>=50), writes to DB conversation_history / collected_info. If sufficient, _set_status(..., "parse") and create_task(_run_analysis_background), the done event carries stage=processing.
4.2 QA Mode (final_report Exists)
If workflow_data.final_report is not empty, chat/stream switches to report Q&A: system prompt injects personal_profile and career_matches JSON summary, still uses astream + Queue, but done has stage=qa and writes to qa_history.
4.3 Concurrency Protection
When _workflow_status[session_id].status == "processing", chat/stream returns 409 βProcessing in progressβ. When a report exists, Guide streaming is unavailable and goes to QA branch.
5. Analysis Progress: In-Memory + Timer + Polling (Not WebSocket Main Path)
Progress status is maintained in chat.py module-level dictionary _workflow_status and asynchronously persisted to workflow_data.status via _save_status_to_db:
1 | |
The background task _run_analysis_background calls run_analysis_pipeline and also starts _update_progress_periodically:
1 | |
Frontend polls GET /api/sessions/{session_id}/status: first reads in-memory _workflow_status, on miss restores from DB workflow_data.status.
When api/routes/report_gen.py independently uploads a resume to generate a report, it reuses the same _set_status / _update_progress_periodically (imported from chat.py).
6. WebSocket: ws_manager Current Status
6.1 Connection Endpoint
1 | |
ConnectionManager in api/ws_manager.py maintains connection list per session_id, providing:
1 | |
6.2 Actual Call Sites
Grepping the whole repo: send_progress / send_completed are NOT called by chat.py or _run_analysis_background.
The only production path call is await ws_manager.send_error(session_id, str(e)) in the top-level except of workflow.run_analysis_pipeline.
Conclusion: The WebSocket manager is ready, the current frontend progress should rely on GET .../status polling. To use WS for progress push, you need to add await ws_manager.send_progress(...) at _set_status or in each stage of _run_analysis_background, and add send_completed when finished.
7. Data Flow Overview
1 | |
After analysis completes, _run_analysis_background merges final_report, personal_profile, etc. into workflow_data, and calls _set_status(..., "completed", "completed", reply="Report generated!").
8. Pitfalls
β WS progress methods not connected to business logicsend_progress / send_completed are implemented in ws_manager.py, but _run_analysis_background only uses _set_status. Documentation or frontend that assumes βconnecting to WS will receive progress eventsβ will be inconsistent with the source code.
β‘ Progress is simulated by timer, not real Agent callback_update_progress_periodically changes the stage name every 20 seconds, unrelated to whether run_resume_parser etc. have actually completed. Long-running steps might show report stage while the Reporter is still running.
β’ SSE and REST each have their own sufficiency logicchat/stream computes keywords locally after stream ends. POST /chat goes through run_guide_single_turn. Both use the same rules, but they differ from the inner check_sufficiency (LLM judges sufficient) β do not confuse with Article 6 subgraph logic.
β£ Producer must use asyncio.create_task
If _stream_worker is directly awaited, it would block event_generator from yielding. The Queue must be asyncio.Queue. On consumer exit, task.cancel() must be called, otherwise the worker leaks.
β€ connect is async, disconnect is syncws_manager.connect is an async function (though no async IO inside); disconnect is a regular def. Be aware of interface consistency when extending.
9. Summary
- SSE endpoint:
POST /api/sessions/{session_id}/chat/stream; eventschunk/done/error; after report generation, the same endpoint switches to QA mode. - Non-streaming:
POST /api/sessionsandPOST .../chat, throughworkflow.run_guide_chat. - Analysis progress:
_workflow_status+_set_status+ timer_update_progress_periodically; frontend pollsGET /api/sessions/{session_id}/status. - WebSocket:
/ws/{session_id}can be connected;ws_managerprogress/complete methods are not yet wired into the main flow, only pipeline errorsend_error. - To extend real-time capabilities, prefer adding WS push alongside
_set_status, or switch to explicit callbacks inside Agent nodes, rather than continuing to rely on pure timer-based stages.
Next article: SQLAlchemy auto-migration (
db/session.py).
Appendix: Key Source Code (Line-by-Line Comments)
The following code is excerpted from iCan implementation, each line has Chinese comments above. You can follow along even without the public repository.
Generation command: python3 bin/build-ican-annotated-snippets.py
SSE Endpoint Entry chat_stream
1 | |
_run_analysis_background
1 | |
run_guide_chat
1 | |
Series Navigation
| Article | Topic |
|---|---|
| 1 | System Overview |
| 2 | Five Agent Collaboration |
| 3 | Holland RIASEC |
| 4β7 | State Β· Routing Β· Nesting Β· Fault Tolerance |
| 8β11 | LLM Layer Β· SSE/WS Β· DB Migration Β· PDF |
| 12β14 | JSON Prompt Β· RIASEC Prompt Β· Guide Prompt |
| 15β17 | Docker Β· Middleware Β· Configuration |