← Back to projects
AI / Multi-Agent Systems / Cloud ArchitectureApr 20, 2026

Multi-Agent Compliance Orchestration Platform

A multi-agent compliance orchestration platform built on LangGraph and CrewAI. It took a brittle linear 8-stage pipeline and turned it into a resilient, self-healing distributed system with bounded loops, checkpointed pause/resume, and a terminal human-in-the-loop gate that refuses to file tickets when the system can't tell a real gap from an evidence-collection issue.

Multi-Agent Compliance Orchestration Platform diagram

Problem

Linear compliance automation scripts break the moment one stage fails. No recovery, no branching, no way to escalate low-confidence output to a human. DynamoDB won't hold the artifacts (400KB item cap), container restarts produce duplicate Jira tickets, and the pipeline has no way to distinguish a true compliance gap from missing evidence or a tooling problem, so it either files false tickets or silently drops findings.

Solution

Rebuilt the pipeline as a stateful LangGraph workflow with CrewAI agents at the reasoning nodes. State is durable through a LangGraph checkpointer (DynamoDB in prod, MemorySaver in smoke tests) and a Claim Check serializer that offloads payloads over 50KB to S3. A deterministic Reviewer node grades every finding; low confidence routes to an evidence_enricher that loops back through gap_analyzer until convergence or max_revisions. Max revisions triggers a terminal human_interrupt that writes ESCALATION REQUIRED to the log and refuses to call the Jira publisher.

Impact

  • Replaced a linear 8-stage script with a self-healing graph. Failed nodes retry from the last checkpoint; the @idempotent decorator prevents double-execution on resume.
  • One graph runs FEDRAMP_20X and SOC2_TYPE2 today, with ISO 27001 and HIPAA wired as stub profiles. Switching frameworks is a FrameworkProfile instance, not a pipeline fork.
  • Confidence-gated Reviewer loop (threshold 0.75) catches low-confidence findings and dispatches evidence_enricher to fetch only the missing signals, then re-enters gap_analyzer. Bounded by max_revisions so the loop can't run forever.
  • human_interrupt is terminal. No LLM call, no Jira write, no external side effects. Sets human_approval_status="pending" and ends the run. Zero false-positive tickets when evidence is incomplete.
  • ui_mapper fails closed. Any status outside {complete, skipped, not_applicable} raises; the graph re-pauses at the interrupt_before checkpoint instead of pushing unverified state to jira_publisher.
  • Multi-megabyte artifacts no longer crash DynamoDB writes. 50KB threshold moves oversized payloads to S3 transparently; pointers live in the state record.

Architecture

  1. 01LangGraph defines the directed graph. Nodes are CrewAI agents or deterministic functions.
  2. 02prompt_init validates the FrameworkProfile and logs the run. gap_analyzer produces ControlFinding objects.
  3. 03evidence_merger combines AI judgment with hard evidence signals; custom_test_runner emits test definitions per gap.
  4. 04Reviewer computes enrichment_confidence, missing_evidence, and revision_count. route_after_review branches from there.
  5. 05Low confidence routes to evidence_enricher. Enrichments append to raw_evidence.enrichments[] and the graph re-enters gap_analyzer.
  6. 06Convergence routes to remediation_drafter, which builds RemediationPayload markdown ticket bodies.
  7. 07Graph pauses before ui_mapper via interrupt_before. Operator uploads evidence, sets ui_upload_status="complete", resumes.
  8. 08ui_mapper validates status (fails closed on pending or anything outside the allowed set), then jira_publisher posts deduped tickets.
  9. 09If revision_count hits max_revisions before convergence, route_after_review dispatches to human_interrupt, which is terminal and files nothing.
  10. 10Every state read/write goes through the Claim Check serializer. @idempotent wraps every node for safe pause/resume.

Capabilities

  • ·Stateful multi-agent graph orchestration with LangGraph checkpointing (DynamoDB in prod, MemorySaver for smoke tests)
  • ·Confidence-gated Reviewer node with 0.75 threshold and bounded self-healing loop via max_revisions
  • ·evidence_enricher that fetches only missing evidence per reviewer feedback and re-enters gap_analyzer
  • ·FrameworkProfile strategy with FEDRAMP_20X and SOC2_TYPE2 active; ISO 27001 and HIPAA stubbed
  • ·Claim Check state serialization. Payloads over 50KB offloaded to S3, S3Ref pointer held in DynamoDB
  • ·@idempotent decorator with per_revision=True for loopable nodes and once-per-thread for terminal nodes
  • ·ui_mapper fails closed. Pending or unknown status raises, graph re-pauses at interrupt_before
  • ·human_interrupt as terminal escalation. ESCALATION REQUIRED log, no LLM, no Jira write, no side effects
  • ·Pydantic schema enforcement at every graph edge
  • ·Terraform-provisioned ECS Fargate runtime

Stack

PythonLangGraphCrewAIPydanticAWS DynamoDBAWS S3AWS ECS FargateTerraformJira REST APIVanta GraphQL APIDrata APIOpenAI GPT-5

Technical Deep Dive

Architecture internals and annotated code from the production system.

Architecture Overview

LangGraph owns control flow. CrewAI owns reasoning inside the nodes. Between them sits a Claim Check state serializer that makes DynamoDB's 400KB item cap disappear by offloading payloads over 50KB to S3. The Reviewer is the self-healing pivot. It grades every finding against a deterministic rubric, and anything under a 0.75 confidence threshold routes into an evidence_enricher that fetches only the specific missing signals and re-enters gap_analyzer. If the loop can't converge before max_revisions, route_after_review dispatches to human_interrupt, which is terminal, calls no LLM, and refuses to file tickets.

 FrameworkProfile (FEDRAMP_20X or SOC2_TYPE2; ISO 27001 and HIPAA stubbed)
[prompt_init] Validates profile, logs run, seeds state
[gap_analyzer] CrewAI analyst produces ControlFinding objects
[evidence_merger] Combines AI judgment with hard evidence signals
[custom_test_runner] Emits test definitions per gap finding
[reviewer] Computes enrichment_confidence, missing_evidence[], revision_count
[route_after_review] Deterministic branch on confidence and revision_count
→ confidence < 0.75: [evidence_enricher] fetches missing evidence, loops back to gap_analyzer
→ confidence OK: [remediation_drafter] builds RemediationPayload markdown
→ revision_count >= max_revisions: [human_interrupt] terminal, no tickets filed
[interrupt_before ui_mapper] Graph pauses. Operator uploads evidence, sets ui_upload_status="complete", resumes.
[ui_mapper] Fails closed on pending or unknown status; otherwise passes through
[jira_publisher] Posts deduped Epic/Task/Subtask hierarchy
[END]

Key Architectural Decisions

01

Claim Check Pattern for State Serialization

DynamoDB caps items at 400KB. Compliance artifacts (SSP snippets, evidence screenshots, full audit narratives) routinely blow past that. A naive serializer crashes on write. Instead, every state field runs through a serializer that compares its size against a 50KB threshold. Under 50KB it stays inline in DynamoDB. Over 50KB it gets written to S3, and an S3Ref pointer (bucket, key, sha256) takes its place in the state record. Reads invert the process transparently. The graph code never knows whether a field lives inline or behind a pointer.

02

Bounded Self-Healing Loop

Every CrewAI finding gets graded by a deterministic Reviewer. The Reviewer applies a fixed rubric (schema completeness, evidence citation, no unsupported claims) and emits enrichment_confidence plus a list of specific missing_evidence items. Confidence under 0.75 routes to evidence_enricher, which fetches only the flagged missing signals, appends them to raw_evidence.enrichments[], and the graph loops back into gap_analyzer with the new evidence. The loop is bounded by max_revisions. It cannot spin forever.

03

Terminal Human-in-the-Loop Escalation

When the self-healing loop can't converge (revision_count >= max_revisions), route_after_review dispatches to human_interrupt. This node is terminal by design. It writes ESCALATION REQUIRED to the log with structured key=value metadata (thread_id, client, framework, revision_count, controls) plus per-control status/confidence details. It does not call jira_publisher. It does not invoke an LLM. It does not touch external services. It sets human_approval_status="pending" and ends the run. The rule is: if the system cannot distinguish a real compliance gap from an evidence-collection or tooling problem, it files nothing rather than risk a false-positive ticket.

04

ui_mapper Fails Closed

The graph pauses before ui_mapper via interrupt_before. An operator uploads evidence manually and updates state with ui_upload_status="complete". On resume, ui_mapper validates. Anything outside {"complete", "skipped", "not_applicable"} raises ValueError — including the common "pending" case where the operator resumed early. LangGraph discards the partial return (because the @idempotent decorator only records completion on normal return), and the graph re-pauses at the checkpoint. The result: unverified state never reaches jira_publisher.

05

FrameworkProfile Strategy Pattern

FrameworkProfile is the pluggable layer that decouples graph orchestration from regulatory business logic. It controls the analyst backstory and standards, the assessor persona, the writer's citation style, and framework-native reporting vocabulary. FEDRAMP_20X and SOC2_TYPE2 are live today; ISO 27001 and HIPAA ship as future-ready stubs. Adding a new framework is a new profile instance, not a pipeline rewrite.

06

@idempotent Decorator with per_revision Semantics

Every node is wrapped in @idempotent, which records completion only after a normal return. If a node raises, LangGraph discards the partial return and the node runs again on resume. Two modes: per_revision=True for loopable nodes (gap_analyzer, evidence_enricher, evidence_merger, custom_test_runner, reviewer) where each revision is a legitimate new execution; once-per-thread for terminal or single-shot nodes (remediation_drafter, ui_mapper, jira_publisher, human_interrupt) where replay would produce duplicates. The semantics line up exactly with LangGraph's checkpointer behavior, so pause/resume and raise-on-validation-failure both stay correct.

07

Checkpointer Strategy

LangGraph's checkpointer persists state after every node transition. In production, the checkpointer is DynamoDB-backed so a container can die mid-run and resume on the next task. In smoke tests, MemorySaver gives a deterministic in-process equivalent that runs in milliseconds. Swapping between them is a constructor argument. The Claim Check serializer sits in front of both.

08

Pydantic at Every Edge

Graph edges in LangGraph pass whatever dict you hand them. That is a production liability. Every edge in this graph is typed with a Pydantic model, and state transitions are validated on read and write. A malformed finding from a CrewAI agent fails fast at the edge instead of propagating silently three nodes downstream — and when it fails, @idempotent ensures the node reruns on resume instead of being marked complete.

09

ECS Fargate over Lambda

LangGraph runs are stateful and often exceed Lambda's 15-minute ceiling when an interrupt_before pause is in play. ECS Fargate with a DynamoDB-backed checkpointer handles long-running graphs without the split-brain you get from chunking a single run across Lambda invocations. Terraform provisions the task definition, the VPC, and the IAM roles.

Code Showcase 1

Claim Check State Serializer

The serializer wraps every DynamoDB read and write. Fields over the size threshold get offloaded to S3 and replaced with a typed pointer. The graph code stays agnostic. Puts never crash on oversized artifacts, and hot-path reads avoid the S3 round trip unless they actually need the big payload.

python
from pydantic import BaseModel
import hashlib, json, boto3

CLAIM_CHECK_THRESHOLD = 50_000  # bytes; 50KB keeps inline state well under 400KB DDB cap

class S3Ref(BaseModel):
    """Pointer that replaces oversized fields in DynamoDB state."""
    kind: str = "s3_ref"
    bucket: str
    key: str
    sha256: str
    size_bytes: int

def serialize_field(value, run_id: str, field_name: str) -> dict | S3Ref:
    payload = json.dumps(value).encode("utf-8")
    if len(payload) < CLAIM_CHECK_THRESHOLD:
        return value  # stays inline in DynamoDB

    digest = hashlib.sha256(payload).hexdigest()
    key = f"state/{run_id}/{field_name}/{digest}.json"
    s3.put_object(Bucket=STATE_BUCKET, Key=key, Body=payload)

    return S3Ref(
        bucket=STATE_BUCKET,
        key=key,
        sha256=digest,
        size_bytes=len(payload),
    )

def hydrate_field(value):
    if isinstance(value, dict) and value.get("kind") == "s3_ref":
        obj = s3.get_object(Bucket=value["bucket"], Key=value["key"])
        return json.loads(obj["Body"].read())
    return value
PropertyDetail
Transparent to Graph CodeNodes read and write state fields the same way whether they live inline or behind an S3Ref. The serializer decides at write time.
50KB ThresholdSmall threshold keeps inline DynamoDB state lean so total item size stays well under the 400KB cap even when many fields coexist.
Content-Addressed S3 KeysKeys include the sha256 digest. Identical payloads across runs deduplicate naturally in S3.
Hot Path Stays InlineSmall fields never hit S3. Sub-second reads are preserved for the majority of state that fits inline.

Code Showcase 2

Bounded Self-Healing Loop with Terminal Escalation

The Reviewer grades every finding deterministically. route_after_review is pure Python and picks one of three destinations: loop back through evidence_enricher for low confidence, proceed to remediation_drafter on convergence, or escalate to a terminal human_interrupt when the loop can't converge before max_revisions. No LLM picks the route.

python
from langgraph.graph import StateGraph, END

CONFIDENCE_THRESHOLD = 0.75

def reviewer_node(state: ComplianceState) -> ComplianceState:
    score = grade_finding(state.latest_finding)  # deterministic rubric, no LLM
    state.review = ReviewResult(
        finding_id=state.latest_finding.id,
        confidence=score.confidence,
        missing_evidence=score.missing_evidence,
        schema_ok=score.schema_ok,
    )
    return state

def route_after_review(state: ComplianceState) -> str:
    review = state.review
    if state.revision_count >= state.max_revisions:
        return "human_interrupt"              # terminal, no tickets
    if not review.schema_ok:
        return "evidence_enricher"
    if review.confidence < CONFIDENCE_THRESHOLD:
        return "evidence_enricher"
    return "remediation_drafter"

def evidence_enricher_node(state: ComplianceState) -> ComplianceState:
    for gap in state.review.missing_evidence:
        state.raw_evidence.enrichments.append(fetch_targeted(gap))
    state.revision_count += 1
    return state

def human_interrupt_node(state: ComplianceState) -> ComplianceState:
    # Terminal. No LLM, no Jira, no external calls.
    unresolved = [c for c in state.controls if c.status != "complete"]
    log.warning(
        "ESCALATION REQUIRED thread_id=%s client=%s framework=%s "
        "revision_count=%d controls=%s",
        state.thread_id, state.client, state.framework,
        state.revision_count, [c.id for c in unresolved],
    )
    state.human_approval_status = "pending"
    return state  # graph ends here

graph = StateGraph(ComplianceState)
graph.add_node("reviewer", reviewer_node)
graph.add_node("evidence_enricher", evidence_enricher_node)
graph.add_node("human_interrupt", human_interrupt_node)
graph.add_conditional_edges("reviewer", route_after_review, {
    "evidence_enricher": "evidence_enricher",
    "remediation_drafter": "remediation_drafter",
    "human_interrupt": "human_interrupt",
})
graph.add_edge("evidence_enricher", "gap_analyzer")  # loop back with new evidence
graph.add_edge("human_interrupt", END)               # terminal
PropertyDetail
Deterministic Routingroute_after_review is pure Python. No LLM gets to decide whether to loop, proceed, or escalate.
Targeted Evidence Fetchevidence_enricher only fetches the specific signals the Reviewer flagged as missing, then appends to raw_evidence.enrichments[].
Bounded Looprevision_count increments on every enrichment. Hitting max_revisions routes to human_interrupt so the graph can't spin forever.
Terminal Escalationhuman_interrupt logs ESCALATION REQUIRED, sets human_approval_status="pending", and ends. Zero LLM calls, zero Jira writes.

Code Showcase 3

FrameworkProfile Strategy Pattern

The graph itself is framework-agnostic. A FrameworkProfile strategy object supplies analyst backstory and standards, assessor persona, writer citation style, and framework-native reporting vocabulary. FEDRAMP_20X and SOC2_TYPE2 are live; ISO 27001 and HIPAA are future-ready stubs so the extension point is already compiled.

python
from abc import ABC, abstractmethod
from enum import Enum

class Framework(str, Enum):
    FEDRAMP_20X = "FEDRAMP_20X"
    SOC2_TYPE2  = "SOC2_TYPE2"
    ISO_27001   = "ISO_27001"   # stub
    HIPAA       = "HIPAA"       # stub

class FrameworkProfile(ABC):
    framework: Framework

    @abstractmethod
    def analyst_backstory(self) -> str: ...
    @abstractmethod
    def standards_corpus(self) -> list[str]: ...
    @abstractmethod
    def assessor_persona(self) -> str: ...
    @abstractmethod
    def writer_citation_style(self) -> str: ...
    @abstractmethod
    def reporting_vocabulary(self) -> dict[str, str]: ...

class FedRAMP20xProfile(FrameworkProfile):
    framework = Framework.FEDRAMP_20X
    def analyst_backstory(self) -> str:
        return "FedRAMP 20x 3PAO assessor specializing in KSI, ADS, and CCM families."
    def writer_citation_style(self) -> str:
        return "KSI-<family>-<number>"  # e.g. KSI-IAM-02

class SOC2TypeIIProfile(FrameworkProfile):
    framework = Framework.SOC2_TYPE2
    def analyst_backstory(self) -> str:
        return "AICPA SOC 2 Type II practitioner focused on Trust Services Criteria."
    def writer_citation_style(self) -> str:
        return "TSC <criterion>.<point>"  # e.g. CC6.1

class ISO27001Profile(FrameworkProfile):    # stub — wired for future
    framework = Framework.ISO_27001

class HIPAAProfile(FrameworkProfile):        # stub — wired for future
    framework = Framework.HIPAA

def build_graph(profile: FrameworkProfile) -> StateGraph:
    graph = StateGraph(ComplianceState)
    graph.add_node("gap_analyzer", gap_analyzer_factory(profile))
    graph.add_node("reviewer", reviewer_factory(profile))
    # ...same graph topology for every framework
    return graph.compile(checkpointer=DynamoDBSaver())
PropertyDetail
One Graph, Many FrameworksGraph topology is identical across FEDRAMP_20X, SOC2_TYPE2, and the stubs. Only profile-supplied data differs.
Framework-Native VocabularyFedRAMP emits KSI-IAM-02 style citations; SOC 2 emits CC6.1 style. The writer pulls its style from the profile.
Two Live, Two StubbedFEDRAMP_20X and SOC2_TYPE2 run in production today. ISO 27001 and HIPAA are compiled stubs ready for content.
Extension PointAdding a new framework is a new FrameworkProfile subclass. Zero graph changes.

Code Showcase 4

@idempotent Decorator with per_revision Semantics

Every node is wrapped in @idempotent. Completion is recorded only after a normal return — if a node raises, LangGraph discards the partial return and the node runs again on resume. per_revision=True is set on loopable nodes that should legitimately execute once per revision. Terminal nodes use once-per-thread to prevent replay duplicates.

python
from functools import wraps

def idempotent(per_revision: bool = False):
    """Record completion only on normal return. Raises retry on resume."""
    def decorator(node_fn):
        @wraps(node_fn)
        def wrapped(state: ComplianceState) -> ComplianceState:
            key = _ledger_key(state, node_fn.__name__, per_revision)
            if ledger.get(key):
                log.info("skip %s: already completed", node_fn.__name__)
                return state

            result = node_fn(state)       # may raise
            ledger.put(key)               # only reached on normal return
            return result
        return wrapped
    return decorator

def _ledger_key(state, node_name, per_revision):
    base = f"{state.thread_id}:{node_name}"
    return f"{base}:r{state.revision_count}" if per_revision else base

# --- loopable nodes: one execution per revision is legitimate ---
@idempotent(per_revision=True)
def gap_analyzer(state): ...

@idempotent(per_revision=True)
def evidence_enricher(state): ...

@idempotent(per_revision=True)
def evidence_merger(state): ...

@idempotent(per_revision=True)
def custom_test_runner(state): ...

@idempotent(per_revision=True)
def reviewer(state): ...

# --- terminal / single-shot nodes: replay would produce duplicates ---
@idempotent(per_revision=False)
def remediation_drafter(state): ...

@idempotent(per_revision=False)
def ui_mapper(state):
    status = state.ui_upload_status
    if status not in {"complete", "skipped", "not_applicable"}:
        # raise -> partial return discarded, ledger NOT written,
        # graph re-pauses at interrupt_before on resume
        raise ValueError(f"ui_upload_status={status!r} not allowed")
    return state

@idempotent(per_revision=False)
def jira_publisher(state): ...

@idempotent(per_revision=False)
def human_interrupt(state): ...
PropertyDetail
Raise = RetryCompletion is recorded only after a normal return. Raising discards the partial return; the node runs again on resume.
per_revision=TrueLoopable nodes (gap_analyzer, evidence_enricher, evidence_merger, custom_test_runner, reviewer) execute legitimately once per revision.
once-per-threadTerminal nodes (remediation_drafter, ui_mapper, jira_publisher, human_interrupt) should run at most once per thread — replay would file duplicate tickets.
ui_mapper Fails ClosedRaising on pending/unknown status skips ledger write, so the graph re-pauses at interrupt_before instead of pushing unverified state forward.