Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,11 @@ Icon?
.VolumeIcon.icns
.com.apple.timemachine.donotpresent

# Python
__pycache__/
*.pyc

# TypeScript build outputs
dist/
.venv/
batch-operations/python/psycopg2/.venv
114 changes: 114 additions & 0 deletions java/batch_operations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Batch Operations with pgJDBC

## Overview

This code example demonstrates how to perform batch DELETE and UPDATE operations in Amazon Aurora DSQL
when working with datasets exceeding the 3,000-row transaction mutation limit. The example uses
[pgJDBC](https://jdbc.postgresql.org/) with the
[Aurora DSQL JDBC Connector](https://github.com/awslabs/aurora-dsql-java-connector) for automatic
IAM authentication.

Two patterns are provided:

- **Sequential**: A single-threaded loop that processes rows in configurable-size batches (default 1,000),
committing each batch as a separate transaction.
- **Parallel**: Multiple worker threads each process a disjoint partition of the dataset concurrently using
`hashtext()` partitioning, with each worker running its own batch loop.

Both patterns include OCC (Optimistic Concurrency Control) retry logic with exponential backoff.

## About the code example

Aurora DSQL limits each transaction to 3,000 row mutations. To DELETE or UPDATE more than 3,000 rows,
you must split the work into batches, each committed as a separate transaction.

The parallel pattern partitions rows across worker threads using
`abs(hashtext(CAST(id AS text))) % num_workers = worker_id`, ensuring workers operate on disjoint sets
of rows and avoid OCC conflicts with each other.

⚠️ **Important**

- Running this code might result in charges to your AWS account.
- Each batch is a separate transaction. A failure mid-way leaves the dataset partially modified.
Design your operations to be idempotent where possible.

## Prerequisites

- You must have an AWS account, and have your default credentials and AWS Region configured as described
in the [Globally configuring AWS SDKs and tools](https://docs.aws.amazon.com/sdkref/latest/guide/creds-config-files.html) guide.
- Java Development Kit (JDK) 17 or later.
- Gradle (the wrapper is included in this project).
- You must have an Aurora DSQL cluster. For information about creating a cluster, see the
[Getting started with Aurora DSQL](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/getting-started.html) guide.

## Set up the test table

Before running the examples, create and populate the test table. Aurora DSQL uses IAM authentication,
so you need to generate a fresh auth token each time you connect with `psql`:

```bash
export CLUSTER_ENDPOINT="<your-cluster-endpoint>"
export CLUSTER_REGION="<your-region>"

# Generate a fresh auth token (expires in 3600 seconds)
export PGPASSWORD=$(aws dsql generate-db-connect-admin-auth-token \
--hostname $CLUSTER_ENDPOINT \
--region $CLUSTER_REGION \
--expires-in 3600)

psql "host=$CLUSTER_ENDPOINT dbname=postgres user=admin sslmode=verify-full sslrootcert=system" \
-f batch_test_setup.sql
```

## Run the example

Set environment variables for your cluster:

```bash
# e.g. "admin"
export CLUSTER_USER="admin"

# e.g. "foo0bar1baz2quux3quuux4.dsql.us-east-1.on.aws"
export CLUSTER_ENDPOINT="<your-cluster-endpoint>"
```

Build and run:

```bash
./gradlew run --args="--endpoint $CLUSTER_ENDPOINT --user $CLUSTER_USER"
```

### Command-line options

| Option | Default | Description |
|--------|---------|-------------|
| `--endpoint` | (required) | Aurora DSQL cluster endpoint |
| `--user` | `admin` | Database user |
| `--batch-size` | `1000` | Rows per batch transaction (must be < 3000) |
| `--num-workers` | `4` | Number of parallel worker threads |

## Clean up

After running the demo, drop the test table to avoid unnecessary storage:

```bash
# Generate a fresh token if the previous one expired
export PGPASSWORD=$(aws dsql generate-db-connect-admin-auth-token \
--hostname $CLUSTER_ENDPOINT \
--region $CLUSTER_REGION \
--expires-in 3600)

psql "host=$CLUSTER_ENDPOINT dbname=postgres user=admin sslmode=verify-full sslrootcert=system" \
-c "DROP TABLE IF EXISTS batch_test;"
```

## Additional resources

- [Amazon Aurora DSQL Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/)
- [Aurora DSQL JDBC Connector](https://github.com/awslabs/aurora-dsql-java-connector)
- [pgJDBC Documentation](https://jdbc.postgresql.org/documentation/)

---

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
214 changes: 214 additions & 0 deletions java/batch_operations/batch_test_setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
-- =============================================================================
-- Sample table setup for Aurora DSQL batch operations
--
-- This script creates a test table and populates it with 25,000 rows of sample
-- data distributed across 5 categories (~5,000 rows each). Each category is
-- used by a different demo operation:
--
-- electronics → Sequential batch DELETE
-- clothing → Sequential batch UPDATE
-- food → Parallel batch DELETE
-- books → Parallel batch UPDATE
-- toys → (unused, remains in table)
--
-- Aurora DSQL limits each transaction to 3,000 row mutations, so we insert
-- in batches of 1,000 rows. Run each INSERT as a separate transaction.
-- =============================================================================

-- Drop the table if it already exists
DROP TABLE IF EXISTS batch_test;

-- Create the sample table.
-- Uses UUID primary key with gen_random_uuid() to minimize OCC contention,
-- following Aurora DSQL best practice of random keys.
CREATE TABLE batch_test (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
category VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'active',
value NUMERIC(10,2),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Create an index on the category column.
CREATE INDEX ASYNC idx_batch_test_category ON batch_test (category);

-- =============================================================================
-- Populate the table with 25,000 rows of test data (25 batches of 1,000).
-- Each INSERT is 1,000 rows — run each as a separate transaction.
-- =============================================================================

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);

INSERT INTO batch_test (category, status, value)
SELECT
(ARRAY['electronics', 'clothing', 'food', 'books', 'toys'])[floor(random() * 5 + 1)],
'active',
round((random() * 1000)::numeric, 2)
FROM generate_series(1, 1000);
40 changes: 40 additions & 0 deletions java/batch_operations/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
plugins {
id 'java'
id 'application'
}

group = 'com.example.dsql'
version = '1.0.0'

java {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}

repositories {
mavenCentral()
}

dependencies {
implementation 'org.postgresql:postgresql:42.7.10'
implementation 'com.zaxxer:HikariCP:7.0.2'
implementation 'software.amazon.dsql:aurora-dsql-jdbc-connector:1.4.0'
implementation 'software.amazon.awssdk:dsql:2.42.14'

testImplementation platform('org.junit:junit-bom:5.11.4')
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

application {
mainClass = 'com.example.dsql.batch_operations.Main'
}

tasks.named('test') {
useJUnitPlatform()

testLogging {
events 'passed', 'skipped', 'failed', 'standardOut', 'standardError'
exceptionFormat = 'FULL'
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this jar file for? Should we be shipping it?

Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-9.4.0-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Loading
Loading