asyncdefget_session_with_messages(db: AsyncSession, session_id: int): """Load session along with all its messages""" result = await db.execute( select(ChatSession).where(ChatSession.id == session_id) ) session = result.scalar_one_or_none()
if session: # Load all messages for this session (sorted by time) msg_result = await db.execute( select(ChatMessage) .where(ChatMessage.session_id == session_id) .order_by(ChatMessage.created_at) ) session.messages = list(msg_result.scalars().all())
return session
asyncdefdelete_session(db: AsyncSession, session_id: int) -> bool: """Delete session and all associated messages""" from sqlalchemy import delete
# Delete messages first (foreign key constraint) await db.execute( delete(ChatMessage).where(ChatMessage.session_id == session_id) ) # Then delete session result = await db.execute( delete(ChatSession).where(ChatSession.id == session_id) ) return result.rowcount > 0
asyncdefchat(db: AsyncSession, session_id: int, user_message: str) -> ChatMessage: # Step 1: Load recent message history (limit to last 10 rounds) history_result = await db.execute( select(ChatMessage) .where(ChatMessage.session_id == session_id) .order_by(ChatMessage.created_at) ) history = list(history_result.scalars().all())
# ⚡ Key: Take only the last 10 messages (~5 conversation rounds) recent_history = history[-10:]
# Step 2: Build conversation context array conversation = [] for msg in recent_history: conversation.append({ "role": msg.role, "content": msg.content })
# Append current user message conversation.append({"role": "user", "content": user_message})
🎯 Why 10 Messages Instead of All?
Strategy
Token Consumption
Context Completeness
Use Case
Full History
⚠️ May exceed limit (>80K tokens)
✅ Complete
Short conversations (<20 rounds)
Last N items (current)
✅ Controllable (~5K tokens)
✅ Sufficient
Long conversations (50+ rounds)
Summary Compression
✅ Very low
❌ May lose details
Very long conversations (100+ rounds)
Our choice: history[-10:] balances cost and effectiveness.
💡 Advanced Optimization: Sliding Window + Summary
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
defbuild_smart_context(history, max_tokens=6000): """ Smart context assembly strategy: - Always keep the first 2 rounds (establish initial context) - Always keep the last 8 rounds (maintain coherence) - Replace middle rounds with summary (if too long) """ iflen(history) <= 10: return history[-10:]
# Keep head and tail, summarize middle head = history[:4] # First 2 rounds tail = history[-6:] # Last 3 rounds middle_summary = summarize_messages(history[4:-6])
# Get semantic model semantic_model = await get_semantic_model(db, session.datasource_id)
# Build System Prompt system_prompt = f"""You are a professional data analysis assistant. You help users query and analyze data from databases. You have access to the following database structure:
When the user asks data-related questions, please reply in the following format: 1. Briefly explain what data you are going to query 2. Wrap the generated SQL query statement in ```sql ``` code blocks 3. Provide valuable analysis and insights whenever possible
If the user's question is not data-related, reply normally."""
# Assemble complete conversation history conversation_text = "" for msg in conversation: role = msg["role"] content = msg["content"] conversation_text += f"\n{role}: {content}"
Role Definition, Semantic Model Injection, Output Format Constraints
2.2 Actual Conversation Example
Round 1:
1 2 3 4 5 6 7 8 9 10 11 12
👤 User: Query the total order amount for the last 7 days
🤖 Assistant: I will query the total order amount for the last 7 days (2026-05-15 to 2026-05-22).
```sql SELECT DATE(created_at) as date, SUM(total_amount) as daily_total FROM orders WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL 7 DAY) GROUP BY DATE(created_at) ORDER BY date DESC;
📊 Analysis Findings:
Total orders in the last 7 days: ¥1,234,567
Peak on May 18 (¥234,567), possibly due to promotional events
Overall upward trend with an average daily growth of 12%
1 2 3
**Round 2** (leveraging context):
👤 User: Break down the above data further by region
🤖 Assistant: (understands “above” = order data) Okay, I will break down the order sales of the last 7 days by region.
1 2 3 4 5 6 7 8
SELECT r.name as region, SUM(o.total_amount) as total_sales FROM orders o JOIN users u ON o.user_id = u.id JOIN regions r ON u.region_id = r.id WHERE o.created_at >= DATE_SUB(CURDATE(), INTERVAL7DAY) GROUPBY r.id, r.name ORDERBY total_sales DESC;
📊 Regional Ranking:
East China: ¥456,789 (37%)
South Region: ¥345,678 (28%)
North Region: ¥234,567 (19%) …
1 2 3
**Round 3** (generating report):
👤 User: Help me generate an analysis report
🤖 Assistant: Generating a professional analysis report based on our previous conversation data…
📊 Sales Analysis Report (2026-05-15 ~ 2026-05-22)
I. Key Findings
✅ Weekly total sales exceeded 1.2 million, up 15% MoM ✅ East China contributed the most, accounting for 37% of total …
(See “Report Generation” section below for details)
Plain text reply (e.g., chitchat, explanatory questions)
Display text only
SQL validation fails
Do not execute, log WARNING
Prompt “Generated SQL poses a risk”
SQL execution exception
Catch exception, return error message
Display “Query error: table does not exist”
Empty query result
Return empty list normally
Display “No data available”
📊 IV. Intelligent Chart Recommendation Algorithm (_suggest_chart)
4.1 Design Philosophy
Core principle: Automatically select the most appropriate chart type based on SQL structural characteristics and statistical features of returned data.
for col in columns: values = [row.get(col) for row in data if row.get(col) isnotNone] if values andall(isinstance(v, (int, float)) for v in values): numeric_cols.append(col) else: string_cols.append(col)
# ===== Rule Matching =====
# Rule 1: Time trend → Line chart if has_date and numeric_cols: return { "type": "line_chart", "x_field": string_cols[0] if string_cols else columns[0], "y_fields": numeric_cols, }
# Rule 2: Group-by aggregation + few categories → Pie chart (suitable for proportion display) elif has_group_by and numeric_cols: iflen(data) <= 8: # Use pie chart when categories ≤ 8 return { "type": "pie_chart", "label_field": string_cols[0] if string_cols else columns[0], "value_field": numeric_cols[0], }
# Rule 3: Group-by aggregation + many categories → Bar chart (suitable for comparison) else: return { "type": "bar_chart", "x_field": string_cols[0] if string_cols else columns[0], "y_fields": numeric_cols, }
# Rule 4: Single row, single numeric value → Metric card (KPI display) elif numeric_cols andlen(data) == 1: return { "type": "metric_card", "metrics": {col: data[0].get(col) for col in numeric_cols}, }
-- Input SQL SELECT c.name as category, SUM(oi.amount) as total FROM order_items oi JOIN products p ON oi.product_id = p.id JOIN categories c ON p.category_id = c.id GROUPBY c.id;
## I. 🎯 Key Findings 1. **Weekly total sales exceeded 1.2 million**, up 15% MoM, hitting a six-month high 2. **East China region performed exceptionally**, contributing 37% of revenue, largely due to the "618 Preheating" campaign 3. **Electronics category continues to lead**, accounting for 42% of total sales, with phone accessories growing the fastest (+28%)
## II. 📈 Key Metrics Analysis | Metric | This Week Value | MoM Change | Trend | |--------|-----------------|------------|-------| | Total Sales | ¥1,234,567 | ↑15% | 📈 Strong growth | | Order Volume | 4,567 orders | ↑8% | 📈 Steady rise | | Average Order Value | ¥270.3 | ↑6.5% | 📈 Continuous improvement | ...
## III. 💡 Action Recommendations 1. **Increase investment in East China**: Consider adding inventory and marketing budget to this region 2. **Replicate successful practices**: Analyze best-selling products in East China and promote them to other regions 3. **Monitor low-growth categories**: Books & Stationery has been declining for 3 consecutive weeks; investigate reasons ...
Args: session_id: Session ID (used to retrieve historical query results) metrics: Metrics to focus on style: Report detail level
Returns: Report text in Markdown format """ # Step 1: Get the last 5 messages with query results msg_result = await db.execute( select(ChatMessage) .where( ChatMessage.session_id == session_id, ChatMessage.query_result.isnot(None) # Only those with results ) .order_by(ChatMessage.created_at.desc()) .limit(5) ) messages = list(msg_result.scalars().all())
# Step 2: Aggregate query results data = {} for msg in messages: if msg.query_result: try: data[msg.sql_query or"query"] = json.loads(msg.query_result) except json.JSONDecodeError: pass
asyncdefgenerate_report_script(self, data: dict, metrics: list[str]) -> str: system_prompt = """You are a professional data report writing expert. Based on the query results and specified metrics, generate a professional business report in Chinese Markdown format.
The report must include the following sections: 1. **Key Findings** (Top 3-5 Insights): Most important conclusions supported by numbers 2. **Detailed Metric Analysis**: In-depth interpretation of each metric, including YoY/MoM comparisons 3. **Trends and Insights**: Analysis of underlying reasons and future predictions 4. **Recommendations and Action Plan**: Concrete, actionable suggestions based on data
Writing requirements: - Use professional yet understandable language - All conclusions must be supported by data (cite specific values) - Use Markdown formatting (headings, tables, lists, bold) - Use emojis appropriately to enhance readability (📈 📉 🎯 💡) - Total length between 1000-2000 characters"""
Limit context window size (history[-10:]) to avoid token overflow
Inject full semantic model into every prompt (ensure field name accuracy)
Use regex to extract SQL (compatible with multiple code block formats)
Implement validate_sql() with two-layer security validation
_suggest_chart() based on dual judgment of SQL features + data types
Use explicit templates and constraints for report generation
Serialize all query results as JSON and store in database
Frontend dynamically selects chart components based on chart_config field
🚀 Next Steps
The next article will dive deep into the two-stage separation architecture of the data dashboard — the most innovative design of this entire project! Why doesn’t dashboard preview need to call the LLM? How to achieve “parse at design time, execute at runtime”? What is the philosophy behind configuration-driven design?
Stay tuned! 🚀
Related code files:
app/services/chat_engine.py - Core logic of the chat engine
app/models/chat.py - Session and message data models