Pipeline Architecture

CMS Claims Data Comparison Pipeline — 6-step design

End-to-End Pipeline

The pipeline processes CMS Medicare claims data in six sequential steps. Each step receives a PipelineContext object containing the DuckDB connection, configuration, and accumulated results from prior steps. Each step returns a StepResult with status, metrics, and warnings.

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart LR
    subgraph INPUT["📁 Input"]
        OLD["Old System CSVs
(3 Beneficiary + 2 Carrier)"] NEW["New System CSVs
(3 Beneficiary + 2 Carrier)"] end S1["1
Receive
& Verify"] S2["2
Schema
Validation"] S3["3
Ingest
& Profile"] S4["4
Record
Matching"] S5["5
Compare
& Analyze"] S6["6
Report"] subgraph OUTPUT["📊 Output"] JSON["report_data.json"] HTML["comparison_report.html"] CSV["CSV Exports"] end INPUT --> S1 --> S2 --> S3 --> S4 --> S5 --> S6 --> OUTPUT style S1 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0 style S2 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0 style S3 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0 style S4 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0 style S5 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0 style S6 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0 style INPUT fill:#0f172a,stroke:#334155,color:#94a3b8 style OUTPUT fill:#0f172a,stroke:#4ade80,color:#94a3b8
1Receive & Verify

Discovers CSV files on disk (or from a StorageAdapter for cloud sources), verifies file integrity via checksums, handles zip extraction, and builds a complete file inventory with sizes and metadata. Separate discovery paths for old and new systems.

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
    subgraph SOURCES["Data Sources"]
        LOCAL["Local Filesystem
data/"] CLOUD["Cloud Storage
(S3 via StorageAdapter)"] NEWDIR["New System Dir
--new-data path/"] end DISCOVER["File Discovery
Glob: *Beneficiary*, *Carrier*"] ZIP["Zip Extraction
(if .zip detected)"] CHECKSUM["SHA-256 Checksum
Integrity Verification"] INVENTORY["Build Inventory
{name, path, size_mb, checksum}"] SOURCES --> DISCOVER DISCOVER --> ZIP ZIP --> CHECKSUM CHECKSUM --> INVENTORY INVENTORY --> RES["StepResult
• old_system.inventory
• new_system.inventory
• file_count"] style DISCOVER fill:#1e293b,stroke:#38bdf8,color:#e2e8f0 style ZIP fill:#1e293b,stroke:#fbbf24,color:#e2e8f0 style CHECKSUM fill:#1e293b,stroke:#4ade80,color:#e2e8f0 style INVENTORY fill:#1e293b,stroke:#38bdf8,color:#e2e8f0 style RES fill:#0f172a,stroke:#4ade80,color:#4ade80

Key Files

  • src/pipeline/step1_receive.py
  • src/storage.py — StorageAdapter protocol

Technologies

pathlib hashlib (SHA-256) zipfile StorageAdapter (S3/local)
2Schema Validation

Reads CSV headers without loading full data and validates them against expected schemas. Classifies each file as beneficiary or carrier claims based on header fingerprints. Detects missing columns, extra columns, and type mismatches early — before the expensive ingestion step.

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart LR
    INV["File Inventory
from Step 1"] --> READ["Read CSV Headers
(first row only)"] READ --> CLASS["Classify File Type
Beneficiary vs Carrier"] CLASS --> BENE["Beneficiary Schema
32 expected columns
DESYNPUF_ID, BENE_BIRTH_DT, ..."] CLASS --> CARR["Carrier Schema
141 expected columns
DESYNPUF_ID, CLM_ID, ..."] BENE --> CHECK["Column Validation
• Missing columns?
• Extra columns?
• Name mismatches?"] CARR --> CHECK CHECK --> RES["StepResult
• validated_count
• beneficiary_count
• carrier_count
• schema_errors[]"] style READ fill:#1e293b,stroke:#38bdf8,color:#e2e8f0 style CLASS fill:#1e293b,stroke:#fbbf24,color:#e2e8f0 style BENE fill:#1e293b,stroke:#a78bfa,color:#e2e8f0 style CARR fill:#1e293b,stroke:#a78bfa,color:#e2e8f0 style CHECK fill:#1e293b,stroke:#4ade80,color:#e2e8f0 style RES fill:#0f172a,stroke:#4ade80,color:#4ade80

Key Files

  • src/pipeline/step2_schema.py

Design Choice

  • Headers only — no full data scan
  • Fails fast before expensive DuckDB ingestion
  • Tolerates extra columns (forward-compatible)
3Ingest & Profile

The heaviest step. Loads all CSVs into DuckDB in-memory tables using read_csv_auto, adds derived columns (e.g., summary_year from filename), then profiles every column in every table — computing null rates, distinct counts, min/max/mean, and detecting statistical anomalies.

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
    subgraph INGEST["Ingestion"]
        CSV_OLD["Old System CSVs"] --> DUCK_OLD["DuckDB Tables
• beneficiary_summary
• carrier_claims"] CSV_NEW["New System CSVs"] --> DUCK_NEW["DuckDB Tables
• new_beneficiary_summary
• new_carrier_claims"] end subgraph ENRICH["Enrichment"] DUCK_OLD --> YEAR["Add summary_year
(extracted from filename)"] DUCK_NEW --> YEAR_N["Add summary_year
(extracted from filename)"] end subgraph PROFILE["Profiling (per table)"] COLS["For each column:
• null_count, null_pct
• distinct_count
• min, max, mean
• dtype detection"] ANOMALY["Anomaly Detection
• High null columns (>50%)
• Zero-variance columns
• Outlier distributions"] end ENRICH --> PROFILE PROFILE --> RES["StepResult
• profiles{table → TableProfile}
• anomalies[]
• row counts"] style DUCK_OLD fill:#1e293b,stroke:#38bdf8,color:#e2e8f0 style DUCK_NEW fill:#1e293b,stroke:#4ade80,color:#e2e8f0 style COLS fill:#1e293b,stroke:#a78bfa,color:#e2e8f0 style ANOMALY fill:#1e293b,stroke:#f87171,color:#e2e8f0 style RES fill:#0f172a,stroke:#4ade80,color:#4ade80

Key Files

  • src/pipeline/step3_ingest.py
  • src/ingest.py — DuckDB loading logic
  • src/profile.py — column-level profiling

Technologies

DuckDB read_csv_auto SQL aggregations In-memory analytics
4Record Matching

Joins old and new system tables on primary keys using FULL OUTER JOIN. Classifies every record as matched, old-only, or new-only. Also runs internal consistency validations (key integrity, temporal checks, demographic checks, financial reconciliation) on the old system data.

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
    subgraph TABLES["DuckDB Tables"]
        OLD_B["beneficiary_summary
343,644 rows"] NEW_B["new_beneficiary_summary
343,644 rows"] OLD_C["carrier_claims
4,741,335 rows"] NEW_C["new_carrier_claims
4,746,112 rows"] end subgraph KEYS["Key Identification"] K1["Beneficiary Key:
DESYNPUF_ID + summary_year"] K2["Claims Key:
CLM_ID"] end subgraph JOIN["FULL OUTER JOIN"] J1["CAST keys to VARCHAR
(handle type mismatches)"] J1 --> CLASSIFY["Classify Records
• COALESCE(old, new) not null → matched
• new IS NULL → old_only
• old IS NULL → new_only"] end subgraph VALIDATE["Internal Validation"] V1["Key Integrity (3 checks)"] V2["Temporal Consistency"] V3["Demographic Checks"] V4["Financial Reconciliation"] end TABLES --> KEYS --> JOIN TABLES --> VALIDATE JOIN --> MATCH_T["_match_beneficiary
_match_claims"] CLASSIFY --> RATES["Match Rates
Bene: 99.91%
Claims: 99.9%"] VALIDATE --> VRES["ValidationResults
7 passed, 4 failed"] style OLD_B fill:#1e293b,stroke:#38bdf8,color:#e2e8f0 style NEW_B fill:#1e293b,stroke:#4ade80,color:#e2e8f0 style OLD_C fill:#1e293b,stroke:#38bdf8,color:#e2e8f0 style NEW_C fill:#1e293b,stroke:#4ade80,color:#e2e8f0 style CLASSIFY fill:#1e293b,stroke:#fbbf24,color:#e2e8f0 style MATCH_T fill:#1e293b,stroke:#a78bfa,color:#e2e8f0 style RATES fill:#0f172a,stroke:#4ade80,color:#4ade80 style VRES fill:#0f172a,stroke:#f87171,color:#f87171

Key Files

  • src/pipeline/step4_match.py — join + classify
  • src/validate.py — 11 validation checks

Design Decisions

  • CAST to VARCHAR prevents type mismatch errors
  • FULL OUTER JOIN captures all records from both sides
  • Match tables stored for downstream comparison
  • Validation runs in parallel on old system data
5Compare & Analyze

For every matched record pair, compares each field value between old and new systems. Aggregates discrepancies by column and category. Computes financial divergence totals and builds a detailed discrepancy table with per-record diff information.

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
    MATCHED["Matched Records
from Step 4"] --> COMPARE["Field-by-Field Comparison
For each column:
old_value != new_value?"] COMPARE --> AGG["Aggregate Discrepancies
• By column (which fields differ?)
• By category (demographic, financial, etc.)
• By severity (count + percentage)"] COMPARE --> DETAIL["Discrepancy Detail Table
_discrepancy_detail
343,485 rows × 37 cols"] COMPARE --> FINREC["Financial Reconciliation
_financial_recon
262,017 rows × 11 cols"] AGG --> CHECKS["46 ComparisonResults
• check_name
• category
• table_pair
• metric_value"] subgraph METRICS["Key Metrics"] M1["446 beneficiaries mismatched (0.13%)"] M2["$35,624.71 total financial divergence"] M3["4,777 phantom claims in new system"] end CHECKS --> METRICS style MATCHED fill:#1e293b,stroke:#38bdf8,color:#e2e8f0 style COMPARE fill:#1e293b,stroke:#fbbf24,color:#e2e8f0 style AGG fill:#1e293b,stroke:#a78bfa,color:#e2e8f0 style DETAIL fill:#1e293b,stroke:#38bdf8,color:#e2e8f0 style FINREC fill:#1e293b,stroke:#4ade80,color:#e2e8f0 style CHECKS fill:#0f172a,stroke:#4ade80,color:#4ade80

Key Files

  • src/pipeline/step5_compare.py
  • src/compare.py — comparison logic

Output Tables

  • _discrepancy_detail — row-level diffs
  • _financial_recon — dollar-level reconciliation
  • 46 ComparisonResult objects
6Report

Generates the final deliverables. First builds a canonical report_data.json (the data layer), then renders the HTML report with client-side Plotly charts from the embedded JSON. Also exports raw diff tables as CSVs for external analysis.

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
    subgraph INPUTS["Inputs from Steps 1-5"]
        PROF["TableProfiles
(8 tables)"] VAL["ValidationResults
(11 checks)"] COMP["ComparisonResults
(46 checks)"] CTX["PipelineContext
(file inventories, match rates)"] CHART_Q["Chart Data Queries
(DuckDB → aggregations)"] end BUILD["build_report_data()
Assemble canonical dict"] INPUTS --> BUILD BUILD --> JSON["report_data.json
292 KB
(presentation-agnostic)"] BUILD --> RENDER["Jinja2 Template
+ Embedded chart_data JSON"] RENDER --> HTML["comparison_report.html
• Data Context section
• Executive Summary
• 6 Plotly charts (client-side)
• Sortable tables
• Collapsible profiles"] BUILD --> EXPORT["CSV Exports"] EXPORT --> CSV1["_discrepancy_detail.csv"] EXPORT --> CSV2["_financial_recon.csv"] EXPORT --> CSV3["_match_beneficiary.csv"] EXPORT --> CSV4["_match_claims.csv"] JSON -.->|"fetch()"|REACT["React Diff Viewer
(separate app)"] JSON -.->|"Any consumer"|OTHER["CLI / PDF / API
(future)"] style BUILD fill:#1e293b,stroke:#38bdf8,color:#e2e8f0 style JSON fill:#1e293b,stroke:#4ade80,color:#e2e8f0 style HTML fill:#1e293b,stroke:#fbbf24,color:#e2e8f0 style RENDER fill:#1e293b,stroke:#a78bfa,color:#e2e8f0 style REACT fill:#0f172a,stroke:#a78bfa,color:#a78bfa,stroke-dasharray: 5 5 style OTHER fill:#0f172a,stroke:#94a3b8,color:#94a3b8,stroke-dasharray: 5 5

Key Files

  • src/pipeline/step6_report.py
  • src/report.py — template + rendering

Architecture Insight

  • Data and presentation are separated
  • report_data.json is the canonical artifact
  • HTML is one presentation layer; React viewer is another
  • Charts render client-side from embedded JSON

Data Flow & Storage

All intermediate data lives in DuckDB in-memory tables. The pipeline context accumulates results from each step. Only the final report step writes to disk (JSON, HTML, CSVs).

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
    subgraph DISK["Disk (Input)"]
        F1["data/old_system/*.csv
(old system)"] F2["data/new_system/*.csv
(new system)"] end subgraph DUCKDB["DuckDB In-Memory"] T1["beneficiary_summary"] T2["carrier_claims"] T3["new_beneficiary_summary"] T4["new_carrier_claims"] T5["_match_beneficiary"] T6["_match_claims"] T7["_discrepancy_detail"] T8["_financial_recon"] end subgraph PYTHON["Python Objects (PipelineContext)"] P1["profiles: dict[str, TableProfile]"] P2["validations: list[ValidationResult]"] P3["comparisons: list[ComparisonResult]"] P4["results: dict (step outputs)"] end subgraph OUTPUT_DISK["Disk (Output)"] O1["reports/report_data.json"] O2["reports/comparison_report.html"] O3["reports/exports/*.csv"] end F1 --> T1 & T2 F2 --> T3 & T4 T1 & T2 & T3 & T4 --> T5 & T6 T5 & T6 --> T7 & T8 DUCKDB --> PYTHON PYTHON --> OUTPUT_DISK style T1 fill:#1e293b,stroke:#38bdf8,color:#e2e8f0 style T2 fill:#1e293b,stroke:#38bdf8,color:#e2e8f0 style T3 fill:#1e293b,stroke:#4ade80,color:#e2e8f0 style T4 fill:#1e293b,stroke:#4ade80,color:#e2e8f0 style T5 fill:#1e293b,stroke:#a78bfa,color:#e2e8f0 style T6 fill:#1e293b,stroke:#a78bfa,color:#e2e8f0 style T7 fill:#1e293b,stroke:#fbbf24,color:#e2e8f0 style T8 fill:#1e293b,stroke:#fbbf24,color:#e2e8f0 style O1 fill:#0f172a,stroke:#4ade80,color:#4ade80 style O2 fill:#0f172a,stroke:#fbbf24,color:#fbbf24 style O3 fill:#0f172a,stroke:#94a3b8,color:#94a3b8

6. AWS Cloud Architecture

Serverless deployment using Lambda container images, Step Functions orchestration, and S3 for storage + static site hosting. Status: scaffolded, not production-tested.

Pipeline Execution

Each pipeline step runs as a Lambda function (container image). Step Functions chains them with gate logic — if a step sets halted=true, execution stops immediately.

  • Memory: 1 GB (light steps) – 10 GB (ingest/compare)
  • Timeout: 5 min (light) – 15 min (heavy)
  • Storage: DuckDB in /tmp (10 GB ephemeral), snapshot to S3 between steps

Static Site Hosting

After the report step, docs/ and reports/ (including Parquet exports) are synced to an S3 static website bucket. The Parquet Viewer and SQL Explorer work identically — hyparquet and Squirreling run in the browser, loading .parquet files via HTTP GET from S3.

  • S3 Website: index.html, all interactive tools
  • CORS: enabled for cross-origin Parquet fetches
  • CloudFront: recommended for production (not yet configured)
flowchart TB
    USER["Browser"] -->|"GET"| S3SITE["S3 Static Website
docs + reports + exports"] USER -->|"POST /pipeline/start"| APIGW["API Gateway"] APIGW --> START["Lambda: Start
(trigger)"] START -->|"StartExecution"| SFN["Step Functions
State Machine"] subgraph PIPELINE["Pipeline Steps (Lambda Containers)"] L1["Step 1: Receive
S3 file listing"] L2["Step 2: Schema Validate
CSV header checks"] L3["Step 3: Ingest
DuckDB + httpfs"] L4["Step 4: Match
FULL OUTER JOIN"] L5["Step 5: Compare
field-level diffs"] L6["Step 6: Report
HTML + Parquet"] end SFN --> L1 --> L2 --> L3 --> L4 --> L5 --> L6 S3DATA["S3 Data Bucket
CSVs, DuckDB snapshots,
reports, exports"] L1 <-->|"read files"| S3DATA L3 <-->|"httpfs + snapshot"| S3DATA L4 <-->|"restore + snapshot"| S3DATA L5 <-->|"restore + snapshot"| S3DATA L6 -->|"write reports"| S3DATA L6 -->|"sync docs/"| S3SITE style USER fill:#0f172a,stroke:#38bdf8,color:#38bdf8 style APIGW fill:#1e293b,stroke:#f59e0b,color:#f59e0b style START fill:#1e293b,stroke:#f59e0b,color:#e2e8f0 style SFN fill:#1e293b,stroke:#a78bfa,color:#a78bfa style S3DATA fill:#1e293b,stroke:#4ade80,color:#4ade80 style S3SITE fill:#1e293b,stroke:#38bdf8,color:#38bdf8 style L1 fill:#0f172a,stroke:#fb923c,color:#e2e8f0 style L2 fill:#0f172a,stroke:#fb923c,color:#e2e8f0 style L3 fill:#0f172a,stroke:#fb923c,color:#e2e8f0 style L4 fill:#0f172a,stroke:#fb923c,color:#e2e8f0 style L5 fill:#0f172a,stroke:#fb923c,color:#e2e8f0 style L6 fill:#0f172a,stroke:#fb923c,color:#e2e8f0

Implementation Status

Component Status Notes
src/pipeline/step1–6 ✅ Implemented Shared with local — same code
src/adapters/aws.py ✅ Implemented S3 read/write/list via boto3
cloud/handlers.py ✅ Implemented Lambda entry points + API trigger
infra/template.yaml ✅ Implemented SAM: S3, Lambda ×7, Step Functions, API GW, static site
S3 static site for docs ⚠️ Scaffolded SAM resource defined; sync not yet wired
DuckDB S3 snapshotting ⚠️ Scaffolded Handler code exists; not tested on real AWS
End-to-end cloud test ❌ Not tested Not deployed or run on real AWS
CloudFront CDN ❌ Not implemented Recommended for production
CI/CD + Monitoring ❌ Not implemented No GitHub Actions or CloudWatch alarms

CMS Claims Comparison Pipeline — Architecture Diagrams

Python DuckDB Jinja2 Plotly.js Mermaid.js