Data Engineering8 min read

Building a Multi-Agent Data Wrangling Pipeline

Making the data cleaning process auditable, configurable, and composable with a DAG-based executor.

PythonStreamlitPydanticData EngineeringMulti-Agent

Most data quality tools are monoliths: you feed in a CSV and get back a cleaned version, with no visibility into what changed or why. I wanted something that makes the cleaning process auditable, configurable, and composable — and that forced me to think about data transformation as a graph problem, not just a list of if-statements.

Why multi-agent for this problem?

The short answer: the problem decomposes naturally into specialized concerns that shouldn't bleed into each other. Profiling a dataset requires different logic than generating transformation candidates, and generating candidates requires different logic than validating that those transformations are safe to apply. Cramming that into a single pipeline function creates a mess that's hard to test and harder to change.

So I structured it as five distinct agents: DataProfiler, CandidateGenerator, ValidationService, QualityScorer, and RankingService.

Each one owns a single contract, takes typed inputs, and returns typed outputs. The AgentCoordinator wires them together, and the PipelineManager handles iteration, state, and failure. The result is a system where you can swap in a different ranking policy without touching validation logic.

Transformations as a DAG

One non-obvious design choice was modeling transformations as a DAG (Directed Acyclic Graph) rather than an ordered list. The reason: some transformations have implicit ordering constraints.

Why ordering matters: You should drop duplicates before normalizing, and you should fill missing values before encoding categoricals — otherwise you risk introducing new nulls or creating spurious categories.

Making the dependency graph explicit means the executor can respect those constraints without baking ordering assumptions into the application logic itself.

Reversibility

Transformations also support reversibility. Every applied transformation can be undone, which turned out to be important for the iterative loop: the pipeline runs up to N iterations, and if a candidate transformation reduces the quality score, it gets rolled back instead of committed.

Quality scoring that's actually meaningful

Most quality metrics I've seen are single-dimensional (e.g. "percent non-null"). Here the scorer evaluates four orthogonal dimensions:

Completeness

Measures missing values

Validity

Checks type correctness

Uniqueness

Evaluates duplicates

Consistency

Checks value distributions

Each one is computed independently and then combined into a composite score using configurable weights. This means you can tell the pipeline to optimize for a completeness-heavy score if you're dealing with survey data, or a uniqueness-heavy score if you're preparing training data for a model.

The ranking system then uses those scores to order transformation candidates by expected improvement. The default policy is improvement-based, but it's pluggable — the RankingService takes any object that conforms to the policy interface.

Failure recovery as a first-class concern

Data is messy and transformations can fail — a type cast on a column with mixed content, an outlier removal that leaves an empty frame, a normalization on a zero-variance column. Instead of letting exceptions bubble up and kill the run, the pipeline has four configurable recovery strategies:

SKIPRETRYABORTFALLBACK

This is set per-run via config, which means a production batch job and an exploratory notebook session can use the same pipeline with different tolerance for failure.

Leakage Detector: The validation layer checks whether a proposed transformation would create target-correlated features or expose information that shouldn't be available at inference time. This is the kind of thing that's easy to skip in a quick script but shows up as an embarrassing bug in production.

What I'd do differently

Execution Separation

The Streamlit interface is intentionally thin, but state management gets awkward with long-running pipeline executions. Ideally, I'd separate backend/frontend more aggressively (e.g., async job queue with lightweight API) rather than tying computation lifecycle to a session.

Execution Engine

The pipeline was load-tested up to 200K rows × 42 columns, enough for its intended use. At significantly larger scale, transformation execution should be pushed to Polars or Spark — pandas stops being the right tool at that point.

Want to try it out?