Production RAG Infrastructure on AWS
A reusable retrieval layer built on AWS OpenSearch and Amazon Bedrock embeddings that grounds multiple LLM pipelines in system-specific documentation.
Problem
LLM workflows for compliance and documentation need grounded retrieval to avoid hallucinations and provide relevant, system-specific answers.
Solution
Built a shared RAG backend that indexes chunked documents with Bedrock-generated embeddings into OpenSearch and exposes namespace-scoped retrieval to downstream pipelines.
Impact
- →Enabled grounded retrieval across multiple AI workflows
- →Centralized retrieval infrastructure for compliance and documentation pipelines
- →Improved reliability of generated outputs by attaching them to semantic search results
Architecture
- 01Documents are embedded using Titan embeddings from Bedrock
- 02Vectors and metadata are stored in OpenSearch
- 03A backend abstraction handles upserts and filtered retrieval
- 04Pipelines query the service using namespace and document-type filters
Capabilities
- ·Semantic indexing and retrieval
- ·Namespace-scoped search
- ·Document chunk ingestion
- ·Shared backend for multiple pipelines
- ·AWS-authenticated service integration
Stack
Technical Deep Dive
Architecture internals and annotated code from the production system.
Architecture Overview
A ports-and-adapters (hexagonal) architecture with four distinct abstraction layers. Any pipeline — IRP generation, SCRM, CONMON, policy drafting — plugs into the same retrieval infrastructure through well-defined interfaces. The retrieval layer is deliberately decoupled so that backend changes (switching embedding providers, adding a new vector store) never require pipeline code changes.
Key Architectural Decisions
Protocol Over ABC for Backend Interface
IndexBackend uses Python Protocol (structural typing), not ABC (nominal typing). Any class with matching method signatures satisfies the interface without inheriting from it. Three implementations conform: OpenSearchIndexBackend, AwsIndexBackend (S3 + OpenAI embeddings), and the OpenAI Vector Store path — all without a shared base class.
DriveFileRecord as Universal Data Contract
Every document flowing through the system becomes a DriveFileRecord with drive_file_id, name, mime_type, modified_time, md5, and content bytes. This is the normalization point — upstream pipelines don't need to know whether the document will end up in OpenSearch, S3, or OpenAI's vector store.
Separated retrieval_query from prompt
BackendExecutor.process() takes retrieval_query as a separate parameter from prompt. The text sent to OpenSearch for kNN search can be optimized independently from the text sent to the LLM. retrieval_query_builder.py extracts {{placeholders}} from markdown templates and builds targeted semantic queries, while the LLM receives the full template with instructions.
Namespace Isolation for Multi-Pipeline Reuse
A single OpenSearch domain and index stores documents for all compliance pipelines. The namespace field (keyword type) is added as a term filter on every kNN query. IRP generator only sees IRP-namespace documents, SCRM only sees SCRM documents. Same index, same embeddings, same infrastructure — different logical partitions.
RAG Factory as Dependency Injection Point
rag_factory.py's build_opensearch_bedrock_rag() wires OpenSearchClient + BedrockEmbeddingsClient + BedrockLLMClient into a single RAG instance. The factory reads all infra config from environment variables, so the same code deploys to dev (basic auth, t3.small) and prod (SigV4, encrypted domain) without code changes.
FedRAMP Layer 1 Deliberately Excluded from RAG
The enrichment pipeline intentionally does not use RAG retrieval. It trusts structured metadata, not retrieved documents, precisely to avoid the hallucination risk that RAG introduces. As the README states: 'Raw KSI prose must not be the sole LLM input. The LLM receives a structured package of source fields.'
Fixed-Size Word-Window Over Semantic Chunking
The source documents are compliance policy prose — incident response plans, supply chain risk management docs, security procedures. These are dense, self-contained paragraphs where any 300-word window captures a complete thought. Smaller uniform chunks (300 words, ~1,200 chars) produce tight embedding vectors that score sharply on kNN. Larger semantic chunks (2,000+ words) dilute the embedding — a 3-page section about incident severity classification would weakly match many queries. With the over-fetch strategy (fetch_mult=5), 5 sharp-match small chunks outperform 1 diluted-match large chunk.
No Overlap to Avoid Embedding Cost Explosion
Chunks don't share context at their boundaries. Overlap-based chunking (common in LangChain recipes — 200-word chunks with 50-word overlap) would increase Titan Embed v2 API calls by ~25% with no measurable retrieval quality gain for prose documents. On a t3.small.search OpenSearch domain, the storage and compute savings matter.
Predictable Chunk IDs for Freshness-Aware Deduplication
index_drive_file() checks source_modified_at and skips unchanged documents. When a document changes, it deletes all existing chunks and re-indexes. Fixed-size chunking produces stable chunk_id = f"{drive_id}:{idx}" identifiers. Semantic chunking would make chunk boundaries shift unpredictably between document versions, breaking chunk-ID-based deduplication.
Hybrid Search (kNN + BM25) as Opt-In Extension
Pure kNN works well for policy prose, but queries containing control IDs (AC-2(4)), tool names (sshd_config), or CLI flags need lexical precision that vector similarity alone misses. Hybrid search fuses normalized kNN and BM25 scores with configurable weights. Documents appearing in only one result set get 0.0 for the missing signal — this naturally penalizes results that score well on only one dimension.
Backwards Compatibility via Keyword-Only Defaults
Every hybrid parameter is keyword-only with a default value: use_hybrid=False (existing behavior unchanged), vector_weight=0.7, keyword_weight=0.3. No existing callers use positional args that would break. Zero test files directly call any of the modified methods. Opting in is a single flag change — opting out requires no code changes at all.
LLM Reranker as Late Interaction Stage
After bi-encoder retrieval returns 20 candidate hits, an LLM reranker (Claude Haiku at temperature=0) scores each chunk 0-10 against the actual question. This catches what vector similarity misses: chunks that share vocabulary but are from the wrong context (a different policy's 'advisory board'), the wrong control (KSI-CMT-02 vs. KSI-CMT-01), or irrelevant lexical overlap ('membership' in an org chart). Only the top-scoring chunks survive into context assembly.
Reranking as Quality Optimization, Not Correctness Gate
If the reranker LLM call fails or returns unparseable JSON, the function silently returns the original hits truncated to top_n. The pipeline never breaks. Reranking improves answer quality but isn't required for correct operation — this makes it safe to opt into without risk of pipeline failures from a downstream model error.
Metadata Filtering via Dynamic Mapping — No Migration Required
A single metadata_filters: Dict[str, str] parameter flows from any entry point down to OpenSearch's bool.filter clause. Each key-value pair becomes a term filter on doc.metadata.<key> — binary include/exclude applied before scoring. OpenSearch's dynamic: true setting auto-creates keyword fields for any new string written under doc.metadata, so no PUT _mapping call is needed. The first document indexed with project_id creates the field; the first query filtering on it uses the auto-created keyword index.
Three Levels of Isolation — Company, Namespace, Metadata
Company (doc.company, always applied) provides tenant isolation. Namespace (doc.namespace, always applied) provides document collection isolation (IRP vs. SCRM vs. CONMON). Metadata (doc.metadata.*, opt-in via metadata_filters) provides fine-grained filtering by project, doc type, classification, or version. Existing documents without a metadata field are correctly excluded from project-scoped queries — a term filter on a missing field returns no match.
Latency Budget — LLM Dominates, Everything Else Under 400ms
A full retrieve-and-answer cycle takes ~5-8s. The LLM answer generation dominates at ~75% of wall time (~4-6s). Everything upstream — query embedding (~200ms via Bedrock), kNN search (~100ms via OpenSearch), filtering, context assembly — totals under 400ms. Six optimizations keep the non-LLM stages fast: aggressive boto3 timeouts, client singleton with connection pool reuse, credential caching with 600s TTL, over-fetch with local score cutoff, single-text embedding for the query path, and cross-region inference profile routing.
Deliberate Non-Optimizations
Response streaming (invoke_model_with_response_stream) is excluded from the IAM policy — a deliberate simplicity tradeoff that would drop perceived TTFT from ~5s to ~500ms. OpenSearch connection pool tuning uses opensearch-py defaults because single-digit QPS on a t3.small domain isn't connection-bound. Formal latency instrumentation exists for ingest (files/s, bytes/s via RunMetrics) but not for the query path — a known gap, not an oversight.
Vector Staleness Management — Per-File Compare-and-Swap
Compliance documentation changes constantly. When a source document changes, every embedding derived from the old version becomes stale — returning irrelevant context to the LLM. The ingestion pipeline uses a timestamp-driven freshness check against Google Drive's modified_time. Each file goes through compare (query OpenSearch for stored source_modified_at, skip if unchanged at ~5ms cost), purge (scoped delete_by_query for that drive_id with conflicts=proceed and refresh=true), and re-index (extract, chunk, embed, bulk-index with new timestamp).
No Downtime During Re-Indexing
The delete-then-index sequence creates a brief window (1-5 seconds) where one file has zero chunks. Searches not referencing that file are unaffected; searches that do draw context from other relevant documents. Since the index serves document generation pipelines (not real-time user-facing search), this sub-5-second gap per file is invisible in practice. The chunk ID scheme ({drive_id}:{index}) means the delete_by_query wipes all old chunks by drive_id regardless of how many chunks the old version produced.
Cross-Backend Consistency via IndexBackend Protocol
The same IndexBackend Protocol governs both OpenSearch and S3 backends. OpenSearch compares timestamps (catches all edits). S3 compares md5 + modified_time (cheaper but misses metadata-only edits). Both backends support skip_if_existing and replace_if_newer flags, giving callers control over the tradeoff between thoroughness and cost.
Hard Context Budgets to Prevent Lost-in-the-Middle
The context assembly layer enforces a 12,000-character total ceiling and a 4,000-character cap per individual chunk. With 300-word chunks averaging ~1,200 characters, the model receives 5-8 focused chunks per query — well below the 20+ document threshold where lost-in-the-middle effects become significant in the literature. Oversized chunks are truncated rather than allowed to consume the entire budget.
Separated Retrieval Query and LLM Prompt
The text sent to OpenSearch for vector/keyword matching is a tight semantic query optimized for retrieval precision (e.g., 'IRP section: roles and responsibilities. Find values for SIEM, infrastructure scanner'). The text sent to the LLM is the full task instruction optimized for generation quality. Without this separation, boilerplate instructions would dilute the embedding vector and degrade retrieval results.
Tiered Placeholder Resolution Before RAG
A four-tier resolution hierarchy resolves template placeholders before they reach the RAG layer: static client profile (instant lookup), in-memory small documents (no search needed), RAG retrieval (OpenSearch), and explicit {{UNKNOWN}} markers for unresolvable values. Known values like company name or SIEM tool name are resolved from configuration — they never enter the context window, preserving the budget for values that genuinely require document retrieval.
Code Showcase 1
Protocol Interface — IndexBackend
The contract that all storage backends must satisfy. Uses Python Protocol for structural typing — any class with matching method signatures conforms without inheritance.
class IndexBackend(Protocol):
def ensure_index(
self, *, company, name, if_exists, namespace
) -> str: ...
def upsert_file(
self, *, company, index_id, file, replace_if_newer
) -> None: ...
# Three implementations conform:
# - OpenSearchIndexBackend (services/opensearch_index_backend.py)
# - AwsIndexBackend (services/aws_index_backend.py)
# - OpenAI Vector Store path (pipelines/google_to_openai)| Property | Detail |
|---|---|
| Pattern | Ports-and-adapters — the Protocol is the port, each backend is an adapter |
| Structural Typing | Protocol, not ABC — implementations don't inherit, they just match the signature |
| Three Backends | OpenSearch (kNN + Bedrock embeddings), AWS S3 (OpenAI embeddings), OpenAI Vector Store |
Code Showcase 2
Retrieval Dispatcher — ensure_retrieval()
The single entry point for all backend-agnostic document ingestion. A pipeline calls ensure_retrieval() with a backend_type and namespace and doesn't care about index creation, SigV4 auth, chunking strategies, or embedding dimensions.
def ensure_retrieval(
*, app, backend_type, company,
drive_path, namespace, ...
):
if backend_type == "openai":
ensure_vs_and_docs(...)
elif backend_type == "opensearch":
ensure_os_index_and_docs(...)
elif backend_type in ("bedrock", "aws"):
ensure_aws_index_and_docs(...)| Property | Detail |
|---|---|
| Backend Routing | Dispatches to correct ingestion path based on backend_type — OpenAI, OpenSearch, or Bedrock/AWS |
| Index Sanitization | Handles OpenSearch naming rules (lowercase, no special chars) internally |
| Caller Simplicity | Pipeline just specifies backend_type and namespace — all infra complexity is hidden |
Code Showcase 3
Namespace Isolation — kNN Filter
The architectural keystone that makes the retrieval layer reusable across pipelines. A single OpenSearch index stores documents for all compliance workflows, partitioned by namespace.
@dataclass
class OpenSearchBedrockRAG:
"""
Intentionally generic so IRP, SCRM, CONMON, etc.
can all reuse it by just changing `namespace`.
"""
company: str
namespace: str = "irp"
# opensearch.py:search_knn
if namespace:
must_filters.append(
{"term": {"doc.namespace": namespace}}
)| Property | Detail |
|---|---|
| Single Index | One OpenSearch domain, one index, one embedding pipeline — no per-pipeline infrastructure duplication |
| Logical Partitions | Namespace term filter on every kNN query isolates IRP, SCRM, CONMON documents |
| Swap One Field | Changing namespace='irp' to namespace='scrm' switches the entire retrieval context |
Code Showcase 4
Source Priority System
RAG is one data source among many, with a defined priority in the multi-source evidence merge. Live API responses win over RAG-retrieved documents, which win over manual exports — the retrieval layer gracefully degrades.
class SourceAdapter(ABC):
def priority(self) -> int:
"""
1 = live API
2 = custom/service
3 = rag
4 = export
5 = manual
"""
# When the compliance engine merges results,
# a live Vanta API response (priority 1) wins over
# a RAG-retrieved document (priority 3), which wins
# over a manual CSV export (priority 5).| Property | Detail |
|---|---|
| Graceful Degradation | If a live API has the evidence, RAG results are deprioritized automatically — no manual override needed |
| Explicit Ranking | 5-tier priority system makes data source precedence auditable, not implicit |
| Extensible | New source adapters slot in at any priority tier without changing existing merge logic |
Code Showcase 5
Pipeline Plug-In Points
Every compliance pipeline connects to the same retrieval infrastructure through ensure_retrieval() and BackendExecutor.process(). The FedRAMP Layer 1 pipeline is the notable exception — it deliberately avoids RAG to prevent hallucination from retrieved prose.
Pipeline What It Does What It Calls
─────────────────────────────────────────────────────────────────────
IRP Generator Drafts incident ensure_retrieval(namespace="irp")
response plan → BackendExecutor.process()
SCRM Generator Drafts supply chain ensure_retrieval(namespace="scrm")
risk plan → BackendExecutor.process()
CONMON Generator Continuous monitoring ensure_retrieval(namespace="conmon")
docs → BackendExecutor.process()
FedRAMP Layer 1 Requirement enrichment Uses structured JSON input, not RAG
— deliberately excluded
Custom Tests Evidence extraction Can query any namespace for
prompts supporting documentation
Compliance Multi-source evidence RAG adapter at priority=3
Engine merge alongside live APIs and exports| Property | Detail |
|---|---|
| Same Interface | Every RAG-enabled pipeline calls ensure_retrieval() + BackendExecutor.process() — no per-pipeline retrieval code |
| Namespace Swap | Switching from IRP to SCRM to CONMON is a single parameter change |
| Deliberate Exclusion | FedRAMP Layer 1 intentionally skips RAG — structured metadata over retrieved prose to avoid hallucination risk |
Code Showcase 6
Stage 1: Word-Based Fixed-Size Chunker
_chunk_text() is a straightforward fixed-size word-window chunker. It splits on whitespace, accumulates words into a buffer, and emits a chunk when the buffer hits the limit. No overlap, no sentence detection, no heading-aware splitting. Called with max_tokens=300 in practice.
def _chunk_text(self, text, max_tokens=500):
words = text.split()
current = []
for w in words:
current.append(w)
if len(current) >= max_tokens:
chunks.append(" ".join(current))
current = []| Property | Detail |
|---|---|
| Strategy | Fixed-size word-window — not semantic, not recursive, not overlap-based |
| Window Size | Default 500 words, called with max_tokens=300 in production (~1,200 chars per chunk) |
| Why No Overlap | Overlap would increase Titan Embed v2 API calls by ~25% with no retrieval quality gain for prose |
| Why No Semantic Split | Compliance prose degrades gracefully with fixed-size chunking — unlike code or structured data |
Code Showcase 7
Stage 2: Boundary-Aware Safety Splitter
_split_for_embed() enforces the embedding model's hard character limit after the word chunker runs. Default max_chars is 12,000 (from EMBED_MAX_CHARS env var), tuned to keep Titan Embed v2 well under its 8,192-token limit (~4 chars/token heuristic).
def _split_for_embed(self, text, max_chars):
# try splitting on a boundary
cut = text.rfind("\n", start, end) # prefer newline break
if cut == -1:
cut = text.rfind(" ", start, end) # fallback to space break
if cut == -1 or cut <= start + (max_chars * 0.5):
cut = end # hard cut if no good boundary
# The two stages chain together in index_drive_file():
chunks = self._chunk_text(text, max_tokens=300)
max_chars = int(os.getenv("EMBED_MAX_CHARS", "12000"))
safe_chunks = []
for c in chunks:
safe_chunks.extend(
self._split_for_embed(c, max_chars=max_chars)
)| Property | Detail |
|---|---|
| Split Priority | Try newline break first, then space break, then hard cut only if no boundary in the back 50% of the window |
| Safety Net | Ensures no chunk exceeds Titan Embed v2's token limit regardless of word-chunker output |
| Two-Stage Chain | Stage 1 (word window) sizes content, Stage 2 (character safety) enforces embedding model limits |
Code Showcase 8
Full Ingestion Pipeline Flow
The complete path from Google Drive document to OpenSearch kNN index. Includes freshness-aware deduplication, multi-format text extraction with binary guardrails, two-stage chunking, batch embedding, and bulk upsert.
Google Drive document (PDF, DOCX, Google Doc)
|
v
GoogleDriveIngestor.iter_files()
-> exports Google-native docs to DOCX/XLSX/PPTX
-> downloads binary files directly
-> yields DriveFileRecord(content=bytes)
|
v
OpenSearchIndexBackend.index_drive_file()
|
+- freshness check: get_latest_source_modified()
| +- skip if unchanged (IndexResult: skipped_unchanged)
|
+- _extract_text(record)
| +- text/plain, text/markdown -> direct decode
| +- PDF, DOCX, PPTX, XLSX -> MarkItDown -> markdown
| +- DOCX fallback -> python-docx paragraph + table extraction
| +- binary guardrail: reject if "%PDF-" or "/FlateDecode" in output
|
+- _chunk_text(text, max_tokens=300) <- Stage 1: word-window
|
+- _split_for_embed(chunk, max_chars=12000) <- Stage 2: safety split
|
+- embed_fn(chunks) <- Titan Embed v2, batch=200
|
+- delete_docs_for_drive_file() <- wipe stale chunks
|
+- os_client.index_chunks(docs) <- bulk upsert with retry
|
v
OpenSearch kNN index
mapping: doc.embedding (knn_vector, dim=1024)
filters: doc.company (keyword), doc.namespace (keyword)
metadata: doc.path, doc.chunk_id, doc.source_modified_at| Property | Detail |
|---|---|
| Freshness Check | Compares source_modified_at before re-indexing — unchanged documents are skipped entirely |
| Multi-Format Extraction | MarkItDown handles PDF, DOCX, PPTX, XLSX; binary guardrail catches corrupt or unconvertible files |
| Two-Stage Chunking | 300-word windows (Stage 1) then 12,000-char safety splits (Stage 2) before embedding |
| Batch Embedding | Titan Embed v2 via Bedrock in batches of 200, producing 1024-dim vectors |
| Atomic Re-Index | On document change: delete all stale chunks first, then bulk upsert new chunks |
Code Showcase 9
Hybrid Search — Score Fusion
When use_hybrid=True, kNN and BM25 results are independently normalized to [0, 1], then fused with configurable weights. Documents appearing in only one result set get 0.0 for the missing signal, naturally penalizing single-dimension matches. The scoring breakdown is attached to each hit for provenance.
# Score fusion algorithm
# For each document appearing in either kNN or BM25 results:
# 1. Normalize kNN scores
knn_norm = (score - min) / (max - min) # -> [0, 1]
# 2. Normalize BM25 scores
bm25_norm = (score - min) / (max - min) # -> [0, 1]
# 3. Fuse with configurable weights
final_score = (
knn_norm * vector_weight
+ bm25_norm * keyword_weight
)
# 4. Sort by final_score descending, return top k
# Each hit includes scoring breakdown:
# hit["_score_detail"] = {
# "knn_normalized": 0.8234,
# "bm25_normalized": 0.9512,
# "vector_weight": 0.5,
# "keyword_weight": 0.5,
# }| Property | Detail |
|---|---|
| Min-Max Normalization | Both kNN and BM25 scores normalized to [0, 1] before fusion — different score scales don't bias the result |
| Missing Signal = 0.0 | Documents in only one result set get 0.0 for the other, penalizing single-dimension matches |
| Score Provenance | Every hit carries _score_detail with normalized scores and weights for debugging and auditability |
Code Showcase 10
Hybrid Search — Usage and Recommended Weights
Hybrid search is opt-in at both the BackendExecutor level (for pipelines) and the RAG layer directly. Weight recommendations vary by query type — semantic similarity dominates for policy prose, while lexical precision matters for control IDs and configuration artifacts.
# Via BackendExecutor (any pipeline):
text, meta = executor.process(
prompt=prompt,
instructions=instructions,
retrieval_query=query,
use_hybrid=True,
vector_weight=0.5,
keyword_weight=0.5,
)
# meta["retrieval"]["hybrid"] == True
# Directly from RAG layer:
# Pure kNN (default, unchanged behavior)
hits = rag.retrieve(
"incident response roles and responsibilities"
)
# Hybrid (kNN + BM25)
hits = rag.retrieve(
"KSI-CMT-01 CM-3 change advisory board",
use_hybrid=True,
vector_weight=0.5,
keyword_weight=0.5,
)| Property | Detail |
|---|---|
| Policy prose (IRP, SCRM, CONMON) | vector=0.7, keyword=0.3 — semantic similarity dominates; keywords break ties |
| Control/KSI evidence lookup | vector=0.5, keyword=0.5 — control IDs like AC-2(4) and tool names need lexical precision |
| Configuration artifact search | vector=0.3, keyword=0.7 — sshd_config, SSLProtocol, exact CLI flags need lexical match |
| Backwards Compatibility | Default use_hybrid=False — all existing callers get unchanged pure kNN behavior with zero code changes |
Code Showcase 11
Full Retrieval Pipeline — 3-Stage Flow
The complete retrieve() call chain: bi-encoder retrieval (kNN or hybrid) fetches 20 candidates, the LLM reranker scores and prunes to the top 5, then context assembly caps and concatenates hits for the final LLM answer generation.
retrieve(question, k=20, rerank=True, rerank_top_n=5)
|
+------------------+------------------+
| STAGE 1: Bi-Encoder Retrieval |
| |
| question --> Titan Embed v2 --> vec |
| | |
| OpenSearch <-- kNN or hybrid <+ |
| | |
| +-> 20 candidate hits (scored |
| by cosine +/- BM25) |
+------------------+------------------+
|
+------------------+------------------+
| STAGE 2: LLM Reranker |
| |
| For each of 20 hits: |
| send first 800 chars + question |
| to Claude (Haiku) at temp=0 |
| |
| Claude scores each chunk 0-10: |
| 10 = directly answers question |
| 7-9 = highly relevant detail |
| 4-6 = partially relevant |
| 1-3 = mostly irrelevant |
| 0 = completely irrelevant |
| |
| Sort by score desc -> return top 5 |
+------------------+------------------+
|
+------------------+------------------+
| STAGE 3: Context Assembly + Answer |
| |
| build_context(5 hits) |
| cap each hit at 4,000 chars |
| cap total at 12,000 chars |
| |
| llm_client.chat() -> final answer |
+-------------------------------------+| Property | Detail |
|---|---|
| Stage 1 | Bi-encoder retrieval via Titan Embed v2 — kNN or hybrid (kNN + BM25) returning 20 candidates |
| Stage 2 | LLM reranker (Claude Haiku, temp=0) scores each chunk 0-10, prunes to top 5 |
| Stage 3 | Context assembly caps hits (4K chars each, 12K total) then generates the final answer |
| Default Flow | k=20 -> rerank -> top 5 (or k//2 with floor of 3) -> context -> answer |
Code Showcase 12
LLM Reranker — rerank_hits()
The 5-step reranking function at rag_bedrock_opensearch.py:35-118. Builds chunk previews, scores via a single Bedrock call with strict JSON output contract, normalizes and clamps scores, sorts with tiebreaker on original retrieval order, and falls back gracefully on failure.
def rerank_hits(*, question, hits, llm_client,
top_n, preview_chars=800, model_id):
# Step 1: Build chunk previews (lines 56-63)
# First 800 chars of content + source path per hit
previews = [
{"index": i, "text": hit["content"][:preview_chars],
"source": hit["path"]}
for i, hit in enumerate(hits)
]
# Step 2: LLM relevance scoring (lines 71-78)
# One Bedrock call, temperature=0.0, strict JSON contract:
# [{"index": 0, "score": 8}, {"index": 1, "score": 3}, ...]
# Step 3: Score normalization + clamping (lines 93-98)
# Each score clamped to [0, 10] — prevents hallucinated
# scores of 15 or -1 from distorting the ranking
# Step 4: Sort and prune (lines 101-113)
# Sort by rerank score desc, original order as tiebreaker
# Only top_n survive; each gets _rerank_score attached
# Step 5: Graceful fallback (lines 115-118)
# If LLM call fails or returns unparseable JSON,
# silently return original hits truncated to top_n
# Reranking is a quality optimization, not a gate| Property | Detail |
|---|---|
| Preview Limit | Only first 800 chars per chunk sent to reranker — controls cost without sacrificing judgment quality on compliance prose |
| Score Clamping | Scores clamped to [0, 10] — prevents LLM hallucination of out-of-range scores from distorting ranking |
| Tiebreaker | Original retrieval order breaks ties — preserves kNN/hybrid ranking signal when rerank scores are equal |
| Graceful Fallback | On any failure, returns original hits truncated to top_n — pipeline never breaks from a reranker error |
Code Showcase 13
Reranking — Call Chain and Cost
Reranking propagates through every public entry point via keyword arguments. At ~$0.0005 per reranked retrieval using Haiku, you could run 2,000 reranked retrievals for $1. The latency add of 1-2 seconds is acceptable for document generation pipelines.
# Full call chain — rerank propagates through all entry points:
BackendExecutor.process(rerank=True, rerank_top_n=5)
# -> opensearch_backend_process() (pops rerank kwargs)
# -> generate_from_prompt(rerank=...) (passes through)
# -> retrieve(rerank=...) (calls rerank_hits())
answer_question(rerank=True, rerank_top_n=5)
# -> retrieve(rerank=...) (same path)
retrieve(rerank=True, rerank_top_n=5)
# -> rerank_hits() directly
# Configuration via dataclass fields:
rag = OpenSearchBedrockRAG(
os_client=os_client,
embed_client=embed_client,
llm_client=llm_client, # Sonnet for answers
company="searchstax",
namespace="irp",
rerank_model_id="anthropic.claude-3-5-haiku-20241022-v1:0",
rerank_preview_chars=800,
)| Property | Detail |
|---|---|
| Reranker Input Cost | ~20 chunks x 800 chars / 4 = ~4,000 tokens at ~$0.0004 (Haiku) |
| Reranker Output Cost | ~20 score objects = ~200 tokens at ~$0.0001 (Haiku) |
| Added Latency | 1 Bedrock round-trip = ~1-2 seconds — acceptable for document generation, less so for interactive Q&A |
| Default top_n | k // 2 with floor of 3 — k=20 produces 10 after reranking, k=8 produces 4, or override explicitly |
Code Showcase 14
Consolidated Filter Builder — _build_filters()
The filter-building logic that was duplicated across search_knn and search_hybrid is consolidated into one static method. Company and namespace filters are always applied; metadata_filters are opt-in and expand to term filters on doc.metadata.<key>.
@staticmethod
def _build_filters(company, namespace, metadata_filters=None):
filters = []
if company:
filters.append({"term": {"doc.company": company}})
if namespace:
filters.append({"term": {"doc.namespace": namespace}})
for key, value in (metadata_filters or {}).items():
filters.append(
{"term": {f"doc.metadata.{key}": value}}
)
return filters
# Both search_knn() and search_hybrid() now delegate
# to _build_filters() — no duplicated filter logic| Property | Detail |
|---|---|
| Consolidation | Filter logic extracted from two search methods into one static method — single source of truth |
| Always-On Filters | Company and namespace are always applied; metadata_filters are additive and opt-in |
| Dynamic Key Expansion | Each metadata key-value pair becomes a term filter on doc.metadata.<key> — no hardcoded field names |
Code Showcase 15
Metadata Filtering — End-to-End Usage
Documents are tagged at indexing time via extra_metadata on the backend dataclass. At query time, metadata_filters flows from any entry point (BackendExecutor, retrieve, answer_question) down to OpenSearch's bool.filter clause. Filters are applied before scoring — non-matching documents never appear in results.
# At indexing time — tag documents with arbitrary metadata:
backend = OpenSearchIndexBackend(
os_client=os_client,
embed_fn=embed_fn,
company="searchstax",
namespace="irp",
extra_metadata={
"project_id": "fedramp-2024",
"doc_type": "policy",
"classification": "cui",
},
)
# Every chunk now has:
# doc.metadata.project_id = "fedramp-2024"
# doc.metadata.doc_type = "policy"
# doc.metadata.classification = "cui"
# At query time — filter to matching documents:
hits = rag.retrieve(
"incident response roles and responsibilities",
metadata_filters={"project_id": "fedramp-2024"},
)
# Combine with hybrid search + reranking:
hits = rag.retrieve(
"KSI-CMT-01 change advisory board",
use_hybrid=True,
rerank=True,
metadata_filters={
"project_id": "fedramp-2024",
"doc_type": "policy",
},
)
# Through BackendExecutor:
text, meta = executor.process(
prompt=prompt,
instructions=instructions,
metadata_filters={"project_id": "A"},
)
# meta["retrieval"]["metadata_filters"] == {"project_id": "A"}| Property | Detail |
|---|---|
| Indexing | extra_metadata dict on the backend dataclass — merged into every chunk's metadata alongside mime_type, drive_id, drive_name |
| Querying | metadata_filters dict at any entry point — each key-value becomes a term filter applied before scoring |
| Composable | Combines freely with namespace, hybrid search, and reranking — all features are independent and additive |
| Provenance | BackendExecutor records metadata_filters in returned meta dict for downstream traceability |
Code Showcase 16
Generated OpenSearch Query — With Metadata Filter
The actual query sent to OpenSearch when metadata_filters are provided. Filter clauses are applied before kNN scoring — documents without the specified metadata field are excluded from the candidate set entirely, regardless of vector similarity.
{
"query": {
"bool": {
"filter": [
{"term": {"doc.company": "searchstax"}},
{"term": {"doc.namespace": "irp"}},
{"term": {"doc.metadata.project_id": "A"}}
],
"must": [
{
"knn": {
"doc.embedding": {
"vector": ["..."],
"k": 40
}
}
}
]
}
}
}| Property | Detail |
|---|---|
| Pre-Score Filtering | Filter clauses execute before kNN scoring — non-matching documents are excluded from the candidate set entirely |
| No Migration | OpenSearch dynamic mapping auto-creates keyword fields for new metadata keys on first ingest |
| Missing Field Behavior | Documents indexed before extra_metadata was set don't have the field — term filter returns no match, correctly excluding them |
Code Showcase 17
Three Levels of Isolation
The retrieval layer now supports three composable levels of document isolation — from tenant-level down to fine-grained metadata attributes — all using the same OpenSearch term filter mechanism.
Level Mechanism Use Case
─────────────────────────────────────────────────────────────────
Company doc.company (always applied) Tenant isolation:
SearchStax vs. Acme
Namespace doc.namespace (always applied) Document collection:
IRP vs. SCRM vs. CONMON
Metadata doc.metadata.* (opt-in via Fine-grained:
metadata_filters) project, doc type,
classification, version| Property | Detail |
|---|---|
| Company | Always applied — tenant isolation between organizations sharing the same OpenSearch domain |
| Namespace | Always applied — logical partition per compliance pipeline (IRP, SCRM, CONMON) |
| Metadata | Opt-in via metadata_filters — project_id, doc_type, classification, version, or any arbitrary key |
| Composable | All three levels stack — a query can filter by company + namespace + multiple metadata fields simultaneously |
Code Showcase 18
Latency Critical Path — 4 Network Hops
The critical path for a single retrieval: embed query (~200ms Bedrock) -> kNN search (~100ms OpenSearch) -> build context (~0ms local) -> LLM answer (~4-6s Bedrock). Six optimizations keep the non-LLM stages fast.
# Critical path: embed -> search -> context -> answer
# ~200ms ~100ms ~0ms ~4-6s
# (Bedrock) (OpenSearch) (local) (Bedrock)
# 1. Aggressive boto3 timeout (bedrock_llm.py:42-46)
Config(
read_timeout=120,
connect_timeout=10, # fail fast on unreachable
retries={"max_attempts": 3, "mode": "standard"},
)
# 2. Client singleton with connection pool reuse
# OpenSearchClient, BedrockEmbeddingsClient, BedrockLLMClient
# constructed once, injected into RAG dataclass.
# First call: ~200-500ms (credential + TLS handshake)
# Subsequent: reuse warm connection
# 3. Credential caching with TTL (ttl_cache.py)
# Google Drive SA keys + Vanta tokens cached 600s
# Without: +100ms Secrets Manager round-trip per call
# 4. Over-fetch + local score cutoff (opensearch.py:229-287)
# Fetch k * fetch_mult (5x), threshold in Python
# Avoids unreliable server-side min_score
# 5. Single-text embedding for query path
# One question -> one InvokeModel call, no batch overhead
# Batch path (EMBED_BATCH_SIZE=200) reserved for ingest
# 6. Cross-region inference profile routing
modelId = os.getenv(
"BEDROCK_EMBEDDINGS_INFERENCE_PROFILE_ID"
) or model_id
# Routes to lowest-latency region, saves 50-100ms| Property | Detail |
|---|---|
| Connect Timeout | 10s connect_timeout fails fast on unreachable Bedrock — retries immediately rather than blocking for 120s |
| Connection Reuse | Boto3 clients maintain internal HTTP pools; first call pays TLS cost, subsequent calls reuse warm connections |
| Credential Cache | 600s TTL on Secrets Manager lookups — eliminates ~100ms per call for Google Drive SA keys and Vanta tokens |
| Over-Fetch | Fetch 5x results, threshold locally — trades larger payload for guaranteed filtering without a second query |
| Cross-Region Routing | Bedrock inference profile auto-routes to lowest-latency region, saving 50-100ms under load |
Code Showcase 19
Latency by Configuration Mode
Wall time varies by retrieval mode. The hybrid BM25 pass adds ~80ms (negligible). The rerank LLM call adds ~1.5s (Haiku) — the only mode where pre-LLM latency becomes noticeable.
Mode Extra Hops Estimated Total
────────────────────────────────────────────────────────────────────────────
Pure kNN (default) embed + search + LLM ~5-8s
Hybrid embed + 2 searches + fusion ~5.5-8.5s
(use_hybrid=True) + LLM
Hybrid + Rerank embed + 2 searches + rerank LLM ~7-11s
+ answer LLM
Optimization Not Taken Reason
────────────────────────────────────────────────────────────────────────────
Response streaming Excluded from IAM policy — deliberate
(invoke_model_with_ simplicity tradeoff. Would drop perceived
response_stream) TTFT from ~5s to ~500ms.
OpenSearch connection opensearch-py defaults used. Single-digit
pool tuning QPS on t3.small isn't connection-bound.
Formal query-path RunMetrics tracks ingest perf (files/s,
latency instrumentation bytes/s) but no perf_counter on retrieve().| Property | Detail |
|---|---|
| Pure kNN | ~5-8s total — embed (200ms) + search (100ms) + LLM answer (4-6s) |
| Hybrid | ~5.5-8.5s — adds ~80ms for BM25 pass and score fusion (negligible) |
| Hybrid + Rerank | ~7-11s — adds ~1.5s for Haiku reranker; the only mode where pre-LLM latency is noticeable |
| LLM Dominance | Answer generation is ~75% of wall time in every mode — non-LLM stages total under 400ms |
Code Showcase 20
Vector Staleness — Compare-and-Swap Flow
The three-step freshness check that runs for every file on every pipeline execution. 90%+ of files are skipped_unchanged — the freshness check is a single lightweight OpenSearch query per file. Scanning 50 unchanged files adds ~250ms total.
# For each file in the Google Drive folder:
# Step 1: COMPARE — query OpenSearch for stored timestamp
latest = get_latest_source_modified(
drive_id=record.drive_file_id
)
if latest >= record.modified_time:
return IndexResult(status="skipped_unchanged") # ~5ms
# Step 2: PURGE — scoped delete of all old chunks
delete_docs_for_drive_file(
drive_id=record.drive_file_id,
conflicts="proceed", # concurrent searches never blocked
refresh=True, # cleared state immediately visible
)
# Step 3: RE-INDEX — extract, chunk, embed, bulk-index
text = _extract_text(record)
chunks = _chunk_text(text, max_tokens=300)
safe = [_split_for_embed(c, max_chars) for c in chunks]
vectors = embed_fn(safe) # Titan Embed v2, batch=200
os_client.index_chunks(
docs=[
{
"chunk_id": f"{record.drive_file_id}:{i}",
"source_modified_at": record.modified_time,
# ... content, embedding, metadata
}
for i, vec in enumerate(vectors)
]
)
# Typical pipeline run log:
# files=47 indexed=3 skipped_unchanged=42
# skipped_no_text=2 total_time=38.12s| Property | Detail |
|---|---|
| Compare Cost | ~5ms per file — single OpenSearch query for the latest source_modified_at by drive_id |
| Skip Rate | 90%+ of files are skipped_unchanged in a typical re-run; scanning 50 files adds ~250ms total |
| Purge Safety | conflicts=proceed ensures concurrent searches are never blocked; refresh=true makes cleared state immediately visible |
| Chunk ID Scheme | {drive_id}:{index} — delete_by_query wipes all old chunks by drive_id regardless of how many the old version produced |
| Zero Downtime | 1-5 second gap per file while re-indexing; searches for other documents are unaffected |
Code Showcase 21
Cross-Backend Freshness Strategies
Both backends implement freshness checking through the IndexBackend Protocol, but with different strategies suited to their storage characteristics. Callers control the tradeoff between thoroughness and cost via flags.
Backend Compare Strategy Catches Flags
──────────────────────────────────────────────────────────────────────
OpenSearch source_modified_at All edits (content replace_if_newer
timestamp comparison + metadata changes) skip_if_existing
S3 (AWS) md5 + modified_time Content edits only replace_if_newer
hash comparison (misses metadata- skip_if_existing
only changes)
Both backends conform to the same IndexBackend Protocol:
class IndexBackend(Protocol):
def upsert_file(
self, *, company, index_id,
file: DriveFileRecord,
replace_if_newer: bool = True,
) -> None: ...| Property | Detail |
|---|---|
| OpenSearch | Timestamp comparison — catches all edits including metadata-only changes; higher per-check cost (~5ms) |
| S3 | MD5 + timestamp — cheaper comparison but misses metadata-only edits; sufficient for content-driven workflows |
| Protocol Conformance | Both backends satisfy the same IndexBackend Protocol — callers don't know which freshness strategy runs |
| Caller Control | replace_if_newer and skip_if_existing flags let pipelines choose thoroughness vs. cost per use case |
Code Showcase 22
Context Assembly — build_context()
The context assembly function that enforces hard budgets, relevance ordering, and structured document headers. Each chunk carries metadata (index, score, chunk ID, source path) that serves as document boundary markers, citation anchors, and provenance signals.
def build_context(hits, max_total=12000, max_per_hit=4000):
"""Assemble retrieved chunks into bounded, structured context."""
context_parts = []
total_chars = 0
# Hits arrive in relevance order (kNN score or rerank score)
for i, hit in enumerate(hits):
content = hit["content"][:max_per_hit] # cap per chunk
if total_chars + len(content) > max_total:
break # budget exhausted — least important dropped
# Structured header: boundary marker + citation anchor
header = (
f"[Document {i+1}] "
f"Score: {hit['_score']:.3f} | "
f"Chunk: {hit['chunk_id']} | "
f"Source: {hit['path']}"
)
context_parts.append(f"{header}\n{content}")
total_chars += len(content)
return "\n\n---\n\n".join(context_parts)
# Result: 5-8 focused chunks, ~12,000 chars total
# Highest-scoring chunk at position 1 (highest attention)
# Least important material truncated by budget (correct failure mode)| Property | Detail |
|---|---|
| Total Budget | 12,000-char ceiling — keeps the model under the threshold where lost-in-the-middle effects emerge |
| Per-Chunk Cap | 4,000-char limit per hit — oversized chunks (extracted tables) are truncated, not allowed to consume the budget |
| Relevance Ordering | Highest-scoring chunk at position 1 (highest model attention); least important at the bottom and potentially dropped |
| Structured Headers | Document index, score, chunk_id, source path — boundary markers prevent conflation, enable inline citation |
Code Showcase 23
Prompt Construction — System/User Split
Behavioral constraints go in the system prompt. Retrieved evidence and the question go in the user prompt with a clear separator and explicit label. This visual boundary ensures the model distinguishes task instructions from source evidence.
SYSTEM PROMPT:
You are a compliance documentation assistant.
Answer ONLY using the provided context.
Cite sources inline using [Document N] references.
If the context does not contain the answer, say so.
---
USER PROMPT:
Context (the ONLY allowed facts):
[Document 1] Score: 0.923 | Chunk: abc123:3 | Source: IRP_v2.docx
The incident response team shall acknowledge P1 incidents
within 15 minutes of detection. The SIEM platform generates
automated alerts...
---
[Document 2] Score: 0.891 | Chunk: def456:7 | Source: IRP_v2.docx
Escalation procedures require notification of the CISO
within 30 minutes for incidents classified as...
---
[Document 3] Score: 0.834 | Chunk: ghi789:1 | Source: roles.docx
The Security Operations Center (SOC) maintains 24/7...
---
Question: Describe the incident response acknowledgment SLA
and escalation procedures.| Property | Detail |
|---|---|
| System/User Boundary | Behavioral rules in system prompt, evidence + question in user prompt — model can't confuse instructions with source material |
| Explicit Label | 'Context (the ONLY allowed facts)' — anchors the model to retrieved evidence, discourages fabrication |
| Document Boundaries | --- separators + [Document N] headers prevent the model from conflating content across different source files |
| Citation Anchors | Chunk IDs in headers enable inline citation — the model can reference specific chunks in its answer |
Code Showcase 24
Tiered Placeholder Resolution
Template placeholders are resolved through a four-tier hierarchy before reaching the RAG layer. Known values never consume context window budget — only genuinely unknown values trigger document retrieval.
Tier Source Example Cost
──────────────────────────────────────────────────────────────────────
1 Static client profile {{COMPANY_NAME}} ~0ms
(config/profile.json) → "SearchStax" (instant)
2 In-memory small docs {{SIEM_TOOL}} ~0ms
(pre-loaded at startup) → "Splunk Enterprise" (cached)
3 RAG retrieval {{INCIDENT_SLA}} ~300ms
(OpenSearch kNN) → retrieved from IRP docs (search)
4 Explicit unknown marker {{RETENTION_PERIOD}} ~0ms
(unresolvable) → "{{UNKNOWN}}" (flagged)
Resolution order: try tier 1, then 2, then 3, then 4.
Known values are resolved BEFORE the prompt is sent to RAG —
they never enter the context window, preserving budget for
values that genuinely require document retrieval.| Property | Detail |
|---|---|
| Tier 1: Profile | Company name, contact info, tool names — resolved from static config at ~0ms |
| Tier 2: Cached Docs | Small reference documents pre-loaded at startup — no search overhead |
| Tier 3: RAG | Only values that genuinely require document retrieval consume context budget (~300ms) |
| Tier 4: Unknown | Unresolvable placeholders become explicit {{UNKNOWN}} markers — visible to the reviewer, not hallucinated |