Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .semaphore/publish-test-pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ blocks:
- name: Verify
commands:
- checkout
- export MISE_PYTHON_GITHUB_ATTESTATIONS=false
- sem-version python 3.11
- export CK_VERSION=$(python -c "import tomllib; print(tomllib.load(open('pyproject.toml','rb'))['project']['version'])")
- tools/test-released-wheels.sh $CK_VERSION test
Expand Down
10 changes: 1 addition & 9 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,7 @@ execution_time_limit:
global_job_config:
env_vars:
- name: LIBRDKAFKA_VERSION
value: v2.14.2
# TODO KIP-932: temporary — build librdkafka from master instead of the
# pinned LIBRDKAFKA_VERSION (v2.14.2). The share consumer changes are merged
# to librdkafka master but not in any released version yet, so CI would fail
# against v2.14.2. This blocks merging this branch to master.
# Fix: release librdkafka v2.15.0-RC1, bump LIBRDKAFKA_VERSION to it (same
# release flow as PR #2257), then remove this LIBRDKAFKA_BRANCH override.
- name: LIBRDKAFKA_BRANCH
value: master
value: v2.15.0-RC1
prologue:
commands:
- checkout
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## v2.xx.x

### Features

- New optional install `confluent-kafka[oauthbearer-aws]` provides AWS IAM-based
OAUTHBEARER authentication via AWS STS `GetWebIdentityToken`. Activate by setting `sasl.oauthbearer.method=oidc`,
`sasl.oauthbearer.metadata.authentication.type=aws_iam`, and
`sasl.oauthbearer.config="region=...,audience=..."`. See the
[AWS IAM OAUTHBEARER guide](docs/oauthbearer-aws.md) for full configuration, and
[`examples/oauth_oidc_ccloud_aws_iam.py`](examples/oauth_oidc_ccloud_aws_iam.py)
for a worked example.

### Fixes

- Fix Encryption fails when expanded union types have two references to the same record (#2262)
Expand Down
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ pip install "confluent-kafka[protobuf,schemaregistry]" # Protobuf

# With Data Contract rules (includes CSFLE support)
pip install "confluent-kafka[avro,schemaregistry,rules]"

# With AWS IAM OAUTHBEARER authentication (mints JWTs via AWS STS GetWebIdentityToken)
pip install "confluent-kafka[oauthbearer-aws]"
```

**Note:** Pre-built Linux wheels do not include SASL Kerberos/GSSAPI support. For Kerberos, see the source installation instructions in [INSTALL.md](INSTALL.md).
Expand Down Expand Up @@ -304,6 +307,20 @@ When using Data Contract rules (including CSFLE) add the `rules`extra, e.g.:
pip install "confluent-kafka[avro,schemaregistry,rules]"
```

To authenticate to a Kafka cluster using AWS IAM (when running on EC2, EKS, ECS,
Fargate, or Lambda with an IAM role attached), add the `oauthbearer-aws` extra:

```bash
pip install "confluent-kafka[oauthbearer-aws]"
```

Activation is config-only — set `sasl.oauthbearer.method=oidc`,
`sasl.oauthbearer.metadata.authentication.type=aws_iam`, and
`sasl.oauthbearer.config="region=...,audience=..."`. The client mints fresh
JWTs via AWS STS on every token refresh — no static credentials, no Python-side
imports. See [`examples/oauth_oidc_ccloud_aws_iam.py`](examples/oauth_oidc_ccloud_aws_iam.py)
for a worked example.

**Install from source**

For source install, see the *Install from source* section in [INSTALL.md](INSTALL.md).
Expand Down
3 changes: 3 additions & 0 deletions docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ rst2md:
$(PANDOC) --wrap=none --from=rst --to=gfm --output=kip-848-migration-guide.md kip-848-migration-guide.rst
@echo
@echo "KIP-848 Migration Guide Markdown file generated: kip-848-migration-guide.md"
$(PANDOC) --wrap=none --from=rst --to=gfm --output=oauthbearer-aws.md oauthbearer-aws.rst
@echo
@echo "AWS IAM OAUTHBEARER guide Markdown file generated: oauthbearer-aws.md"

html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
Expand Down
11 changes: 10 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Guides
- :ref:`Configuration Guide <pythonclient_configuration>`
- :ref:`Transactional API <pythonclient_transactional>`
- :ref:`KIP-848 Migration Guide <pythonclient_migration_kip848>`
- :ref:`AWS IAM OAUTHBEARER <pythonclient_oauthbearer_aws>`

Client API
- :ref:`Producer <pythonclient_producer>`
Expand Down Expand Up @@ -1098,4 +1099,12 @@ https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
`KIP-848 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol>`_ - Migration Guide
==================================================================================================================================================

.. include:: kip-848-migration-guide.rst
.. include:: kip-848-migration-guide.rst


.. _pythonclient_oauthbearer_aws:

AWS IAM OAUTHBEARER Authentication
==================================

.. include:: oauthbearer-aws.rst
134 changes: 134 additions & 0 deletions docs/oauthbearer-aws.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
AWS IAM-based authentication for Kafka clients via the **OAUTHBEARER** SASL mechanism, delivered as the optional `confluent-kafka[oauthbearer-aws]` install extra. When the application runs on AWS compute (EC2, EKS, ECS, Fargate, or Lambda) with an IAM role attached, the client mints short-lived JSON Web Tokens through AWS STS `GetWebIdentityToken` (via `boto3`) and hands them to the broker as the SASL bearer credential — refreshing them automatically before they expire.

**Activation is config-only.** Install the extra and set the three configuration keys below; no application code changes are required at the integration site. Applications that do not install the extra pull **zero AWS dependencies**.

# Installation

``` bash
pip install 'confluent-kafka[oauthbearer-aws]'
```

This adds `boto3` to the dependency graph. Without the extra, nothing AWS is imported or installed.

# Quick start

The minimum configuration is the two required `sasl.oauthbearer.config` keys — `region` and `audience` — alongside the standard SASL/OAUTHBEARER settings:

``` python
from confluent_kafka import Consumer

consumer = Consumer({
'bootstrap.servers': 'pkc-xxxx.aws.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',

'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.metadata.authentication.type': 'aws_iam',
'sasl.oauthbearer.config': 'region=us-east-1,audience=https://confluent.cloud/oidc',

'group.id': 'my-group',
'auto.offset.reset': 'earliest',
})
consumer.subscribe(['my-topic'])
```

The same SASL keys apply unchanged to `Producer` and `AdminClient`.

# All options

Every supported key, set on a single comma-separated `sasl.oauthbearer.config` string, with SASL extensions supplied through the separate `sasl.oauthbearer.extensions` key:

``` python
conf = {
'bootstrap.servers': 'pkc-xxxx.aws.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',

'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.metadata.authentication.type': 'aws_iam',
'sasl.oauthbearer.config': (
'region=us-east-1,'
'audience=https://confluent.cloud/oidc,'
'duration_seconds=900,'
'signing_algorithm=ES384,'
'sts_endpoint=https://sts-fips.us-east-1.amazonaws.com,'
'aws_debug=console,'
'tag_team=platform,'
'tag_environment=prod'
),
'sasl.oauthbearer.extensions': (
'logicalCluster=lkc-abc,'
'identityPoolId=pool-xyz'
),
}
```

# Configuration

`sasl.oauthbearer.config` is a single string of comma-separated `key=value` pairs, e.g. `region=us-east-1,audience=https://confluent.cloud/oidc`. Leading and trailing whitespace around each pair is ignored, but do **not** put spaces directly around the `=` — `region = us-east-1` is read as a key named `region` *with a trailing space* and rejected. Each pair splits on its first `=`, so a value may itself contain `=` (no escaping needed); to include a literal comma in a value, escape it with a backslash (`\,`). Any key not listed below is rejected with a descriptive error.

## Required

| Key | Description |
|----|----|
| `region` | AWS region used for the STS `GetWebIdentityToken` call (e.g. `us-east-1`). |
| `audience` | The OIDC audience the broker expects; must match the audience configured on the IAM role's trust relationship (e.g. `https://confluent.cloud/oidc`). |

## Optional

| Key | Default | Description |
|----|----|----|
| `duration_seconds` | `300` | Requested token lifetime in seconds (60–3600). The client auto-refreshes the token at roughly 80% of this value. |
| `signing_algorithm` | `ES384` | JWT signature algorithm: `ES384` or `RS256`. |
| `sts_endpoint` | SDK default | Override the STS endpoint URL — e.g. a FIPS or VPC (PrivateLink) endpoint. |
| `aws_debug` | `none` | AWS SDK diagnostic logging: `none` or `console`. |
| `tag_<name>` | — | Adds a custom claim to the minted token (repeatable; up to 50). For example `tag_team=platform` adds a `team` tag with value `platform`. |

## Signing algorithm

`ES384` (the default) produces compact ECDSA signatures and is recommended for most deployments. Use `RS256` only when an RSA-based signature is specifically required.

## AWS SDK diagnostic logging

`aws_debug` controls AWS SDK log output, which is useful when validating the credential chain or diagnosing STS failures:

- `none` (default) — the client does not touch AWS SDK logging.
- `console` — enables verbose `botocore` logging to standard error.

``` text
# Quiet (production)
region=us-east-1,audience=https://confluent.cloud/oidc

# Verbose AWS SDK output while debugging
region=us-east-1,audience=https://confluent.cloud/oidc,aws_debug=console
```

# SASL extensions

SASL/OAUTHBEARER extensions (for example, a Confluent Cloud logical cluster and identity pool) are configured through the separate `sasl.oauthbearer.extensions` key — **not** inside `sasl.oauthbearer.config` — as a comma-separated list of `key=value` pairs:

``` python
'sasl.oauthbearer.extensions': 'logicalCluster=lkc-abc,identityPoolId=pool-xyz'
```

# Prerequisites

- The application runs with AWS credentials resolvable by `boto3`'s default credential chain — an EC2 instance profile, EKS IRSA, an ECS/Fargate task role, a Lambda execution role, environment variables, or a shared credentials file.
- The IAM role is permitted to mint a web identity token via STS `GetWebIdentityToken` for the requested `audience`.
- The `audience` value matches the audience configured on the IAM role's trust relationship and the broker-side identity provider.

# Common pitfalls

- **\`\`sasl.oauthbearer.method=oidc\`\` is mandatory.** When `sasl.oauthbearer.metadata.authentication.type=aws_iam` is set, omitting or changing the method raises a `ValueError` at client construction.
- **Set \`\`security.protocol=SASL_SSL\`\` and \`\`sasl.mechanisms=OAUTHBEARER\`\`.** Without them the OAUTHBEARER token-refresh path never engages and authentication silently does not happen.
- **\`\`sasl.oauthbearer.config\`\` must be present and non-empty** — it carries the required `region` and `audience`. A missing or empty value raises a `ValueError` naming the missing keys.
- **Install the extra, not \`\`boto3\`\` directly.** If the marker is set but the extra is not installed, client construction raises an `ImportError` pointing to `pip install 'confluent-kafka[oauthbearer-aws]'`.
- **\`\`audience\`\` mismatches fail at STS.** If the configured `audience` does not match the IAM role's trust relationship, the `GetWebIdentityToken` call is rejected and no token is issued.

# Requirements

- The `oauthbearer-aws` extra installs `boto3` (a version recent enough to expose the STS `GetWebIdentityToken` API).

# Further reading

A complete, runnable producer/consumer/admin example is available at [examples/oauth_oidc_ccloud_aws_iam.py](https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/oauth_oidc_ccloud_aws_iam.py).
Loading