📊 Table of Contents

  1. Why Does a RAG System Need a Triple‑Storage Architecture?
  2. Triple‑Storage Responsibility Division and Design Philosophy
  3. Dual‑Write Consistency Guarantee Mechanism
  4. Complete Implementation Code (Production‑Grade)
  5. Data Synchronization and Disaster Recovery
  6. Performance Optimization and Scaling Strategies
  7. Monitoring, Operations, and Best Practices
  8. Summary and Architecture Evolution Roadmap

Why Does a RAG System Need a Triple‑Storage Architecture?

Fatal Flaws of Single‑Storage

Let’s see what problems arise when you rely on only one storage.

❌ Option 1: Only MySQL (Relational Database)

1
2
3
4
5
6
7
# Suppose all data is stored in MySQL
query = "SELECT content FROM documents WHERE MATCH(content) AGAINST('糖尿病' IN NATURAL LANGUAGE MODE)"

# Problem 1: Full‑text search is slow! 10M records, query takes > 5s
# Problem 2: No semantic similarity search
# Problem 3: Large fields (e.g. original PDF, images) make backups difficult
# Problem 4: Cannot leverage vector indexes for acceleration

Consequence: QPS < 10, terrible user experience

❌ Option 2: Only Milvus (Vector Database)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Suppose all data is stuffed into Milvus dynamic fields
data = {
"doc_id": "doc_001",
"vector": [0.1, 0.2, ...], # 1024 dimensions
"content": "Here is the full document content...", # may be several KB to MB
"metadata": "{...}", # JSON string
"original_file": base64_encoded_pdf, # binary!
}

milvus.insert(data)

# Problem 1: Milvus is not suited for storing large files (memory explosion)
# Problem 2: Scalar queries are inefficient (no index optimisations)
# Problem 3: Data migration and backup are complicated
# Problem 4: Cannot perform complex analyses with SQL

Consequence: Memory usage is 5–10 times normal, high cost

❌ Option 3: Only MinIO (Object Storage)

1
2
3
4
5
6
7
# All data stored as files
minio.put_object("bucket", f"docs/{doc_id}.json", json.dumps(doc))

# Problem 1: No metadata index – only full scans possible
# Problem 2: Cannot perform vector or keyword retrieval
# Problem 3: Severe lock contention on concurrent reads/writes
# Problem 4: Data versioning is messy

Consequence: Cannot support online retrieval, offline batch only.

Advantages of Triple‑Storage Collaboration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
graph TB
subgraph WhyTriple["Why Triple Storage?"]
direction TB

subgraph MySQL_Role["🐬 MySQL Role"]
M1["✅ Structured metadata<br/>doc_id, title, source, tags..."]
M2["✅ Business logic queries<br/>filter by time/type/status"]
M3["✅ Transactional operations<br/>ACID guarantees"]
M4["✅ Complex relational analyses<br/>JOIN, GROUP BY"]
end

subgraph Milvus_Role["🔷 Milvus Role"]
V1["✅ Vector similarity search<br/>semantic search core"]
V2["✅ High‑performance ANN indexes<br/>HNSW/IVF_PQ"]
V3["✅ Unified multimodal<br/>text/image/table"]
V4["✅ Real‑time near‑real‑time search<br/>millisecond response"]
end

subgraph MinIO_Role["🪣 MinIO Role"]
O1["✅ Original file storage<br/>PDF/Word/Image/Video"]
O2["✅ Large capacity, low cost<br/>petabyte‑scale"]
O3["✅ Versioning & snapshots<br/>data rollback capability"]
O4["✅ CDN delivery acceleration<br/>static resource access"]
end
end

Synergy["🔄 Synergy effect:<br/>1+1+1 > 3"]

MySQL_Role --> Synergy
Milvus_Role --> Synergy
MinIO_Role --> Synergy

Feature Comparison

Feature MySQL Milvus MinIO
Data types Structured metadata Vectors + light scalars Files / binary large objects
Query capability SQL (strong) Vector similarity (strong) Simple CRUD (weak)
Transaction support ✅ ACID ❌ Eventual consistency ❌ Atomic operations
Scale Hundreds of millions of rows Ten‑million to billion vectors PB‑level files
Latency ms (with indexes) ms (ANN) 10–100 ms (network)
Cost model CPU / memory Memory / SSD Storage space
Typical scenario Metadata management Semantic retrieval Original document archiving

See also: 《RAG Offline Component: Metadata Enhancement & Knowledge Graph Fusion Preprocessing》 – design rationale for triple storage with metadata and lineage.

Triple‑Storage Responsibility Division and Design Philosophy

Architecture Overview

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
flowchart TB
subgraph InputLayer["Input Layer"]
Upload["📤 File upload<br/>(PDF/Word/Image)"]
API["🔗 API call<br/>(REST/gRPC)"]
BatchImport["📦 Batch import<br/>(ETL tasks)"]
end

subgraph ProcessingLayer["Processing Layer"]
Extractor["🔍 Content extractor<br/>(PyMuPDF/PaddleOCR)"]
Chunker["✂️ Chunker<br/>(5 strategies)"]
Embedder["🎯 Embedder<br/>(BGE-M3)"]
end

subgraph StorageLayer["Triple Storage Layer"]
direction LR

subgraph MySQL_DB["🐬 MySQL 8.0"]
direction TB
DocsTable["documents table<br/>(document metadata)"]
ChunksTable["chunks table<br/>(chunk records)"]
TasksTable["tasks table<br/>(task status)"]

DocsTable --> ChunksTable
end

subgraph Milvus_Store["🔷 Milvus 2.x"]
direction TB
DenseIndex["Dense Vectors<br/>(COSINE/HNSW)"]
SparseIndex["Sparse Weights<br/>(BM25-like)"]
MetadataFilter["Scalar Indexes<br/>(modal_type/business_tag)"]
end

subgraph MinIO_Bucket["🪣 MinIO"]
direction TB
RawFiles["raw-files/<br/>original files"]
ProcessedData["processed-data/<br/>processed content"]
Models["models/<br/>model files"]
Backups["backups/<br/>periodic snapshots"]
end
end

subgraph ServiceLayer["Service Layer"]
DualWriteService["✍️ Dual‑write coordinator<br/>(consistency guarantee)"]
SyncService["🔄 Sync service<br/>(compensation repair)"]
CacheLayer["💾 Redis caching layer<br/>(hot data)"]
end

Input --> ProcessingLayer
ProcessingLayer --> ServiceLayer
ServiceLayer --> StorageLayer

DualWriteService --> |"Transactional writes"| MySQL_DB
DualWriteService --> |"Async writes"| Milvus_Store
DualWriteService --> |"Parallel uploads"| MinIO_Bucket

Data Model Design

MySQL Schema (Metadata Storage)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
-- ============================================
-- RAG System - MySQL Database Schema Design
-- ============================================

-- 1. Main Document Table
CREATE TABLE documents (
doc_id VARCHAR(64) PRIMARY KEY COMMENT 'Business document ID',
title VARCHAR(512) NOT NULL COMMENT 'Document title',
source_type ENUM('pdf', 'word', 'image', 'webpage', 'table') NOT NULL COMMENT 'Source type',
source_url VARCHAR(1024) COMMENT 'Original URL or path',

-- Business attributes
business_tag VARCHAR(128) DEFAULT '' COMMENT 'Business tag (medical/legal/finance)',
domain VARCHAR(64) DEFAULT '' COMMENT 'Domain classification',
language VARCHAR(16) DEFAULT 'zh' COMMENT 'Language',

-- File info
file_size BIGINT DEFAULT 0 COMMENT 'File size (bytes)',
page_count INT DEFAULT 0 COMMENT 'Page count (for PDF)',
oss_key VARCHAR(256) COMMENT 'MinIO object key',

-- Processing status
status ENUM('pending', 'processing', 'completed', 'failed') DEFAULT 'pending',
chunk_count INT DEFAULT 0 COMMENT 'Number of chunks',

-- Timestamps
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
deleted_at TIMESTAMP NULL,

INDEX idx_business_tag (business_tag),
INDEX idx_status (status),
INDEX idx_created_at (created_at),
INDEX idx_source_type (source_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='RAG document main table';


-- 2. Chunks Table
CREATE TABLE chunks (
chunk_id VARCHAR(64) PRIMARY KEY COMMENT 'Unique chunk ID',
doc_id VARCHAR(64) NOT NULL COMMENT 'Parent document ID',

-- Chunk info
chunk_index INT NOT NULL COMMENT 'Index within the document',
chunk_strategy VARCHAR(32) NOT NULL COMMENT 'Chunking strategy used',
vector_level VARCHAR(16) DEFAULT 'text' COMMENT 'Vectorisation granularity (text/table/row/col/cell)',

-- Content info
content TEXT COMMENT 'Chunk text content (first 500 chars for preview)',
content_hash VARCHAR(64) COMMENT 'Content hash (for deduplication)',
char_count INT DEFAULT 0 COMMENT 'Character count',

-- Association info
parent_chunk_id VARCHAR(64) COMMENT 'Parent chunk ID (for parent‑child chunking)',
associate_id VARCHAR(64) COMMENT 'Association ID (for structured data like tables)',

-- Embedding status
embedding_status ENUM('pending', 'embedded', 'failed') DEFAULT 'pending',
milvus_synced BOOLEAN DEFAULT FALSE COMMENT 'Whether synced to Milvus',

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

UNIQUE KEY uk_doc_index (doc_id, chunk_index),
INDEX idx_doc_id (doc_id),
INDEX idx_vector_level (vector_level),
INDEX idx_embedding_status (embedding_status),
INDEX idx_content_hash (content_hash),

FOREIGN KEY (doc_id) REFERENCES documents(doc_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='RAG document chunks table';


-- 3. Task Queue Table (for async processing)
CREATE TABLE processing_tasks (
task_id VARCHAR(64) PRIMARY KEY,
task_type ENUM('extract', 'chunk', 'embed', 'sync') NOT NULL,
doc_id VARCHAR(64) NOT NULL,

status ENUM('queued', 'running', 'completed', 'failed') DEFAULT 'queued',
priority INT DEFAULT 0 COMMENT 'Priority (higher number = higher priority)',
retry_count INT DEFAULT 0 COMMENT 'Retry count',

error_message TEXT COMMENT 'Error message',
result_json JSON COMMENT 'Execution result',

started_at TIMESTAMP NULL,
completed_at TIMESTAMP NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

INDEX idx_status_priority (status, priority),
INDEX idx_doc_id (doc_id),
INDEX idx_task_type (task_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='RAG processing tasks table';


-- 4. Operation Log Table (audit trail)
CREATE TABLE operation_logs (
log_id BIGINT AUTO_INCREMENT PRIMARY KEY,
operation_type VARCHAR(32) NOT NULL COMMENT 'INSERT/UPDATE/DELETE/SYNC',
target_table VARCHAR(64) NOT NULL COMMENT 'Target table name',
target_id VARCHAR(64) NOT NULL COMMENT 'Target record ID',

payload JSON COMMENT 'Pre‑operation data snapshot',
operator VARCHAR(64) DEFAULT 'system' COMMENT 'Operator / system',

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

INDEX idx_target (target_table, target_id),
INDEX idx_operation_type (operation_type),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='RAG operation log table';

Milvus Collection Design (detailed in Part 5)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Collection: rag_vectors
Fields:
- doc_id: VARCHAR(64) [PK]
- vector: FLOAT_VECTOR(1024) # BGE-M3 dimension
- modal_type: VARCHAR(16) # text/image/table/video
- business_tag: VARCHAR(128)
- vector_level: VARCHAR(16) # table/row/col/cell
- associate_id: VARCHAR(64)
- oss_id: VARCHAR(64)

Indexes:
- vector: HNSW(M=16, efConstruction=200, metric=COSINE)
- modal_type: Trie
- business_tag: Trie

MinIO Bucket Layout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Bucket: rag-knowledge-base
├── raw-files/ # Original files
│ ├── pdf/
│ │ └── {YYYY}/{MM}/{DD}/{doc_id}.pdf
│ ├── word/
│ ├── image/
│ └── webpage/

├── processed-data/ # Processed data
│ ├── extracted-text/ # Extracted plain text
│ │ └── {doc_id}.txt
│ ├── tables/ # Parsed tables
│ │ └── {doc_id}/
│ │ └── table_{n}.json
│ └── images/ # Extracted images
│ └── {doc_id}/
│ └── img_{n}.png

├── models/ # Model files
│ ├── bge-m3/
│ └── paddleocr/

├── backups/ # Periodic backups
│ ├── mysql/
│ │ └── dump_{timestamp}.sql
│ └── metadata/
│ └── manifest_{timestamp}.json

└── temp/ # Temporary files (periodically cleaned)
└── upload_{uuid}/

Dual‑Write Consistency Guarantee Mechanism

Consistency Challenges

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
sequenceDiagram
participant App as Application
participant DWS as Dual‑Write Service
participant MySQL as MySQL
participant Milvus as Milvus
participant MinIO as MinIO

Note over App,MinIO: Scenario: writing a new document

App->>DWS: 1. Upload document + metadata

par Parallel writes
DWS->>MySQL: 2a. INSERT document (transaction begin)
DWS->>MinIO: 2b. PUT object (file upload)
DWS->>Milvus: 2c. INSERT vectors (batch)
end

alt All succeed
MySQL-->>DWS: ✅ COMMIT
DWS-->>App: Return success
else MySQL succeeds, Milvus fails
MySQL-->>DWS: ❌ Needs rollback or mark as pending sync
DWS->>DWS: Record in compensation queue
DWS-->>App: Return "processing" (eventual consistency)
else MinIO fails
DWS->>MySQL: ROLLBACK
DWS-->>App: Return failure
end

Consistency Level Definitions

Level Name Guarantee Use Case Implementation Cost
Strong Atomic All succeed or all fail Financial transactions, critical data High (distributed transactions)
Eventual Eventual Allows temporary inconsistency, converges Knowledge base updates, content publishing Medium (message queue + compensation)
Causal Causal Ordered operations see ordered results User instantly sees their own submissions Low (session binding)

Our choice: Eventual consistency + strong consistency on critical paths

Dual‑Write Coordinator Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
# dual_write_service.py
"""
Dual‑Write Coordination Service – Production‑Grade Implementation
Ensures data consistency across MySQL, Milvus, and MinIO
"""

import time
import uuid
import threading
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from enum import Enum
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import traceback

logger = logging.getLogger(__name__)


class WriteOperation(Enum):
"""Write operation types"""
INSERT = "insert"
UPDATE = "update"
DELETE = "delete"
SYNC = "sync"


class ConsistencyLevel(Enum):
"""Consistency levels"""
STRONG = "strong"
EVENTUAL = "eventual"
CAUSAL = "causal"


@dataclass
class WriteResult:
"""Result of a single write operation"""
success: bool
operation: WriteOperation
target: str # mysql/milvus/minio
data_id: str
error: Optional[str] = None
latency_ms: float = 0


@dataclass
class DualWriteResult:
"""Aggregated result of a dual‑write operation"""
operation_id: str
overall_success: bool
consistency_achieved: bool
results: List[WriteResult] = field(default_factory=list)
compensation_needed: bool = False
total_latency_ms: float = 0


class DualWriteCoordinator:
"""
Dual‑Write Coordinator

Core features:
1. Orchestrate write order across multiple stores
2. Handle partial failure scenarios
3. Trigger compensation mechanisms
4. Guarantee eventual consistency
"""

def __init__(
self,
mysql_client,
milvus_client,
minio_client,
consistency_level: ConsistencyLevel = ConsistencyLevel.EVENTUAL,
max_retries: int = 3,
retry_delay_base: float = 1.0,
enable_compensation_queue: bool = True
):
self.mysql = mysql_client
self.milvus = milvus_client
self.minio = minio_client
self.consistency_level = consistency_level
self.max_retries = max_retries
self.retry_delay_base = retry_delay_base
self.enable_compensation_queue = enable_compensation_queue

# Compensation queue (for async repair)
self.compensation_queue = []
self._queue_lock = threading.Lock()

# Statistics
self.stats = {
'total_operations': 0,
'successful_operations': 0,
'compensations_triggered': 0,
'avg_latency_ms': 0,
}

def execute_dual_write(
self,
operation: WriteOperation,
mysql_data: Dict = None,
milvus_data: List[Dict] = None,
minio_file_path: str = None,
minio_file_content: bytes = None,
transaction_context: Dict = None
) -> DualWriteResult:
"""
Execute a dual‑write operation

Parameters:
operation: operation type
mysql_data: dictionary of data to write to MySQL
milvus_data: list of data to write to Milvus
minio_file_path: MinIO object path
minio_file_content: file content as bytes
transaction_context: transaction context (for logging)
"""
op_id = str(uuid.uuid4())[:8]
start_time = time.time()

logger.info(f"[{op_id}] Starting dual‑write operation: {operation.value}")

results = []
all_success = True
compensation_needed = False

try:
if operation == WriteOperation.INSERT:
results = self._execute_insert(
op_id, mysql_data, milvus_data,
minio_file_path, minio_file_content
)
elif operation == WriteOperation.UPDATE:
results = self._execute_update(
op_id, mysql_data, milvus_data, minio_file_path, minio_file_content
)
elif operation == WriteOperation.DELETE:
results = self._execute_delete(op_id, mysql_data, milvus_data, minio_file_path)

# Check results
failed_targets = [r for r in results if not r.success]
all_success = len(failed_targets) == 0

if not all_success:
compensation_needed = True
logger.warning(
f"[{op_id}] Partial write failure: "
f"{[f'{r.target}({r.error})' for r in failed_targets]}"
)

if self.enable_compensation_queue:
self._enqueue_compensation(
op_id, operation, mysql_data, milvus_data,
minio_file_path, minio_file_content, failed_targets
)

except Exception as e:
logger.error(f"[{op_id}] Dual‑write exception: {e}\n{traceback.format_exc()}")
all_success = False
compensation_needed = True

results.append(WriteResult(
success=False,
operation=operation,
target="coordinator",
data_id=op_id,
error=str(e)
))

# Update statistics
elapsed_ms = (time.time() - start_time) * 1000
self.stats['total_operations'] += 1
if all_success:
self.stats['successful_operations'] += 1

result = DualWriteResult(
operation_id=op_id,
overall_success=all_success,
consistency_achieved=all_success or self.consistency_level != ConsistencyLevel.STRONG,
results=results,
compensation_needed=compensation_needed,
total_latency_ms=elapsed_ms
)

logger.info(
f"[{op_id}] Dual‑write completed: success={all_success}, "
f"latency={elapsed_ms:.0f}ms, targets={len(results)}"
)

return result

def _execute_insert(
self,
op_id: str,
mysql_data: Dict,
milvus_data: List[Dict],
minio_path: str,
minio_content: bytes
) -> List[WriteResult]:
"""
Execute insert operation (recommended order: MinIO → MySQL → Milvus)
"""
results = []

# Step 1: Upload file to MinIO first (ensure original data is safe)
if minio_path and minio_content:
result = self._safe_write(
op_id, "minio", WriteOperation.INSERT,
lambda: self._upload_to_minio(minio_path, minio_content),
data_id=minio_path
)
results.append(result)

if not result.success:
# If MinIO fails, abort the whole flow
return results

# Step 2: Write to MySQL (transactional, serves as source of truth)
if mysql_data:
result = self._safe_write(
op_id, "mysql", WriteOperation.INSERT,
lambda: self._insert_to_mysql(mysql_data),
data_id=mysql_data.get('doc_id', '')
)
results.append(result)

if not result.success:
# If MySQL fails, rollback MinIO (if already uploaded)
if minio_path:
try:
self.minio.remove_object('rag-knowledge-base', minio_path)
except:
pass
return results

# Step 3: Finally write to Milvus (can be compensated asynchronously)
if milvus_data:
result = self._safe_write(
op_id, "milvus", WriteOperation.INSERT,
lambda: self._insert_to_milvus(milvus_data),
data_id=mysql_data.get('doc_id', '') if mysql_data else ''
)
results.append(result)
# Milvus failure does not block the flow; subsequent compensation queue will repair it

return results

def _safe_write(
self,
op_id: str,
target: str,
operation: WriteOperation,
write_func,
data_id: str,
retry: int = 0
) -> WriteResult:
"""
Safely execute a write operation (with retries)
"""
start = time.time()

try:
write_func()
latency = (time.time() - start) * 1000

return WriteResult(
success=True,
operation=operation,
target=target,
data_id=data_id,
latency_ms=latency
)

except Exception as e:
latency = (time.time() - start) * 1000

if retry < self.max_retries:
delay = self.retry_delay_base * (2 ** retry) # exponential backoff
logger.warning(
f"[{op_id}] {target} write failed (attempt {retry+1}), "
f"retrying in {delay}s: {e}"
)
time.sleep(delay)
return self._safe_write(
op_id, target, operation, write_func,
data_id, retry + 1
)
else:
logger.error(f"[{op_id}] {target} write ultimately failed: {e}")
return WriteResult(
success=False,
operation=operation,
target=target,
data_id=data_id,
error=str(e),
latency_ms=latency
)

def _upload_to_minio(self, path: str, content: bytes):
"""Upload file to MinIO"""
self.minio.put_object(
bucket_name="rag-knowledge-base",
object_name=path,
data=content,
length=len(content)
)

def _insert_to_mysql(self, data: Dict):
"""Insert data into MySQL"""
# Use parameterised query to prevent SQL injection
sql = """
INSERT INTO documents
(doc_id, title, source_type, business_tag, oss_key, status)
VALUES (%s, %s, %s, %s, %s, %s)
"""
self.mysql.execute(sql, (
data['doc_id'],
data['title'],
data['source_type'],
data.get('business_tag', ''),
data.get('oss_key', ''),
'completed'
))

def _insert_to_milvus(self, data_list: List[Dict]):
"""Batch insert data into Milvus"""
if data_list:
self.milvus.insert(
collection_name="rag_vectors",
data=data_list
)

def _enqueue_compensation(
self,
op_id: str,
operation: WriteOperation,
mysql_data: Dict,
milvus_data: List[Dict],
minio_path: str,
minio_content: bytes,
failed_targets: List[WriteResult]
):
"""Add failed operation to the compensation queue"""
with self._queue_lock:
compensation_item = {
'operation_id': op_id,
'operation': operation,
'mysql_data': mysql_data,
'milvus_data': milvus_data,
'minio_path': minio_path,
'minio_content': minio_content,
'failed_targets': [t.target for t in failed_targets],
'created_at': time.time(),
'retry_count': 0
}
self.compensation_queue.append(compensation_item)
self.stats['compensations_triggered'] += 1

logger.info(f"[{op_id}] Added to compensation queue, current queue length: {len(self.compensation_queue)}")

def process_compensation_queue(self, max_items: int = 100):
"""
Process failed operations in the compensation queue
Should be called by a background scheduled task
"""
with self._queue_lock:
items_to_process = self.compensation_queue[:max_items]
self.compensation_queue = self.compensation_queue[max_items:]

processed = 0
success_count = 0

for item in items_to_process:
if item['retry_count'] >= self.max_retries:
logger.error(
f"Compensation operation reached max retries, giving up: {item['operation_id']}"
)
continue

item['retry_count'] += 1

try:
result = self.execute_dual_write(
operation=item['operation'],
mysql_data=item['mysql_data'],
milvus_data=item['milvus_data'],
minio_file_path=item['minio_path'],
minio_file_content=item['minio_content']
)

if result.overall_success:
success_count += 1
logger.info(f"Compensation succeeded: {item['operation_id']}")
else:
# Re‑add to the end of the queue
with self._queue_lock:
self.compensation_queue.append(item)

except Exception as e:
logger.error(f"Compensation execution exception: {item['operation_id']}, {e}")
with self._queue_lock:
self.compensation_queue.append(item)

processed += 1

logger.info(
f"Compensation queue processing complete: processed={processed}, succeeded={success_count}, "
f"remaining queue length={len(self.compensation_queue)}"
)

return {'processed': processed, 'success': success_count}

def get_statistics(self) -> Dict:
"""Get statistics"""
return {
**self.stats,
'compensation_queue_size': len(self.compensation_queue),
'success_rate': (
self.stats['successful_operations'] / max(1, self.stats['total_operations']) * 100
)
}

See also: 《RAG in Production: Deployment & Performance Monitoring Practices》 – operational essentials of the data base in a production environment.

Data Synchronization and Disaster Recovery

Sync Detection Mechanism

# data_consistency_checker.py
"""
Data Consistency Checker
Periodically compares data consistency across MySQL, Milvus, and MinIO
"""

import time
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
import hashlib
import logging

logger = logging.getLogger(__name__)


@dataclass
class ConsistencyReport:
    """Consistency report"""
    check_time: str
    total_records_mysql: int
    total_records_milvus: int
    total_files_minio: int
    
    inconsistencies: List[Dict]
    summary: Dict
    
    is_healthy: bool
    recommendations: List[str]


class DataConsistencyChecker:
    """
    Data consistency checker
    
    Features:
    1. Regularly sample and check that data volumes across three stores match
    2. Detect orphaned records (present in one store but absent in others)
    3. Generate repair suggestions
    4. Support auto‑repair (optional)
    """
    
    def __init__(
        self,
        mysql_client,
        milvus_client,
        minio_client,
        sample_ratio: float = 0.01  # sampling ratio 1%
    ):
        self.mysql = mysql_client
        self.milvus = milvus_client
        self.minio = minio_client
        self.sample_ratio = sample_ratio
    
    def run_full_check(self) -> ConsistencyReport:
        """
        Perform a full consistency check
        """
        from datetime import datetime
        
        start_time = time.time()
        report = ConsistencyReport(
            check_time=datetime.now().isoformat(),
            total_records_mysql=0,
            total_records_milvus=0,
            total_files_minio=0,
            inconsistencies=[],
            summary={},
            is_healthy=True,
            recommendations=[]
        )
        
        logger.info("🔍 Starting data consistency check...")
        
        # 1. Count data in each store
        try:
            mysql_count = self._count_mysql_documents()
            report.total_records_mysql = mysql_count
        except Exception as e