πŸ“Š Table of Contents

  1. Why Do We Need Multi-Channel Retrieval Fusion?
  2. Deep Dive into RRF Algorithm Core Principles
  3. Multi-Channel Retrieval Architecture in RAG Systems
  4. Complete RRF Implementation Code (Production-Grade)
  5. Advanced Fusion Strategies and Parameter Tuning
  6. Real-World Cases and A/B Test Data
  7. Performance Optimization and Engineering Practices
  8. Summary and Best Practices Guide

Why Do We Need Multi-Channel Retrieval Fusion?

Limitations of Single Retrieval Methods

Let’s illustrate this problem with a real scenario:

User Query: β€œWhat are the complications of diabetes? How to prevent them?”

❌ Option 1: Pure Vector Retrieval (Dense Retrieval)

1
2
3
4
5
6
7
8
9
10
11
12
results_dense = vector_search(
query="What are the complications of diabetes? How to prevent them?",
embedding_model="bge-m3",
top_k=10
)

# Returned results:
# 1. "Common complications of diabetes include..." (score: 0.87) βœ…
# 2. "How to prevent the onset of diabetes..." (score: 0.82) ⚠️ partially relevant
# 3. "Dietary considerations for diabetes patients..." (score: 0.79) ⚠️ weakly relevant
# 4. "Prevention measures for hypertension..." (score: 0.75) ❌ irrelevant
# 5. "How to use insulin..." (score: 0.73) ❌ irrelevant

Problem: Semantic similarity but may deviate from precise topic; poor matching of specialized terminology.

❌ Option 2: Pure Keyword Retrieval (BM25/Sparse Retrieval)

1
2
3
4
5
6
7
8
9
10
11
12
results_sparse = keyword_search(
query="diabetes complications prevention",
index_type="bm25",
top_k=10
)

# Returned results:
# 1. "Management of acute diabetes complications" (score: 8.5) ⚠️ only mentions complications, no prevention
# 2. "Detailed explanation of chronic diabetes complications" (score: 7.8) ⚠️ only list of complications
# 3. "5 ways to prevent diabetes complications" (score: 6.2) βœ… perfect match
# 4. "Identifying early symptoms of diabetes" (score: 5.9) ❌ topic shift
# 5. "Care guide for patients with complications" (score: 5.5) ⚠️ partially relevant

Problem: Strict keyword matching, cannot understand synonyms and semantic variants.

❌ Option 3: Graph Retrieval

1
2
3
4
5
6
7
8
9
10
11
12
results_graph = graph_search(
query_entity="diabetes",
relation_types=["has_complication", "prevention_method"],
depth=2,
top_k=10
)

# Returned results:
# 1. "diabetes β†’ complication β†’ retinopathy" (path_score: 0.92) βœ… structured knowledge
# 2. "diabetes β†’ prevention β†’ blood sugar control" (path_score: 0.88) βœ… knowledge association
# 3. "diabetes β†’ treatment β†’ medication" (path_score: 0.85) ⚠️ biased toward treatment rather than prevention
# Problem: if knowledge graph is incomplete, important documents may be missed

Problem: Depends on KG quality, limited coverage.

Comparison of Retrieval Methods

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
graph TB
subgraph Comparison["Radar Chart of Three Retrieval Methods’ Capabilities"]
direction TB

Dense["πŸ”΅ Dense (Vector)<br/>βœ… Semantic understanding<br/>βœ… Synonym matching<br/>❌ Poor precise term matching<br/>❌ Blind spot for new terms"]

Sparse["🟒 Sparse (Keyword)<br/>βœ… Exact match<br/>βœ… Specialized terminology<br/>❌ Missing semantics<br/>❌ Synonym omission"]

Graph["🟑 Graph (Knowledge Graph)<br/>βœ… Relationship reasoning<br/>βœ… Structured knowledge<br/>❌ Limited coverage<br/>❌ High construction cost"]
end

Query["User Query"] --> |"distribute"| Router{"Router"}

Router --> Dense --> ResultsD["Result Set D"]
Router --> Sparse --> ResultsS["Result Set S"]
Router --> Graph --> ResultsG["Result Set G"]

ResultsD --> Fusion["πŸ”„ RRF Fusion"]
ResultsS --> Fusion
ResultsG --> Fusion

Fusion --> Final["πŸ“Š Final Ranked Results<br/>βœ… Combined strengths<br/>βœ… Complementary weaknesses<br/>βœ… Maximized recall"]

Improvement After Fusion

Metric Dense Only Sparse Only Graph Only RRF Fusion Improvement
Recall@10 0.72 0.68 0.55 0.91 +26%
P@5 0.64 0.61 0.48 0.84 +31%
MRR 0.71 0.66 0.52 0.88 +24%
NDCG@10 0.69 0.65 0.50 0.86 +25%
User Satisfaction 3.8/5 3.6/5 3.2/5 4.5/5 +18%

Deep Dive into RRF Algorithm Core Principles

What is RRF?

RRF (Reciprocal Rank Fusion) is a simple yet powerful multi-list ranking fusion algorithm. Its core idea is:

The higher a document appears in each retrieval result list, the higher its final score.

Let me explain with a real-life analogy:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
flowchart LR
subgraph Example["πŸ† Life Analogy: University Admission Composite Scoring"]
direction TB

Exam1["College Entrance Exam Rank<br/>Student A: 5th β†’ 1/5 = 0.20"]
Exam2["Competition Award Rank<br/>Student A: 2nd β†’ 1/2 = 0.50"]
Exam3["Comprehensive Quality Rank<br/>Student A: 10th β†’ 1/10 = 0.10"]

Total["Total = 0.20 + 0.50 + 0.10 = 0.80"]

Exam1 --> Total
Exam2 --> Total
Exam3 --> Total
end

subgraph Formula["πŸ“ RRF Mathematical Formula"]
direction TB

F1["score(d) = Ξ£ 1/(k + rank_i(d))"]
F2["where:"]
F3["d: document"]
F4["i: i-th retrieval result list"]
F5["rank_i(d): rank of document d in list i"]
F6["k: smoothing constant (usually 60)"]
end

Detailed Steps of RRF Algorithm

Assume we have three retrieval result lists:

List 1 (Dense):

Rank Doc ID Original Score
1 Doc_A 0.92
2 Doc_B 0.87
3 Doc_C 0.83
… … …

List 2 (Sparse/BM25):

Rank Doc ID Original Score
1 Doc_D 8.5
2 Doc_A 7.8
3 Doc_E 6.2
… … …

List 3 (Graph):

Rank Doc ID Original Score
1 Doc_B 0.95
2 Doc_F 0.88
3 Doc_A 0.82
… … …

RRF Calculation (k=60):

Document List1 Rank Contribution List2 Rank Contribution List3 Rank Contribution RRF Total Score
Doc_A 1/(60+1) = 0.0164 1/(60+2) = 0.0161 1/(60+3) = 0.0159 0.0484 βœ… 1st
Doc_B 1/(60+2) = 0.0161 Not present 1/(60+1) = 0.0164 0.0325 2nd
Doc_D Not present 1/(60+1) = 0.0164 Not present 0.0164 3rd
Doc_C 1/(60+3) = 0.0159 Not present Not present 0.0159 4th
… … … … …

Key Observations:

  • βœ… Doc_A appears in all three lists; although not the top in each, it has the highest composite score.
  • βœ… Doc_B appears in two lists with high ranks, scoring second.
  • βœ… Documents that appear only in one list (Doc_D, Doc_C) have lower scores.

Why Choose RRF Over Other Fusion Methods?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
graph TB
subgraph Methods["Comparison of Fusion Methods"]
direction TB

RRF["βœ… RRF (Recommended)<br/>β€’ No normalization needed<br/>β€’ Few parameters (only k)<br/>β€’ Robust<br/>β€’ Computationally efficient"]

WeightedAvg["⚠️ Weighted Average<br/>β€’ Requires score normalization<br/>β€’ Hard to tune weights<br/>β€’ Sensitive to outliers"]

BordaCount["⚠️ Borda Count<br/>β€’ Uses only rank info<br/>β€’ Ignores score differences<br/>β€’ Low discrimination"]

MachineLearning["❌ Learning to Rank (LTR)<br/>β€’ Requires labeled data<br/>β€’ High training cost<br/>β€’ Cold start difficulties"]
end

subgraph WhyRRF["Why is RRF Best for RAG?"]
W1["πŸš€ Fast deployment: no training needed"]
W2["🎯 Stable results: mathematically guaranteed"]
W3["βš™οΈ Easy parameter tuning: only 1 hyperparameter"]
W4["πŸ“Š High interpretability: clear formula"]
end

Methods --> WhyRRF

Related: β€œRAG Online: Retrieval Optimization β€” Multi-Recall and Result Fusion” β€” Theory of RRF/weighted fusion after multi-recall.

Multi-Channel Retrieval Architecture in RAG Systems

Complete Architecture Design

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
flowchart TB
subgraph Input["Input Layer"]
UserQuery["πŸ‘€ User Query:<br/>'Diabetes complication prevention'"]
end

subgraph Preprocessing["Preprocessing Layer"]
QueryUnderstander["🧠 Query Understander<br/>β€’ Intent recognition<br/>β€’ Entity extraction<br/>β€’ Query expansion"]
QueryExpander["πŸ” Query Expansion<br/>β€’ Synonym generation<br/>β€’ Related term recommendation<br/>β€’ Abbreviation expansion"]
end

subgraph RetrievalChannels["Multi-Channel Retrieval"]
direction LR

subgraph Channel1["Channel 1: Dense Vector Retrieval"]
C1_Embed["Embedding Model<br/>(BGE-M3)"]
C1_Milvus["Milvus HNSW Index<br/>(COSINE)"]
C1_Result["Result Set D<br/>(Top-K1)"]

C1_Embed --> C1_Milvus --> C1_Result
end

subgraph Channel2["Channel 2: Sparse Retrieval"]
C2_Tokenizer["Tokenizer<br/>(jieba)"]
C2_Elastic["Elasticsearch<br/>(BM25)"]
C2_Result["Result Set S<br/>(Top-K2)"]

C2_Tokenizer --> C2_Elastic --> C2_Result
end

subgraph Channel3["Channel 3: Graph Retrieval"]
C3_NER["NER Entity Recognition<br/>(spaCy)"]
C3_KG["Knowledge Graph<br/>(Neo4j)"]
C3_Result["Result Set G<br/>(Top-K3)"]

C3_NER --> C3_KG --> C3_Result
end

subgraph Channel4["Channel 4: Table Retrieval"]
C4_Router["Table Router<br/>(4-level granularity)"]
C4_TableMilvus["Milvus<br/>(table vectors)"]
C4_Result["Result Set T<br/>(Top-K4)"]

C4_Router --> C4_TableMilvus --> C4_Result
end
end

subgraph FusionLayer["Fusion Layer"]
RRFFusion["πŸ”„ RRF Fusion<br/>β€’ Multi-channel merge<br/>β€’ Reciprocal rank calculation<br/>β€’ Final ranking"]
Reranker["🎯 Reranker (optional)<br/>β€’ Cross-Encoder<br/>β€’ Fine-grained ranking"]
end

subgraph Output["Output Layer"]
FinalResults["πŸ“‹ Final Top-N Results<br/>β€’ De-duplication<br/>β€’ Truncation<br/>β€’ Metadata attachment"]
end

Input --> Preprocessing
Preprocessing --> RetrievalChannels

C1_Result --> RRFFusion
C2_Result --> RRFFusion
C3_Result --> RRFFusion
C4_Result --> RRFFusion

RRFFusion --> Reranker
Reranker --> Output

Channel Responsibilities and Configurations

Channel Retrieval Method Use Case Top-K Config Typical Latency
Dense Vector Cosine Similarity Semantic queries, synonyms, paraphrases 20-50 15-30ms
Sparse BM25 Keyword Matching Specialized terms, exact names, abbreviations 30-50 5-15ms
Graph Graph Path Traversal Relationship reasoning, entity attributes, structured Q&A 10-30 20-50ms
Table 4-Level Granular Table Retrieval Tabular data, numeric queries, comparative analysis 20-40 25-45ms

Complete RRF Implementation Code (Production-Grade)

Core Fusion Engine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# rrf_fusion_engine.py
"""
RRF (Reciprocal Rank Fusion) Fusion Engine - Production-Grade Implementation
Supports intelligent merging and re-ranking of multi-channel retrieval results.
"""

import time
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import logging
import heapq

logger = logging.getLogger(__name__)


class RetrievalChannel(Enum):
"""Retrieval channel enumeration"""
DENSE = "dense"
SPARSE = "sparse"
GRAPH = "graph"
TABLE = "table"
KEYWORD = "keyword"
HYBRID = "hybrid"


@dataclass
class RetrievedDocument:
"""
Retrieved document.
Contains original information and metadata.
"""
doc_id: str
content: str
original_score: float # Original retrieval score
channel: RetrievalChannel
rank_in_channel: int # Rank within this channel
metadata: Dict = field(default_factory=dict)

# Populated at runtime
rrf_score: float = 0.0 # RRF fusion score
final_rank: int = 0 # Final global rank


@dataclass
class RRFConfig:
"""
RRF algorithm configuration.
"""
k: int = 60 # Smoothing constant, typically 60
enable_weighting: bool = False # Enable channel weighting
channel_weights: Dict[RetrievalChannel, float] = field(default_factory=lambda: {
RetrievalChannel.DENSE: 1.0,
RetrievalChannel.SPARSE: 1.0,
RetrievalChannel.GRAPH: 0.8,
RetrievalChannel.TABLE: 0.9,
})
top_n: int = 20 # Number of top-N results to return
dedup_strategy: str = "doc_id" # Dedup strategy: doc_id / content_similarity


class RRFFusionEngine:
"""
RRF Fusion Engine.

Features:
1. Accept results from multiple retrieval channels
2. Execute RRF fusion algorithm
3. Support optional channel weighting
4. Deduplicate and truncate results
"""

def __init__(self, config: RRFConfig = None):
self.config = config or RRFConfig()
self.stats = {
'total_fusions': 0,
'avg_input_lists': 0,
'avg_output_size': 0,
'processing_time_ms': 0,
}

def fuse(
self,
result_lists: Dict[RetrievalChannel, List[RetrievedDocument]],
query: str = ""
) -> List[RetrievedDocument]:
"""
Execute RRF fusion.

Parameters:
result_lists: {channel: [documents]}
query: original query (for logging)

Returns:
List of fused documents (descending by RRF score)
"""
start_time = time.time()

logger.info(f"Starting RRF fusion, input from {len(result_lists)} channels")

# Step 1: Initialize RRF scores for all documents
rrf_scores: Dict[str, float] = {}
doc_info_map: Dict[str, RetrievedDocument] = {}

# Step 2: Iterate through each channel's results
for channel, documents in result_lists.items():
if not documents:
continue

logger.debug(f"Processing channel {channel.value}, containing {len(documents)} results")

# Get channel weight
weight = 1.0
if self.config.enable_weighting:
weight = self.config.channel_weights.get(channel, 1.0)

# Step 3: Accumulate RRF score for each document
for rank, doc in enumerate(documents, start=1):
doc_id = doc.doc_id

# RRF formula: score += weight / (k + rank)
contribution = weight / (self.config.k + rank)

if doc_id not in rrf_scores:
rrf_scores[doc_id] = 0.0
doc_info_map[doc_id] = doc

rrf_scores[doc_id] += contribution

logger.trace(
f"Document {doc_id} in {channel.value} channel "
f"ranked #{rank}, contribution {contribution:.6f}"
)

# Step 4: Update document RRF scores
for doc_id, score in rrf_scores.items():
if doc_id in doc_info_map:
doc_info_map[doc_id].rrf_score = score

# Step 5: Sort descending by RRF score
fused_results = sorted(
doc_info_map.values(),
key=lambda x: x.rrf_score,
reverse=True
)

# Step 6: Assign final ranks and truncate
for rank, doc in enumerate(fused_results[:self.config.top_n], start=1):
doc.final_rank = rank

final_results = fused_results[:self.config.top_n]

# Update statistics
elapsed_ms = (time.time() - start_time) * 1000
self.stats['total_fusions'] += 1
self.stats['processing_time_ms'] += elapsed_ms
total_inputs = sum(len(docs) for docs in result_lists.values())
self.stats['avg_input_lists'] = (
(self.stats['avg_input_lists'] * (self.stats['total_fusions'] - 1) + len(result_lists))
/ self.stats['total_fusions']
)
self.stats['avg_output_size'] = (
(self.stats['avg_output_size'] * (self.stats['total_fusions'] - 1) + len(final_results))
/ self.stats['total_fusions']
)

logger.info(
f"RRF fusion complete: input {total_inputs} items, "
f"output {len(final_results)} items, latency {elapsed_ms:.1f}ms"
)

return final_results

def fuse_with_deduplication(
self,
result_lists: Dict[RetrievalChannel, List[RetrievedDocument]],
similarity_threshold: float = 0.85
) -> List[RetrievedDocument]:
"""
RRF fusion with deduplication.

For documents with highly similar content (e.g., parent-child chunks),
keep only the highest-scoring one.
"""
# First, execute standard fusion
fused = self.fuse(result_lists)

if self.config.dedup_strategy == "doc_id":
# Dedup based on doc_id (already naturally deduped)
return fused

elif self.config.dedup_strategy == "content_similarity":
# Dedup based on content similarity
return self._deduplicate_by_content(fused, similarity_threshold)

else:
return fused

def _deduplicate_by_content(
self,
documents: List[RetrievedDocument],
threshold: float
) -> List[RetrievedDocument]:
"""
Deduplication based on content similarity.
Uses a simple character overlap rate (production environment can use Embedding).
"""
unique_docs = []
seen_contents = set()

for doc in documents:
content_key = self._normalize_content(doc.content)

is_duplicate = False
for seen in seen_contents:
similarity = self._calculate_text_overlap(content_key, seen)
if similarity > threshold:
is_duplicate = True
break

if not is_duplicate:
unique_docs.append(doc)
seen_contents.add(content_key)

# Reassign ranks
for rank, doc in enumerate(unique_docs, start=1):
doc.final_rank = rank

logger.info(
f"Content dedup complete: {len(documents)} β†’ {len(unique_docs)} "
f"(threshold={threshold})"
)

return unique_docs

@staticmethod
def _normalize_content(content: str) -> str:
"""Normalize text"""
import re
content = content.lower().strip()
content = re.sub(r'\s+', ' ', content)
return content

@staticmethod
def _calculate_text_overlap(text1: str, text2: str) -> float:
"""Calculate simple text overlap rate"""
words1 = set(text1.split())
words2 = set(text2.split())

if not words1 or not words2:
return 0.0

intersection = words1 & words2
union = words1 | words2

return len(intersection) / len(union) if union else 0.0

def get_statistics(self) -> Dict:
"""Get statistics"""
return {
**self.stats,
'config': {
'k': self.config.k,
'enable_weighting': self.config.enable_weighting,
'top_n': self.config.top_n,
}
}

def reset_statistics(self):
"""Reset statistics"""
self.stats = {
'total_fusions': 0,
'avg_input_lists': 0,
'avg_output_size': 0,
'processing_time_ms': 0,
}

Multi-Channel Retrieval Coordinator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# multi_channel_retriever.py
"""
Multi-channel retrieval coordinator.
Responsible for calling multiple retrieval channels in parallel and collecting results.
"""

import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Callable, Optional
from dataclasses import dataclass
import time

from rrf_fusion_engine import (
RRFFusionEngine,
RRFConfig,
RetrievedDocument,
RetrievalChannel
)


@dataclass
class RetrievalRequest:
"""Retrieval request"""
query: str
query_embedding: Optional[List[float]] = None
top_k_per_channel: int = 30
filters: Dict = None
channels_to_use: List[RetrievalChannel] = None


@dataclass
class RetrievalResult:
"""Retrieval response"""
query: str
fused_results: List[RetrievedDocument]
per_channel_results: Dict[RetrievalChannel, List[RetrievedDocument]]
latency_ms: float
channels_used: List[RetrievalChannel]


class MultiChannelRetriever:
"""
Multi-channel retrieval coordinator.

Features:
1. Execute multiple retrieval channels in parallel
2. Collect results from each channel
3. Invoke RRF fusion
4. Return unified results
"""

def __init__(
self,
fusion_engine: RRFFusionEngine = None,
max_workers: int = 4,
timeout_per_channel: float = 5.0
):
self.fusion_engine = fusion_engine or RRFFusionEngine()
self.max_workers = max_workers
self.timeout_per_channel = timeout_per_channel

# Registered retrieval functions for each channel
self._channel_retrievers: Dict[RetrievalChannel, Callable] = {}
self._channel_configs: Dict[RetrievalChannel, Dict] = {}

def register_channel(
self,
channel: RetrievalChannel,
retriever_func: Callable,
config: Dict = None
):
"""
Register a retrieval channel.

Parameters:
channel: channel type
retriever_func: retrieval function, signature: func(query, top_k, **kwargs) -> List[RetrievedDocument]
config: channel-specific configuration
"""
self._channel_retrievers[channel] = retriever_func
self._channel_configs[channel] = config or {}
print(f"βœ… Registered retrieval channel: {channel.value}")

async def retrieve_async(self, request: RetrievalRequest) -> RetrievalResult:
"""
Asynchronous multi-channel retrieval (recommended for production).
"""
start_time = time.time()

# Determine which channels to use
channels = request.channels_to_use or list(self._channel_retrievers.keys())

print(f"πŸš€ Starting multi-channel retrieval, using channels: {[c.value for c in channels]}")

# Execute all channels in parallel
tasks = {}
loop = asyncio.get_event_loop()

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {}

for channel in channels:
if channel not in self._channel_retrievers:
continue

retriever_func = self._channel_retrievers[channel]
channel_config = self._channel_configs.get(channel, {})

future = executor.submit(
self._safe_retrieve,
retriever_func,
request.query,
request.top_k_per_channel,
channel,
**channel_config
)
futures[future] = channel

# Collect results
per_channel_results: Dict[RetrievalChannel, List[RetrievedDocument]] = {}

for future in as_completed(futures, timeout=self.timeout_per_channel * len(channels)):
channel = futures[future]
try:
results = future.result(timeout=self.timeout_per_channel)
per_channel_results[channel] = results
print(f"βœ… Channel {channel.value} completed, returned {len(results)} results")
except Exception as e:
print(f"❌ Channel {channel.value} failed: {e}")
per_channel_results[channel] = []

# Execute RRF fusion
fused_results = self.fusion_engine.fuse(per_channel_results, request.query)

# Build return result
latency_ms = (time.time() - start_time) * 1000

result = RetrievalResult(
query=request.query,
fused_results=fused_results,
per_channel_results=per_channel_results,
latency_ms=latency_ms,
channels_used=list(per_channel_results.keys())
)

print(
f"πŸŽ‰ Multi-channel retrieval complete: {len(channels)} channels, "
f"fused returned {len(fused_results)} items, latency {latency_ms:.0f}ms"
)

return result

def retrieve_sync(self, request: RetrievalRequest) -> RetrievalResult:
"""
Synchronous multi-channel retrieval (simplified version).
"""
import asyncio
return asyncio.run(self.retrieve_async(request))

def _safe_retrieve(
self,
retriever_func: Callable,
query: str,
top_k: int,
channel: RetrievalChannel,
**kwargs
) -> List[RetrievedDocument]:
"""
Safely execute retrieval (with exception handling).
"""
try:
results = retriever_func(query, top_k=top_k, **kwargs)

# Ensure correct format
formatted_results = []
for rank, item in enumerate(results, start=1):
if isinstance(item, RetrievedDocument):
item.rank_in_channel = rank
formatted_results.append(item)
else:
# Try to convert dict format
formatted_results.append(RetrievedDocument(
doc_id=item.get('doc_id', ''),
content=item.get('content', ''),
original_score=item.get('score', 0),
channel=channel,
rank_in_channel=rank,
metadata=item.get('metadata', {})
))

return formatted_results

except Exception as e:
print(f"❌ Retrieval exception ({channel.value}): {e}")
return []

Advanced Fusion Strategies and Parameter Tuning

Dynamic Weight Adjustment

# adaptive_rrf.py
"""
Adaptive RRF Fusion
Dynamically adjust channel weights based on query characteristics.
"""

from typing import Dict, List
from rrf_fusion_engine import (
    RRFFusionEngine, 
    RRFConfig, 
    RetrievedDocument, 
    RetrievalChannel
)
import re


class AdaptiveRRFEngine(RRFFusionEngine):
    """
    Adaptive RRF Engine.
    
    Features:
    1. Automatically adjust channel weights based on query type
    2. Support learning from historical feedback
    3. Configurable rule engine
    """
    
    # Mapping from query patterns to weight adjustments
    QUERY_PATTERNS = {
        # Specialized terms dense β†’ boost Sparse weight
        r'[a-zA-Z]{2,}\d+|[A-Z]{2,}|\d+\s*(mg|ml|kg|cm)': {
            RetrievalChannel.SPARSE: 1.5,
            RetrievalChannel.DENSE: 0.8,
        },
        # Natural language questions β†’ boost Dense weight
        r'(what|how|why|which|when|where)\b.*\?$': {
            RetrievalChannel.DENSE: 1.3,
            RetrievalChannel.SPARSE: 1.0,
        },
        # Numeric comparison queries β†’ boost Table weight
        r'(how many|highest|lowest|maximum|minimum|exceed|below|more than|less than)': {
            RetrievalChannel.TABLE: 1.4,
            RetrievalChannel.DENSE: 1.0,
        },
        # Relationship reasoning queries β†’ boost Graph weight
        r'(relationship|related|belongs|contains|part of|subclass|parent class)': {
            RetrievalChannel.GRAPH: 1.5,
            RetrievalChannel.DENSE: 0.9,
        }
    }
    
    def __init__(self, config: RRFConfig = None):
        super().__init__(config)
        self.enable_adaptive = True
        self.learning_rate = 0.05  # Learning rate for online learning
        self.feedback_history: List[Dict] = []
    
    def analyze_query(self, query: str) -> Dict[RetrievalChannel, float]:
        """
        Analyze query characteristics and return suggested channel weights.
        """
        base_weights = dict(self.config.channel_weights)
        
        if not self.enable_adaptive:
            return base_weights
        
        adjustments = {}
        
        for pattern, weight_adjustments in self.QUERY_PATTERNS.items():
            if re.search(pattern, query, re.IGNORECASE):
                for channel, factor in weight_adjustments.items():
                    if channel not in adjustments:
                        adjustments[channel] = 1.0
                    adjustments[channel] *= factor
        
        # Apply adjustments
        adjusted_weights = {}
        for channel, base_weight in base_weights.items():
            adjustment = adjustments.get(channel, 1.0)
            adjusted_weights[channel] = base_weight * adjustment
        
        # Normalize (keep total weight relatively stable)
        total = sum(adjusted_weights.values())
        if total > 0:
            scale = len(base_weights) / total
            adjusted_weights = {
                ch: w * scale 
                for ch, w in adjusted_weights.items()
            }
        
        return adjusted_weights
    
    def fuse_with_adaptive_weights(
        self,
        result_lists: Dict[RetrievalChannel, List[RetrievedDocument]],
        query: str = ""
    ) -> List[RetrievedDocument]:
        """
        Fuse with adaptive weights.
        """
        # Analyze query and get dynamic weights
        dynamic_weights = self.analyze_query(query)
        
        print(f"πŸ“Š Adaptive weights: { {ch.value: f'{w:.2f}' for ch, w in dynamic_weights.items()} }")
        
        # Temporarily modify configuration
        original_weights = self.config.channel_weights.copy()
        self.config.channel_weights = dynamic