feat(new source): add iggy source and sink#25414
Conversation
Wires up the optional iggy dependency, sources-iggy / sinks-iggy feature flags, the iggy-integration-tests group, and adds the gates to the sources-logs / sinks-logs / integration-tests groupings. No code yet.
Adds a new `iggy` sink that publishes events to a topic on the Iggy message streaming platform via the iggy 0.10 SDK. The sink connects through Iggy's connection-string format (`iggy+tcp://...`, `iggy+quic://...`, etc.) which carries the credentials, then creates the configured stream and topic on init when they do not already exist. The sink follows the existing Tower-based pipeline used by other streaming sinks (see `src/sinks/nats/`): events flow through a RequestBuilder that encodes them with the configured codec, then a Tower Service wrapping a single `Arc<IggyProducer>` calls `producer.send()` in Direct mode. Healthcheck pings the broker. This commit only ships the sink — the source and the integration tests for both will follow as separate commits.
Adds a new `iggy` source that consumes messages from a topic on the
Iggy message streaming platform via the iggy 0.10 SDK. Connection,
authentication, and transport selection (TCP, QUIC, HTTP, WebSocket)
all flow through Iggy's connection-string format.
Two consumption modes are supported:
- When `partition` is set, the source attaches as a standalone
consumer pinned to that partition.
- Otherwise it joins (and creates if missing) a consumer group named
after `consumer_name` so that partitions are balanced across
multiple Vector instances.
Each emitted event is annotated with the originating Iggy stream,
topic, partition ID, and message offset under the `iggy` source
metadata namespace; the legacy keys (`stream`, `topic`) are
configurable via `stream_key_field` / `topic_key_field`.
The consumer is built with the SDK's default `AutoCommit` policy
(commit on a 1s interval / on poll), and the source loop selects on
`tokio::select!` so that the broker is drained gracefully when a
shutdown signal is received.
Adds round-trip integration tests for both the iggy sink and source, gated behind the existing iggy-integration-tests feature flag. The sink tests cover two flows: iggy_publish_round_trip publishes 10 random events through the sink and reads them back through a separate consumer to confirm payload-fidelity, and iggy_creates_stream_and_topic_on_connect confirms that the auto-create paths in connect_and_init actually materialise the stream and topic and that a second connect_and_init against the same names is idempotent. The source test iggy_consume_round_trip pre-publishes three messages directly through the SDK, builds the source, and verifies all three arrive on the channel in order. The broker URL is read from IGGY_ADDRESS and defaults to iggy+tcp://iggy:iggy@127.0.0.1:8090. Verifier helpers had to import the StreamClient and TopicClient traits explicitly -- Iggy keeps those off its prelude so they only become available to callers that actually need broker introspection. To keep the test setup clean, connect_and_init on IggySinkConfig was bumped from private to pub(super) so the integration tests can drive the full handshake without re-implementing it.
Adds the hand-written and auto-generated CUE documentation for both
iggy components plus their shared service descriptor:
- urls.cue gains the iggy + iggy_connection_string entries.
- services/iggy.cue describes the Iggy broker as a service.
- components/iggy.cue is the shared collect/send + how_it_works
block (modeled on the existing components/nats.cue pattern).
- components/{sinks,sources}/iggy.cue are the per-component
handwritten CUE files that wire the title, classes, features
block, and per-component output schema to the generated config.
- components/{sinks,sources}/generated/iggy.cue are the auto-
generated configuration schemas, produced by
`make generate-component-docs` from the
`#[configurable_component]` annotations on
`IggySinkConfig` and `IggySourceConfig`.
The generation step was run with
`cargo build --no-default-features --features sinks-iggy,sources-iggy`
to keep the build inside this sandbox tractable; any further
regeneration in CI will reseed the global
`reference/generated/configuration.cue` with the full component
matrix on top of these files.
Adds the docker-compose, vdev test config, and CI workflow plumbing
so that 'cargo vdev int test iggy' (and the corresponding GitHub
Actions integration job) runs the iggy sink + source integration
tests against a real broker.
- tests/integration/iggy/config/compose.yaml stands up a single
apache/iggy container exposing TCP on the default port 8090.
- tests/integration/iggy/config/test.yaml selects the
iggy-integration-tests feature, restricts the test runner to
'::iggy::', sets IGGY_ADDRESS to point at the compose service,
and lists the source paths that should re-trigger this job in
CI.
- .github/workflows/changes.yml: forward an 'iggy' filter output
so the integration matrix can gate on whether iggy-touching
paths changed.
- .github/workflows/integration.yml: register 'iggy' in the
services matrix so the runner actually picks up the test.
Verified that 'cargo vdev int ci-paths' emits the expected paths
filter for iggy.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 47d5759ad9
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
The source previously advertised `can_acknowledge()` while the Iggy consumer ran with the SDK's default auto-commit, which stores the consumer offset as soon as messages are polled. A crash or a sink rejection after polling would therefore skip those messages on restart. When end-to-end acknowledgements are enabled, the consumer is now built with `AutoCommit::Disabled` and each polled message is tracked through an `OrderedFinalizer`; the consumer offset is only stored on the server once the corresponding events have been delivered downstream. When acknowledgements are disabled the previous auto-commit behaviour is kept. Offsets are committed eagerly as acknowledgements arrive (roughly once per half batch), with the `commit_interval_secs` timer as a backstop for sparse traffic and shutdown. The Iggy SDK polls relative to the server-stored consumer offset, so keeping it close to the consumed position avoids repeatedly re-fetching and discarding the same window. Updates the source's delivery guarantee to at-least-once, adds an `iggy_offset_update` error internal event, and adds an `iggy_consumer_committed_offset` gauge (per stream/topic/partition) reporting the offset most recently committed to the server.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8f771613e5
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
With end-to-end acknowledgements enabled the consumer runs with `AutoCommit::Disabled`, so the source is solely responsible for storing consumer offsets. Two cases could lose a delivered offset and replay already-acknowledged messages on restart: - On a graceful shutdown the source committed only the offsets already copied into `pending_offsets` and then exited while the `OrderedFinalizer` still held receivers for batches sent downstream; acknowledgements arriving after the shutdown signal were never observed. The source now drops the finalizer handle and drains the acknowledgement stream (bounded by the new `drain_timeout_secs` option, default 5s) before committing the final offsets. - `commit_offsets` drained every pending offset before calling `store_offset`, so a transient commit failure dropped the only record of those offsets. Failed offsets are now kept pending so a later timer tick or the shutdown drain retries them.
The `offset` metadata field was set to the SDK's `current_offset` (the partition's high-water offset for the poll) rather than the individual message's `header.offset`, so all messages from one poll could share the same `offset` value, which breaks consumers that use it for deduplication or audit trails.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9d8f4e251c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
…nfig `generate_sink_config` in the iggy sink integration tests builds an `IggySinkConfig` with an exhaustive struct literal but never set the `batch` field, so any build with the `iggy-integration-tests` feature (pulled in by `all-integration-tests`) failed with a missing-field error before the iggy tests could run.
Performance: - Bind stream/topic as &str and stream_key/topic_key as Option<&OwnedValuePath> (Copy) once at function entry rather than re-deriving them inside the per-event map closure on every decoded event. - Move four owned_value_path! calls (each allocates a Vec-backed OwnedValuePath) out of the per-event closure to function entry; the closure borrows references. Clarity: - Rename _client to _keep_alive_client to document that the IggyClient is held solely to keep the underlying connection alive while the consumer runs. - Emit a warning when consumer.next() returns None unexpectedly, giving operators visibility if the Iggy SDK terminates the consumer stream.
…lValuePath outputs() was calling .clone() on the full OptionalValuePath struct before accessing .path. Clone only the inner Option<OwnedValuePath> directly.
Previously request_builder() collected payloads into a Vec then computed uncompressed_byte_size with a separate .iter().map(|p| p.len()).sum() pass. Accumulate the size during the encoding filter_map to eliminate the extra iteration over the payload buffer.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 85d5d39387
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
The integration test config (`test.yaml`) connects with `iggy+tcp://iggy:iggy@iggy:8090`, but the iggy service is started without `IGGY_ROOT_USERNAME`/`IGGY_ROOT_PASSWORD` (or the default-credentials flag). Recent Iggy images generate a random root password on first start when none are provided, so CI would fail authentication before the new source/sink components ever ran. Set the matching `IGGY_ROOT_USERNAME=iggy` and `IGGY_ROOT_PASSWORD=iggy` environment variables on the iggy service.
Lock in the per-partition acknowledgement invariants that Codex flagged repeatedly during PR review: - Delivered acks set `pending` to the running maximum and never lower it. - A rejection installs the lowest unacknowledged offset as a fence and caps `pending` at `fence - 1`. - An offset-0 rejection blocks all commits. - A subsequent Delivered ack for a fenced offset clears it and advances `pending` to the next fence (or `max_delivered` when no fence remains). - Rejections that do not move the fence return false to the caller. - `lag()` returns `pending - committed` and zero when no pending offset is recorded.
…lper The `consumer.next()` arm in `run_iggy_source` had grown to roughly 130 lines that decoded the polled Iggy message, enriched each log event with source metadata, forwarded the batch downstream, and lazily registered the message with a per-partition finalizer. Inlined like that the high-level select! loop was hard to skim and the per-message logic had no way to be exercised in isolation. Pull the body out as `process_received_message`. The logging context (log namespace, stream/topic strings, metadata paths) is bundled into a `MessageMetadata` borrow and the three per-partition mutable maps into an `AckTracker` with a `register` method that lazily creates the per-partition finalizer and ack stream on first sight of a partition. The select! arm shrinks to building the tracker and dispatching to the helper; behaviour is unchanged.
Two safety hardening changes against future SDK upgrades and edge cases in encoder failures. `IggyRetryLogic::is_retriable_error` previously enumerated the non-retriable SDK errors and defaulted unknown variants to retriable. When iggy adds a new fatal error (a schema mismatch, an oversized payload, an unsupported protocol feature) it would silently land in the retry loop until someone audited the upgrade. Invert the match to an explicit allowlist of transient transport/timeout errors; unknown variants now default to non-retriable, which is the safer direction. `request_builder` now returns `Option<IggyRequest>` and the sink filters `None` out before reaching the driver. When every event in a batch fails encoding their finalizers are already marked `Errored` individually, but dispatching the resulting empty request still caused the driver to emit `Delivered` telemetry for the original event count even though nothing was sent.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3fc698db1c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
Iggy 0.10.0's poll_next immediately re-schedules a poll on empty responses unless poll_interval is set on the consumer builder. With no value, an idle or low-volume topic causes the source to poll the broker at network round-trip speed. Expose poll_interval_ms (default 100ms) and pass it to the consumer/consumer_group builder.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9d8232203a
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
Preserves publish order for consumers that rely on Iggy's stream ordering. Matches the NATS and Redis sinks, which default to Concurrency::None for the same reason.
… frames On a non-continuable decode error the previous code wrapped the shared BatchNotifier in an EventFinalizer set to Rejected, which retroactively rejected events from earlier frames in the same payload that had already been sent downstream. Those events would be re-delivered on restart even though they reached the sink successfully (silent duplication). Additionally, the rejected offset entered the PartitionState fence set where it could never be cleared: the Iggy SDK yields each offset exactly once per consumer lifetime, so record_delivered could never fire for that offset. The partition stalled permanently at fence-1 with no escape hatch. Fix both issues together: - Drop the BatchNotifier normally on decode failure so earlier-frame events keep whatever status they earn naturally downstream. - Introduce ProcessOutcome::DecodeFailed and return it instead of registering the receiver. The run_iggy_source loop then calls store_offset directly to advance past the poison message and keep the partition making progress. - Remove the dead was_fenced return from record_delivered (the SDK yields each offset once, so redelivery of a fenced offset cannot occur within a consumer lifetime). - Remove the dead redelivery-logging branch in record_ack. - Delete the three unit tests that relied on bool-returning record_delivered or asserted the unreachable redelivery path.
Call disconnect() on the Iggy client in both source and sink on shutdown so the SDK heartbeat task exits cleanly rather than leaking until process exit. In the source, disconnect is called after the consumer-group leave path completes; in the sink, the client is stored on IggySink and disconnected in run_inner before returning. Remove per-partition state from the partitions map when a consumer-group rebalance revokes a partition, preventing stale PartitionState entries from issuing store_offset on every commit tick for an unowned partition. Add a comment on the single-partition shutdown branch explaining why consumer.shutdown() is intentionally not called there.
… bounds The Some(Err) arm in the consumer poll loop emitted IggyReadError and immediately re-entered the select. For SDK error variants that do not sleep internally the source would spin at full speed, pegging CPU and flooding the log. Add a 500ms sleep after the error event so the source backs off before the next poll attempt. batch_length=0 causes the SDK to return empty batches indefinitely with no error, silently stalling the source. commit_interval_secs=0 and drain_timeout_secs=0 cause the .max(1) guard to silently override the configured value. Validate all three fields at build time with a clear error message rather than silently clamping or stalling.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b5b5a62c18
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
IggyOffsetUpdated was emitted before decode or delivery, so the gauge it set (iggy_consumer_offset) tracked the polled offset, not the delivered one. Sitting next to iggy_consumer_committed_offset in dashboards the names were easy to misread. Rename the event struct to IggyOffsetPolled, the GaugeName variant to IggyConsumerPolledOffset, and the metric string to iggy_consumer_polled_offset so the naming matches the actual semantics. Replace offset as i64 with i64::try_from(offset).unwrap_or(i64::MAX) to avoid silent wrapping on offsets above 2^63 (practically unreachable but now correct). Replace partition_id as i64 with the infallible i64::from(partition_id) since u32 always fits in i64. Add a debug_assert in record_ack verifying that acks arrive in offset order. OrderedFinalizer guarantees this per partition; the assert documents the invariant at the call site and will catch ordering bugs in tests if the guarantee is ever broken.
Move byte_size.add_event and event count/size tracking to after a successful encode so events that fail encoding are not counted in the request telemetry. Capture tags, json_size, and size_of before the encoder consumes the event, then apply them only on Ok(()). RequestMetadataBuilder is now built from the surviving-encode counts via new() rather than from_events() on the full pre-filter slice. Remove the unreachable if !messages.is_empty() guard in service.rs. request_builder() returns None when payloads is empty, so the Service::call path never receives an empty messages vec.
… partitions Hoist a single BytesMut outside the encoding loop and use buf.split().freeze() to extract each encoded frame, avoiding a fresh heap allocation per event. Chain .send_retries(None, None) on the IggyProducerBuilder in connect_and_init so the SDK does not apply its own 3x1s retry loop. The Tower retry layer (IggyRetryLogic) is now the sole retry policy for the sink, eliminating silent retry amplification. Validate partitions >= 1 in SinkConfig::build before connecting; a zero-partition topic creation fails at the broker with an unhelpful error, so reject it early with a clear message. Move sources-iggy and sinks-iggy to their correct lexicographic positions in Cargo.toml.
IggySink::new now takes (config, Arc<IggyClient>, Arc<IggyProducer>) after the client-lifecycle change, but publish_and_check was still passing (config, producer) and discarding the client, causing the iggy-integration-tests build to fail.
…n acks enabled When end-to-end acknowledgements are enabled, a decode failure mid-payload may leave earlier frames from the same message already in-flight downstream. The previous code called store_offset immediately on DecodeFailed, advancing the broker offset before those events had settled. If Vector exited or a sink rejected them after the commit, those events would be lost without replay. When acknowledgements are enabled and a decode failure occurs, register the batch receiver through the normal ack path instead of skipping straight to store_offset. The broker offset then only advances once the in-flight events finalize downstream. An empty batch resolves as Delivered immediately via the BatchNotifier drop path and advances the offset on the next commit tick. When acknowledgements are disabled the previous immediate store_offset behaviour is preserved since there is no ack tracking needed.
…uble-count The IggyService per-call future emitted IggySendError inside an inspect_err around producer.send(...). Tower constructs a fresh future for every retry attempt, so each transient failure that later succeeded was counted as a dropped batch -- a 100-event batch that recovered on the fourth try reported 300 dropped events even though zero were lost. Propagate the error to the Driver instead, which emits CallError (component_errors_total + ComponentEventsDropped::<UNINTENTIONAL>) exactly once on the terminal failure. Drop the now-unused IggySendError struct.
When out.send_batch fails inside process_received_message, the function returns Err(()) and the main select-loop arm previously did "return Err(())" directly. That skipped the post-loop cleanup -- drain, final commit_offsets, leave_consumer_group, and keep_alive_client.disconnect() -- leaving the TCP session open until the broker timed it out and delaying any consumer-group rebalance. Any acks that finalized after the channel closed but before the connection timed out also went uncommitted. Track the closed-channel exit in a local flag and break out of the loop instead, so the existing drain + commit + leave-group + disconnect block runs unconditionally. Map the flag back to Err(()) at the end so the caller still sees the failure.
The three offset-derived gauges (iggy_consumer_lag_messages, iggy_consumer_polled_offset, iggy_consumer_committed_offset) cast u64 directly to f64. f64 stores integers exactly only up to 2^53, so offsets above that drift in the metric. Apply the same saturating idiom already used for the event-metadata "offset" field (i64::try_from(offset).unwrap_or(i64::MAX) as f64) so all u64-to-narrow conversions on offsets behave consistently.
The other three intervals (batch_length, commit_interval_secs, drain_timeout_secs) are already rejected as 0 in IggySourceConfig::build. poll_interval_ms had a default of 100 ms but no bounds check, so a user who explicitly set it to 0 would reintroduce the busy-loop the default exists to prevent. Add the matching one-line guard.
A fixed 500 ms sleep on Some(Err(_)) prevented the original tight spin but left the source emitting an IggyReadError log line and counter increment every 500 ms (twice per second) for the duration of any longer outage. Switch to crate::common::backoff::ExponentialBackoff (already used by the websocket source and several sinks) with a 500 ms base that doubles up to a 30 s cap, and reset it on the first successful poll so transient hiccups do not penalise subsequent reads.
The drain branch broke silently on deadline expiry: an operator looking at the logs after a slow shutdown could not tell whether the final commit covered every in-flight event or whether some had been abandoned. Add a warn! with the configured timeout and the number of partitions still pending so the "best-effort commit" path is visible. The consumer.next() => None arm was a plain warn!. For a long-lived consumer that means the SDK's internal reconnection logic gave up, which is an operational incident, not a notice. Promote it to a new IggyConsumerStreamEnded internal event that emits error! plus the component_errors_total counter with a dedicated error_code, matching how the other reader-side errors are reported.
commit_offsets iterated partitions and awaited consumer.store_offset one at a time. The whole tokio::select! is blocked for the duration, so with N partitions and a slow broker the source froze for N * latency every commit tick -- backing new polls up in the SDK queue, accumulating per-partition ack streams (defeating the per-partition finalizer fix in f44da98), and delaying observation of the shutdown signal. store_offset takes &self, so the calls can run together. Take all pending offsets up front, fan them out with futures::future::join_all, and apply the results back to the partition map after the await finishes. Sub-millisecond commits are unchanged; a degraded broker now stalls the loop for max(latency) instead of sum(latency).
b5b5a62 to
126df30
Compare
After a consumer-group rebalance revokes a partition the broker returns IggyError::NotResolvedConsumer for any subsequent store_offset on it, but the per-partition entries in partitions, finalizers, and ack_streams stuck around in the source. Every commit tick re-attempted the store and re-emitted IggyOffsetUpdateError, drowning the log on a long-lived consumer that sees frequent rebalances. fc31a96 took the safer half of the fix by not re-queuing the pending offset; this completes it. Match NotResolvedConsumer specifically in commit_offsets's error arm and drop the partition from all three maps. Other error variants still emit IggyOffsetUpdateError unchanged. The shutdown drain now uses finalizers.clear() instead of drop(finalizers) so the final commit_offsets call can still borrow the map; clearing has the same effect on the sender ends.
|
@hubcio Thank you very much for that review run - I believe that I've resolved everything now, on top of then doing a self-review and clean-up. |
Summary
Adds a Vector sink/source for Apache Iggy / https://iggy.apache.org/ - a persistent message streaming platform.
Vector configuration
Sink:
Source:
How did you test this PR?
NixOS integration testing framwork.
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
Notes
@vectordotdev/vectorto reach out to us regarding this PR.pre-pushhook, please see this template.make fmtmake check-clippy(if there are failures it's possible some of them can be fixed withmake clippy-fix)make testgit merge origin masterandgit push.Cargo.lock), pleaserun
make build-licensesto regenerate the license inventory and commit the changes (if any). More details on the dd-rust-license-tool.