Skip to main content

COCO Lakehouse Demo — Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Build the lakehouse/ Python package and the COCO-Caption Experiment #0 demo (script + notebook) that exercises every layer of the full architecture at 1,000-sample scale.

Architecture: A layered Python package (catalogsyncquerystreamtoolsvisualize) backed by DuckDB/DuckLake for catalog management, MinIO for object storage, and HF datasets for the streaming egress interface. Pydantic-AI registers the query/sample functions as agent tools. W&B is an optional loose adapter (no W&B = everything still works). Rerun visualises spatial/temporal samples.

Tech Stack: Python 3.12, DuckDB 1.3+, DuckLake (DuckDB extension), s3fs, huggingface-hub, datasets, pydantic-ai, wandb, rerun-sdk, moto (test mocking), pytest

Design doc: docs/plans/2026-02-22-coco-lakehouse-demo-design.md


Prerequisites

Docker Compose services must be running for integration tests:

docker-compose up -d
# MinIO: http://localhost:29000 (key=minio, secret=minio123)
# PostgreSQL: localhost:55432 (db=ducklake, user=ducklake, pass=123456)

Unit tests use moto (S3 mock) and a tmp file catalog — no Docker needed.


Task 1: Add Dependencies

Files:

  • Modify: pyproject.toml

Step 1: Add the six new runtime dependencies

Edit the dependencies list in pyproject.toml to add after "duckdb>=1.3.0":

    "huggingface-hub>=0.23.0",
"datasets>=2.20.0",
"wandb>=0.17.0",
"rerun-sdk>=0.16.0",
"nats-py>=2.7.0",
"pydantic-ai>=0.0.14",
"moto[s3]>=5.0.0",
"boto3>=1.35.0",
"Pillow>=10.0.0",

Step 2: Sync the environment

uv sync

Expected: lock file updated, packages installed with no errors.

Step 3: Verify key imports

uv run python -c "import duckdb, datasets, wandb, rerun, pydantic_ai, nats; print('all imports OK')"

Expected: all imports OK

Step 4: Commit

git add pyproject.toml uv.lock
git commit -m "chore: add huggingface, wandb, rerun, pydantic-ai, nats dependencies"

Task 2: Package Skeleton

Files:

  • Create: lakehouse/__init__.py
  • Create: lakehouse/catalog.py
  • Create: lakehouse/sync.py
  • Create: lakehouse/query.py
  • Create: lakehouse/stream.py
  • Create: lakehouse/tools.py
  • Create: lakehouse/visualize.py
  • Create: tests/test_catalog.py
  • Create: tests/test_sync.py
  • Create: tests/test_query.py
  • Create: tests/test_stream.py
  • Create: tests/test_tools.py
  • Create: tests/test_visualize.py
  • Create: experiments/__init__.py
  • Create: notebooks/ (directory only)

Step 1: Create directory structure

mkdir -p lakehouse experiments notebooks

Step 2: Create lakehouse/__init__.py

from lakehouse.catalog import LakehouseCatalog, Experiment, SimulationRun
from lakehouse.sync import sync_from_hf
from lakehouse.query import QueryResult
from lakehouse.stream import as_iterable_dataset
from lakehouse.visualize import visualize

__all__ = [
"LakehouseCatalog",
"Experiment",
"SimulationRun",
"sync_from_hf",
"QueryResult",
"as_iterable_dataset",
"visualize",
]

Step 3: Create all other files as empty stubs

Each file should contain only:

# stub — implemented in subsequent tasks

Step 4: Create experiments/__init__.py as empty.

Step 5: Verify package is importable

uv run python -c "import lakehouse; print('package OK')"

Expected: package OK

Step 6: Commit

git add lakehouse/ experiments/ notebooks/ tests/test_catalog.py tests/test_sync.py tests/test_query.py tests/test_stream.py tests/test_tools.py tests/test_visualize.py
git commit -m "chore: scaffold lakehouse package and test files"

Task 3: Catalog — Experiment Schema and DuckLake Attach

Files:

  • Modify: lakehouse/catalog.py
  • Modify: tests/test_catalog.py

What this builds

LakehouseCatalog wraps a DuckDB connection with a DuckLake catalog attached. It owns the experiments and simulation_runs tables and provides typed methods for registering and updating records.

Step 1: Write failing tests in tests/test_catalog.py

import pytest
import tempfile
import os
from pathlib import Path
from lakehouse.catalog import LakehouseCatalog, Experiment, SimulationRun


@pytest.fixture
def catalog(tmp_path):
db_path = str(tmp_path / "test.ducklake")
# file-based DuckLake, no S3 needed for catalog tests
cat = LakehouseCatalog(catalog_path=db_path, data_path=str(tmp_path / "data"))
yield cat
cat.close()


def test_register_experiment(catalog):
exp = Experiment(experiment_id="exp-001", project="test-project", description="unit test")
catalog.register_experiment(exp)
row = catalog.get_experiment("exp-001")
assert row.experiment_id == "exp-001"
assert row.project == "test-project"


def test_register_experiment_idempotent(catalog):
exp = Experiment(experiment_id="exp-dup", project="proj")
catalog.register_experiment(exp)
catalog.register_experiment(exp) # second call must not raise
row = catalog.get_experiment("exp-dup")
assert row is not None


def test_register_simulation_run(catalog):
catalog.register_experiment(Experiment(experiment_id="exp-001", project="proj"))
catalog.register_run(
SimulationRun(
run_id="run-001",
experiment_id="exp-001",
sim_index=0,
config={"lr": 0.001},
s3_prefix="s3://warehouse/exp-001/sim_0/",
)
)
run = catalog.get_run("run-001")
assert run.sim_index == 0
assert run.status == "started"
assert run.config["lr"] == pytest.approx(0.001)


def test_complete_run(catalog):
catalog.register_experiment(Experiment(experiment_id="exp-001", project="proj"))
catalog.register_run(
SimulationRun(run_id="run-001", experiment_id="exp-001", sim_index=0,
config={}, s3_prefix="s3://warehouse/exp-001/sim_0/")
)
catalog.complete_run("run-001")
run = catalog.get_run("run-001")
assert run.status == "completed"
assert run.completed_at is not None


def test_list_runs_for_experiment(catalog):
catalog.register_experiment(Experiment(experiment_id="exp-002", project="proj"))
for k in range(3):
catalog.register_run(
SimulationRun(run_id=f"run-{k}", experiment_id="exp-002",
sim_index=k, config={}, s3_prefix=f"s3://w/exp-002/sim_{k}/")
)
runs = catalog.list_runs("exp-002")
assert len(runs) == 3
assert {r.sim_index for r in runs} == {0, 1, 2}

Step 2: Run to verify they fail

uv run pytest tests/test_catalog.py -v

Expected: ImportError or AttributeErrorLakehouseCatalog not implemented.

Step 3: Implement lakehouse/catalog.py

from __future__ import annotations

import json
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

import duckdb


@dataclass
class Experiment:
experiment_id: str
project: str
description: str = ""


@dataclass
class SimulationRun:
run_id: str
experiment_id: str
sim_index: int
config: dict
s3_prefix: str
status: str = "started"
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None


class LakehouseCatalog:
def __init__(self, catalog_path: str, data_path: str) -> None:
self._conn = duckdb.connect()
self._conn.execute(
f"ATTACH 'ducklake:{catalog_path}' AS lh (DATA_PATH '{data_path}')"
)
self._conn.execute("USE lh")
self._init_schema()

def _init_schema(self) -> None:
self._conn.execute("""
CREATE TABLE IF NOT EXISTS experiments (
experiment_id TEXT PRIMARY KEY,
project TEXT NOT NULL,
description TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT now()
)
""")
self._conn.execute("""
CREATE TABLE IF NOT EXISTS simulation_runs (
run_id TEXT PRIMARY KEY,
experiment_id TEXT NOT NULL REFERENCES experiments(experiment_id),
sim_index INTEGER NOT NULL,
config JSON DEFAULT '{}',
status TEXT DEFAULT 'started',
s3_prefix TEXT,
started_at TIMESTAMPTZ DEFAULT now(),
completed_at TIMESTAMPTZ
)
""")

def register_experiment(self, exp: Experiment) -> None:
self._conn.execute(
"""
INSERT INTO experiments (experiment_id, project, description)
VALUES (?, ?, ?)
ON CONFLICT (experiment_id) DO NOTHING
""",
[exp.experiment_id, exp.project, exp.description],
)

def get_experiment(self, experiment_id: str) -> Optional[Experiment]:
row = self._conn.execute(
"SELECT experiment_id, project, description FROM experiments WHERE experiment_id = ?",
[experiment_id],
).fetchone()
if row is None:
return None
return Experiment(experiment_id=row[0], project=row[1], description=row[2])

def register_run(self, run: SimulationRun) -> None:
self._conn.execute(
"""
INSERT INTO simulation_runs
(run_id, experiment_id, sim_index, config, s3_prefix)
VALUES (?, ?, ?, ?, ?)
""",
[run.run_id, run.experiment_id, run.sim_index,
json.dumps(run.config), run.s3_prefix],
)

def get_run(self, run_id: str) -> Optional[SimulationRun]:
row = self._conn.execute(
"""
SELECT run_id, experiment_id, sim_index, config, status,
s3_prefix, started_at, completed_at
FROM simulation_runs WHERE run_id = ?
""",
[run_id],
).fetchone()
if row is None:
return None
return SimulationRun(
run_id=row[0], experiment_id=row[1], sim_index=row[2],
config=json.loads(row[3]) if isinstance(row[3], str) else row[3],
status=row[4], s3_prefix=row[5],
started_at=row[6], completed_at=row[7],
)

def complete_run(self, run_id: str) -> None:
self._conn.execute(
"UPDATE simulation_runs SET status='completed', completed_at=now() WHERE run_id=?",
[run_id],
)

def list_runs(self, experiment_id: str) -> list[SimulationRun]:
rows = self._conn.execute(
"""
SELECT run_id, experiment_id, sim_index, config, status,
s3_prefix, started_at, completed_at
FROM simulation_runs WHERE experiment_id = ?
ORDER BY sim_index
""",
[experiment_id],
).fetchall()
return [
SimulationRun(
run_id=r[0], experiment_id=r[1], sim_index=r[2],
config=json.loads(r[3]) if isinstance(r[3], str) else r[3],
status=r[4], s3_prefix=r[5], started_at=r[6], completed_at=r[7],
)
for r in rows
]

def close(self) -> None:
self._conn.close()

Step 4: Run tests and verify they pass

uv run pytest tests/test_catalog.py -v

Expected: 5 tests pass.

Step 5: Commit

git add lakehouse/catalog.py tests/test_catalog.py
git commit -m "feat: implement LakehouseCatalog with experiment/run schema"

Task 4: Sync — HF Hub to MinIO

Files:

  • Modify: lakehouse/sync.py
  • Modify: tests/test_sync.py

What this builds

sync_from_hf() downloads Parquet files from HF Hub and uploads them to a MinIO S3 bucket. Tests use moto to mock S3 so Docker is not required.

Step 1: Write failing tests in tests/test_sync.py

import pytest
import io
from unittest.mock import patch, MagicMock
import pyarrow as pa
import pyarrow.parquet as pq
from moto import mock_aws
import boto3
from lakehouse.sync import sync_from_hf, SyncResult


def _make_parquet_bytes(n: int = 10) -> bytes:
table = pa.table({
"image_id": list(range(n)),
"caption": [f"a photo of object {i}" for i in range(n)],
})
buf = io.BytesIO()
pq.write_table(table, buf)
return buf.getvalue()


@mock_aws
def test_sync_uploads_parquet_to_s3():
# Create the mock bucket
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="warehouse")

fake_files = {"data/train-00000-of-00001.parquet": _make_parquet_bytes(10)}

with patch("lakehouse.sync._download_hf_parquet_files", return_value=fake_files):
result = sync_from_hf(
hf_repo="lmms-lab/COCO-Caption",
bucket="warehouse",
s3_prefix="coco/v1",
endpoint_url="http://localhost:29000",
access_key="minio",
secret_key="minio123",
max_samples=None,
)

assert result.files_uploaded == 1
assert result.rows_total == 10
assert "s3://warehouse/coco/v1/" in result.s3_prefix


@mock_aws
def test_sync_respects_max_samples():
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="warehouse")

fake_files = {"data/part-0.parquet": _make_parquet_bytes(100)}

with patch("lakehouse.sync._download_hf_parquet_files", return_value=fake_files):
result = sync_from_hf(
hf_repo="lmms-lab/COCO-Caption",
bucket="warehouse",
s3_prefix="coco/v1-small",
endpoint_url="http://localhost:29000",
access_key="minio",
secret_key="minio123",
max_samples=20,
)

assert result.rows_total == 20

Step 2: Run to verify they fail

uv run pytest tests/test_sync.py -v

Expected: ImportErrorsync_from_hf not implemented.

Step 3: Implement lakehouse/sync.py

from __future__ import annotations

import io
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

import boto3
import pyarrow as pa
import pyarrow.parquet as pq
from huggingface_hub import list_repo_files, hf_hub_download


@dataclass
class SyncResult:
hf_repo: str
s3_prefix: str
files_uploaded: int
rows_total: int


def _download_hf_parquet_files(hf_repo: str) -> dict[str, bytes]:
"""Return {relative_path: parquet_bytes} for all parquet files in the repo."""
files = {}
for filename in list_repo_files(hf_repo, repo_type="dataset"):
if not filename.endswith(".parquet"):
continue
local = hf_hub_download(repo_id=hf_repo, filename=filename, repo_type="dataset")
with open(local, "rb") as f:
files[filename] = f.read()
return files


def sync_from_hf(
hf_repo: str,
bucket: str,
s3_prefix: str,
endpoint_url: str,
access_key: str,
secret_key: str,
max_samples: Optional[int] = None,
) -> SyncResult:
"""Download Parquet files from HF Hub and upload to MinIO S3."""
raw_files = _download_hf_parquet_files(hf_repo)

s3 = boto3.client(
"s3",
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name="local",
)

files_uploaded = 0
rows_total = 0

for relative_path, parquet_bytes in raw_files.items():
table = pq.read_table(io.BytesIO(parquet_bytes))

if max_samples is not None:
remaining = max_samples - rows_total
if remaining <= 0:
break
table = table.slice(0, min(remaining, len(table)))

rows_total += len(table)
out_buf = io.BytesIO()
pq.write_table(table, out_buf)
out_buf.seek(0)

filename = Path(relative_path).name
key = f"{s3_prefix}/{filename}"
s3.put_object(Bucket=bucket, Key=key, Body=out_buf.read())
files_uploaded += 1

if max_samples is not None and rows_total >= max_samples:
break

return SyncResult(
hf_repo=hf_repo,
s3_prefix=f"s3://{bucket}/{s3_prefix}/",
files_uploaded=files_uploaded,
rows_total=rows_total,
)

Step 4: Run tests and verify they pass

uv run pytest tests/test_sync.py -v

Expected: 2 tests pass.

Step 5: Commit

git add lakehouse/sync.py tests/test_sync.py
git commit -m "feat: implement sync_from_hf for HF Hub → MinIO Parquet ingestion"

Task 5: Query — Typed DuckDB Queries Over the Catalog

Files:

  • Modify: lakehouse/query.py
  • Modify: tests/test_query.py

What this builds

LakehouseQuery wraps a DuckDB connection with S3 secrets configured and exposes typed query methods. Tests build a small in-memory Parquet fixture via DuckDB without touching MinIO.

Step 1: Write failing tests in tests/test_query.py

import pytest
import tempfile
import os
import io
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
from lakehouse.query import LakehouseQuery, QueryResult


@pytest.fixture
def parquet_dir(tmp_path):
"""Create a small Parquet file the query layer can read locally."""
table = pa.table({
"image_id": list(range(50)),
"caption": [f"a photo showing object number {i}" for i in range(50)],
"width": [640] * 50,
"height": [480] * 50,
})
pq.write_table(table, tmp_path / "part-0.parquet")
return tmp_path


@pytest.fixture
def query(parquet_dir):
q = LakehouseQuery(
s3_endpoint=None, # local mode — reads from filesystem paths
access_key=None,
secret_key=None,
)
q.register_local_path("coco_test", str(parquet_dir / "*.parquet"))
yield q
q.close()


def test_query_all_returns_arrow(query):
result = query.execute("SELECT * FROM coco_test")
assert isinstance(result, QueryResult)
assert result.num_rows == 50


def test_query_filter(query):
result = query.execute("SELECT * FROM coco_test WHERE image_id < 10")
assert result.num_rows == 10


def test_query_to_pandas(query):
result = query.execute("SELECT image_id, caption FROM coco_test LIMIT 5")
df = result.to_pandas()
assert len(df) == 5
assert "caption" in df.columns


def test_query_to_arrow(query):
result = query.execute("SELECT * FROM coco_test LIMIT 3")
table = result.to_arrow()
assert isinstance(table, pa.Table)
assert table.num_rows == 3


def test_query_sample(query):
result = query.sample("coco_test", n=10)
assert result.num_rows == 10


def test_query_caption_keyword(query):
result = query.execute(
"SELECT * FROM coco_test WHERE caption LIKE '%object number 1%'"
)
# Matches "object number 1", "object number 10", "object number 11" ... "object number 19"
assert result.num_rows >= 1

Step 2: Run to verify they fail

uv run pytest tests/test_query.py -v

Expected: ImportError.

Step 3: Implement lakehouse/query.py

from __future__ import annotations

from typing import Optional

import duckdb
import pyarrow as pa
import pandas as pd


class QueryResult:
def __init__(self, relation: duckdb.DuckDBPyRelation) -> None:
self._rel = relation
self._arrow: Optional[pa.Table] = None

def _materialise(self) -> pa.Table:
if self._arrow is None:
self._arrow = self._rel.arrow()
return self._arrow

@property
def num_rows(self) -> int:
return len(self._materialise())

def to_arrow(self) -> pa.Table:
return self._materialise()

def to_pandas(self) -> pd.DataFrame:
return self._materialise().to_pandas()


class LakehouseQuery:
def __init__(
self,
s3_endpoint: Optional[str],
access_key: Optional[str],
secret_key: Optional[str],
) -> None:
self._conn = duckdb.connect()
if s3_endpoint:
self._conn.execute("INSTALL httpfs; LOAD httpfs;")
self._conn.execute(f"SET s3_endpoint='{s3_endpoint}';")
self._conn.execute(f"SET s3_access_key_id='{access_key}';")
self._conn.execute(f"SET s3_secret_access_key='{secret_key}';")
self._conn.execute("SET s3_use_ssl=false;")
self._conn.execute("SET s3_url_style='path';")

def register_local_path(self, name: str, glob_path: str) -> None:
"""Register a local or S3 glob pattern as a named view."""
self._conn.execute(
f"CREATE OR REPLACE VIEW {name} AS SELECT * FROM read_parquet('{glob_path}')"
)

def execute(self, sql: str) -> QueryResult:
return QueryResult(self._conn.sql(sql))

def sample(self, table_or_view: str, n: int, seed: Optional[int] = None) -> QueryResult:
seed_clause = f"REPEATABLE ({seed})" if seed is not None else ""
sql = f"SELECT * FROM {table_or_view} USING SAMPLE {n} ROWS {seed_clause}"
return QueryResult(self._conn.sql(sql))

def close(self) -> None:
self._conn.close()

Step 4: Run tests and verify they pass

uv run pytest tests/test_query.py -v

Expected: 6 tests pass.

Step 5: Commit

git add lakehouse/query.py tests/test_query.py
git commit -m "feat: implement LakehouseQuery with typed Arrow/pandas results"

Task 6: Stream — HF IterableDataset Egress

Files:

  • Modify: lakehouse/stream.py
  • Modify: tests/test_stream.py

What this builds

as_iterable_dataset() wraps a LakehouseQuery result in an HF IterableDataset, enabling training code to consume lakehouse data with the standard datasets API. Uses fetch_arrow_reader() for zero-copy streaming.

Step 1: Write failing tests in tests/test_stream.py

import pytest
import io
import pyarrow as pa
import pyarrow.parquet as pq
from datasets import IterableDataset
from lakehouse.query import LakehouseQuery
from lakehouse.stream import as_iterable_dataset, stream_batches


@pytest.fixture
def query_with_data(tmp_path):
table = pa.table({
"image_id": list(range(100)),
"caption": [f"caption {i}" for i in range(100)],
"width": [320] * 100,
})
pq.write_table(table, tmp_path / "data.parquet")
q = LakehouseQuery(s3_endpoint=None, access_key=None, secret_key=None)
q.register_local_path("ds", str(tmp_path / "*.parquet"))
yield q
q.close()


def test_as_iterable_dataset_returns_correct_type(query_with_data):
ds = as_iterable_dataset(query_with_data, "SELECT * FROM ds")
assert isinstance(ds, IterableDataset)


def test_iterable_dataset_yields_all_rows(query_with_data):
ds = as_iterable_dataset(query_with_data, "SELECT * FROM ds")
rows = list(ds)
assert len(rows) == 100


def test_iterable_dataset_has_correct_fields(query_with_data):
ds = as_iterable_dataset(query_with_data, "SELECT * FROM ds LIMIT 1")
row = next(iter(ds))
assert "image_id" in row
assert "caption" in row


def test_iterable_dataset_respects_limit(query_with_data):
ds = as_iterable_dataset(query_with_data, "SELECT * FROM ds LIMIT 10")
rows = list(ds)
assert len(rows) == 10


def test_stream_batches_yields_arrow_tables(query_with_data):
batches = list(stream_batches(query_with_data, "SELECT * FROM ds", batch_size=32))
total = sum(b.num_rows for b in batches)
assert total == 100
assert all(isinstance(b, pa.RecordBatch) for b in batches)


def test_epoch_reshuffle_differs(query_with_data):
sql = "SELECT * FROM ds ORDER BY random()"
epoch1 = [row["image_id"] for row in as_iterable_dataset(query_with_data, sql)]
epoch2 = [row["image_id"] for row in as_iterable_dataset(query_with_data, sql)]
# With high probability, random order differs between epochs
assert epoch1 != epoch2

Step 2: Run to verify they fail

uv run pytest tests/test_stream.py -v

Expected: ImportError.

Step 3: Implement lakehouse/stream.py

from __future__ import annotations

from typing import Iterator

import pyarrow as pa
from datasets import IterableDataset

from lakehouse.query import LakehouseQuery


def stream_batches(
query: LakehouseQuery,
sql: str,
batch_size: int = 256,
) -> Iterator[pa.RecordBatch]:
"""Yield Arrow RecordBatches from a DuckDB query. Zero-copy, never materialises full result."""
reader = query._conn.execute(sql).fetch_arrow_reader(batch_size)
yield from reader


def as_iterable_dataset(
query: LakehouseQuery,
sql: str,
batch_size: int = 256,
) -> IterableDataset:
"""
Wrap a DuckDB SQL query as an HF IterableDataset.

Training code using this is identical to load_dataset(..., streaming=True).
Data stays in MinIO; HF Hub is not involved at runtime.

To reshuffle between epochs, change the SQL (e.g. append 'ORDER BY random()').
"""
def _generator():
for batch in stream_batches(query, sql, batch_size):
yield from batch.to_pylist()

return IterableDataset.from_generator(_generator)

Step 4: Run tests and verify they pass

uv run pytest tests/test_stream.py -v

Expected: 6 tests pass. Note: test_epoch_reshuffle_differs has a tiny probability of false failure (two identical random permutations of 100 items). Re-run if it fails once.

Step 5: Commit

git add lakehouse/stream.py tests/test_stream.py
git commit -m "feat: implement streaming egress via HF IterableDataset and Arrow reader"

Task 7: Tools — Pydantic-AI Agent Tool Definitions

Files:

  • Modify: lakehouse/tools.py
  • Modify: tests/test_tools.py

What this builds

build_lakehouse_agent() returns a configured Pydantic-AI Agent with lakehouse query, sample, and quality-check tools registered. The same Pydantic models that define tool schemas will later generate the MCP server schema (Phase 2).

Step 1: Write failing tests in tests/test_tools.py

import pytest
import pyarrow as pa
import pyarrow.parquet as pq
from unittest.mock import AsyncMock, patch, MagicMock
from lakehouse.query import LakehouseQuery
from lakehouse.tools import build_lakehouse_agent, QualityReport


@pytest.fixture
def query_fixture(tmp_path):
table = pa.table({
"image_id": list(range(20)),
"caption": [f"a cat sitting on a mat {i}" for i in range(20)],
"width": [640] * 20,
"height": [480] * 20,
})
pq.write_table(table, tmp_path / "data.parquet")
q = LakehouseQuery(s3_endpoint=None, access_key=None, secret_key=None)
q.register_local_path("coco", str(tmp_path / "*.parquet"))
yield q
q.close()


def test_quality_report_fields():
report = QualityReport(
experiment_id="exp-001",
total_rows=1000,
null_caption_count=2,
mean_caption_length=45.3,
passed=True,
)
assert report.passed is True
assert report.null_caption_count == 2


def test_build_lakehouse_agent_returns_agent(query_fixture):
from pydantic_ai import Agent
agent = build_lakehouse_agent(query_fixture, view_name="coco")
assert isinstance(agent, Agent)


@pytest.mark.asyncio
async def test_agent_can_call_quality_check(query_fixture):
agent = build_lakehouse_agent(query_fixture, view_name="coco")
with patch("pydantic_ai.Agent.run", new_callable=AsyncMock) as mock_run:
mock_result = MagicMock()
mock_result.data = "Quality check passed: 20 rows, 0 nulls"
mock_run.return_value = mock_result
result = await agent.run("Run a quality check on coco")
assert mock_run.called

Step 2: Run to verify they fail

uv run pytest tests/test_tools.py -v

Expected: ImportError.

Step 3: Add pytest-asyncio to dependencies

Add to pyproject.toml:

    "pytest-asyncio>=0.23.0",

Run: uv sync

Also add pytest.ini or pyproject.toml section:

[tool.pytest.ini_options]
asyncio_mode = "auto"

Step 4: Implement lakehouse/tools.py

from __future__ import annotations

from dataclasses import dataclass
from typing import Optional

from pydantic import BaseModel
from pydantic_ai import Agent, RunContext

from lakehouse.query import LakehouseQuery


class QualityReport(BaseModel):
experiment_id: str
total_rows: int
null_caption_count: int
mean_caption_length: float
passed: bool


class SampleRequest(BaseModel):
view_name: str
n: int = 10
seed: Optional[int] = None


def build_lakehouse_agent(
query: LakehouseQuery,
view_name: str,
model: str = "claude-sonnet-4-6",
) -> Agent:
"""
Build a Pydantic-AI Agent with lakehouse tools registered.

The same Pydantic models here generate the MCP server tool schemas in Phase 2.
All tools are synchronous wrappers — the Agent handles async scheduling.
"""
agent: Agent[LakehouseQuery, str] = Agent(
model,
deps_type=LakehouseQuery,
result_type=str,
system_prompt=(
"You are a lakehouse orchestration agent. "
"Use the available tools to query, sample, and validate datasets."
),
)

@agent.tool
def query_dataset(ctx: RunContext[LakehouseQuery], sql: str) -> dict:
"""Execute a SQL query against the lakehouse and return summary statistics."""
result = ctx.deps.execute(sql)
table = result.to_arrow()
return {
"num_rows": table.num_rows,
"columns": table.schema.names,
"preview": table.slice(0, 3).to_pydict(),
}

@agent.tool
def sample_dataset(ctx: RunContext[LakehouseQuery], req: SampleRequest) -> dict:
"""Sample N rows from a lakehouse view, optionally with a fixed seed."""
result = ctx.deps.sample(req.view_name, n=req.n, seed=req.seed)
return result.to_arrow().to_pydict()

@agent.tool
def quality_check(
ctx: RunContext[LakehouseQuery],
experiment_id: str,
caption_column: str = "caption",
) -> QualityReport:
"""
Run a basic quality check on the active dataset view.
Returns a QualityReport with null counts and caption length statistics.
"""
total = ctx.deps.execute(f"SELECT count(*) FROM {view_name}").to_arrow()[0][0].as_py()
null_count = ctx.deps.execute(
f"SELECT count(*) FROM {view_name} WHERE {caption_column} IS NULL"
).to_arrow()[0][0].as_py()
mean_len = ctx.deps.execute(
f"SELECT avg(length({caption_column})) FROM {view_name} WHERE {caption_column} IS NOT NULL"
).to_arrow()[0][0].as_py() or 0.0

return QualityReport(
experiment_id=experiment_id,
total_rows=total,
null_caption_count=null_count,
mean_caption_length=round(mean_len, 2),
passed=(null_count == 0 and total > 0),
)

return agent

Step 5: Run tests and verify they pass

uv run pytest tests/test_tools.py -v

Expected: 3 tests pass.

Step 6: Commit

git add lakehouse/tools.py tests/test_tools.py pyproject.toml
git commit -m "feat: implement Pydantic-AI agent with query/sample/quality_check tools"

Task 8: Visualize — Rerun and W&B Routing

Files:

  • Modify: lakehouse/visualize.py
  • Modify: tests/test_visualize.py

What this builds

visualize(data, backend="auto") inspects the Arrow schema and routes to Rerun (spatial columns present) or W&B (scalar time-series). Tests mock both backends so no running Rerun viewer or W&B account is needed.

Step 1: Write failing tests in tests/test_visualize.py

import pytest
import pyarrow as pa
from unittest.mock import patch, MagicMock, call
from lakehouse.visualize import visualize, detect_backend, Backend


def test_detect_backend_spatial_goes_to_rerun():
schema = pa.schema([
("frame_id", pa.int64()),
("timestamp", pa.float64()),
("image_bytes", pa.binary()),
("caption", pa.string()),
])
assert detect_backend(schema) == Backend.RERUN


def test_detect_backend_scalar_goes_to_wandb():
schema = pa.schema([
("step", pa.int64()),
("reward", pa.float64()),
("loss", pa.float32()),
])
assert detect_backend(schema) == Backend.WANDB


def test_detect_backend_caption_only_goes_to_rerun():
schema = pa.schema([
("image_id", pa.int64()),
("caption", pa.string()),
("width", pa.int32()),
("height", pa.int32()),
])
assert detect_backend(schema) == Backend.RERUN


def test_visualize_routes_to_rerun(tmp_path):
table = pa.table({
"image_id": [0, 1],
"caption": ["cat", "dog"],
"width": [640, 640],
"height": [480, 480],
})
with patch("lakehouse.visualize._log_to_rerun") as mock_rerun:
visualize(table, backend="auto")
mock_rerun.assert_called_once_with(table)


def test_visualize_routes_to_wandb():
table = pa.table({
"step": [0, 1, 2],
"reward": [0.1, 0.5, 0.8],
"loss": [1.0, 0.7, 0.4],
})
with patch("lakehouse.visualize._log_to_wandb") as mock_wb:
visualize(table, backend="auto")
mock_wb.assert_called_once_with(table)


def test_visualize_explicit_backend_overrides_auto():
table = pa.table({"step": [0], "reward": [1.0]})
with patch("lakehouse.visualize._log_to_rerun") as mock_rerun:
visualize(table, backend="rerun")
mock_rerun.assert_called_once()

Step 2: Run to verify they fail

uv run pytest tests/test_visualize.py -v

Expected: ImportError.

Step 3: Implement lakehouse/visualize.py

from __future__ import annotations

from enum import Enum
from typing import Literal

import pyarrow as pa

# Spatial column names that indicate Rerun is the right backend
_SPATIAL_COLUMNS = frozenset({
"image_bytes", "image_path", "frame_id", "point_cloud",
"trajectory", "transform", "pose", "bbox", "embedding",
"caption", "width", "height",
})

# Scalar metric column names that indicate W&B is the right backend
_METRIC_COLUMNS = frozenset({
"reward", "loss", "accuracy", "step", "epoch",
"success_rate", "value", "metric",
})


class Backend(str, Enum):
RERUN = "rerun"
WANDB = "wandb"


def detect_backend(schema: pa.Schema) -> Backend:
"""
Inspect column names to determine the appropriate visualisation backend.
Rerun: spatial, temporal, or multimodal data (images, captions, poses).
W&B: scalar time-series (reward, loss, accuracy).
When ambiguous, prefer Rerun.
"""
col_names = set(schema.names)
if col_names & _SPATIAL_COLUMNS:
return Backend.RERUN
if col_names & _METRIC_COLUMNS:
return Backend.WANDB
return Backend.RERUN # default


def _log_to_rerun(table: pa.Table) -> None:
"""Log an Arrow table to a Rerun recording stream."""
import rerun as rr

rr.init("lakehouse", spawn=True)
for i, row in enumerate(table.to_pylist()):
timestamp = row.get("timestamp", float(i))
rr.set_time_seconds("time", timestamp)
entity = f"sample/{row.get('image_id', i)}"

if "image_bytes" in row and row["image_bytes"] is not None:
rr.log(entity, rr.Image(row["image_bytes"]))
if "caption" in row:
rr.log(f"{entity}/caption", rr.TextLog(str(row["caption"])))
if "bbox" in row and row["bbox"] is not None:
rr.log(f"{entity}/bbox", rr.Boxes2D(array=row["bbox"]))


def _log_to_wandb(table: pa.Table) -> None:
"""Log scalar columns from an Arrow table to W&B as time-series metrics."""
import wandb

df = table.to_pandas()
step_col = "step" if "step" in df.columns else None
for _, row in df.iterrows():
metrics = {k: v for k, v in row.items() if k != step_col}
step = int(row[step_col]) if step_col else None
wandb.log(metrics, step=step)


def visualize(
data: pa.Table,
backend: Literal["auto", "rerun", "wandb"] = "auto",
) -> None:
"""
Visualise an Arrow table in the appropriate backend.

backend="auto": inspect schema columns to choose Rerun or W&B.
backend="rerun": force Rerun regardless of schema.
backend="wandb": force W&B regardless of schema.
"""
if backend == "auto":
chosen = detect_backend(data.schema)
else:
chosen = Backend(backend)

if chosen == Backend.RERUN:
_log_to_rerun(data)
else:
_log_to_wandb(data)

Step 4: Run tests and verify they pass

uv run pytest tests/test_visualize.py -v

Expected: 6 tests pass.

Step 5: Commit

git add lakehouse/visualize.py tests/test_visualize.py
git commit -m "feat: implement visualize() routing to Rerun or W&B by schema inspection"

Task 9: Full Unit Test Suite

Step 1: Run all unit tests together

uv run pytest tests/test_catalog.py tests/test_sync.py tests/test_query.py tests/test_stream.py tests/test_tools.py tests/test_visualize.py -v

Expected: all tests pass (approx 28 tests).

Step 2: If any failures, fix before continuing

Do not proceed to Task 10 with failing tests.

Step 3: Commit any fixes

git add -p
git commit -m "fix: resolve test failures before integration demo"

Task 10: COCO Demo Script

Files:

  • Create: experiments/coco_demo.py

What this builds

An end-to-end runnable script that exercises all lakehouse layers in sequence. Requires Docker Compose services running.

Step 1: Create experiments/coco_demo.py

#!/usr/bin/env python3
"""
Experiment #0: COCO-Caption lakehouse demo.

Requires:
docker-compose up -d (MinIO on :29000, PostgreSQL on :55432)
uv run python experiments/coco_demo.py
"""

import os
import asyncio
import tempfile
from pathlib import Path
from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel

load_dotenv()
console = Console()

S3_ENDPOINT = os.getenv("S3_ENDPOINT", "http://localhost:29000")
S3_KEY = os.getenv("DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID", "minio")
S3_SECRET = os.getenv("DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY", "minio123")
HF_REPO = "lmms-lab/COCO-Caption"
MAX_SAMPLES = 1000
EXPERIMENT_ID = "coco-v1"
BUCKET = "warehouse"
S3_PREFIX = f"experiments/{EXPERIMENT_ID}/sim_0"


def step1_sync():
console.print(Panel("[bold]Step 1: HF Hub → MinIO sync[/bold]"))
from lakehouse.sync import sync_from_hf
result = sync_from_hf(
hf_repo=HF_REPO,
bucket=BUCKET,
s3_prefix=S3_PREFIX,
endpoint_url=S3_ENDPOINT,
access_key=S3_KEY,
secret_key=S3_SECRET,
max_samples=MAX_SAMPLES,
)
console.print(f" Uploaded {result.files_uploaded} file(s), {result.rows_total} rows")
console.print(f" S3 prefix: {result.s3_prefix}")
return result


def step2_catalog(tmp_path: str):
console.print(Panel("[bold]Step 2: Register in DuckLake catalog[/bold]"))
from lakehouse.catalog import LakehouseCatalog, Experiment, SimulationRun
cat = LakehouseCatalog(
catalog_path=f"{tmp_path}/coco-v1.ducklake",
data_path=f"{tmp_path}/data",
)
cat.register_experiment(Experiment(
experiment_id=EXPERIMENT_ID,
project="lakehouse-demo",
description="COCO-Caption 1k sample demo",
))
cat.register_run(SimulationRun(
run_id="run-coco-001",
experiment_id=EXPERIMENT_ID,
sim_index=0,
config={"max_samples": MAX_SAMPLES, "hf_repo": HF_REPO},
s3_prefix=f"s3://{BUCKET}/{S3_PREFIX}/",
))
console.print(f" Registered experiment: {EXPERIMENT_ID}")
return cat


def step3_wandb(s3_prefix: str):
console.print(Panel("[bold]Step 3: W&B init + Reference Artifact[/bold]"))
try:
import wandb
run = wandb.init(
project="lakehouse-demo",
group=EXPERIMENT_ID,
name="run-coco-001",
config={"max_samples": MAX_SAMPLES, "hf_repo": HF_REPO},
mode=os.getenv("WANDB_MODE", "disabled"), # set to "online" to enable
)
artifact = wandb.Artifact(name="coco-caption-1k", type="dataset")
artifact.add_reference(s3_prefix)
wandb.log_artifact(artifact)
wandb.log({"samples_ingested": MAX_SAMPLES})
console.print(f" W&B run: {run.name} (mode={os.getenv('WANDB_MODE', 'disabled')})")
return run
except Exception as e:
console.print(f" [yellow]W&B skipped: {e}[/yellow]")
return None


def step4_query():
console.print(Panel("[bold]Step 4: DuckDB query[/bold]"))
from lakehouse.query import LakehouseQuery
q = LakehouseQuery(
s3_endpoint=S3_ENDPOINT.replace("http://", ""),
access_key=S3_KEY,
secret_key=S3_SECRET,
)
q.register_local_path(
"coco",
f"s3://{BUCKET}/{S3_PREFIX}/*.parquet",
)
result = q.execute("SELECT * FROM coco WHERE caption LIKE '%cat%' LIMIT 5")
console.print(f" Rows matching 'cat': {result.num_rows}")
console.print(result.to_pandas()[["image_id", "caption"]].to_string(index=False))
return q


def step5_stream(query):
console.print(Panel("[bold]Step 5: HF IterableDataset streaming egress[/bold]"))
from lakehouse.stream import as_iterable_dataset
ds = as_iterable_dataset(query, "SELECT * FROM coco")
count = sum(1 for _ in ds)
console.print(f" Streamed {count} rows via IterableDataset")


def step6_shuffle(query):
console.print(Panel("[bold]Step 6: Epoch reshuffling[/bold]"))
from lakehouse.stream import as_iterable_dataset
ids_e1 = [r["image_id"] for r in as_iterable_dataset(query, "SELECT * FROM coco ORDER BY random()")]
ids_e2 = [r["image_id"] for r in as_iterable_dataset(query, "SELECT * FROM coco ORDER BY random()")]
same = ids_e1 == ids_e2
console.print(f" Epoch 1 == Epoch 2: {same} (should be False for good shuffle)")


def step7_rerun(query):
console.print(Panel("[bold]Step 7: Rerun visualisation (10 samples)[/bold]"))
from lakehouse.visualize import visualize
result = query.execute("SELECT * FROM coco LIMIT 10")
try:
visualize(result.to_arrow(), backend="rerun")
console.print(" Logged 10 samples to Rerun")
except Exception as e:
console.print(f" [yellow]Rerun skipped (no viewer running): {e}[/yellow]")


def step8_wandb_charts(query, wandb_run):
console.print(Panel("[bold]Step 8: W&B data quality charts[/bold]"))
try:
import wandb
if wandb_run is None:
console.print(" [yellow]Skipped — W&B not initialised[/yellow]")
return
result = query.execute(
"SELECT length(caption) AS cap_len, width, height FROM coco"
)
df = result.to_pandas()
wandb.log({
"caption_length_mean": df["cap_len"].mean(),
"caption_length_std": df["cap_len"].std(),
})
console.print(f" Caption length: mean={df['cap_len'].mean():.1f}, std={df['cap_len'].std():.1f}")
except Exception as e:
console.print(f" [yellow]Skipped: {e}[/yellow]")


async def step9_pydantic_agent(query):
console.print(Panel("[bold]Step 9: Pydantic-AI quality-check agent[/bold]"))
from lakehouse.tools import build_lakehouse_agent
agent = build_lakehouse_agent(query, view_name="coco")
try:
result = await agent.run(
"Run a quality check on the coco dataset for experiment coco-v1",
deps=query,
)
console.print(f" Agent result: {result.data}")
except Exception as e:
console.print(f" [yellow]Agent skipped (no API key?): {e}[/yellow]")


async def main():
console.print(Panel("[bold green]COCO-Caption Lakehouse Demo — Experiment #0[/bold green]"))

with tempfile.TemporaryDirectory() as tmp:
sync_result = step1_sync()
cat = step2_catalog(tmp)
wb_run = step3_wandb(sync_result.s3_prefix)
query = step4_query()
step5_stream(query)
step6_shuffle(query)
step7_rerun(query)
step8_wandb_charts(query, wb_run)
await step9_pydantic_agent(query)
cat.complete_run("run-coco-001")
cat.close()
query.close()

if wb_run:
import wandb
wandb.finish()

console.print(Panel("[bold green]Demo complete.[/bold green]"))


if __name__ == "__main__":
asyncio.run(main())

Step 2: Run the script (requires Docker Compose up)

uv run python experiments/coco_demo.py

Expected: all 9 steps print with either output or a yellow skipped notice. No unhandled exceptions.

Step 3: Commit

git add experiments/coco_demo.py experiments/__init__.py
git commit -m "feat: add COCO-Caption demo script (Experiment #0)"

Task 11: COCO Demo Notebook

Files:

  • Create: notebooks/coco_demo.ipynb

Step 1: Create the notebook

Run in the repo root:

uv run jupyter nbconvert --to notebook --execute --output notebooks/coco_demo.ipynb \
--ExecutePreprocessor.kernel_name=python3 \
--ExecutePreprocessor.timeout=600 \
/dev/stdin <<'EOF'
{
"cells": [
{"cell_type":"markdown","source":["# COCO-Caption Lakehouse Demo\n\nExperiment #0 — walks through every layer of the Modal Lakehouse architecture.\n\nSee design doc: `docs/plans/2026-02-22-coco-lakehouse-demo-design.md`"],"metadata":{}},
{"cell_type":"code","source":["import os\nos.environ.setdefault('WANDB_MODE', 'disabled')\nfrom experiments.coco_demo import *"],"outputs":[],"metadata":{},"execution_count":null},
{"cell_type":"markdown","source":["## Step 1 — HF Hub → MinIO Sync"],"metadata":{}},
{"cell_type":"code","source":["sync_result = step1_sync()\nprint(sync_result)"],"outputs":[],"metadata":{},"execution_count":null},
{"cell_type":"markdown","source":["## Step 2 — DuckLake Catalog Registration"],"metadata":{}},
{"cell_type":"code","source":["import tempfile\n_tmp = tempfile.mkdtemp()\ncat = step2_catalog(_tmp)\nexp = cat.get_experiment(EXPERIMENT_ID)\nprint(exp)"],"outputs":[],"metadata":{},"execution_count":null},
{"cell_type":"markdown","source":["## Step 4 — DuckDB Query"],"metadata":{}},
{"cell_type":"code","source":["query = step4_query()"],"outputs":[],"metadata":{},"execution_count":null},
{"cell_type":"markdown","source":["## Step 5 — Streaming Egress via HF IterableDataset"],"metadata":{}},
{"cell_type":"code","source":["from lakehouse.stream import as_iterable_dataset\nds = as_iterable_dataset(query, 'SELECT * FROM coco LIMIT 5')\nfor row in ds:\n print(row['image_id'], row['caption'][:60])"],"outputs":[],"metadata":{},"execution_count":null},
{"cell_type":"markdown","source":["## Step 6 — Epoch Reshuffling"],"metadata":{}},
{"cell_type":"code","source":["step6_shuffle(query)"],"outputs":[],"metadata":{},"execution_count":null},
{"cell_type":"markdown","source":["## Step 8 — Data Quality Summary"],"metadata":{}},
{"cell_type":"code","source":["result = query.execute('SELECT length(caption) AS cap_len FROM coco')\ndf = result.to_pandas()\ndf['cap_len'].describe()"],"outputs":[],"metadata":{},"execution_count":null},
{"cell_type":"markdown","source":["## Cleanup"],"metadata":{}},
{"cell_type":"code","source":["cat.complete_run('run-coco-001')\ncat.close()\nquery.close()\nprint('Done')"],"outputs":[],"metadata":{},"execution_count":null}
],
"metadata": {
"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"},
"language_info": {"name": "python", "version": "3.12.0"}
},
"nbformat": 4,
"nbformat_minor": 5
}
EOF

Alternatively, create the notebook manually in JupyterLab and run cells in order. The key requirement is that all cells execute without error.

Step 2: Verify notebook runs clean

uv run jupyter nbconvert --to notebook --execute notebooks/coco_demo.ipynb --output notebooks/coco_demo.ipynb

Expected: no CellExecutionError.

Step 3: Commit

git add notebooks/coco_demo.ipynb
git commit -m "feat: add COCO-Caption demo notebook (Experiment #0)"

Task 12: Final Integration Test

Step 1: Run the full test suite

uv run pytest tests/ -v --tb=short

Expected: all tests pass.

Step 2: Run the demo script end-to-end

docker-compose up -d
uv run python experiments/coco_demo.py

Expected: 9 steps complete, no unhandled exceptions.

Step 3: Final commit

git add .
git commit -m "feat: complete COCO-Caption lakehouse demo (Experiment #0)

- lakehouse/ package: catalog, sync, query, stream, tools, visualize
- HF IterableDataset streaming egress via DuckDB Arrow reader
- Pydantic-AI agent with quality-check, query, sample tools
- W&B loose coupling via Reference Artifacts
- Rerun visualisation routing by schema inspection
- experiments/coco_demo.py + notebooks/coco_demo.ipynb"

What Is NOT in This Plan (Deferred to Phase 2+)

FeaturePhase
MCP server (mcp_server.py)2
Zenoh ingest subscriber3
K > 1 parallel simulators4
NATS/JetStream control events4
Ray Data distributed egress5
load_dataset("lakehouse", ...) custom DatasetBuilder5