← Back to projects
AI Infrastructure / AWS / MLOpsMar 2025

Production RAG Infrastructure on AWS

A reusable retrieval layer built on AWS OpenSearch and Amazon Bedrock embeddings that grounds multiple LLM pipelines in system-specific documentation.

Problem

LLM workflows for compliance and documentation need grounded retrieval to avoid hallucinations and provide relevant, system-specific answers.

Solution

Built a shared RAG backend that indexes chunked documents with Bedrock-generated embeddings into OpenSearch and exposes namespace-scoped retrieval to downstream pipelines.

Impact

  • Enabled grounded retrieval across multiple AI workflows
  • Centralized retrieval infrastructure for compliance and documentation pipelines
  • Improved reliability of generated outputs by attaching them to semantic search results

Architecture

  1. 01Documents are embedded using Titan embeddings from Bedrock
  2. 02Vectors and metadata are stored in OpenSearch
  3. 03A backend abstraction handles upserts and filtered retrieval
  4. 04Pipelines query the service using namespace and document-type filters

Capabilities

  • ·Semantic indexing and retrieval
  • ·Namespace-scoped search
  • ·Document chunk ingestion
  • ·Shared backend for multiple pipelines
  • ·AWS-authenticated service integration

Stack

PythonAWS OpenSearch ServiceAmazon Bedrockboto3AWS4AuthSigV4AWS

Technical Deep Dive

Architecture internals and annotated code from the production system.

Architecture Overview

A ports-and-adapters (hexagonal) architecture with four distinct abstraction layers. Any pipeline — IRP generation, SCRM, CONMON, policy drafting — plugs into the same retrieval infrastructure through well-defined interfaces. The retrieval layer is deliberately decoupled so that backend changes (switching embedding providers, adding a new vector store) never require pipeline code changes.

 DriveFileRecord — universal document contract (normalization point)
IndexBackend Protocol — storage interface (OpenSearch / S3+OpenAI / OpenAI Vector Store)
ensure_retrieval() — backend-agnostic ingestion dispatcher
BackendExecutor.process() — unified RAG + generation interface
Namespace isolation — single index, logical partitions per pipeline
SourceAdapter priority — RAG at priority=3, graceful degradation

Key Architectural Decisions

01

Protocol Over ABC for Backend Interface

IndexBackend uses Python Protocol (structural typing), not ABC (nominal typing). Any class with matching method signatures satisfies the interface without inheriting from it. Three implementations conform: OpenSearchIndexBackend, AwsIndexBackend (S3 + OpenAI embeddings), and the OpenAI Vector Store path — all without a shared base class.

02

DriveFileRecord as Universal Data Contract

Every document flowing through the system becomes a DriveFileRecord with drive_file_id, name, mime_type, modified_time, md5, and content bytes. This is the normalization point — upstream pipelines don't need to know whether the document will end up in OpenSearch, S3, or OpenAI's vector store.

03

Separated retrieval_query from prompt

BackendExecutor.process() takes retrieval_query as a separate parameter from prompt. The text sent to OpenSearch for kNN search can be optimized independently from the text sent to the LLM. retrieval_query_builder.py extracts {{placeholders}} from markdown templates and builds targeted semantic queries, while the LLM receives the full template with instructions.

04

Namespace Isolation for Multi-Pipeline Reuse

A single OpenSearch domain and index stores documents for all compliance pipelines. The namespace field (keyword type) is added as a term filter on every kNN query. IRP generator only sees IRP-namespace documents, SCRM only sees SCRM documents. Same index, same embeddings, same infrastructure — different logical partitions.

05

RAG Factory as Dependency Injection Point

rag_factory.py's build_opensearch_bedrock_rag() wires OpenSearchClient + BedrockEmbeddingsClient + BedrockLLMClient into a single RAG instance. The factory reads all infra config from environment variables, so the same code deploys to dev (basic auth, t3.small) and prod (SigV4, encrypted domain) without code changes.

06

FedRAMP Layer 1 Deliberately Excluded from RAG

The enrichment pipeline intentionally does not use RAG retrieval. It trusts structured metadata, not retrieved documents, precisely to avoid the hallucination risk that RAG introduces. As the README states: 'Raw KSI prose must not be the sole LLM input. The LLM receives a structured package of source fields.'

07

Fixed-Size Word-Window Over Semantic Chunking

The source documents are compliance policy prose — incident response plans, supply chain risk management docs, security procedures. These are dense, self-contained paragraphs where any 300-word window captures a complete thought. Smaller uniform chunks (300 words, ~1,200 chars) produce tight embedding vectors that score sharply on kNN. Larger semantic chunks (2,000+ words) dilute the embedding — a 3-page section about incident severity classification would weakly match many queries. With the over-fetch strategy (fetch_mult=5), 5 sharp-match small chunks outperform 1 diluted-match large chunk.

08

No Overlap to Avoid Embedding Cost Explosion

Chunks don't share context at their boundaries. Overlap-based chunking (common in LangChain recipes — 200-word chunks with 50-word overlap) would increase Titan Embed v2 API calls by ~25% with no measurable retrieval quality gain for prose documents. On a t3.small.search OpenSearch domain, the storage and compute savings matter.

09

Predictable Chunk IDs for Freshness-Aware Deduplication

index_drive_file() checks source_modified_at and skips unchanged documents. When a document changes, it deletes all existing chunks and re-indexes. Fixed-size chunking produces stable chunk_id = f"{drive_id}:{idx}" identifiers. Semantic chunking would make chunk boundaries shift unpredictably between document versions, breaking chunk-ID-based deduplication.

10

Hybrid Search (kNN + BM25) as Opt-In Extension

Pure kNN works well for policy prose, but queries containing control IDs (AC-2(4)), tool names (sshd_config), or CLI flags need lexical precision that vector similarity alone misses. Hybrid search fuses normalized kNN and BM25 scores with configurable weights. Documents appearing in only one result set get 0.0 for the missing signal — this naturally penalizes results that score well on only one dimension.

11

Backwards Compatibility via Keyword-Only Defaults

Every hybrid parameter is keyword-only with a default value: use_hybrid=False (existing behavior unchanged), vector_weight=0.7, keyword_weight=0.3. No existing callers use positional args that would break. Zero test files directly call any of the modified methods. Opting in is a single flag change — opting out requires no code changes at all.

12

LLM Reranker as Late Interaction Stage

After bi-encoder retrieval returns 20 candidate hits, an LLM reranker (Claude Haiku at temperature=0) scores each chunk 0-10 against the actual question. This catches what vector similarity misses: chunks that share vocabulary but are from the wrong context (a different policy's 'advisory board'), the wrong control (KSI-CMT-02 vs. KSI-CMT-01), or irrelevant lexical overlap ('membership' in an org chart). Only the top-scoring chunks survive into context assembly.

13

Reranking as Quality Optimization, Not Correctness Gate

If the reranker LLM call fails or returns unparseable JSON, the function silently returns the original hits truncated to top_n. The pipeline never breaks. Reranking improves answer quality but isn't required for correct operation — this makes it safe to opt into without risk of pipeline failures from a downstream model error.

14

Metadata Filtering via Dynamic Mapping — No Migration Required

A single metadata_filters: Dict[str, str] parameter flows from any entry point down to OpenSearch's bool.filter clause. Each key-value pair becomes a term filter on doc.metadata.<key> — binary include/exclude applied before scoring. OpenSearch's dynamic: true setting auto-creates keyword fields for any new string written under doc.metadata, so no PUT _mapping call is needed. The first document indexed with project_id creates the field; the first query filtering on it uses the auto-created keyword index.

15

Three Levels of Isolation — Company, Namespace, Metadata

Company (doc.company, always applied) provides tenant isolation. Namespace (doc.namespace, always applied) provides document collection isolation (IRP vs. SCRM vs. CONMON). Metadata (doc.metadata.*, opt-in via metadata_filters) provides fine-grained filtering by project, doc type, classification, or version. Existing documents without a metadata field are correctly excluded from project-scoped queries — a term filter on a missing field returns no match.

16

Latency Budget — LLM Dominates, Everything Else Under 400ms

A full retrieve-and-answer cycle takes ~5-8s. The LLM answer generation dominates at ~75% of wall time (~4-6s). Everything upstream — query embedding (~200ms via Bedrock), kNN search (~100ms via OpenSearch), filtering, context assembly — totals under 400ms. Six optimizations keep the non-LLM stages fast: aggressive boto3 timeouts, client singleton with connection pool reuse, credential caching with 600s TTL, over-fetch with local score cutoff, single-text embedding for the query path, and cross-region inference profile routing.

17

Deliberate Non-Optimizations

Response streaming (invoke_model_with_response_stream) is excluded from the IAM policy — a deliberate simplicity tradeoff that would drop perceived TTFT from ~5s to ~500ms. OpenSearch connection pool tuning uses opensearch-py defaults because single-digit QPS on a t3.small domain isn't connection-bound. Formal latency instrumentation exists for ingest (files/s, bytes/s via RunMetrics) but not for the query path — a known gap, not an oversight.

18

Vector Staleness Management — Per-File Compare-and-Swap

Compliance documentation changes constantly. When a source document changes, every embedding derived from the old version becomes stale — returning irrelevant context to the LLM. The ingestion pipeline uses a timestamp-driven freshness check against Google Drive's modified_time. Each file goes through compare (query OpenSearch for stored source_modified_at, skip if unchanged at ~5ms cost), purge (scoped delete_by_query for that drive_id with conflicts=proceed and refresh=true), and re-index (extract, chunk, embed, bulk-index with new timestamp).

19

No Downtime During Re-Indexing

The delete-then-index sequence creates a brief window (1-5 seconds) where one file has zero chunks. Searches not referencing that file are unaffected; searches that do draw context from other relevant documents. Since the index serves document generation pipelines (not real-time user-facing search), this sub-5-second gap per file is invisible in practice. The chunk ID scheme ({drive_id}:{index}) means the delete_by_query wipes all old chunks by drive_id regardless of how many chunks the old version produced.

20

Cross-Backend Consistency via IndexBackend Protocol

The same IndexBackend Protocol governs both OpenSearch and S3 backends. OpenSearch compares timestamps (catches all edits). S3 compares md5 + modified_time (cheaper but misses metadata-only edits). Both backends support skip_if_existing and replace_if_newer flags, giving callers control over the tradeoff between thoroughness and cost.

21

Hard Context Budgets to Prevent Lost-in-the-Middle

The context assembly layer enforces a 12,000-character total ceiling and a 4,000-character cap per individual chunk. With 300-word chunks averaging ~1,200 characters, the model receives 5-8 focused chunks per query — well below the 20+ document threshold where lost-in-the-middle effects become significant in the literature. Oversized chunks are truncated rather than allowed to consume the entire budget.

22

Separated Retrieval Query and LLM Prompt

The text sent to OpenSearch for vector/keyword matching is a tight semantic query optimized for retrieval precision (e.g., 'IRP section: roles and responsibilities. Find values for SIEM, infrastructure scanner'). The text sent to the LLM is the full task instruction optimized for generation quality. Without this separation, boilerplate instructions would dilute the embedding vector and degrade retrieval results.

23

Tiered Placeholder Resolution Before RAG

A four-tier resolution hierarchy resolves template placeholders before they reach the RAG layer: static client profile (instant lookup), in-memory small documents (no search needed), RAG retrieval (OpenSearch), and explicit {{UNKNOWN}} markers for unresolvable values. Known values like company name or SIEM tool name are resolved from configuration — they never enter the context window, preserving the budget for values that genuinely require document retrieval.

Code Showcase 1

Protocol Interface — IndexBackend

The contract that all storage backends must satisfy. Uses Python Protocol for structural typing — any class with matching method signatures conforms without inheritance.

python
class IndexBackend(Protocol):
    def ensure_index(
        self, *, company, name, if_exists, namespace
    ) -> str: ...

    def upsert_file(
        self, *, company, index_id, file, replace_if_newer
    ) -> None: ...

# Three implementations conform:
# - OpenSearchIndexBackend  (services/opensearch_index_backend.py)
# - AwsIndexBackend          (services/aws_index_backend.py)
# - OpenAI Vector Store path (pipelines/google_to_openai)
PropertyDetail
PatternPorts-and-adapters — the Protocol is the port, each backend is an adapter
Structural TypingProtocol, not ABC — implementations don't inherit, they just match the signature
Three BackendsOpenSearch (kNN + Bedrock embeddings), AWS S3 (OpenAI embeddings), OpenAI Vector Store

Code Showcase 2

Retrieval Dispatcher — ensure_retrieval()

The single entry point for all backend-agnostic document ingestion. A pipeline calls ensure_retrieval() with a backend_type and namespace and doesn't care about index creation, SigV4 auth, chunking strategies, or embedding dimensions.

python
def ensure_retrieval(
    *, app, backend_type, company,
    drive_path, namespace, ...
):
    if backend_type == "openai":
        ensure_vs_and_docs(...)
    elif backend_type == "opensearch":
        ensure_os_index_and_docs(...)
    elif backend_type in ("bedrock", "aws"):
        ensure_aws_index_and_docs(...)
PropertyDetail
Backend RoutingDispatches to correct ingestion path based on backend_type — OpenAI, OpenSearch, or Bedrock/AWS
Index SanitizationHandles OpenSearch naming rules (lowercase, no special chars) internally
Caller SimplicityPipeline just specifies backend_type and namespace — all infra complexity is hidden

Code Showcase 3

Namespace Isolation — kNN Filter

The architectural keystone that makes the retrieval layer reusable across pipelines. A single OpenSearch index stores documents for all compliance workflows, partitioned by namespace.

python
@dataclass
class OpenSearchBedrockRAG:
    """
    Intentionally generic so IRP, SCRM, CONMON, etc.
    can all reuse it by just changing `namespace`.
    """
    company: str
    namespace: str = "irp"

# opensearch.py:search_knn
if namespace:
    must_filters.append(
        {"term": {"doc.namespace": namespace}}
    )
PropertyDetail
Single IndexOne OpenSearch domain, one index, one embedding pipeline — no per-pipeline infrastructure duplication
Logical PartitionsNamespace term filter on every kNN query isolates IRP, SCRM, CONMON documents
Swap One FieldChanging namespace='irp' to namespace='scrm' switches the entire retrieval context

Code Showcase 4

Source Priority System

RAG is one data source among many, with a defined priority in the multi-source evidence merge. Live API responses win over RAG-retrieved documents, which win over manual exports — the retrieval layer gracefully degrades.

python
class SourceAdapter(ABC):
    def priority(self) -> int:
        """
        1 = live API
        2 = custom/service
        3 = rag
        4 = export
        5 = manual
        """

# When the compliance engine merges results,
# a live Vanta API response (priority 1) wins over
# a RAG-retrieved document (priority 3), which wins
# over a manual CSV export (priority 5).
PropertyDetail
Graceful DegradationIf a live API has the evidence, RAG results are deprioritized automatically — no manual override needed
Explicit Ranking5-tier priority system makes data source precedence auditable, not implicit
ExtensibleNew source adapters slot in at any priority tier without changing existing merge logic

Code Showcase 5

Pipeline Plug-In Points

Every compliance pipeline connects to the same retrieval infrastructure through ensure_retrieval() and BackendExecutor.process(). The FedRAMP Layer 1 pipeline is the notable exception — it deliberately avoids RAG to prevent hallucination from retrieved prose.

text
Pipeline         What It Does              What It Calls
─────────────────────────────────────────────────────────────────────
IRP Generator    Drafts incident           ensure_retrieval(namespace="irp")
                 response plan             → BackendExecutor.process()

SCRM Generator   Drafts supply chain       ensure_retrieval(namespace="scrm")
                 risk plan                 → BackendExecutor.process()

CONMON Generator Continuous monitoring      ensure_retrieval(namespace="conmon")
                 docs                      → BackendExecutor.process()

FedRAMP Layer 1  Requirement enrichment    Uses structured JSON input, not RAG
                                           — deliberately excluded

Custom Tests     Evidence extraction       Can query any namespace for
                 prompts                   supporting documentation

Compliance       Multi-source evidence     RAG adapter at priority=3
Engine           merge                     alongside live APIs and exports
PropertyDetail
Same InterfaceEvery RAG-enabled pipeline calls ensure_retrieval() + BackendExecutor.process() — no per-pipeline retrieval code
Namespace SwapSwitching from IRP to SCRM to CONMON is a single parameter change
Deliberate ExclusionFedRAMP Layer 1 intentionally skips RAG — structured metadata over retrieved prose to avoid hallucination risk

Code Showcase 6

Stage 1: Word-Based Fixed-Size Chunker

_chunk_text() is a straightforward fixed-size word-window chunker. It splits on whitespace, accumulates words into a buffer, and emits a chunk when the buffer hits the limit. No overlap, no sentence detection, no heading-aware splitting. Called with max_tokens=300 in practice.

python
def _chunk_text(self, text, max_tokens=500):
    words = text.split()
    current = []
    for w in words:
        current.append(w)
        if len(current) >= max_tokens:
            chunks.append(" ".join(current))
            current = []
PropertyDetail
StrategyFixed-size word-window — not semantic, not recursive, not overlap-based
Window SizeDefault 500 words, called with max_tokens=300 in production (~1,200 chars per chunk)
Why No OverlapOverlap would increase Titan Embed v2 API calls by ~25% with no retrieval quality gain for prose
Why No Semantic SplitCompliance prose degrades gracefully with fixed-size chunking — unlike code or structured data

Code Showcase 7

Stage 2: Boundary-Aware Safety Splitter

_split_for_embed() enforces the embedding model's hard character limit after the word chunker runs. Default max_chars is 12,000 (from EMBED_MAX_CHARS env var), tuned to keep Titan Embed v2 well under its 8,192-token limit (~4 chars/token heuristic).

python
def _split_for_embed(self, text, max_chars):
    # try splitting on a boundary
    cut = text.rfind("\n", start, end)     # prefer newline break
    if cut == -1:
        cut = text.rfind(" ", start, end)  # fallback to space break
    if cut == -1 or cut <= start + (max_chars * 0.5):
        cut = end                          # hard cut if no good boundary

# The two stages chain together in index_drive_file():
chunks = self._chunk_text(text, max_tokens=300)
max_chars = int(os.getenv("EMBED_MAX_CHARS", "12000"))
safe_chunks = []
for c in chunks:
    safe_chunks.extend(
        self._split_for_embed(c, max_chars=max_chars)
    )
PropertyDetail
Split PriorityTry newline break first, then space break, then hard cut only if no boundary in the back 50% of the window
Safety NetEnsures no chunk exceeds Titan Embed v2's token limit regardless of word-chunker output
Two-Stage ChainStage 1 (word window) sizes content, Stage 2 (character safety) enforces embedding model limits

Code Showcase 8

Full Ingestion Pipeline Flow

The complete path from Google Drive document to OpenSearch kNN index. Includes freshness-aware deduplication, multi-format text extraction with binary guardrails, two-stage chunking, batch embedding, and bulk upsert.

text
Google Drive document (PDF, DOCX, Google Doc)
        |
        v
GoogleDriveIngestor.iter_files()
  -> exports Google-native docs to DOCX/XLSX/PPTX
  -> downloads binary files directly
  -> yields DriveFileRecord(content=bytes)
        |
        v
OpenSearchIndexBackend.index_drive_file()
  |
  +- freshness check: get_latest_source_modified()
  |   +- skip if unchanged (IndexResult: skipped_unchanged)
  |
  +- _extract_text(record)
  |   +- text/plain, text/markdown -> direct decode
  |   +- PDF, DOCX, PPTX, XLSX -> MarkItDown -> markdown
  |   +- DOCX fallback -> python-docx paragraph + table extraction
  |   +- binary guardrail: reject if "%PDF-" or "/FlateDecode" in output
  |
  +- _chunk_text(text, max_tokens=300)         <- Stage 1: word-window
  |
  +- _split_for_embed(chunk, max_chars=12000)  <- Stage 2: safety split
  |
  +- embed_fn(chunks)                          <- Titan Embed v2, batch=200
  |
  +- delete_docs_for_drive_file()              <- wipe stale chunks
  |
  +- os_client.index_chunks(docs)              <- bulk upsert with retry
        |
        v
OpenSearch kNN index
  mapping: doc.embedding (knn_vector, dim=1024)
  filters: doc.company (keyword), doc.namespace (keyword)
  metadata: doc.path, doc.chunk_id, doc.source_modified_at
PropertyDetail
Freshness CheckCompares source_modified_at before re-indexing — unchanged documents are skipped entirely
Multi-Format ExtractionMarkItDown handles PDF, DOCX, PPTX, XLSX; binary guardrail catches corrupt or unconvertible files
Two-Stage Chunking300-word windows (Stage 1) then 12,000-char safety splits (Stage 2) before embedding
Batch EmbeddingTitan Embed v2 via Bedrock in batches of 200, producing 1024-dim vectors
Atomic Re-IndexOn document change: delete all stale chunks first, then bulk upsert new chunks

Code Showcase 9

Hybrid Search — Score Fusion

When use_hybrid=True, kNN and BM25 results are independently normalized to [0, 1], then fused with configurable weights. Documents appearing in only one result set get 0.0 for the missing signal, naturally penalizing single-dimension matches. The scoring breakdown is attached to each hit for provenance.

python
# Score fusion algorithm
# For each document appearing in either kNN or BM25 results:

# 1. Normalize kNN scores
knn_norm = (score - min) / (max - min)      # -> [0, 1]

# 2. Normalize BM25 scores
bm25_norm = (score - min) / (max - min)     # -> [0, 1]

# 3. Fuse with configurable weights
final_score = (
    knn_norm * vector_weight
    + bm25_norm * keyword_weight
)

# 4. Sort by final_score descending, return top k

# Each hit includes scoring breakdown:
# hit["_score_detail"] = {
#     "knn_normalized": 0.8234,
#     "bm25_normalized": 0.9512,
#     "vector_weight": 0.5,
#     "keyword_weight": 0.5,
# }
PropertyDetail
Min-Max NormalizationBoth kNN and BM25 scores normalized to [0, 1] before fusion — different score scales don't bias the result
Missing Signal = 0.0Documents in only one result set get 0.0 for the other, penalizing single-dimension matches
Score ProvenanceEvery hit carries _score_detail with normalized scores and weights for debugging and auditability

Code Showcase 10

Hybrid Search — Usage and Recommended Weights

Hybrid search is opt-in at both the BackendExecutor level (for pipelines) and the RAG layer directly. Weight recommendations vary by query type — semantic similarity dominates for policy prose, while lexical precision matters for control IDs and configuration artifacts.

python
# Via BackendExecutor (any pipeline):
text, meta = executor.process(
    prompt=prompt,
    instructions=instructions,
    retrieval_query=query,
    use_hybrid=True,
    vector_weight=0.5,
    keyword_weight=0.5,
)
# meta["retrieval"]["hybrid"] == True

# Directly from RAG layer:
# Pure kNN (default, unchanged behavior)
hits = rag.retrieve(
    "incident response roles and responsibilities"
)

# Hybrid (kNN + BM25)
hits = rag.retrieve(
    "KSI-CMT-01 CM-3 change advisory board",
    use_hybrid=True,
    vector_weight=0.5,
    keyword_weight=0.5,
)
PropertyDetail
Policy prose (IRP, SCRM, CONMON)vector=0.7, keyword=0.3 — semantic similarity dominates; keywords break ties
Control/KSI evidence lookupvector=0.5, keyword=0.5 — control IDs like AC-2(4) and tool names need lexical precision
Configuration artifact searchvector=0.3, keyword=0.7 — sshd_config, SSLProtocol, exact CLI flags need lexical match
Backwards CompatibilityDefault use_hybrid=False — all existing callers get unchanged pure kNN behavior with zero code changes

Code Showcase 11

Full Retrieval Pipeline — 3-Stage Flow

The complete retrieve() call chain: bi-encoder retrieval (kNN or hybrid) fetches 20 candidates, the LLM reranker scores and prunes to the top 5, then context assembly caps and concatenates hits for the final LLM answer generation.

text
retrieve(question, k=20, rerank=True, rerank_top_n=5)
                     |
  +------------------+------------------+
  | STAGE 1: Bi-Encoder Retrieval       |
  |                                     |
  | question --> Titan Embed v2 --> vec  |
  |                              |      |
  | OpenSearch <-- kNN or hybrid <+     |
  |     |                               |
  |     +-> 20 candidate hits (scored   |
  |         by cosine +/- BM25)         |
  +------------------+------------------+
                     |
  +------------------+------------------+
  | STAGE 2: LLM Reranker              |
  |                                     |
  | For each of 20 hits:                |
  |   send first 800 chars + question   |
  |   to Claude (Haiku) at temp=0       |
  |                                     |
  | Claude scores each chunk 0-10:      |
  |   10 = directly answers question    |
  |   7-9 = highly relevant detail      |
  |   4-6 = partially relevant          |
  |   1-3 = mostly irrelevant           |
  |   0 = completely irrelevant         |
  |                                     |
  | Sort by score desc -> return top 5  |
  +------------------+------------------+
                     |
  +------------------+------------------+
  | STAGE 3: Context Assembly + Answer  |
  |                                     |
  | build_context(5 hits)               |
  |   cap each hit at 4,000 chars       |
  |   cap total at 12,000 chars         |
  |                                     |
  | llm_client.chat() -> final answer   |
  +-------------------------------------+
PropertyDetail
Stage 1Bi-encoder retrieval via Titan Embed v2 — kNN or hybrid (kNN + BM25) returning 20 candidates
Stage 2LLM reranker (Claude Haiku, temp=0) scores each chunk 0-10, prunes to top 5
Stage 3Context assembly caps hits (4K chars each, 12K total) then generates the final answer
Default Flowk=20 -> rerank -> top 5 (or k//2 with floor of 3) -> context -> answer

Code Showcase 12

LLM Reranker — rerank_hits()

The 5-step reranking function at rag_bedrock_opensearch.py:35-118. Builds chunk previews, scores via a single Bedrock call with strict JSON output contract, normalizes and clamps scores, sorts with tiebreaker on original retrieval order, and falls back gracefully on failure.

python
def rerank_hits(*, question, hits, llm_client,
                top_n, preview_chars=800, model_id):

    # Step 1: Build chunk previews (lines 56-63)
    # First 800 chars of content + source path per hit
    previews = [
        {"index": i, "text": hit["content"][:preview_chars],
         "source": hit["path"]}
        for i, hit in enumerate(hits)
    ]

    # Step 2: LLM relevance scoring (lines 71-78)
    # One Bedrock call, temperature=0.0, strict JSON contract:
    # [{"index": 0, "score": 8}, {"index": 1, "score": 3}, ...]

    # Step 3: Score normalization + clamping (lines 93-98)
    # Each score clamped to [0, 10] — prevents hallucinated
    # scores of 15 or -1 from distorting the ranking

    # Step 4: Sort and prune (lines 101-113)
    # Sort by rerank score desc, original order as tiebreaker
    # Only top_n survive; each gets _rerank_score attached

    # Step 5: Graceful fallback (lines 115-118)
    # If LLM call fails or returns unparseable JSON,
    # silently return original hits truncated to top_n
    # Reranking is a quality optimization, not a gate
PropertyDetail
Preview LimitOnly first 800 chars per chunk sent to reranker — controls cost without sacrificing judgment quality on compliance prose
Score ClampingScores clamped to [0, 10] — prevents LLM hallucination of out-of-range scores from distorting ranking
TiebreakerOriginal retrieval order breaks ties — preserves kNN/hybrid ranking signal when rerank scores are equal
Graceful FallbackOn any failure, returns original hits truncated to top_n — pipeline never breaks from a reranker error

Code Showcase 13

Reranking — Call Chain and Cost

Reranking propagates through every public entry point via keyword arguments. At ~$0.0005 per reranked retrieval using Haiku, you could run 2,000 reranked retrievals for $1. The latency add of 1-2 seconds is acceptable for document generation pipelines.

python
# Full call chain — rerank propagates through all entry points:

BackendExecutor.process(rerank=True, rerank_top_n=5)
  # -> opensearch_backend_process()  (pops rerank kwargs)
  #      -> generate_from_prompt(rerank=...)  (passes through)
  #           -> retrieve(rerank=...)  (calls rerank_hits())

answer_question(rerank=True, rerank_top_n=5)
  # -> retrieve(rerank=...)  (same path)

retrieve(rerank=True, rerank_top_n=5)
  # -> rerank_hits() directly

# Configuration via dataclass fields:
rag = OpenSearchBedrockRAG(
    os_client=os_client,
    embed_client=embed_client,
    llm_client=llm_client,          # Sonnet for answers
    company="searchstax",
    namespace="irp",
    rerank_model_id="anthropic.claude-3-5-haiku-20241022-v1:0",
    rerank_preview_chars=800,
)
PropertyDetail
Reranker Input Cost~20 chunks x 800 chars / 4 = ~4,000 tokens at ~$0.0004 (Haiku)
Reranker Output Cost~20 score objects = ~200 tokens at ~$0.0001 (Haiku)
Added Latency1 Bedrock round-trip = ~1-2 seconds — acceptable for document generation, less so for interactive Q&A
Default top_nk // 2 with floor of 3 — k=20 produces 10 after reranking, k=8 produces 4, or override explicitly

Code Showcase 14

Consolidated Filter Builder — _build_filters()

The filter-building logic that was duplicated across search_knn and search_hybrid is consolidated into one static method. Company and namespace filters are always applied; metadata_filters are opt-in and expand to term filters on doc.metadata.<key>.

python
@staticmethod
def _build_filters(company, namespace, metadata_filters=None):
    filters = []
    if company:
        filters.append({"term": {"doc.company": company}})
    if namespace:
        filters.append({"term": {"doc.namespace": namespace}})
    for key, value in (metadata_filters or {}).items():
        filters.append(
            {"term": {f"doc.metadata.{key}": value}}
        )
    return filters

# Both search_knn() and search_hybrid() now delegate
# to _build_filters() — no duplicated filter logic
PropertyDetail
ConsolidationFilter logic extracted from two search methods into one static method — single source of truth
Always-On FiltersCompany and namespace are always applied; metadata_filters are additive and opt-in
Dynamic Key ExpansionEach metadata key-value pair becomes a term filter on doc.metadata.<key> — no hardcoded field names

Code Showcase 15

Metadata Filtering — End-to-End Usage

Documents are tagged at indexing time via extra_metadata on the backend dataclass. At query time, metadata_filters flows from any entry point (BackendExecutor, retrieve, answer_question) down to OpenSearch's bool.filter clause. Filters are applied before scoring — non-matching documents never appear in results.

python
# At indexing time — tag documents with arbitrary metadata:
backend = OpenSearchIndexBackend(
    os_client=os_client,
    embed_fn=embed_fn,
    company="searchstax",
    namespace="irp",
    extra_metadata={
        "project_id": "fedramp-2024",
        "doc_type": "policy",
        "classification": "cui",
    },
)
# Every chunk now has:
# doc.metadata.project_id = "fedramp-2024"
# doc.metadata.doc_type = "policy"
# doc.metadata.classification = "cui"

# At query time — filter to matching documents:
hits = rag.retrieve(
    "incident response roles and responsibilities",
    metadata_filters={"project_id": "fedramp-2024"},
)

# Combine with hybrid search + reranking:
hits = rag.retrieve(
    "KSI-CMT-01 change advisory board",
    use_hybrid=True,
    rerank=True,
    metadata_filters={
        "project_id": "fedramp-2024",
        "doc_type": "policy",
    },
)

# Through BackendExecutor:
text, meta = executor.process(
    prompt=prompt,
    instructions=instructions,
    metadata_filters={"project_id": "A"},
)
# meta["retrieval"]["metadata_filters"] == {"project_id": "A"}
PropertyDetail
Indexingextra_metadata dict on the backend dataclass — merged into every chunk's metadata alongside mime_type, drive_id, drive_name
Queryingmetadata_filters dict at any entry point — each key-value becomes a term filter applied before scoring
ComposableCombines freely with namespace, hybrid search, and reranking — all features are independent and additive
ProvenanceBackendExecutor records metadata_filters in returned meta dict for downstream traceability

Code Showcase 16

Generated OpenSearch Query — With Metadata Filter

The actual query sent to OpenSearch when metadata_filters are provided. Filter clauses are applied before kNN scoring — documents without the specified metadata field are excluded from the candidate set entirely, regardless of vector similarity.

json
{
  "query": {
    "bool": {
      "filter": [
        {"term": {"doc.company": "searchstax"}},
        {"term": {"doc.namespace": "irp"}},
        {"term": {"doc.metadata.project_id": "A"}}
      ],
      "must": [
        {
          "knn": {
            "doc.embedding": {
              "vector": ["..."],
              "k": 40
            }
          }
        }
      ]
    }
  }
}
PropertyDetail
Pre-Score FilteringFilter clauses execute before kNN scoring — non-matching documents are excluded from the candidate set entirely
No MigrationOpenSearch dynamic mapping auto-creates keyword fields for new metadata keys on first ingest
Missing Field BehaviorDocuments indexed before extra_metadata was set don't have the field — term filter returns no match, correctly excluding them

Code Showcase 17

Three Levels of Isolation

The retrieval layer now supports three composable levels of document isolation — from tenant-level down to fine-grained metadata attributes — all using the same OpenSearch term filter mechanism.

text
Level       Mechanism                       Use Case
─────────────────────────────────────────────────────────────────
Company     doc.company (always applied)    Tenant isolation:
                                            SearchStax vs. Acme

Namespace   doc.namespace (always applied)  Document collection:
                                            IRP vs. SCRM vs. CONMON

Metadata    doc.metadata.* (opt-in via      Fine-grained:
            metadata_filters)               project, doc type,
                                            classification, version
PropertyDetail
CompanyAlways applied — tenant isolation between organizations sharing the same OpenSearch domain
NamespaceAlways applied — logical partition per compliance pipeline (IRP, SCRM, CONMON)
MetadataOpt-in via metadata_filters — project_id, doc_type, classification, version, or any arbitrary key
ComposableAll three levels stack — a query can filter by company + namespace + multiple metadata fields simultaneously

Code Showcase 18

Latency Critical Path — 4 Network Hops

The critical path for a single retrieval: embed query (~200ms Bedrock) -> kNN search (~100ms OpenSearch) -> build context (~0ms local) -> LLM answer (~4-6s Bedrock). Six optimizations keep the non-LLM stages fast.

python
# Critical path: embed -> search -> context -> answer
# ~200ms      ~100ms     ~0ms       ~4-6s
# (Bedrock)   (OpenSearch) (local)   (Bedrock)

# 1. Aggressive boto3 timeout (bedrock_llm.py:42-46)
Config(
    read_timeout=120,
    connect_timeout=10,    # fail fast on unreachable
    retries={"max_attempts": 3, "mode": "standard"},
)

# 2. Client singleton with connection pool reuse
# OpenSearchClient, BedrockEmbeddingsClient, BedrockLLMClient
# constructed once, injected into RAG dataclass.
# First call: ~200-500ms (credential + TLS handshake)
# Subsequent: reuse warm connection

# 3. Credential caching with TTL (ttl_cache.py)
# Google Drive SA keys + Vanta tokens cached 600s
# Without: +100ms Secrets Manager round-trip per call

# 4. Over-fetch + local score cutoff (opensearch.py:229-287)
# Fetch k * fetch_mult (5x), threshold in Python
# Avoids unreliable server-side min_score

# 5. Single-text embedding for query path
# One question -> one InvokeModel call, no batch overhead
# Batch path (EMBED_BATCH_SIZE=200) reserved for ingest

# 6. Cross-region inference profile routing
modelId = os.getenv(
    "BEDROCK_EMBEDDINGS_INFERENCE_PROFILE_ID"
) or model_id
# Routes to lowest-latency region, saves 50-100ms
PropertyDetail
Connect Timeout10s connect_timeout fails fast on unreachable Bedrock — retries immediately rather than blocking for 120s
Connection ReuseBoto3 clients maintain internal HTTP pools; first call pays TLS cost, subsequent calls reuse warm connections
Credential Cache600s TTL on Secrets Manager lookups — eliminates ~100ms per call for Google Drive SA keys and Vanta tokens
Over-FetchFetch 5x results, threshold locally — trades larger payload for guaranteed filtering without a second query
Cross-Region RoutingBedrock inference profile auto-routes to lowest-latency region, saving 50-100ms under load

Code Showcase 19

Latency by Configuration Mode

Wall time varies by retrieval mode. The hybrid BM25 pass adds ~80ms (negligible). The rerank LLM call adds ~1.5s (Haiku) — the only mode where pre-LLM latency becomes noticeable.

text
Mode                     Extra Hops                         Estimated Total
────────────────────────────────────────────────────────────────────────────
Pure kNN (default)       embed + search + LLM               ~5-8s

Hybrid                   embed + 2 searches + fusion         ~5.5-8.5s
(use_hybrid=True)        + LLM

Hybrid + Rerank          embed + 2 searches + rerank LLM     ~7-11s
                         + answer LLM


Optimization Not Taken           Reason
────────────────────────────────────────────────────────────────────────────
Response streaming               Excluded from IAM policy — deliberate
(invoke_model_with_               simplicity tradeoff. Would drop perceived
response_stream)                 TTFT from ~5s to ~500ms.

OpenSearch connection            opensearch-py defaults used. Single-digit
pool tuning                      QPS on t3.small isn't connection-bound.

Formal query-path                RunMetrics tracks ingest perf (files/s,
latency instrumentation          bytes/s) but no perf_counter on retrieve().
PropertyDetail
Pure kNN~5-8s total — embed (200ms) + search (100ms) + LLM answer (4-6s)
Hybrid~5.5-8.5s — adds ~80ms for BM25 pass and score fusion (negligible)
Hybrid + Rerank~7-11s — adds ~1.5s for Haiku reranker; the only mode where pre-LLM latency is noticeable
LLM DominanceAnswer generation is ~75% of wall time in every mode — non-LLM stages total under 400ms

Code Showcase 20

Vector Staleness — Compare-and-Swap Flow

The three-step freshness check that runs for every file on every pipeline execution. 90%+ of files are skipped_unchanged — the freshness check is a single lightweight OpenSearch query per file. Scanning 50 unchanged files adds ~250ms total.

python
# For each file in the Google Drive folder:

# Step 1: COMPARE — query OpenSearch for stored timestamp
latest = get_latest_source_modified(
    drive_id=record.drive_file_id
)
if latest >= record.modified_time:
    return IndexResult(status="skipped_unchanged")  # ~5ms

# Step 2: PURGE — scoped delete of all old chunks
delete_docs_for_drive_file(
    drive_id=record.drive_file_id,
    conflicts="proceed",  # concurrent searches never blocked
    refresh=True,         # cleared state immediately visible
)

# Step 3: RE-INDEX — extract, chunk, embed, bulk-index
text = _extract_text(record)
chunks = _chunk_text(text, max_tokens=300)
safe = [_split_for_embed(c, max_chars) for c in chunks]
vectors = embed_fn(safe)  # Titan Embed v2, batch=200
os_client.index_chunks(
    docs=[
        {
            "chunk_id": f"{record.drive_file_id}:{i}",
            "source_modified_at": record.modified_time,
            # ... content, embedding, metadata
        }
        for i, vec in enumerate(vectors)
    ]
)

# Typical pipeline run log:
# files=47 indexed=3 skipped_unchanged=42
# skipped_no_text=2 total_time=38.12s
PropertyDetail
Compare Cost~5ms per file — single OpenSearch query for the latest source_modified_at by drive_id
Skip Rate90%+ of files are skipped_unchanged in a typical re-run; scanning 50 files adds ~250ms total
Purge Safetyconflicts=proceed ensures concurrent searches are never blocked; refresh=true makes cleared state immediately visible
Chunk ID Scheme{drive_id}:{index} — delete_by_query wipes all old chunks by drive_id regardless of how many the old version produced
Zero Downtime1-5 second gap per file while re-indexing; searches for other documents are unaffected

Code Showcase 21

Cross-Backend Freshness Strategies

Both backends implement freshness checking through the IndexBackend Protocol, but with different strategies suited to their storage characteristics. Callers control the tradeoff between thoroughness and cost via flags.

text
Backend       Compare Strategy         Catches               Flags
──────────────────────────────────────────────────────────────────────
OpenSearch    source_modified_at       All edits (content     replace_if_newer
              timestamp comparison     + metadata changes)    skip_if_existing

S3 (AWS)      md5 + modified_time      Content edits only     replace_if_newer
              hash comparison          (misses metadata-      skip_if_existing
                                       only changes)


Both backends conform to the same IndexBackend Protocol:

class IndexBackend(Protocol):
    def upsert_file(
        self, *, company, index_id,
        file: DriveFileRecord,
        replace_if_newer: bool = True,
    ) -> None: ...
PropertyDetail
OpenSearchTimestamp comparison — catches all edits including metadata-only changes; higher per-check cost (~5ms)
S3MD5 + timestamp — cheaper comparison but misses metadata-only edits; sufficient for content-driven workflows
Protocol ConformanceBoth backends satisfy the same IndexBackend Protocol — callers don't know which freshness strategy runs
Caller Controlreplace_if_newer and skip_if_existing flags let pipelines choose thoroughness vs. cost per use case

Code Showcase 22

Context Assembly — build_context()

The context assembly function that enforces hard budgets, relevance ordering, and structured document headers. Each chunk carries metadata (index, score, chunk ID, source path) that serves as document boundary markers, citation anchors, and provenance signals.

python
def build_context(hits, max_total=12000, max_per_hit=4000):
    """Assemble retrieved chunks into bounded, structured context."""
    context_parts = []
    total_chars = 0

    # Hits arrive in relevance order (kNN score or rerank score)
    for i, hit in enumerate(hits):
        content = hit["content"][:max_per_hit]  # cap per chunk
        if total_chars + len(content) > max_total:
            break  # budget exhausted — least important dropped

        # Structured header: boundary marker + citation anchor
        header = (
            f"[Document {i+1}] "
            f"Score: {hit['_score']:.3f} | "
            f"Chunk: {hit['chunk_id']} | "
            f"Source: {hit['path']}"
        )
        context_parts.append(f"{header}\n{content}")
        total_chars += len(content)

    return "\n\n---\n\n".join(context_parts)

# Result: 5-8 focused chunks, ~12,000 chars total
# Highest-scoring chunk at position 1 (highest attention)
# Least important material truncated by budget (correct failure mode)
PropertyDetail
Total Budget12,000-char ceiling — keeps the model under the threshold where lost-in-the-middle effects emerge
Per-Chunk Cap4,000-char limit per hit — oversized chunks (extracted tables) are truncated, not allowed to consume the budget
Relevance OrderingHighest-scoring chunk at position 1 (highest model attention); least important at the bottom and potentially dropped
Structured HeadersDocument index, score, chunk_id, source path — boundary markers prevent conflation, enable inline citation

Code Showcase 23

Prompt Construction — System/User Split

Behavioral constraints go in the system prompt. Retrieved evidence and the question go in the user prompt with a clear separator and explicit label. This visual boundary ensures the model distinguishes task instructions from source evidence.

text
SYSTEM PROMPT:
  You are a compliance documentation assistant.
  Answer ONLY using the provided context.
  Cite sources inline using [Document N] references.
  If the context does not contain the answer, say so.

---

USER PROMPT:
  Context (the ONLY allowed facts):

  [Document 1] Score: 0.923 | Chunk: abc123:3 | Source: IRP_v2.docx
  The incident response team shall acknowledge P1 incidents
  within 15 minutes of detection. The SIEM platform generates
  automated alerts...

  ---

  [Document 2] Score: 0.891 | Chunk: def456:7 | Source: IRP_v2.docx
  Escalation procedures require notification of the CISO
  within 30 minutes for incidents classified as...

  ---

  [Document 3] Score: 0.834 | Chunk: ghi789:1 | Source: roles.docx
  The Security Operations Center (SOC) maintains 24/7...

  ---

  Question: Describe the incident response acknowledgment SLA
  and escalation procedures.
PropertyDetail
System/User BoundaryBehavioral rules in system prompt, evidence + question in user prompt — model can't confuse instructions with source material
Explicit Label'Context (the ONLY allowed facts)' — anchors the model to retrieved evidence, discourages fabrication
Document Boundaries--- separators + [Document N] headers prevent the model from conflating content across different source files
Citation AnchorsChunk IDs in headers enable inline citation — the model can reference specific chunks in its answer

Code Showcase 24

Tiered Placeholder Resolution

Template placeholders are resolved through a four-tier hierarchy before reaching the RAG layer. Known values never consume context window budget — only genuinely unknown values trigger document retrieval.

text
Tier    Source                    Example                    Cost
──────────────────────────────────────────────────────────────────────
1       Static client profile     {{COMPANY_NAME}}           ~0ms
        (config/profile.json)     → "SearchStax"             (instant)

2       In-memory small docs      {{SIEM_TOOL}}              ~0ms
        (pre-loaded at startup)   → "Splunk Enterprise"      (cached)

3       RAG retrieval             {{INCIDENT_SLA}}           ~300ms
        (OpenSearch kNN)          → retrieved from IRP docs  (search)

4       Explicit unknown marker   {{RETENTION_PERIOD}}       ~0ms
        (unresolvable)            → "{{UNKNOWN}}"            (flagged)


Resolution order: try tier 1, then 2, then 3, then 4.
Known values are resolved BEFORE the prompt is sent to RAG —
they never enter the context window, preserving budget for
values that genuinely require document retrieval.
PropertyDetail
Tier 1: ProfileCompany name, contact info, tool names — resolved from static config at ~0ms
Tier 2: Cached DocsSmall reference documents pre-loaded at startup — no search overhead
Tier 3: RAGOnly values that genuinely require document retrieval consume context budget (~300ms)
Tier 4: UnknownUnresolvable placeholders become explicit {{UNKNOWN}} markers — visible to the reviewer, not hallucinated