Multi-Agent Compliance Orchestration Platform
A multi-agent compliance orchestration platform built on LangGraph and CrewAI. It took a brittle linear 8-stage pipeline and turned it into a resilient, self-healing distributed system with bounded loops, checkpointed pause/resume, and a terminal human-in-the-loop gate that refuses to file tickets when the system can't tell a real gap from an evidence-collection issue.

Problem
Linear compliance automation scripts break the moment one stage fails. No recovery, no branching, no way to escalate low-confidence output to a human. DynamoDB won't hold the artifacts (400KB item cap), container restarts produce duplicate Jira tickets, and the pipeline has no way to distinguish a true compliance gap from missing evidence or a tooling problem, so it either files false tickets or silently drops findings.
Solution
Rebuilt the pipeline as a stateful LangGraph workflow with CrewAI agents at the reasoning nodes. State is durable through a LangGraph checkpointer (DynamoDB in prod, MemorySaver in smoke tests) and a Claim Check serializer that offloads payloads over 50KB to S3. A deterministic Reviewer node grades every finding; low confidence routes to an evidence_enricher that loops back through gap_analyzer until convergence or max_revisions. Max revisions triggers a terminal human_interrupt that writes ESCALATION REQUIRED to the log and refuses to call the Jira publisher.
Impact
- →Replaced a linear 8-stage script with a self-healing graph. Failed nodes retry from the last checkpoint; the @idempotent decorator prevents double-execution on resume.
- →One graph runs FEDRAMP_20X and SOC2_TYPE2 today, with ISO 27001 and HIPAA wired as stub profiles. Switching frameworks is a FrameworkProfile instance, not a pipeline fork.
- →Confidence-gated Reviewer loop (threshold 0.75) catches low-confidence findings and dispatches evidence_enricher to fetch only the missing signals, then re-enters gap_analyzer. Bounded by max_revisions so the loop can't run forever.
- →human_interrupt is terminal. No LLM call, no Jira write, no external side effects. Sets human_approval_status="pending" and ends the run. Zero false-positive tickets when evidence is incomplete.
- →ui_mapper fails closed. Any status outside {complete, skipped, not_applicable} raises; the graph re-pauses at the interrupt_before checkpoint instead of pushing unverified state to jira_publisher.
- →Multi-megabyte artifacts no longer crash DynamoDB writes. 50KB threshold moves oversized payloads to S3 transparently; pointers live in the state record.
Architecture
- 01LangGraph defines the directed graph. Nodes are CrewAI agents or deterministic functions.
- 02prompt_init validates the FrameworkProfile and logs the run. gap_analyzer produces ControlFinding objects.
- 03evidence_merger combines AI judgment with hard evidence signals; custom_test_runner emits test definitions per gap.
- 04Reviewer computes enrichment_confidence, missing_evidence, and revision_count. route_after_review branches from there.
- 05Low confidence routes to evidence_enricher. Enrichments append to raw_evidence.enrichments[] and the graph re-enters gap_analyzer.
- 06Convergence routes to remediation_drafter, which builds RemediationPayload markdown ticket bodies.
- 07Graph pauses before ui_mapper via interrupt_before. Operator uploads evidence, sets ui_upload_status="complete", resumes.
- 08ui_mapper validates status (fails closed on pending or anything outside the allowed set), then jira_publisher posts deduped tickets.
- 09If revision_count hits max_revisions before convergence, route_after_review dispatches to human_interrupt, which is terminal and files nothing.
- 10Every state read/write goes through the Claim Check serializer. @idempotent wraps every node for safe pause/resume.
Capabilities
- ·Stateful multi-agent graph orchestration with LangGraph checkpointing (DynamoDB in prod, MemorySaver for smoke tests)
- ·Confidence-gated Reviewer node with 0.75 threshold and bounded self-healing loop via max_revisions
- ·evidence_enricher that fetches only missing evidence per reviewer feedback and re-enters gap_analyzer
- ·FrameworkProfile strategy with FEDRAMP_20X and SOC2_TYPE2 active; ISO 27001 and HIPAA stubbed
- ·Claim Check state serialization. Payloads over 50KB offloaded to S3, S3Ref pointer held in DynamoDB
- ·@idempotent decorator with per_revision=True for loopable nodes and once-per-thread for terminal nodes
- ·ui_mapper fails closed. Pending or unknown status raises, graph re-pauses at interrupt_before
- ·human_interrupt as terminal escalation. ESCALATION REQUIRED log, no LLM, no Jira write, no side effects
- ·Pydantic schema enforcement at every graph edge
- ·Terraform-provisioned ECS Fargate runtime
Stack
Technical Deep Dive
Architecture internals and annotated code from the production system.
Architecture Overview
LangGraph owns control flow. CrewAI owns reasoning inside the nodes. Between them sits a Claim Check state serializer that makes DynamoDB's 400KB item cap disappear by offloading payloads over 50KB to S3. The Reviewer is the self-healing pivot. It grades every finding against a deterministic rubric, and anything under a 0.75 confidence threshold routes into an evidence_enricher that fetches only the specific missing signals and re-enters gap_analyzer. If the loop can't converge before max_revisions, route_after_review dispatches to human_interrupt, which is terminal, calls no LLM, and refuses to file tickets.
Key Architectural Decisions
Claim Check Pattern for State Serialization
DynamoDB caps items at 400KB. Compliance artifacts (SSP snippets, evidence screenshots, full audit narratives) routinely blow past that. A naive serializer crashes on write. Instead, every state field runs through a serializer that compares its size against a 50KB threshold. Under 50KB it stays inline in DynamoDB. Over 50KB it gets written to S3, and an S3Ref pointer (bucket, key, sha256) takes its place in the state record. Reads invert the process transparently. The graph code never knows whether a field lives inline or behind a pointer.
Bounded Self-Healing Loop
Every CrewAI finding gets graded by a deterministic Reviewer. The Reviewer applies a fixed rubric (schema completeness, evidence citation, no unsupported claims) and emits enrichment_confidence plus a list of specific missing_evidence items. Confidence under 0.75 routes to evidence_enricher, which fetches only the flagged missing signals, appends them to raw_evidence.enrichments[], and the graph loops back into gap_analyzer with the new evidence. The loop is bounded by max_revisions. It cannot spin forever.
Terminal Human-in-the-Loop Escalation
When the self-healing loop can't converge (revision_count >= max_revisions), route_after_review dispatches to human_interrupt. This node is terminal by design. It writes ESCALATION REQUIRED to the log with structured key=value metadata (thread_id, client, framework, revision_count, controls) plus per-control status/confidence details. It does not call jira_publisher. It does not invoke an LLM. It does not touch external services. It sets human_approval_status="pending" and ends the run. The rule is: if the system cannot distinguish a real compliance gap from an evidence-collection or tooling problem, it files nothing rather than risk a false-positive ticket.
ui_mapper Fails Closed
The graph pauses before ui_mapper via interrupt_before. An operator uploads evidence manually and updates state with ui_upload_status="complete". On resume, ui_mapper validates. Anything outside {"complete", "skipped", "not_applicable"} raises ValueError — including the common "pending" case where the operator resumed early. LangGraph discards the partial return (because the @idempotent decorator only records completion on normal return), and the graph re-pauses at the checkpoint. The result: unverified state never reaches jira_publisher.
FrameworkProfile Strategy Pattern
FrameworkProfile is the pluggable layer that decouples graph orchestration from regulatory business logic. It controls the analyst backstory and standards, the assessor persona, the writer's citation style, and framework-native reporting vocabulary. FEDRAMP_20X and SOC2_TYPE2 are live today; ISO 27001 and HIPAA ship as future-ready stubs. Adding a new framework is a new profile instance, not a pipeline rewrite.
@idempotent Decorator with per_revision Semantics
Every node is wrapped in @idempotent, which records completion only after a normal return. If a node raises, LangGraph discards the partial return and the node runs again on resume. Two modes: per_revision=True for loopable nodes (gap_analyzer, evidence_enricher, evidence_merger, custom_test_runner, reviewer) where each revision is a legitimate new execution; once-per-thread for terminal or single-shot nodes (remediation_drafter, ui_mapper, jira_publisher, human_interrupt) where replay would produce duplicates. The semantics line up exactly with LangGraph's checkpointer behavior, so pause/resume and raise-on-validation-failure both stay correct.
Checkpointer Strategy
LangGraph's checkpointer persists state after every node transition. In production, the checkpointer is DynamoDB-backed so a container can die mid-run and resume on the next task. In smoke tests, MemorySaver gives a deterministic in-process equivalent that runs in milliseconds. Swapping between them is a constructor argument. The Claim Check serializer sits in front of both.
Pydantic at Every Edge
Graph edges in LangGraph pass whatever dict you hand them. That is a production liability. Every edge in this graph is typed with a Pydantic model, and state transitions are validated on read and write. A malformed finding from a CrewAI agent fails fast at the edge instead of propagating silently three nodes downstream — and when it fails, @idempotent ensures the node reruns on resume instead of being marked complete.
ECS Fargate over Lambda
LangGraph runs are stateful and often exceed Lambda's 15-minute ceiling when an interrupt_before pause is in play. ECS Fargate with a DynamoDB-backed checkpointer handles long-running graphs without the split-brain you get from chunking a single run across Lambda invocations. Terraform provisions the task definition, the VPC, and the IAM roles.
Code Showcase 1
Claim Check State Serializer
The serializer wraps every DynamoDB read and write. Fields over the size threshold get offloaded to S3 and replaced with a typed pointer. The graph code stays agnostic. Puts never crash on oversized artifacts, and hot-path reads avoid the S3 round trip unless they actually need the big payload.
from pydantic import BaseModel
import hashlib, json, boto3
CLAIM_CHECK_THRESHOLD = 50_000 # bytes; 50KB keeps inline state well under 400KB DDB cap
class S3Ref(BaseModel):
"""Pointer that replaces oversized fields in DynamoDB state."""
kind: str = "s3_ref"
bucket: str
key: str
sha256: str
size_bytes: int
def serialize_field(value, run_id: str, field_name: str) -> dict | S3Ref:
payload = json.dumps(value).encode("utf-8")
if len(payload) < CLAIM_CHECK_THRESHOLD:
return value # stays inline in DynamoDB
digest = hashlib.sha256(payload).hexdigest()
key = f"state/{run_id}/{field_name}/{digest}.json"
s3.put_object(Bucket=STATE_BUCKET, Key=key, Body=payload)
return S3Ref(
bucket=STATE_BUCKET,
key=key,
sha256=digest,
size_bytes=len(payload),
)
def hydrate_field(value):
if isinstance(value, dict) and value.get("kind") == "s3_ref":
obj = s3.get_object(Bucket=value["bucket"], Key=value["key"])
return json.loads(obj["Body"].read())
return value| Property | Detail |
|---|---|
| Transparent to Graph Code | Nodes read and write state fields the same way whether they live inline or behind an S3Ref. The serializer decides at write time. |
| 50KB Threshold | Small threshold keeps inline DynamoDB state lean so total item size stays well under the 400KB cap even when many fields coexist. |
| Content-Addressed S3 Keys | Keys include the sha256 digest. Identical payloads across runs deduplicate naturally in S3. |
| Hot Path Stays Inline | Small fields never hit S3. Sub-second reads are preserved for the majority of state that fits inline. |
Code Showcase 2
Bounded Self-Healing Loop with Terminal Escalation
The Reviewer grades every finding deterministically. route_after_review is pure Python and picks one of three destinations: loop back through evidence_enricher for low confidence, proceed to remediation_drafter on convergence, or escalate to a terminal human_interrupt when the loop can't converge before max_revisions. No LLM picks the route.
from langgraph.graph import StateGraph, END
CONFIDENCE_THRESHOLD = 0.75
def reviewer_node(state: ComplianceState) -> ComplianceState:
score = grade_finding(state.latest_finding) # deterministic rubric, no LLM
state.review = ReviewResult(
finding_id=state.latest_finding.id,
confidence=score.confidence,
missing_evidence=score.missing_evidence,
schema_ok=score.schema_ok,
)
return state
def route_after_review(state: ComplianceState) -> str:
review = state.review
if state.revision_count >= state.max_revisions:
return "human_interrupt" # terminal, no tickets
if not review.schema_ok:
return "evidence_enricher"
if review.confidence < CONFIDENCE_THRESHOLD:
return "evidence_enricher"
return "remediation_drafter"
def evidence_enricher_node(state: ComplianceState) -> ComplianceState:
for gap in state.review.missing_evidence:
state.raw_evidence.enrichments.append(fetch_targeted(gap))
state.revision_count += 1
return state
def human_interrupt_node(state: ComplianceState) -> ComplianceState:
# Terminal. No LLM, no Jira, no external calls.
unresolved = [c for c in state.controls if c.status != "complete"]
log.warning(
"ESCALATION REQUIRED thread_id=%s client=%s framework=%s "
"revision_count=%d controls=%s",
state.thread_id, state.client, state.framework,
state.revision_count, [c.id for c in unresolved],
)
state.human_approval_status = "pending"
return state # graph ends here
graph = StateGraph(ComplianceState)
graph.add_node("reviewer", reviewer_node)
graph.add_node("evidence_enricher", evidence_enricher_node)
graph.add_node("human_interrupt", human_interrupt_node)
graph.add_conditional_edges("reviewer", route_after_review, {
"evidence_enricher": "evidence_enricher",
"remediation_drafter": "remediation_drafter",
"human_interrupt": "human_interrupt",
})
graph.add_edge("evidence_enricher", "gap_analyzer") # loop back with new evidence
graph.add_edge("human_interrupt", END) # terminal| Property | Detail |
|---|---|
| Deterministic Routing | route_after_review is pure Python. No LLM gets to decide whether to loop, proceed, or escalate. |
| Targeted Evidence Fetch | evidence_enricher only fetches the specific signals the Reviewer flagged as missing, then appends to raw_evidence.enrichments[]. |
| Bounded Loop | revision_count increments on every enrichment. Hitting max_revisions routes to human_interrupt so the graph can't spin forever. |
| Terminal Escalation | human_interrupt logs ESCALATION REQUIRED, sets human_approval_status="pending", and ends. Zero LLM calls, zero Jira writes. |
Code Showcase 3
FrameworkProfile Strategy Pattern
The graph itself is framework-agnostic. A FrameworkProfile strategy object supplies analyst backstory and standards, assessor persona, writer citation style, and framework-native reporting vocabulary. FEDRAMP_20X and SOC2_TYPE2 are live; ISO 27001 and HIPAA are future-ready stubs so the extension point is already compiled.
from abc import ABC, abstractmethod
from enum import Enum
class Framework(str, Enum):
FEDRAMP_20X = "FEDRAMP_20X"
SOC2_TYPE2 = "SOC2_TYPE2"
ISO_27001 = "ISO_27001" # stub
HIPAA = "HIPAA" # stub
class FrameworkProfile(ABC):
framework: Framework
@abstractmethod
def analyst_backstory(self) -> str: ...
@abstractmethod
def standards_corpus(self) -> list[str]: ...
@abstractmethod
def assessor_persona(self) -> str: ...
@abstractmethod
def writer_citation_style(self) -> str: ...
@abstractmethod
def reporting_vocabulary(self) -> dict[str, str]: ...
class FedRAMP20xProfile(FrameworkProfile):
framework = Framework.FEDRAMP_20X
def analyst_backstory(self) -> str:
return "FedRAMP 20x 3PAO assessor specializing in KSI, ADS, and CCM families."
def writer_citation_style(self) -> str:
return "KSI-<family>-<number>" # e.g. KSI-IAM-02
class SOC2TypeIIProfile(FrameworkProfile):
framework = Framework.SOC2_TYPE2
def analyst_backstory(self) -> str:
return "AICPA SOC 2 Type II practitioner focused on Trust Services Criteria."
def writer_citation_style(self) -> str:
return "TSC <criterion>.<point>" # e.g. CC6.1
class ISO27001Profile(FrameworkProfile): # stub — wired for future
framework = Framework.ISO_27001
class HIPAAProfile(FrameworkProfile): # stub — wired for future
framework = Framework.HIPAA
def build_graph(profile: FrameworkProfile) -> StateGraph:
graph = StateGraph(ComplianceState)
graph.add_node("gap_analyzer", gap_analyzer_factory(profile))
graph.add_node("reviewer", reviewer_factory(profile))
# ...same graph topology for every framework
return graph.compile(checkpointer=DynamoDBSaver())| Property | Detail |
|---|---|
| One Graph, Many Frameworks | Graph topology is identical across FEDRAMP_20X, SOC2_TYPE2, and the stubs. Only profile-supplied data differs. |
| Framework-Native Vocabulary | FedRAMP emits KSI-IAM-02 style citations; SOC 2 emits CC6.1 style. The writer pulls its style from the profile. |
| Two Live, Two Stubbed | FEDRAMP_20X and SOC2_TYPE2 run in production today. ISO 27001 and HIPAA are compiled stubs ready for content. |
| Extension Point | Adding a new framework is a new FrameworkProfile subclass. Zero graph changes. |
Code Showcase 4
@idempotent Decorator with per_revision Semantics
Every node is wrapped in @idempotent. Completion is recorded only after a normal return — if a node raises, LangGraph discards the partial return and the node runs again on resume. per_revision=True is set on loopable nodes that should legitimately execute once per revision. Terminal nodes use once-per-thread to prevent replay duplicates.
from functools import wraps
def idempotent(per_revision: bool = False):
"""Record completion only on normal return. Raises retry on resume."""
def decorator(node_fn):
@wraps(node_fn)
def wrapped(state: ComplianceState) -> ComplianceState:
key = _ledger_key(state, node_fn.__name__, per_revision)
if ledger.get(key):
log.info("skip %s: already completed", node_fn.__name__)
return state
result = node_fn(state) # may raise
ledger.put(key) # only reached on normal return
return result
return wrapped
return decorator
def _ledger_key(state, node_name, per_revision):
base = f"{state.thread_id}:{node_name}"
return f"{base}:r{state.revision_count}" if per_revision else base
# --- loopable nodes: one execution per revision is legitimate ---
@idempotent(per_revision=True)
def gap_analyzer(state): ...
@idempotent(per_revision=True)
def evidence_enricher(state): ...
@idempotent(per_revision=True)
def evidence_merger(state): ...
@idempotent(per_revision=True)
def custom_test_runner(state): ...
@idempotent(per_revision=True)
def reviewer(state): ...
# --- terminal / single-shot nodes: replay would produce duplicates ---
@idempotent(per_revision=False)
def remediation_drafter(state): ...
@idempotent(per_revision=False)
def ui_mapper(state):
status = state.ui_upload_status
if status not in {"complete", "skipped", "not_applicable"}:
# raise -> partial return discarded, ledger NOT written,
# graph re-pauses at interrupt_before on resume
raise ValueError(f"ui_upload_status={status!r} not allowed")
return state
@idempotent(per_revision=False)
def jira_publisher(state): ...
@idempotent(per_revision=False)
def human_interrupt(state): ...| Property | Detail |
|---|---|
| Raise = Retry | Completion is recorded only after a normal return. Raising discards the partial return; the node runs again on resume. |
| per_revision=True | Loopable nodes (gap_analyzer, evidence_enricher, evidence_merger, custom_test_runner, reviewer) execute legitimately once per revision. |
| once-per-thread | Terminal nodes (remediation_drafter, ui_mapper, jira_publisher, human_interrupt) should run at most once per thread — replay would file duplicate tickets. |
| ui_mapper Fails Closed | Raising on pending/unknown status skips ledger write, so the graph re-pauses at interrupt_before instead of pushing unverified state forward. |