📊 Table of Contents

  1. Why Meticulously Design a Milvus Collection?
  2. Collection Schema Best Practices
  3. Deep Dive into HNSW Index Principles
  4. Complete HNSW Parameter Tuning Guide
  5. Production Performance Optimization Strategies
  6. Monitoring and Operations Practices
  7. Common Issues and Troubleshooting
  8. Summary and Performance Benchmarks

Why Meticulously Design a Milvus Collection?

The Cost of a Bad Design

Let’s look at a real‑world example:

Scenario: A medical knowledge base RAG system containing 5 million document chunks.

Initial Design Problems:

1
2
3
4
5
6
7
8
9
10
11
12
13
# ❌ Poor design
schema = CollectionSchema([
FieldSchema("id", DataType.INT64, is_primary=True, auto_id=True),
FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=1024),
FieldSchema("content", DataType.VARCHAR, max_length=65535), # stores full text!
FieldSchema("metadata", DataType.JSON), # all metadata mixed together
])

index_params = {
"index_type": "IVF_FLAT", # inappropriate index
"metric_type": "L2",
"params": {"nlist": 1024}
}

Consequences:

Issue Impact Quantified Data
Full text in a single field Memory explosion Single record 50KB+, total memory 250GB+
JSON mixed metadata Unable to filter Full table scan required, latency >2s
IVF_FLAT index Slow retrieval QPS < 50, P99 latency 3s
No partitioning design Hard to scale Linear performance degradation after data growth

After Optimization:

  • ✅ Memory reduced by 85% (250GB → 37GB)
  • ✅ QPS improved 20 times (50 → 1000+)
  • ✅ P99 latency reduced by 95% (3000ms → 150ms)
  • ✅ Supports horizontal scaling to hundreds of millions of vectors

Collection Schema Best Practices

Core Design Principles

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
graph TB
subgraph DesignPrinciples["Collection Design Core Principles"]
direction TB

P1["📏 Principle 1: Streamline Fields<br/>Store only necessary info, avoid redundancy"]
P2["🗂️ Principle 2: Use Appropriate Types<br/>Minimum sufficient data type"]
P3["🔍 Principle 3: Reasonable Indexing<br/>Build scalar indexes on query fields"]
P4["⚖️ Principle 4: Plan Partitions<br/>Pre‑shard by business dimensions"]
P5["🚫 Principle 5: Avoid Anti‑Patterns<br/>Don’t store large text in a vector DB"]
end

subgraph AntiPatterns["❌ Common Anti‑Patterns"]
A1["Storing full text in VARCHAR"]
A2["Using auto_id without business meaning"]
A3["Putting all metadata into JSON"]
A4["Ignoring scalar field indexes"]
A5["Single Collection for all types"]
end

DesignPrinciples --> |"Follow"| GoodDesign["✅ High‑Performance Architecture"]
AntiPatterns --> |"Avoid"| BadDesign["❌ Performance Bottleneck"]

Production‑Grade Schema Design Example

Based on our RAG project, here is a proven production‑grade schema:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# production_collection_schema.py
"""
Milvus production‑grade Collection Schema design
Optimized for RAG multi‑modal scenarios
"""

from pymilvus import (
CollectionSchema,
FieldSchema,
DataType,
utility
)


class RAGCollectionDesigner:
"""RAG system Collection designer"""

@staticmethod
def design_rag_vectors_collection() -> tuple:
"""
Design the rag_vectors Collection

Design philosophy:
- Use business IDs as primary keys for easy association and deduplication
- Independent vector field supporting multiple metric types
- Scalar fields support efficient filtering
- Reserve dynamic fields for future requirements
"""

fields = [
# ===== Primary key field =====
FieldSchema(
name="doc_id",
dtype=DataType.VARCHAR,
max_length=64,
is_primary=True,
description="Business document ID, format: {source}_{hash}"
),

# ===== Vector field =====
FieldSchema(
name="vector",
dtype=DataType.FLOAT_VECTOR,
dim=1024, # BGE-M3 output dimension
description="Dense embedding vector"
),

# ===== Core filter fields (high‑frequency queries) =====
FieldSchema(
name="modal_type",
dtype=DataType.VARCHAR,
max_length=16,
description="Modal type: text/image/table/video"
),

FieldSchema(
name="business_tag",
dtype=DataType.VARCHAR,
max_length=128,
description="Business tag: medical/legal/finance etc."
),

FieldSchema(
name="vector_level",
dtype=DataType.VARCHAR,
max_length=16,
description="Vectorization granularity: table/row/col/cell"
),

# ===== Association fields =====
FieldSchema(
name="associate_id",
dtype=DataType.VARCHAR,
max_length=64,
description="Parent/child or associated document ID"
),

FieldSchema(
name="oss_id",
dtype=DataType.VARCHAR,
max_length=64,
description="MinIO object storage ID"
),

# ===== Timestamp fields (for TTL and data management) =====
FieldSchema(
name="created_at",
dtype=INT64,
description="Creation timestamp (milliseconds)"
),

FieldSchema(
name="updated_at",
dtype=INT64,
description="Update timestamp (milliseconds)"
),

# ===== Statistical fields (for analysis and sorting) =====
FieldSchema(
name="chunk_size",
dtype=INT32,
description="Original text length"
),

FieldSchema(
name="relevance_score",
dtype=FLOAT,
description="Quality or relevance score"
)
]

schema = CollectionSchema(
fields=fields,
enable_dynamic_field=True, # allow dynamic fields
description="RAG multi‑modal vector collection, unified storage for text/table/image"
)

return schema, "rag_vectors"

@staticmethod
def design_index_params() -> dict:
"""
Design index parameters
Provide different index configurations for different scenarios
"""

index_configs = {
# ===== HNSW index (recommended for production) =====
"hnsw_production": {
"field_name": "vector",
"index_type": "HNSW",
"metric_type": "COSINE", # BGE-M3 recommends COSINE
"params": {
"M": 16, # number of connections, affects recall and performance
"efConstruction": 200 # search width during construction
}
},

# ===== IVF_PQ index (very large dataset, memory‑constrained) =====
"ivf_pq_large_scale": {
"field_name": "vector",
"index_type": "IVF_PQ",
"metric_type": "COSINE",
"params": {
"nlist": 16384, # number of cluster centers
"m": 8, # number of PQ subspaces
"nbits": 8 # quantization bits per subspace
}
},

# ===== FLAT index (small dataset, exact search) =====
"flat_small_scale": {
"field_name": "vector",
"index_type": "FLAT",
"metric_type": "COSINE",
"params": {}
}
}

return index_configs

@staticmethod
def design_scalar_indexes() -> list:
"""
Design scalar field indexes
Used to accelerate filter operations
"""
scalar_indexes = [
# Modal type (low cardinality, suitable for TREE index)
{
"field_name": "modal_type",
"index_type": "Trie" # or "INVERTED"
},

# Business tag (medium cardinality)
{
"field_name": "business_tag",
"index_type": "Trie"
},

# Vector granularity (low cardinality)
{
"field_name": "vector_level",
"index_type": "Trie"
},

# Creation time (range queries)
{
"field_name": "created_at",
"index_type": "SORTED" # or "STL_SORT"
}
]

return scalar_indexes


# Usage example
if __name__ == "__main__":
designer = RAGCollectionDesigner()

# Get schema design
schema, collection_name = designer.design_rag_vectors_collection()
print(f"Collection name: {collection_name}")
print(f"Number of fields: {len(schema.fields)}")

# Get index configuration
index_params = designer.design_index_params()
print(f"\nAvailable index configurations: {list(index_params.keys())}")

# Recommended configuration
recommended = index_params['hnsw_production']
print(f"\nRecommended production configuration: {recommended}")

Field Type Selection Guide

Business Scenario Recommended Type Max Length Memory Usage Example
Document primary key VARCHAR 64 64B pdf_abc123_hash
Short tag VARCHAR 16‑128 16‑128B text, medical
Timestamp INT64 8B 8B 1716247200000
Length statistic INT32 4B 4B 512
Score (float) FLOAT 4B 4B 0.95
Boolean flag BOOL 1B 1B True/False

💡 Key Insight: Field length directly impacts memory usage. For a Collection with 5 million records, reducing max_length from 65535 to 64 alone saves 300GB+ of memory.


See on site: 《RAG Online Part: Retrieval Optimization — Multi‑Recall and Result Fusion》 — bridging vector retrieval parameters with multi‑recall.

Deep Dive into HNSW Index Principles

What is HNSW?

HNSW (Hierarchical Navigable Small World) is one of the most advanced approximate nearest neighbor (ANN) search algorithms. Let me explain with a real‑life analogy:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
graph TB
subgraph Analogy["🌍 Life Analogy: Global Airline Network"]
direction TB

Layer1["Layer 1: International routes<br/>✈️ connecting major cities<br/>📍 few hub airports"]
Layer2["Layer 2: Domestic routes<br/>🚄 connecting provincial capitals<br/>📍 medium airports"]
Layer3["Layer 3: Regional transport<br/>🚗 connecting nearby towns<br/>📍 many small airports"]

Travel["A traveler wants to go from A to B"]

Travel --> Step1["1️⃣ Take off from local airport<br/>(enter lowest layer)"]
Step1 --> Step2["2️⃣ Transfer via hubs<br/>(ascend layer by layer)"]
Step2 --> Step3["3️⃣ Arrive at target region<br/>(descend layer by layer)"]
Step3 --> Step4["4️⃣ Reach destination<br/>(fine‑grained search)"]
end

subgraph HNSW_Tech["💻 HNSW Technical Implementation"]
direction TB

L0["Layer 0: Sparsest layer<br/>Long‑distance jumps<br/>Quickly locate general region"]
L1["Layer 1: Intermediate layer<br/>Medium‑distance connections<br/>Narrow down search scope"]
Ln["Layer n: Densest layer<br/>Short‑distance connections<br/>Precisely find nearest neighbors"]

Search["Vector query"]

Search --> S1["Entry point: start from top layer"]
S1 --> S2["Greedy search: find nearest neighbor at each layer"]
S2 --> S3["Descend layer by layer: become more precise downward"]
S3 --> S4["Return Top‑K results"]
end

Core HNSW Parameters

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
flowchart LR
subgraph Parameters["Three Core HNSW Parameters"]
direction TB

M["M (Number of connections)<br/>Max connections per node<br/>Range: 4‑64"]

efConstruction["efConstruction<br/>Search width during construction<br/>Range: 40‑512"]

efSearch["ef (search parameter)<br/>Candidate queue size during query<br/>Range: 10 ‑ topK*10"]
end

subgraph Tradeoffs["Performance Trade‑off Triangle"]
direction LR

Recall["Recall<br/>(Accuracy)"]
Speed["Query Speed<br/>(Latency)"]
Memory["Memory Usage<br/>(Memory)"]

Recall --- Speed
Speed --- Memory
Memory --- Recall
end

M --> |"↑ improves recall<br/>↓ reduces speed<br/>↑ increases memory"| Tradeoffs
efConstruction --> |"↑ improves index quality<br/>↓ slows building<br/>↑ increases memory"| Tradeoffs
efSearch --> |"↑ improves recall<br/>↓ reduces speed<br/>does not affect memory"| Tradeoffs

Parameter Details:

1. M (Max Connections)

  • Effect: Controls how many other nodes each node connects to in the graph.
  • Range: Typically 4‑64, recommended 16.
  • Impact:
    • ✅ Larger M → denser graph → higher recall
    • ❌ Larger M → more memory → slower build and query

2. efConstruction

  • Effect: Search width (candidate queue size) at each layer during index construction.
  • Range: Typically 40‑512, recommended 200.
  • Impact:
    • ✅ Larger efConstruction → better index quality → higher recall
    • ❌ Larger efConstruction → longer build time (one‑time cost)

3. ef (efSearch)

  • Effect: Dynamic search parameter during query (adjustable at runtime).
  • Range: topK to topK*10, recommended 128.
  • Impact:
    • ✅ Larger ef → more thorough search → higher recall
    • ❌ Larger ef → higher query latency (can be adjusted in real time)

Complete HNSW Parameter Tuning Guide

Scenario‑Based Parameter Recommendations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# hnsw_tuning_guide.py
"""
HNSW Parameter Tuning Guide
Provides optimal parameter configurations for different business scenarios
"""

class HNSWTuningGuide:
"""HNSW parameter tuning expert"""

SCENARIOS = {
"real_time_search": {
"name": "Real‑time Search",
"description": "E‑commerce recommendation, ad retrieval – requires extremely low latency",
"characteristics": ["Latency sensitive <10ms", "High QPS >5000", "Acceptable slight accuracy loss"],
"recommended_params": {
"M": 8,
"efConstruction": 100,
"ef": 64,
"expected_recall": 0.90,
"expected_latency_ms": "< 5ms"
}
},

"accuracy_critical": {
"name": "High Accuracy",
"description": "Medical diagnosis, legal retrieval – requires high recall",
"characteristics": ["Recall >98%", "Latency tolerant <100ms", "Moderate QPS 100‑1000"],
"recommended_params": {
"M": 32,
"efConstruction": 400,
"ef": 256,
"expected_recall": 0.98,
"expected_latency_ms": "20‑50ms"
}
},

"balanced_rag": {
"name": "Balanced RAG (Recommended)",
"description": "Enterprise knowledge base, customer Q&A – balance precision and speed",
"characteristics": ["Recall >95%", "Latency <50ms", "QPS 500‑2000"],
"recommended_params": {
"M": 16,
"efConstruction": 200,
"ef": 128,
"expected_recall": 0.96,
"expected_latency_ms": "10‑30ms"
}
},

"memory_constrained": {
"name": "Memory‑Constrained",
"description": "Edge devices, low‑cost servers – limited memory",
"characteristics": ["Memory <16GB", "Large dataset >10 million", "Acceptable lower accuracy"],
"recommended_params": {
"M": 12,
"efConstruction": 150,
"ef": 80,
"expected_recall": 0.92,
"expected_latency_ms": "15‑40ms"
}
},

"large_scale_batch": {
"name": "Large‑Scale Batch",
"description": "Offline analysis, batch embedding – throughput first",
"characteristics": ["Batch queries", "Throughput priority", "Latency insensitive"],
"recommended_params": {
"M": 20,
"efConstruction": 256,
"ef": 160,
"expected_recall": 0.94,
"expected_latency_ms": "30‑80ms"
}
}
}

@classmethod
def get_recommendation(cls, scenario: str = "balanced_rag") -> dict:
"""Get recommended configuration for a given scenario"""
if scenario not in cls.SCENARIOS:
available = ", ".join(cls.SCENARIOS.keys())
raise ValueError(f"Unknown scenario: {scenario}. Available: {available}")

return cls.SCENARIOS[scenario]

@classmethod
def print_all_scenarios(cls):
"""Print all scenario configurations"""
for key, config in cls.SCENARIOS.items():
print(f"\n{'='*60}")
print(f"📋 Scenario: {config['name']}")
print(f"Description: {config['description']}")
print(f"Characteristics:")
for char in config['characteristics']:
print(f" • {char}")
print(f"\nRecommended parameters:")
params = config['recommended_params']
for param, value in params.items():
print(f" {param}: {value}")


# Usage example
if __name__ == "__main__":
guide = HNSWTuningGuide()

# Print all scenarios
guide.print_all_scenarios()

# Get RAG scenario recommendation
rag_config = guide.get_recommendation("balanced_rag")
print(f"\n✅ RAG scenario recommended configuration:")
print(f"M = {rag_config['recommended_params']['M']}")
print(f"efConstruction = {rag_config['recommended_params']['efConstruction']}")
print(f"ef = {rag_config['recommended_params']['ef']}")

Parameter Tuning Experiment Framework

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# hnsw_parameter_search.py
"""
HNSW parameter grid search and auto‑tuning
Helps find the optimal parameter combination
"""

import time
import numpy as np
from typing import Dict, List, Tuple
from dataclasses import dataclass
import pandas as pd
import matplotlib.pyplot as plt


@dataclass
class TuningResult:
"""Tuning result data class"""
M: int
efConstruction: int
ef: int
recall: float
latency_ms: float
memory_mb: float
qps: float
score: float # composite score


class HNSWParameterOptimizer:
"""HNSW parameter auto‑optimizer"""

def __init__(
self,
milvus_client,
collection_name: str,
test_data: Dict,
target_recall: float = 0.95,
max_latency_ms: float = 50.0,
weight_recall: float = 0.4,
weight_speed: float = 0.3,
weight_memory: float = 0.3
):
self.client = milvus_client
self.collection_name = collection_name
self.test_data = test_data
self.target_recall = target_recall
self.max_latency_ms = max_latency_ms
self.weight_recall = weight_recall
self.weight_speed = weight_speed
self.weight_memory = weight_memory

self.results = []

def grid_search(
self,
M_range: List[int] = None,
efConstruction_range: List[int] = None,
ef_range: List[int] = None
) -> pd.DataFrame:
"""
Grid search for optimal parameters

Parameters:
M_range: search range for M
efConstruction_range: search range for efConstruction
ef_range: search range for ef
"""
if M_range is None:
M_range = [8, 12, 16, 20, 24, 32]
if efConstruction_range is None:
efConstruction_range = [100, 150, 200, 256, 300, 400]
if ef_range is None:
ef_range = [64, 80, 100, 128, 160, 200, 256]

total_combinations = len(M_range) * len(efConstruction_range) * len(ef_range)
print(f"Starting grid search, total {total_combinations} parameter combinations...")

completed = 0
for M in M_range:
for efC in efConstruction_range:
for ef in ef_range:
result = self._evaluate_parameters(M, efC, ef)
self.results.append(result)

completed += 1
if completed % 10 == 0:
print(f"Progress: {completed}/{total_combinations} ({completed/total_combinations*100:.1f}%)")

results_df = pd.DataFrame([vars(r) for r in self.results])

# Sort by composite score descending
results_df = results_df.sort_values('score', ascending=False)

return results_df

def _evaluate_parameters(self, M: int, efConstruction: int, ef: int) -> TuningResult:
"""
Evaluate a parameter combination's performance
"""
try:
start_time = time.time()

# Rebuild index (in production, you might need to delete and recreate the collection)
self._rebuild_index(M, efConstruction)

# Measure recall
recall = self._measure_recall(ef)

# Measure latency
latency, qps = self._measure_latency(ef)

# Estimate memory (can be obtained via Milvus API)
memory_mb = self._estimate_memory_usage(M, efConstruction)

elapsed = time.time() - start_time

# Calculate composite score
score = self._calculate_score(recall, latency, memory_mb)

result = TuningResult(
M=M,
efConstruction=efConstruction,
ef=ef,
recall=recall,
latency_ms=latency,
memory_mb=memory_mb,
qps=qps,
score=score
)

return result

except Exception as e:
print(f"Error evaluating parameters M={M}, efC={efConstruction}, ef={ef}: {e}")
return TuningResult(
M=M, efConstruction=efConstruction, ef=ef,
recall=0, latency_ms=999999, memory_mb=999999,
qps=0, score=-1
)

def _rebuild_index(self, M: int, efConstruction: int):
"""Rebuild HNSW index"""
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="HNSW",
metric_type="COSINE",
params={"M": M, "efConstruction": efConstruction}
)

# Drop old index and rebuild
self.client.drop_index(
collection_name=self.collection_name,
field_name="vector"
)
self.client.create_index(
collection_name=self.collection_name,
index_params=index_params,
sync=True # wait for index build completion
)

def _measure_recall(self, ef: int, num_queries: int = 100) -> float:
"""Measure recall (compared with brute‑force search)"""
queries = self.test_data['queries'][:num_queries]
ground_truth = self.test_data['ground_truth'][:num_queries]

correct = 0
total = 0

for query_vec, true_neighbors in zip(queries, ground_truth):
# Search with current ef parameter
results = self.client.search(
collection_name=self.collection_name,
data=[query_vec],
limit=10,
search_params={
"metric_type": "COSINE",
"params": {"ef": ef}
}
)

retrieved_ids = set([hit['id'] for hit in results[0]])
true_ids = set(true_neighbors[:10])

correct += len(retrieved_ids & true_ids)
total += len(true_ids)

recall = correct / total if total > 0 else 0
return recall

def _measure_latency(self, ef: int, num_queries: int = 1000) -> Tuple[float, float]:
"""Measure query latency and QPS"""
queries = self.test_data['queries'][:num_queries]

latencies = []
start_total = time.time()

for query_vec in queries:
start = time.time()
self.client.search(
collection_name=self.collection_name,
data=[query_vec],
limit=10,
search_params={
"metric_type": "COSINE",
"params": {"ef": ef}
}
)
latency = (time.time() - start) * 1000 # ms
latencies.append(latency)

total_time = time.time() - start_total

avg_latency = np.mean(latencies)
p99_latency = np.percentile(latencies, 99)
qps = num_queries / total_time

return p99_latency, qps # Return P99 latency as metric

def _estimate_memory_usage(self, M: int, efConstruction: int) -> float:
"""Estimate HNSW index memory usage in MB"""
# Approximate formula: memory ≈ num_vectors * (dim * 4 + M * (4 + 8))
# Simplified estimate; in practice, obtain via Milvus metrics
num_vectors = self.test_data.get('num_vectors', 1000000)
dim = 1024

base_memory = num_vectors * dim * 4 / 1024 / 1024 # vector data
graph_memory = num_vectors * M * 12 / 1024 / 1024 # graph structure

total_memory = base_memory + graph_memory
return total_memory

def _calculate_score(self, recall: float, latency_ms: float, memory_mb: float) -> float:
"""
Calculate composite score (weighted normalization)
"""
# Normalize each metric to 0‑1
norm_recall = min(recall, 1.0) # higher recall is better

# Lower latency is better (smooth with sigmoid)
norm_speed = 1.0 / (1.0 + np.exp((latency_ms - self.max_latency_ms) / 10))

# Lower memory is better (assume upper limit 10GB)
norm_memory = 1.0 - min(memory_mb / 10240, 1.0)

# Weighted composite score
score = (
self.weight_recall * norm_recall +
self.weight_speed * norm_speed +
self.weight_memory * norm_memory
)

return score

def visualize_results(self, results_df: pd.DataFrame, save_path: str = 'tuning_results.png'):
"""Visualize tuning results"""
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Plot 1: Recall vs Latency
ax1 = axes[0, 0]
scatter = ax1.scatter(
results_df['latency_ms'],
results_df['recall'],
c=results_df['score'],
cmap='viridis',
s=50,
alpha=0.7
)
ax1.set_xlabel('P99 Latency (ms)')
ax1.set_ylabel('Recall')
ax1.set_title('Recall vs Latency (color=Score)')
plt.colorbar(scatter, ax=ax1, label='Score')

# Plot 2: Parameter impact on Score
ax2 = axes[0, 1]
top_10 = results_df.head(10)
x = np.arange(len(top_10))
width = 0.25

bars1 = ax2.bar(x - width, top_10['M'], width, label='M')
bars2 = ax2.bar(x, top_10['efConstruction'], width, label='efConstruction')
bars3 = ax2.bar(x + width, top_10['ef']/10, width, label='ef/10')

ax2.set_xlabel('Top Configurations')
ax2.set_ylabel('Parameter Value')
ax2.set_title('Top 10 Configurations Parameters')
ax2.legend()
ax2.set_xticks(x)
ax2.set_xticklabels([f'#{i+1}' for i in range(len(top_10))], rotation=45)

# Plot 3: Memory vs Recall
ax3 = axes[1, 0]
ax3.scatter(results_df['memory_mb'], results_df['recall'],
c=results_df['score'], cmap='plasma', s=50, alpha=0.7)
ax3.set_xlabel('Memory Usage (MB)')
ax3.set_ylabel('Recall')
ax3.set_title('Memory vs Recall (color=Score)')

# Plot 4: QPS distribution
ax4 = axes[1, 1]
ax4.hist(results_df['qps'], bins=20, edgecolor='black', alpha=0.7)
ax4.axvline(results_df['qps'].mean(), color='red', linestyle='--', label=f'Mean: {results_df["qps"].mean():.0f}')
ax4.set_xlabel('QPS')
ax4.set_ylabel('Frequency')
ax4.set_title('QPS Distribution')
ax4.legend()

plt.tight_layout()
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.close()

print(f"✅ Visualization saved to: {save_path}")

def get_best_configuration(self, results_df: pd.DataFrame) -> Dict:
"""Get the optimal configuration"""
best_row = results_df.iloc[0]

config = {
'M': int(best_row['M']),
'efConstruction': int(best_row['efConstruction']),
'ef': int(best_row['ef']),
'expected_performance': {
'recall': best_row['recall'],
'p99_latency_ms': best_row['latency_ms'],
'memory_mb': best_row['memory_mb'],
'qps': best_row['qps'],
'composite_score': best_row['score']
}
}

return config


# Usage example
if __name__ == "__main__":
# Initialize the optimizer (needs an actual Milvus client and test data)
"""
from pymilvus import MilvusClient

client = MilvusClient(uri="http://localhost:19530")

# Prepare test data
test_data = {
'queries': [...], # list of query vectors
'ground_truth': [[...], ...], # true nearest neighbors
'num_vectors': 1000000 # total number of vectors in Collection
}

optimizer = HNSWParameterOptimizer(
milvus_client=client,
collection_name="rag_vectors",
test_data=test_data,
target_recall=0.95,
max_latency_ms=50.0
)

# Execute grid search
results_df = optimizer.grid_search(
M_range=[12, 16, 20],
efConstruction_range=[150, 200, 256],
ef_range=[100, 128, 160]
)

# Visualize results
optimizer.visualize_results(results_df)

# Get optimal configuration
best_config = optimizer.get_best_configuration(results_df)
print("\n=== Optimal Configuration ===")
print(f"M = {best_config['M']}")
print(f"efConstruction = {best_config['efConstruction']}")
print(f"ef = {best_config['ef']}")
print(f"Expected performance: {best_config['expected_performance']}")
"""
pass

Production Performance Optimization Strategies

Partition Design Strategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
graph TB
subgraph PartitionStrategy["Partition Design Strategy"]
direction TB

subgraph ByBusiness["By Business Domain"]
B1["partition_medical<br/>🏥 Medical documents"]
B2["partition_legal<br/>⚖️ Legal documents"]
B3["partition_finance<br/>💰 Finance documents"]
B4["partition_tech<br/>💻 Technical documents"]
end

subgraph ByTime["By Time"]
T1["partition_2024_Q1"]
T2["partition_2024_Q2"]
T3["partition_2024_Q3"]
T4["partition_2024_Q4"]
end

subgraph ByModalType["By Modal Type"]
M1["partition_text<br/>📝 Text vectors"]
M2["partition_table<br/>📊 Table vectors"]
M3["partition_image<br/>🖼️ Image vectors"]
end
end

Query["User Query"] --> Router["Router Layer"]
Router --> |"business_tag=medical"| ByBusiness
Router --> |"created_at >= 2024-Q1"| ByTime
Router --> |"modal_type=text"| ByModalType
# partition_manager.py
"""
Milvus partition manager
Implements automatic partition creation and data routing
"""

from typing import Optional, List
from datetime import datetime
from pymilvus import MilvusClient


class PartitionManager:
    """Partition manager"""
    
    def __init__(self, client: MilvusClient, collection_name: str):
        self.client = client
        self.collection_name = collection_name
        self._partition_cache = {}
    
    def get_or_create_partition(
        self,
        partition_key: str,
        partition_name_template: str = "partition_{key}"
    ) -> str:
        """
        Get or create a partition
        
        Parameters:
            partition_key: partition key value
            partition_name_template: partition name template
        """
        partition_name = partition_name_template.format(key=partition_key)
        
        # Check cache
        if partition_name in self