Production RAG Infrastructure on AWS
A reusable retrieval layer running on AWS OpenSearch and Amazon Bedrock embeddings. It grounds every downstream LLM pipeline in the actual documentation of the system it's answering about.
Problem
LLM workflows for compliance and documentation need grounded retrieval. Without it the model hallucinates, and the answers aren't specific to the system you're asking about.
Solution
Built a shared RAG backend that chunks documents, generates Bedrock embeddings, indexes them into OpenSearch, and hands back namespace scoped retrieval to whichever downstream pipeline needs it.
Impact
- →Grounded retrieval across every AI workflow that plugs in
- →One retrieval layer to maintain instead of one per pipeline
- →Every generated output now backs itself with semantic search results, which made the whole thing far more reliable
Architecture
- 01Documents get embedded with Titan embeddings from Bedrock
- 02Vectors and metadata land in OpenSearch
- 03A backend abstraction handles upserts and filtered retrieval
- 04Pipelines query the service with namespace and document-type filters
Capabilities
- ·Semantic indexing and retrieval
- ·Namespace scoped search
- ·Document chunk ingestion
- ·Shared backend for multiple pipelines
- ·AWS authenticated service integration
Stack
Technical Deep Dive
Architecture internals and annotated code from the production system.
Architecture Overview
A ports and adapters (hexagonal) architecture with four distinct abstraction layers. Any pipeline, whether it's IRP generation, SCRM, CONMON, or policy drafting, plugs into the same retrieval infrastructure through a well defined interface. The retrieval layer is deliberately decoupled, so switching embedding providers or adding a new vector store never requires pipeline code changes.
Key Architectural Decisions
Protocol Over ABC for the Backend Interface
IndexBackend uses Python Protocol, which is structural typing, not ABC, which is nominal typing. Any class with matching method signatures satisfies the interface without inheriting from it. Three implementations already conform. OpenSearchIndexBackend, AwsIndexBackend (S3 plus OpenAI embeddings), and the OpenAI Vector Store path. No shared base class in sight.
DriveFileRecord as Universal Data Contract
Every document in the system turns into a DriveFileRecord first. drive_file_id, name, mime_type, modified_time, md5, and content bytes. That's the normalization point. Upstream pipelines have no idea whether the document will end up in OpenSearch, S3, or OpenAI's vector store. They don't need to.
Separated retrieval_query from prompt
BackendExecutor.process() takes retrieval_query as a parameter separate from prompt. The text that goes to OpenSearch for kNN matching can be tuned independently from the text going to the LLM. retrieval_query_builder.py extracts {{placeholders}} from markdown templates and builds targeted semantic queries, while the LLM gets the full template with instructions.
Namespace Isolation for Multi Pipeline Reuse
One OpenSearch domain, one index, documents for every compliance pipeline. The namespace field is a term filter on every kNN query. IRP generator only sees IRP documents. SCRM only sees SCRM documents. Same index, same embeddings, same infrastructure, different logical partitions.
RAG Factory as Dependency Injection Point
rag_factory.py's build_opensearch_bedrock_rag() wires OpenSearchClient, BedrockEmbeddingsClient, and BedrockLLMClient into one RAG instance. The factory reads all infrastructure config from environment variables. Same code deploys to dev (basic auth, t3.small) and prod (SigV4, encrypted domain) without a line changing.
FedRAMP Layer 1 Deliberately Excluded from RAG
The enrichment pipeline deliberately skips RAG. It trusts structured metadata, not retrieved documents, precisely because RAG introduces hallucination risk. The README says it directly. Raw KSI prose cannot be the sole LLM input, and the LLM receives a structured package of source fields instead.
Fixed Size Word Window Over Semantic Chunking
The source documents are compliance policy prose. IRPs, SCRM docs, security procedures. Dense, self-contained paragraphs where a 300 word window captures a complete thought. Small uniform chunks produce tight embedding vectors that score sharply on kNN. Larger semantic chunks dilute the embedding, and a 3 page section on incident severity will weakly match everything. With the over-fetch strategy, 5 sharp small chunks beat 1 diluted large chunk every time.
No Overlap to Avoid Embedding Cost Explosion
Chunks don't share context at the boundaries. Overlap chunking (LangChain's usual recipe, 200 words with 50 word overlap) would add about 25% more Titan Embed v2 API calls with no measurable retrieval gain on prose. On a t3.small.search domain, storage and compute savings actually matter.
Predictable Chunk IDs for Freshness Aware Dedup
index_drive_file() checks source_modified_at and skips unchanged documents. When a document changes, the pipeline deletes every existing chunk and re-indexes from scratch. Fixed size chunking gives me stable chunk_id = f"{drive_id}:{idx}" identifiers. Semantic chunking would shift chunk boundaries between versions, which breaks chunk-ID based dedup.
Hybrid Search (kNN + BM25) as Opt In Extension
Pure kNN works fine for policy prose. Queries with control IDs like AC-2(4), tool names like sshd_config, or CLI flags need lexical precision vector similarity misses. Hybrid search fuses normalized kNN and BM25 scores with configurable weights. Documents appearing in only one result set get 0.0 on the other. Single dimension matches get penalized automatically.
Backwards Compatibility via Keyword Only Defaults
Every hybrid parameter is keyword-only with a default. use_hybrid=False (existing behavior unchanged), vector_weight=0.7, keyword_weight=0.3. No positional args got added that could break existing callers. Zero test files touch the modified methods. Opting in is one flag. Opting out is no code change at all.
LLM Reranker as Late Interaction Stage
After bi-encoder retrieval hands me 20 candidates, a reranker (Claude Haiku at temperature=0) scores each chunk 0 to 10 against the actual question. That catches what vector similarity misses. Chunks that share vocabulary but come from the wrong context (a different policy's "advisory board"), the wrong control (KSI-CMT-02 instead of KSI-CMT-01), or irrelevant lexical overlap ("membership" in an org chart). Only the top scorers make it into context.
Reranking as Quality Optimization, Not Correctness Gate
If the reranker LLM call fails or returns unparseable JSON, the function silently falls back to the original hits truncated to top_n. The pipeline never breaks on a reranker error. Reranking improves answer quality. It isn't a correctness gate. So opting in is safe even if the downstream model hiccups.
Metadata Filtering via Dynamic Mapping, No Migration
One metadata_filters dict flows from any entry point down to OpenSearch's bool.filter clause. Each key-value pair becomes a term filter on doc.metadata.<key>. Binary include or exclude, applied before scoring. OpenSearch dynamic: true auto-creates keyword fields for any new string written under doc.metadata, so there's no PUT _mapping call. The first document indexed with project_id creates the field. The first query that filters on it uses the auto-created index.
Three Levels of Isolation: Company, Namespace, Metadata
Three isolation levels, all built on the same term filter mechanism. Company (doc.company, always applied) gives tenant isolation. Namespace (doc.namespace, always applied) gives collection isolation (IRP vs SCRM vs CONMON). Metadata (doc.metadata.*, opt in via metadata_filters) gives fine-grained filters on project, doc type, classification, or version. Documents without a metadata field are correctly excluded from project-scoped queries. A term filter on a missing field returns no match.
Latency Budget: LLM Dominates, Everything Else Under 400ms
A full retrieve-and-answer cycle takes roughly 5 to 8 seconds. The LLM eats about 75% of that (4 to 6s). Everything else, query embedding (~200ms Bedrock), kNN search (~100ms OpenSearch), filtering, context assembly, totals under 400ms combined. Six optimizations keep the non-LLM stages fast. Aggressive boto3 timeouts. Client singletons that reuse connection pools. Credential caching with 600s TTL. Over-fetch with local score cutoff. Single text embedding on the query path. Cross region inference profile routing.
Deliberate Non-Optimizations
Response streaming (invoke_model_with_response_stream) is excluded from the IAM policy on purpose. A simplicity tradeoff. It would drop perceived TTFT from roughly 5s to 500ms. OpenSearch connection pool tuning uses opensearch-py defaults because single digit QPS on a t3.small domain isn't connection-bound. Formal latency instrumentation exists for ingest (files per second, bytes per second through RunMetrics), but not for the query path. A known gap, not an oversight.
Vector Staleness Management, Per File Compare and Swap
Compliance documentation changes constantly. When a source document changes, every embedding derived from the old version is stale, which means the LLM is getting irrelevant context. The ingestion pipeline runs a timestamp check against Google Drive's modified_time for every file. Compare (query OpenSearch for the stored source_modified_at, skip if unchanged at roughly 5ms). Purge (scoped delete_by_query by drive_id with conflicts=proceed and refresh=true). Re-index (extract, chunk, embed, bulk-index with a fresh timestamp).
No Downtime During Re-Indexing
The delete-then-index sequence creates a 1 to 5 second window where one file has zero chunks. Searches that don't reference that file are unaffected. Searches that do reference it still pull context from other relevant documents. The index serves document generation pipelines, not real-time user-facing search, so the gap is invisible in practice. The chunk ID scheme ({drive_id}:{index}) means delete_by_query wipes every old chunk by drive_id no matter how many chunks the old version produced.
Cross Backend Consistency via IndexBackend Protocol
Same IndexBackend Protocol, different strategies. OpenSearch compares timestamps, which catches every edit. S3 compares md5 plus modified_time, cheaper but misses metadata-only edits. Both backends support skip_if_existing and replace_if_newer flags, so the caller picks the tradeoff between thoroughness and cost.
Hard Context Budgets to Prevent Lost in the Middle
The context assembly layer caps total output at 12,000 characters and individual chunks at 4,000. With 300 word chunks averaging roughly 1,200 characters, the model gets 5 to 8 focused chunks per query. That's well below the 20-plus document threshold where the lost-in-the-middle effect actually shows up in the literature. Oversized chunks get truncated rather than allowed to eat the whole budget.
Separated Retrieval Query and LLM Prompt
The text going to OpenSearch for vector or keyword matching is a tight semantic query, optimized for retrieval precision (something like "IRP section: roles and responsibilities. Find values for SIEM, infrastructure scanner"). The text going to the LLM is the full task instruction, optimized for generation quality. Without that separation, boilerplate instructions dilute the embedding vector and make retrieval worse.
Tiered Placeholder Resolution Before RAG
Four tiers of resolution before RAG ever gets called. Static client profile (instant lookup). In-memory small docs (preloaded at startup, no search). RAG retrieval (OpenSearch). Explicit {{UNKNOWN}} markers for values no layer can resolve. Known values like company name or SIEM tool name come from configuration. They never hit the context window, which saves the budget for values that actually need document retrieval.
Code Showcase 1
Protocol Interface, IndexBackend
The contract every storage backend has to satisfy. Python Protocol, structural typing. Any class with matching method signatures conforms. No inheritance required.
class IndexBackend(Protocol):
def ensure_index(
self, *, company, name, if_exists, namespace
) -> str: ...
def upsert_file(
self, *, company, index_id, file, replace_if_newer
) -> None: ...
# Three implementations conform:
# - OpenSearchIndexBackend (services/opensearch_index_backend.py)
# - AwsIndexBackend (services/aws_index_backend.py)
# - OpenAI Vector Store path (pipelines/google_to_openai)| Property | Detail |
|---|---|
| Pattern | Ports-and-adapters. The Protocol is the port, each backend is an adapter |
| Structural Typing | Protocol, not ABC. Implementations don't inherit, they just match the signature |
| Three Backends | OpenSearch (kNN + Bedrock embeddings), AWS S3 (OpenAI embeddings), OpenAI Vector Store |
Code Showcase 2
Retrieval Dispatcher, ensure_retrieval()
The single entry point for backend agnostic ingestion. A pipeline calls ensure_retrieval() with a backend_type and a namespace. It doesn't care about index creation, SigV4 auth, chunking strategy, or embedding dimensions. The dispatcher figures it out.
def ensure_retrieval(
*, app, backend_type, company,
drive_path, namespace, ...
):
if backend_type == "openai":
ensure_vs_and_docs(...)
elif backend_type == "opensearch":
ensure_os_index_and_docs(...)
elif backend_type in ("bedrock", "aws"):
ensure_aws_index_and_docs(...)| Property | Detail |
|---|---|
| Backend Routing | Dispatches to correct ingestion path based on backend_type. OpenAI, OpenSearch, or Bedrock/AWS |
| Index Sanitization | Handles OpenSearch naming rules (lowercase, no special chars) internally |
| Caller Simplicity | Pipeline just specifies backend_type and namespace. All infra complexity is hidden |
Code Showcase 3
Namespace Isolation via kNN Filter
The architectural keystone that makes the retrieval layer reusable across pipelines. One OpenSearch index stores documents for every compliance workflow, partitioned by namespace.
@dataclass
class OpenSearchBedrockRAG:
"""
Intentionally generic so IRP, SCRM, CONMON, etc.
can all reuse it by just changing `namespace`.
"""
company: str
namespace: str = "irp"
# opensearch.py:search_knn
if namespace:
must_filters.append(
{"term": {"doc.namespace": namespace}}
)| Property | Detail |
|---|---|
| Single Index | One OpenSearch domain, one index, one embedding pipeline. No per-pipeline infrastructure duplication |
| Logical Partitions | Namespace term filter on every kNN query isolates IRP, SCRM, CONMON documents |
| Swap One Field | Changing namespace='irp' to namespace='scrm' switches the entire retrieval context |
Code Showcase 4
Source Priority System
RAG is one data source among several, and it has an explicit priority in the multi-source merge. Live API responses beat RAG documents, which beat manual exports. Retrieval degrades gracefully when a higher priority source has the answer.
class SourceAdapter(ABC):
def priority(self) -> int:
"""
1 = live API
2 = custom/service
3 = rag
4 = export
5 = manual
"""
# When the compliance engine merges results,
# a live Vanta API response (priority 1) wins over
# a RAG-retrieved document (priority 3), which wins
# over a manual CSV export (priority 5).| Property | Detail |
|---|---|
| Graceful Degradation | If a live API has the evidence, RAG results are deprioritized automatically. No manual override needed |
| Explicit Ranking | 5-tier priority system makes data source precedence auditable, not implicit |
| Extensible | New source adapters slot in at any priority tier without changing existing merge logic |
Code Showcase 5
Pipeline Plug In Points
Every compliance pipeline talks to the same retrieval infrastructure through ensure_retrieval() and BackendExecutor.process(). FedRAMP Layer 1 is the notable exception. It deliberately skips RAG to avoid hallucinating on retrieved prose.
Pipeline What It Does What It Calls
─────────────────────────────────────────────────────────────────────
IRP Generator Drafts incident ensure_retrieval(namespace="irp")
response plan → BackendExecutor.process()
SCRM Generator Drafts supply chain ensure_retrieval(namespace="scrm")
risk plan → BackendExecutor.process()
CONMON Generator Continuous monitoring ensure_retrieval(namespace="conmon")
docs → BackendExecutor.process()
FedRAMP Layer 1 Requirement enrichment Uses structured JSON input, not RAG
— deliberately excluded
Custom Tests Evidence extraction Can query any namespace for
prompts supporting documentation
Compliance Multi-source evidence RAG adapter at priority=3
Engine merge alongside live APIs and exports| Property | Detail |
|---|---|
| Same Interface | Every RAG-enabled pipeline calls ensure_retrieval() + BackendExecutor.process(). No per-pipeline retrieval code |
| Namespace Swap | Switching from IRP to SCRM to CONMON is a single parameter change |
| Deliberate Exclusion | FedRAMP Layer 1 intentionally skips RAG. Structured metadata over retrieved prose to avoid hallucination risk |
Code Showcase 6
Stage 1: Word Based Fixed Size Chunker
_chunk_text() is a fixed size word window chunker. Split on whitespace, accumulate words into a buffer, emit a chunk when the buffer hits the limit. No overlap, no sentence detection, no heading-aware splitting. Called with max_tokens=300 in production.
def _chunk_text(self, text, max_tokens=500):
words = text.split()
current = []
for w in words:
current.append(w)
if len(current) >= max_tokens:
chunks.append(" ".join(current))
current = []| Property | Detail |
|---|---|
| Strategy | Fixed-size word-window. Not semantic, not recursive, not overlap-based |
| Window Size | Default 500 words, called with max_tokens=300 in production (~1,200 chars per chunk) |
| Why No Overlap | Overlap would increase Titan Embed v2 API calls by ~25% with no retrieval quality gain for prose |
| Why No Semantic Split | Compliance prose degrades gracefully with fixed-size chunking. Unlike code or structured data |
Code Showcase 7
Stage 2: Boundary Aware Safety Splitter
_split_for_embed() enforces the embedding model's hard character limit after the word chunker runs. Default max_chars is 12,000 (via EMBED_MAX_CHARS env var), tuned so Titan Embed v2 stays well under its 8,192 token ceiling (about 4 chars per token).
def _split_for_embed(self, text, max_chars):
# try splitting on a boundary
cut = text.rfind("\n", start, end) # prefer newline break
if cut == -1:
cut = text.rfind(" ", start, end) # fallback to space break
if cut == -1 or cut <= start + (max_chars * 0.5):
cut = end # hard cut if no good boundary
# The two stages chain together in index_drive_file():
chunks = self._chunk_text(text, max_tokens=300)
max_chars = int(os.getenv("EMBED_MAX_CHARS", "12000"))
safe_chunks = []
for c in chunks:
safe_chunks.extend(
self._split_for_embed(c, max_chars=max_chars)
)| Property | Detail |
|---|---|
| Split Priority | Try newline break first, then space break, then hard cut only if no boundary in the back 50% of the window |
| Safety Net | Ensures no chunk exceeds Titan Embed v2's token limit regardless of word-chunker output |
| Two-Stage Chain | Stage 1 (word window) sizes content, Stage 2 (character safety) enforces embedding model limits |
Code Showcase 8
Full Ingestion Pipeline Flow
Full path from a Google Drive document to an indexed chunk in OpenSearch. Freshness aware dedup, multi-format text extraction with binary guardrails, two stage chunking, batched embedding, and bulk upsert.
Google Drive document (PDF, DOCX, Google Doc)
|
v
GoogleDriveIngestor.iter_files()
-> exports Google-native docs to DOCX/XLSX/PPTX
-> downloads binary files directly
-> yields DriveFileRecord(content=bytes)
|
v
OpenSearchIndexBackend.index_drive_file()
|
+- freshness check: get_latest_source_modified()
| +- skip if unchanged (IndexResult: skipped_unchanged)
|
+- _extract_text(record)
| +- text/plain, text/markdown -> direct decode
| +- PDF, DOCX, PPTX, XLSX -> MarkItDown -> markdown
| +- DOCX fallback -> python-docx paragraph + table extraction
| +- binary guardrail: reject if "%PDF-" or "/FlateDecode" in output
|
+- _chunk_text(text, max_tokens=300) <- Stage 1: word-window
|
+- _split_for_embed(chunk, max_chars=12000) <- Stage 2: safety split
|
+- embed_fn(chunks) <- Titan Embed v2, batch=200
|
+- delete_docs_for_drive_file() <- wipe stale chunks
|
+- os_client.index_chunks(docs) <- bulk upsert with retry
|
v
OpenSearch kNN index
mapping: doc.embedding (knn_vector, dim=1024)
filters: doc.company (keyword), doc.namespace (keyword)
metadata: doc.path, doc.chunk_id, doc.source_modified_at| Property | Detail |
|---|---|
| Freshness Check | Compares source_modified_at before re-indexing. Unchanged documents are skipped entirely |
| Multi-Format Extraction | MarkItDown handles PDF, DOCX, PPTX, XLSX; binary guardrail catches corrupt or unconvertible files |
| Two-Stage Chunking | 300-word windows (Stage 1) then 12,000-char safety splits (Stage 2) before embedding |
| Batch Embedding | Titan Embed v2 via Bedrock in batches of 200, producing 1024-dim vectors |
| Atomic Re-Index | On document change: delete all stale chunks first, then bulk upsert new chunks |
Code Showcase 9
Hybrid Search, Score Fusion
When use_hybrid=True, kNN and BM25 results get normalized to [0, 1] independently, then fused with configurable weights. Documents in only one result set get 0.0 on the missing signal, which naturally penalizes single dimension matches. Every hit carries the scoring breakdown for provenance.
# Score fusion algorithm
# For each document appearing in either kNN or BM25 results:
# 1. Normalize kNN scores
knn_norm = (score - min) / (max - min) # -> [0, 1]
# 2. Normalize BM25 scores
bm25_norm = (score - min) / (max - min) # -> [0, 1]
# 3. Fuse with configurable weights
final_score = (
knn_norm * vector_weight
+ bm25_norm * keyword_weight
)
# 4. Sort by final_score descending, return top k
# Each hit includes scoring breakdown:
# hit["_score_detail"] = {
# "knn_normalized": 0.8234,
# "bm25_normalized": 0.9512,
# "vector_weight": 0.5,
# "keyword_weight": 0.5,
# }| Property | Detail |
|---|---|
| Min-Max Normalization | Both kNN and BM25 scores normalized to [0, 1] before fusion. Different score scales don't bias the result |
| Missing Signal = 0.0 | Documents in only one result set get 0.0 for the other, penalizing single-dimension matches |
| Score Provenance | Every hit carries _score_detail with normalized scores and weights for debugging and auditability |
Code Showcase 10
Hybrid Search, Usage and Recommended Weights
Hybrid search is opt in both at the BackendExecutor level (for pipelines) and at the RAG layer directly. Weight recommendations move with query type. Semantic similarity wins for policy prose. Lexical precision wins for control IDs and configuration artifacts.
# Via BackendExecutor (any pipeline):
text, meta = executor.process(
prompt=prompt,
instructions=instructions,
retrieval_query=query,
use_hybrid=True,
vector_weight=0.5,
keyword_weight=0.5,
)
# meta["retrieval"]["hybrid"] == True
# Directly from RAG layer:
# Pure kNN (default, unchanged behavior)
hits = rag.retrieve(
"incident response roles and responsibilities"
)
# Hybrid (kNN + BM25)
hits = rag.retrieve(
"KSI-CMT-01 CM-3 change advisory board",
use_hybrid=True,
vector_weight=0.5,
keyword_weight=0.5,
)| Property | Detail |
|---|---|
| Policy prose (IRP, SCRM, CONMON) | vector=0.7, keyword=0.3. Semantic similarity dominates; keywords break ties |
| Control/KSI evidence lookup | vector=0.5, keyword=0.5. Control IDs like AC-2(4) and tool names need lexical precision |
| Configuration artifact search | vector=0.3, keyword=0.7. Sshd_config, SSLProtocol, exact CLI flags need lexical match |
| Backwards Compatibility | Default use_hybrid=False. All existing callers get unchanged pure kNN behavior with zero code changes |
Code Showcase 11
Full Retrieval Pipeline, 3 Stage Flow
Full retrieve() call chain. Bi-encoder retrieval (kNN or hybrid) fetches 20 candidates. The LLM reranker scores and trims to the top 5. Context assembly caps and concatenates the hits. Final answer generation runs on that bounded context.
retrieve(question, k=20, rerank=True, rerank_top_n=5)
|
+------------------+------------------+
| STAGE 1: Bi-Encoder Retrieval |
| |
| question --> Titan Embed v2 --> vec |
| | |
| OpenSearch <-- kNN or hybrid <+ |
| | |
| +-> 20 candidate hits (scored |
| by cosine +/- BM25) |
+------------------+------------------+
|
+------------------+------------------+
| STAGE 2: LLM Reranker |
| |
| For each of 20 hits: |
| send first 800 chars + question |
| to Claude (Haiku) at temp=0 |
| |
| Claude scores each chunk 0-10: |
| 10 = directly answers question |
| 7-9 = highly relevant detail |
| 4-6 = partially relevant |
| 1-3 = mostly irrelevant |
| 0 = completely irrelevant |
| |
| Sort by score desc -> return top 5 |
+------------------+------------------+
|
+------------------+------------------+
| STAGE 3: Context Assembly + Answer |
| |
| build_context(5 hits) |
| cap each hit at 4,000 chars |
| cap total at 12,000 chars |
| |
| llm_client.chat() -> final answer |
+-------------------------------------+| Property | Detail |
|---|---|
| Stage 1 | Bi-encoder retrieval via Titan Embed v2. KNN or hybrid (kNN + BM25) returning 20 candidates |
| Stage 2 | LLM reranker (Claude Haiku, temp=0) scores each chunk 0-10, prunes to top 5 |
| Stage 3 | Context assembly caps hits (4K chars each, 12K total) then generates the final answer |
| Default Flow | k=20 -> rerank -> top 5 (or k//2 with floor of 3) -> context -> answer |
Code Showcase 12
LLM Reranker, rerank_hits()
The reranker function at rag_bedrock_opensearch.py:35-118. Five steps. Build chunk previews, score them in one Bedrock call with a strict JSON contract, normalize and clamp the scores, sort with the original retrieval order as tiebreaker, fall back silently on failure.
def rerank_hits(*, question, hits, llm_client,
top_n, preview_chars=800, model_id):
# Step 1: Build chunk previews (lines 56-63)
# First 800 chars of content + source path per hit
previews = [
{"index": i, "text": hit["content"][:preview_chars],
"source": hit["path"]}
for i, hit in enumerate(hits)
]
# Step 2: LLM relevance scoring (lines 71-78)
# One Bedrock call, temperature=0.0, strict JSON contract:
# [{"index": 0, "score": 8}, {"index": 1, "score": 3}, ...]
# Step 3: Score normalization + clamping (lines 93-98)
# Each score clamped to [0, 10] — prevents hallucinated
# scores of 15 or -1 from distorting the ranking
# Step 4: Sort and prune (lines 101-113)
# Sort by rerank score desc, original order as tiebreaker
# Only top_n survive; each gets _rerank_score attached
# Step 5: Graceful fallback (lines 115-118)
# If LLM call fails or returns unparseable JSON,
# silently return original hits truncated to top_n
# Reranking is a quality optimization, not a gate| Property | Detail |
|---|---|
| Preview Limit | Only first 800 chars per chunk sent to reranker. Controls cost without sacrificing judgment quality on compliance prose |
| Score Clamping | Scores clamped to [0, 10]. Prevents LLM hallucination of out-of-range scores from distorting ranking |
| Tiebreaker | Original retrieval order breaks ties. Preserves kNN/hybrid ranking signal when rerank scores are equal |
| Graceful Fallback | On any failure, returns original hits truncated to top_n. Pipeline never breaks from a reranker error |
Code Showcase 13
Reranking, Call Chain and Cost
Reranking propagates through every public entry point via keyword arguments. At roughly $0.0005 per reranked retrieval with Haiku, $1 buys you 2,000 reranked retrievals. The 1 to 2 second latency add is fine for document generation pipelines. Less fine for interactive Q&A.
# Full call chain — rerank propagates through all entry points:
BackendExecutor.process(rerank=True, rerank_top_n=5)
# -> opensearch_backend_process() (pops rerank kwargs)
# -> generate_from_prompt(rerank=...) (passes through)
# -> retrieve(rerank=...) (calls rerank_hits())
answer_question(rerank=True, rerank_top_n=5)
# -> retrieve(rerank=...) (same path)
retrieve(rerank=True, rerank_top_n=5)
# -> rerank_hits() directly
# Configuration via dataclass fields:
rag = OpenSearchBedrockRAG(
os_client=os_client,
embed_client=embed_client,
llm_client=llm_client, # Sonnet for answers
company="searchstax",
namespace="irp",
rerank_model_id="anthropic.claude-3-5-haiku-20241022-v1:0",
rerank_preview_chars=800,
)| Property | Detail |
|---|---|
| Reranker Input Cost | ~20 chunks x 800 chars / 4 = ~4,000 tokens at ~$0.0004 (Haiku) |
| Reranker Output Cost | ~20 score objects = ~200 tokens at ~$0.0001 (Haiku) |
| Added Latency | 1 Bedrock round-trip = ~1-2 seconds. Acceptable for document generation, less so for interactive Q&A |
| Default top_n | k // 2 with floor of 3. K=20 produces 10 after reranking, k=8 produces 4, or override explicitly |
Code Showcase 14
Consolidated Filter Builder, _build_filters()
The filter builder that used to be duplicated across search_knn and search_hybrid. Now it lives in one static method. Company and namespace are always applied. metadata_filters expand to term filters on doc.metadata.<key> when they're provided.
@staticmethod
def _build_filters(company, namespace, metadata_filters=None):
filters = []
if company:
filters.append({"term": {"doc.company": company}})
if namespace:
filters.append({"term": {"doc.namespace": namespace}})
for key, value in (metadata_filters or {}).items():
filters.append(
{"term": {f"doc.metadata.{key}": value}}
)
return filters
# Both search_knn() and search_hybrid() now delegate
# to _build_filters() — no duplicated filter logic| Property | Detail |
|---|---|
| Consolidation | Filter logic extracted from two search methods into one static method. Single source of truth |
| Always-On Filters | Company and namespace are always applied; metadata_filters are additive and opt-in |
| Dynamic Key Expansion | Each metadata key-value pair becomes a term filter on doc.metadata.<key>. No hardcoded field names |
Code Showcase 15
Metadata Filtering, End to End Usage
Documents get tagged at index time through extra_metadata on the backend dataclass. At query time, metadata_filters flows from any entry point (BackendExecutor, retrieve, answer_question) down to OpenSearch's bool.filter clause. Filters run before scoring. Non-matching documents never appear in the result set.
# At indexing time — tag documents with arbitrary metadata:
backend = OpenSearchIndexBackend(
os_client=os_client,
embed_fn=embed_fn,
company="searchstax",
namespace="irp",
extra_metadata={
"project_id": "fedramp-2024",
"doc_type": "policy",
"classification": "cui",
},
)
# Every chunk now has:
# doc.metadata.project_id = "fedramp-2024"
# doc.metadata.doc_type = "policy"
# doc.metadata.classification = "cui"
# At query time — filter to matching documents:
hits = rag.retrieve(
"incident response roles and responsibilities",
metadata_filters={"project_id": "fedramp-2024"},
)
# Combine with hybrid search + reranking:
hits = rag.retrieve(
"KSI-CMT-01 change advisory board",
use_hybrid=True,
rerank=True,
metadata_filters={
"project_id": "fedramp-2024",
"doc_type": "policy",
},
)
# Through BackendExecutor:
text, meta = executor.process(
prompt=prompt,
instructions=instructions,
metadata_filters={"project_id": "A"},
)
# meta["retrieval"]["metadata_filters"] == {"project_id": "A"}| Property | Detail |
|---|---|
| Indexing | extra_metadata dict on the backend dataclass. Merged into every chunk's metadata alongside mime_type, drive_id, drive_name |
| Querying | metadata_filters dict at any entry point. Each key-value becomes a term filter applied before scoring |
| Composable | Combines freely with namespace, hybrid search, and reranking. All features are independent and additive |
| Provenance | BackendExecutor records metadata_filters in returned meta dict for downstream traceability |
Code Showcase 16
Generated OpenSearch Query with Metadata Filter
The actual OpenSearch query sent when metadata_filters is provided. Filter clauses run before kNN scoring. Documents without the specified metadata field get excluded from the candidate set entirely, regardless of how similar their vectors are.
{
"query": {
"bool": {
"filter": [
{"term": {"doc.company": "searchstax"}},
{"term": {"doc.namespace": "irp"}},
{"term": {"doc.metadata.project_id": "A"}}
],
"must": [
{
"knn": {
"doc.embedding": {
"vector": ["..."],
"k": 40
}
}
}
]
}
}
}| Property | Detail |
|---|---|
| Pre-Score Filtering | Filter clauses execute before kNN scoring. Non-matching documents are excluded from the candidate set entirely |
| No Migration | OpenSearch dynamic mapping auto-creates keyword fields for new metadata keys on first ingest |
| Missing Field Behavior | Documents indexed before extra_metadata was set don't have the field. Term filter returns no match, correctly excluding them |
Code Showcase 17
Three Levels of Isolation
Three levels of isolation that compose on the same term filter mechanism. From coarse tenant isolation down to fine grained metadata attributes.
Level Mechanism Use Case
─────────────────────────────────────────────────────────────────
Company doc.company (always applied) Tenant isolation:
SearchStax vs. Acme
Namespace doc.namespace (always applied) Document collection:
IRP vs. SCRM vs. CONMON
Metadata doc.metadata.* (opt-in via Fine-grained:
metadata_filters) project, doc type,
classification, version| Property | Detail |
|---|---|
| Company | Always applied. Tenant isolation between organizations sharing the same OpenSearch domain |
| Namespace | Always applied. Logical partition per compliance pipeline (IRP, SCRM, CONMON) |
| Metadata | Opt-in via metadata_filters. Project_id, doc_type, classification, version, or any arbitrary key |
| Composable | All three levels stack. A query can filter by company + namespace + multiple metadata fields simultaneously |
Code Showcase 18
Latency Critical Path, 4 Network Hops
Critical path for a single retrieval. Embed query (~200ms Bedrock), kNN search (~100ms OpenSearch), build context (~0ms local), LLM answer (~4 to 6s Bedrock). Six optimizations keep the non-LLM stages fast.
# Critical path: embed -> search -> context -> answer
# ~200ms ~100ms ~0ms ~4-6s
# (Bedrock) (OpenSearch) (local) (Bedrock)
# 1. Aggressive boto3 timeout (bedrock_llm.py:42-46)
Config(
read_timeout=120,
connect_timeout=10, # fail fast on unreachable
retries={"max_attempts": 3, "mode": "standard"},
)
# 2. Client singleton with connection pool reuse
# OpenSearchClient, BedrockEmbeddingsClient, BedrockLLMClient
# constructed once, injected into RAG dataclass.
# First call: ~200-500ms (credential + TLS handshake)
# Subsequent: reuse warm connection
# 3. Credential caching with TTL (ttl_cache.py)
# Google Drive SA keys + Vanta tokens cached 600s
# Without: +100ms Secrets Manager round-trip per call
# 4. Over-fetch + local score cutoff (opensearch.py:229-287)
# Fetch k * fetch_mult (5x), threshold in Python
# Avoids unreliable server-side min_score
# 5. Single-text embedding for query path
# One question -> one InvokeModel call, no batch overhead
# Batch path (EMBED_BATCH_SIZE=200) reserved for ingest
# 6. Cross-region inference profile routing
modelId = os.getenv(
"BEDROCK_EMBEDDINGS_INFERENCE_PROFILE_ID"
) or model_id
# Routes to lowest-latency region, saves 50-100ms| Property | Detail |
|---|---|
| Connect Timeout | 10s connect_timeout fails fast on unreachable Bedrock. Retries immediately rather than blocking for 120s |
| Connection Reuse | Boto3 clients maintain internal HTTP pools; first call pays TLS cost, later calls reuse warm connections |
| Credential Cache | 600s TTL on Secrets Manager lookups. Eliminates ~100ms per call for Google Drive SA keys and Vanta tokens |
| Over-Fetch | Fetch 5x results, threshold locally. Trades larger payload for guaranteed filtering without a second query |
| Cross-Region Routing | Bedrock inference profile auto-routes to lowest-latency region, saving 50-100ms under load |
Code Showcase 19
Latency by Configuration Mode
Wall time moves with retrieval mode. The hybrid BM25 pass adds roughly 80ms, which is nothing. The rerank LLM call adds about 1.5s with Haiku. That's the only mode where pre-LLM latency is actually noticeable.
Mode Extra Hops Estimated Total
────────────────────────────────────────────────────────────────────────────
Pure kNN (default) embed + search + LLM ~5-8s
Hybrid embed + 2 searches + fusion ~5.5-8.5s
(use_hybrid=True) + LLM
Hybrid + Rerank embed + 2 searches + rerank LLM ~7-11s
+ answer LLM
Optimization Not Taken Reason
────────────────────────────────────────────────────────────────────────────
Response streaming Excluded from IAM policy — deliberate
(invoke_model_with_ simplicity tradeoff. Would drop perceived
response_stream) TTFT from ~5s to ~500ms.
OpenSearch connection opensearch-py defaults used. Single-digit
pool tuning QPS on t3.small isn't connection-bound.
Formal query-path RunMetrics tracks ingest perf (files/s,
latency instrumentation bytes/s) but no perf_counter on retrieve().| Property | Detail |
|---|---|
| Pure kNN | ~5-8s total. Embed (200ms) + search (100ms) + LLM answer (4-6s) |
| Hybrid | ~5.5-8.5s. Adds ~80ms for BM25 pass and score fusion (negligible) |
| Hybrid + Rerank | ~7-11s. Adds ~1.5s for Haiku reranker; the only mode where pre-LLM latency is noticeable |
| LLM Dominance | Answer generation is ~75% of wall time in every mode. Non-LLM stages total under 400ms |
Code Showcase 20
Vector Staleness, Compare and Swap Flow
Three step freshness check that runs on every file on every pipeline execution. 90% or more of files are skipped unchanged. The check is one lightweight OpenSearch query per file. Scanning 50 unchanged files adds about 250ms total.
# For each file in the Google Drive folder:
# Step 1: COMPARE — query OpenSearch for stored timestamp
latest = get_latest_source_modified(
drive_id=record.drive_file_id
)
if latest >= record.modified_time:
return IndexResult(status="skipped_unchanged") # ~5ms
# Step 2: PURGE — scoped delete of all old chunks
delete_docs_for_drive_file(
drive_id=record.drive_file_id,
conflicts="proceed", # concurrent searches never blocked
refresh=True, # cleared state immediately visible
)
# Step 3: RE-INDEX — extract, chunk, embed, bulk-index
text = _extract_text(record)
chunks = _chunk_text(text, max_tokens=300)
safe = [_split_for_embed(c, max_chars) for c in chunks]
vectors = embed_fn(safe) # Titan Embed v2, batch=200
os_client.index_chunks(
docs=[
{
"chunk_id": f"{record.drive_file_id}:{i}",
"source_modified_at": record.modified_time,
# ... content, embedding, metadata
}
for i, vec in enumerate(vectors)
]
)
# Typical pipeline run log:
# files=47 indexed=3 skipped_unchanged=42
# skipped_no_text=2 total_time=38.12s| Property | Detail |
|---|---|
| Compare Cost | ~5ms per file. Single OpenSearch query for the latest source_modified_at by drive_id |
| Skip Rate | 90%+ of files are skipped_unchanged in a typical re-run; scanning 50 files adds ~250ms total |
| Purge Safety | conflicts=proceed ensures concurrent searches are never blocked; refresh=true makes cleared state immediately visible |
| Chunk ID Scheme | {drive_id}:{index}. Delete_by_query wipes all old chunks by drive_id regardless of how many the old version produced |
| Zero Downtime | 1-5 second gap per file while re-indexing; searches for other documents are unaffected |
Code Showcase 21
Cross Backend Freshness Strategies
Both backends conform to IndexBackend Protocol, but each picks its own strategy. OpenSearch compares timestamps (catches all edits, 5ms per check). S3 compares md5 plus modified_time (cheaper but misses metadata-only edits). Caller controls the tradeoff via flags.
Backend Compare Strategy Catches Flags
──────────────────────────────────────────────────────────────────────
OpenSearch source_modified_at All edits (content replace_if_newer
timestamp comparison + metadata changes) skip_if_existing
S3 (AWS) md5 + modified_time Content edits only replace_if_newer
hash comparison (misses metadata- skip_if_existing
only changes)
Both backends conform to the same IndexBackend Protocol:
class IndexBackend(Protocol):
def upsert_file(
self, *, company, index_id,
file: DriveFileRecord,
replace_if_newer: bool = True,
) -> None: ...| Property | Detail |
|---|---|
| OpenSearch | Timestamp comparison. Catches all edits including metadata-only changes; higher per-check cost (~5ms) |
| S3 | MD5 + timestamp. Cheaper comparison but misses metadata-only edits; sufficient for content-driven workflows |
| Protocol Conformance | Both backends satisfy the same IndexBackend Protocol. Callers don't know which freshness strategy runs |
| Caller Control | replace_if_newer and skip_if_existing flags let pipelines choose thoroughness vs. Cost per use case |
Code Showcase 22
Context Assembly, build_context()
Context assembly that enforces hard budgets, relevance ordering, and structured document headers. Each chunk carries its index, score, chunk_id, and source path. Those double as boundary markers, citation anchors, and provenance signals for the LLM.
def build_context(hits, max_total=12000, max_per_hit=4000):
"""Assemble retrieved chunks into bounded, structured context."""
context_parts = []
total_chars = 0
# Hits arrive in relevance order (kNN score or rerank score)
for i, hit in enumerate(hits):
content = hit["content"][:max_per_hit] # cap per chunk
if total_chars + len(content) > max_total:
break # budget exhausted — least important dropped
# Structured header: boundary marker + citation anchor
header = (
f"[Document {i+1}] "
f"Score: {hit['_score']:.3f} | "
f"Chunk: {hit['chunk_id']} | "
f"Source: {hit['path']}"
)
context_parts.append(f"{header}\n{content}")
total_chars += len(content)
return "\n\n---\n\n".join(context_parts)
# Result: 5-8 focused chunks, ~12,000 chars total
# Highest-scoring chunk at position 1 (highest attention)
# Least important material truncated by budget (correct failure mode)| Property | Detail |
|---|---|
| Total Budget | 12,000-char ceiling. Keeps the model under the threshold where lost-in-the-middle effects emerge |
| Per-Chunk Cap | 4,000-char limit per hit. Oversized chunks (extracted tables) are truncated, not allowed to consume the budget |
| Relevance Ordering | Highest-scoring chunk at position 1 (highest model attention); least important at the bottom and potentially dropped |
| Structured Headers | Document index, score, chunk_id, source path. Boundary markers prevent conflation, enable inline citation |
Code Showcase 23
Prompt Construction, System and User Split
Behavioral constraints live in the system prompt. Retrieved evidence and the question live in the user prompt with a clear separator and explicit label. That boundary stops the model from confusing task instructions with source evidence.
SYSTEM PROMPT:
You are a compliance documentation assistant.
Answer ONLY using the provided context.
Cite sources inline using [Document N] references.
If the context does not contain the answer, say so.
---
USER PROMPT:
Context (the ONLY allowed facts):
[Document 1] Score: 0.923 | Chunk: abc123:3 | Source: IRP_v2.docx
The incident response team shall acknowledge P1 incidents
within 15 minutes of detection. The SIEM platform generates
automated alerts...
---
[Document 2] Score: 0.891 | Chunk: def456:7 | Source: IRP_v2.docx
Escalation procedures require notification of the CISO
within 30 minutes for incidents classified as...
---
[Document 3] Score: 0.834 | Chunk: ghi789:1 | Source: roles.docx
The Security Operations Center (SOC) maintains 24/7...
---
Question: Describe the incident response acknowledgment SLA
and escalation procedures.| Property | Detail |
|---|---|
| System/User Boundary | Behavioral rules in system prompt, evidence + question in user prompt. Model can't confuse instructions with source material |
| Explicit Label | 'Context (the ONLY allowed facts)'. Anchors the model to retrieved evidence, discourages fabrication |
| Document Boundaries | --- separators + [Document N] headers prevent the model from conflating content across different source files |
| Citation Anchors | Chunk IDs in headers enable inline citation. The model can reference specific chunks in its answer |
Code Showcase 24
Tiered Placeholder Resolution
Template placeholders get resolved through a four tier hierarchy before they ever reach the RAG layer. Known values never consume context window budget. Only genuinely unknown values trigger document retrieval.
Tier Source Example Cost
──────────────────────────────────────────────────────────────────────
1 Static client profile {{COMPANY_NAME}} ~0ms
(config/profile.json) → "SearchStax" (instant)
2 In-memory small docs {{SIEM_TOOL}} ~0ms
(pre-loaded at startup) → "Splunk Enterprise" (cached)
3 RAG retrieval {{INCIDENT_SLA}} ~300ms
(OpenSearch kNN) → retrieved from IRP docs (search)
4 Explicit unknown marker {{RETENTION_PERIOD}} ~0ms
(unresolvable) → "{{UNKNOWN}}" (flagged)
Resolution order: try tier 1, then 2, then 3, then 4.
Known values are resolved BEFORE the prompt is sent to RAG —
they never enter the context window, preserving budget for
values that genuinely require document retrieval.| Property | Detail |
|---|---|
| Tier 1: Profile | Company name, contact info, tool names. Resolved from static config at ~0ms |
| Tier 2: Cached Docs | Small reference documents pre-loaded at startup. No search overhead |
| Tier 3: RAG | Only values that genuinely require document retrieval consume context budget (~300ms) |
| Tier 4: Unknown | Unresolvable placeholders become explicit {{UNKNOWN}} markers. Visible to the reviewer, not hallucinated |