Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# CLAUDE.md
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe have this file as a symlink to CLAUDE.md?


Estuary is a real-time data platform with:
- Control plane: user-facing catalog management APIs
- Data planes: distributed runtime execution
- Connectors: OCI images integrating external systems

This repo lives at `https://github.com/estuary/flow`

## Repository Overview

Estuary is built with:
- **Rust** (primary language)
- Third-party sources under `~/.cargo/registry/src/`
- **Go** - integration glue with the Gazette consumer framework
- Third-party sources under `~/go/pkg/mod/`
- **Protobuf** - communication between control plane, data planes, and connectors
- **Supabase** - migrations are under `supabase/migrations/`
- pgTAP tests under `supabase/tests/`
- **Docs** - external user-facing product documentation under `site/` (Docusaurus)

## Essential Commands

### Build & Test

Use regular `cargo` and `go` tools to build and test crates.

```bash
# libsqlite3 tag is required for `bindings` and `flowctl-go` packages.
go build -tags libsqlite3 ./go/bindings

# Regenerate checked-in protobuf (required after .proto changes)
mise run build:go-protobufs
mise run build:rust-protobufs

# Run pgTAP SQL Tests
mise run ci:sql-tap

# E2E tests over derivation examples (SLOW)
mise run ci:catalog-test
```

### Development

A development Supabase instance is available:
```bash
# Reset with current migrations as needed
supabase db reset

# Interact directly with dev DB
psql postgresql://postgres:postgres@localhost:5432/postgres -c 'SELECT 1;'
```

## Architecture Overview

### Core Concepts

Users interact with the control plane to manage a catalog of:
- **Captures**: tasks which capture from a user endpoint into target collections
- **Collections**: collections of data with enforced JSON Schema
- **Derivations**: both a collection and a task - the task builds its collection through transformation of other collections
- **Materializations**: tasks which maintain materialized views of source collections in an endpoint
- **Tests**: fixtures of source collection inputs and expected derivation outputs

Collections and tasks have a declarative (JSON/YAML) **model**.
Users refine model changes in **drafts**, which are **published**
to the control plane for verification and testing.
The control plane compiles the user's catalog model into
**built specs** that have extra specifics required by the runtime,
and activates specs into their associated data plane.

### Control-plane components
- **Supabase**: catalog and platform config DB
- **Agent**: APIs and background automation
- **Data-plane controller**: provisions data planes

### Data-plane components
- **Gazette**: brokers serve the journals that back collections
- **Reactors**: runtime written to Gazette consumer framework;
executes tasks and runs connectors as sidecars over gRPC
- **Etcd**: config for gazette and reactors

### Protocols

- `go/protocols/flow/flow.proto` - core types and built specs
- `go/protocols/capture/capture.proto` - protocol for capture tasks
- `go/protocols/derive/derive.proto` - for derivation tasks
- `go/protocols/materialize/materialize.proto` - for materialization tasks

## README.md

Every crate/module should have a README.md with essential context:
- Purpose and fit within the project
- Key types and entry points
- Brief architecture and non-obvious details

A README.md is ONLY a roadmap for expert developers,
orienting them where to look next.

Keep READMEs current - update with code changes.

## Development Guidelines

### Implementation
- Use `var myVar = ...` in Go. Do NOT use `myVar := ...` (unless required due to shadowing)
- Write comments that document "why" - rationale, broader context, and non-obvious detail
- Do NOT write comments which describe the obvious behavior of code.
Don't write `// Get credentials` before a call `getCredentials()`
- Use early-return over nested conditionals
- Use at least one level of name qualification for third-party types and functions.
For example, `axum::Router::new()` instead of `use axum::Router; Router::new()`.
Types / functions should be unqualified ONLY if they're in the current module.
- Prefer pure functions that take and act over POD states.
AVOID structures that mix complex state and impl behaviors, where possible.
The exception is state machines: structs and enums that encapsulate fine-grain
POD state into higher-order transitions that are easier to reason about.
DO seek to decompose problems into state machines.
- Avoid trivial impl routines which could be inlined by the caller.
Indirection is harder to read; routines must buy us something.
- Decompose IO and POD processing into separate routines where possible.
Routines should gravitate towards IO or processing, and not mix both.

### Testing
- Prefer snapshots over fine-grain assertions (`insta` / `cupaloy`)

### Errors
- Wrap errors with context (`anyhow::Context` / `fmt.Errorf`)
- Return errors up the stack rather than logging
- Panic on impossible states (do NOT add spurious error handling)

### Logging
- Structured logging with context (`tracing` / `logrus`)
- Avoid verbose logging in hot paths
10 changes: 9 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,19 @@ Keep READMEs current - update with code changes.
- Write comments that document "why" - rationale, broader context, and non-obvious detail
- Do NOT write comments which describe the obvious behavior of code.
Don't write `// Get credentials` before a call `getCredentials()`
- Prefer functional approaches. Try to avoid mutation.
- Use early-return over nested conditionals
- Use at least one level of name qualification for third-party types and functions.
For example, `axum::Router::new()` instead of `use axum::Router; Router::new()`.
Types / functions should be unqualified ONLY if they're in the current module.
- Prefer pure functions that take and act over POD states.
AVOID structures that mix complex state and impl behaviors, where possible.
The exception is state machines: structs and enums that encapsulate fine-grain
POD state into higher-order transitions that are easier to reason about.
DO seek to decompose problems into state machines.
- Avoid trivial impl routines which could be inlined by the caller.
Indirection is harder to read; routines must buy us something.
- Decompose IO and POD processing into separate routines where possible.
Routines should gravitate towards IO or processing, and not mix both.

### Testing
- Prefer snapshots over fine-grain assertions (`insta` / `cupaloy`)
Expand Down
67 changes: 67 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/doc/src/combine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ impl Accumulator {
Ok(self.memtable.as_ref().unwrap())
}

/// Segment ranges of the spill file written by this Accumulator so far.
/// Ranges are append-only, non-overlapping, and monotonic within an
/// Accumulator's lifetime.
pub fn ranges(&self) -> &[std::ops::Range<u64>] {
self.spill.segment_ranges()
}

/// Map this combine Accumulator into a Drainer, which will drain directly
/// from the inner MemTable (if no spill occurred) or from an inner SpillDrainer.
pub fn into_drainer(self) -> Result<Drainer, Error> {
Expand Down
2 changes: 0 additions & 2 deletions crates/flow-client-next/src/workflows/user_collection_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ pub fn new_journal_client(
///
/// The partition template prefix of certain legacy collections were created
/// without a "/{generation_id}/" suffix -- these are supported as well.
///
/// Panics if the partition template prefix is not in the expected format.
pub fn new_journal_client_factory(
api_client: crate::rest::Client,
capability: models::Capability,
Expand Down
2 changes: 2 additions & 0 deletions crates/flowctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ proto-grpc = { path = "../proto-grpc", features = [
"capture_client",
"shuffle_client",
] }
publisher = { path = "../publisher" }
runtime = { path = "../runtime" }
runtime-next = { path = "../runtime-next" }
shuffle = { path = "../shuffle" }
simd-doc = { path = "../simd-doc" }
sources = { path = "../sources" }
Expand Down
6 changes: 3 additions & 3 deletions crates/flowctl/src/raw/materialize_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub async fn do_materialize_fixture(
..Default::default()
});
emit(Request {
acknowledge: Some(request::Acknowledge {}),
acknowledge: Some(request::Acknowledge::default()),
..Default::default()
});

Expand Down Expand Up @@ -147,7 +147,7 @@ pub async fn do_materialize_fixture(
emit(load)
}
emit(Request {
flush: Some(request::Flush {}),
flush: Some(request::Flush::default()),
..Default::default()
});
for store in stores {
Expand All @@ -171,7 +171,7 @@ pub async fn do_materialize_fixture(
..Default::default()
});
emit(Request {
acknowledge: Some(request::Acknowledge {}),
acknowledge: Some(request::Acknowledge::default()),
..Default::default()
});
}
Expand Down
9 changes: 4 additions & 5 deletions crates/flowctl/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ mod alerts;
mod discover;
mod materialize_fixture;
mod oauth;
mod preview_next;
mod shards;
mod shuffle;
mod spec;

#[derive(Debug, clap::Args)]
Expand Down Expand Up @@ -75,9 +75,8 @@ pub enum Command {
/// Print environment variables for working with a given data-plane
/// and prefix using Gazette's `gazctl`.
GazctlEnv(GazctlEnv),

#[clap(hide = true)]
Shuffle(shuffle::Shuffle),
/// Locally run and preview a materialization using the V2 runtime.
PreviewNext(preview_next::Preview),
}

#[derive(Debug, clap::Args)]
Expand Down Expand Up @@ -241,7 +240,7 @@ impl Advanced {
Command::BearerLogs(bearer_logs) => bearer_logs.run(ctx).await,
Command::ListShards(selector) => shards::do_list_shards(ctx, selector).await,
Command::GazctlEnv(gazctl_env) => gazctl_env.run(ctx).await,
Command::Shuffle(shuffle) => shuffle.run(ctx).await,
Command::PreviewNext(preview) => preview.run(ctx).await,
}
}
}
Expand Down
Loading
Loading