diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java index 5fde18815d391..c145fb5202af5 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java @@ -30,6 +30,8 @@ import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest; import software.amazon.awssdk.transfer.s3.model.FileDownload; +import javax.annotation.concurrent.ThreadSafe; + import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -68,6 +70,7 @@ * consolidate S3 URI handling across the codebase. */ @Internal +@ThreadSafe class NativeS3BulkCopyHelper { private static final Logger LOG = LoggerFactory.getLogger(NativeS3BulkCopyHelper.class); diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java index 0ec9fb226242e..8ea92a99e1e62 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java @@ -49,6 +49,7 @@ import software.amazon.awssdk.services.s3.model.S3Object; import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; import java.io.FileNotFoundException; import java.io.IOException; @@ -93,6 +94,7 @@ * return errors * */ +@ThreadSafe class NativeS3FileSystem extends FileSystem implements EntropyInjectingFileSystem, PathsCopyingFileSystem, AutoCloseableAsync { diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java index 50c0145be9348..8251b4f42541f 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; import java.net.URI; @@ -54,6 +55,7 @@ * @see org.apache.flink.core.fs.FileSystemFactory */ @Experimental +@ThreadSafe public class NativeS3FileSystemFactory implements FileSystemFactory { private static final Logger LOG = LoggerFactory.getLogger(NativeS3FileSystemFactory.class); @@ -298,8 +300,8 @@ public class NativeS3FileSystemFactory implements FileSystemFactory { + "When not set, the default chain is used: delegation tokens -> " + "static credentials (if configured) -> DefaultCredentialsProvider."); - @Nullable private Configuration flinkConfig; - @Nullable private BucketConfigProvider bucketConfigProvider; + @Nullable private volatile Configuration flinkConfig; + @Nullable private volatile BucketConfigProvider bucketConfigProvider; @Override public String getScheme() { @@ -343,8 +345,9 @@ public FileSystem create(URI fsUri) throws IOException { if (StringUtils.isNullOrWhitespaceOnly(bucketName)) { throw new IOException("Invalid S3 URI: missing or empty bucket name in URI: " + fsUri); } - if (bucketConfigProvider != null) { - S3BucketConfig overrides = bucketConfigProvider.getBucketConfig(bucketName); + BucketConfigProvider provider = this.bucketConfigProvider; + if (provider != null) { + S3BucketConfig overrides = provider.getBucketConfig(bucketName); if (overrides != null) { LOG.debug( "Applying bucket-specific configuration for bucket '{}': {}", diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java index dd2a80e4a7037..ea7e647e5f2b6 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java @@ -28,6 +28,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse; import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; import java.io.BufferedInputStream; import java.io.EOFException; @@ -41,7 +42,11 @@ * work is deferred to the next {@link #read()} call via {@link #lazySeek()}, so multiple seeks * between reads coalesce. When the seek is forward and within {@code readBufferSize}, bytes are * skipped in-buffer instead of reopening the HTTP connection. + * + *
Thread Safety: The lock guards all mutable state so that {@link #close()} can be safely + * invoked from another thread (e.g. during task cancellation). */ +@ThreadSafe class NativeS3InputStream extends FSDataInputStream { private static final Logger LOG = LoggerFactory.getLogger(NativeS3InputStream.class); @@ -78,7 +83,7 @@ class NativeS3InputStream extends FSDataInputStream { private long streamPos; @GuardedBy("lock") - private volatile boolean closed; + private boolean closed; public NativeS3InputStream( S3Client s3Client, String bucketName, String key, long contentLength) { diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java index d9a123d1b535f..dfdd53b0806db 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java @@ -25,6 +25,9 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; @@ -41,6 +44,7 @@ * can be safely invoked from another thread (e.g. during task cancellation) per {@link * org.apache.flink.core.fs.FSDataOutputStream} contract. */ +@ThreadSafe class NativeS3OutputStream extends FSDataOutputStream { private static final int BUFFER_SIZE = 64 * 1024; @@ -54,9 +58,11 @@ class NativeS3OutputStream extends FSDataOutputStream { private final ReentrantLock lock = new ReentrantLock(); + @GuardedBy("lock") private long position; /** Flag to ensure upload happens exactly once. */ + @GuardedBy("lock") private boolean fileUploaded = false; public NativeS3OutputStream( diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java index cd1c918c486f6..f434abfe49855 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java @@ -50,6 +50,8 @@ import software.amazon.awssdk.utils.SdkAutoCloseable; import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; +import javax.annotation.concurrent.ThreadSafe; import java.lang.reflect.Method; import java.lang.reflect.Modifier; @@ -70,6 +72,7 @@ * atomic operations ({@link AtomicBoolean}). */ @Internal +@ThreadSafe class S3ClientProvider implements AutoCloseableAsync { private static final Logger LOG = LoggerFactory.getLogger(S3ClientProvider.class); @@ -299,6 +302,8 @@ public static Builder builder() { return new Builder(); } + /** Mutable builder for {@link S3ClientProvider}. Not safe for concurrent use. */ + @NotThreadSafe public static class Builder { private String accessKey; private String secretKey; diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/DynamicTemporaryAWSCredentialsProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/DynamicTemporaryAWSCredentialsProvider.java index 52c387cd7c620..ea416d4cf41ff 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/DynamicTemporaryAWSCredentialsProvider.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/DynamicTemporaryAWSCredentialsProvider.java @@ -27,7 +27,10 @@ import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.services.sts.model.Credentials; +import javax.annotation.concurrent.ThreadSafe; + @Internal +@ThreadSafe public class DynamicTemporaryAWSCredentialsProvider implements AwsCredentialsProvider { private static final Logger LOG = diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/NativeS3DelegationTokenProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/NativeS3DelegationTokenProvider.java index 323f6fde12cad..4d5a93f014a98 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/NativeS3DelegationTokenProvider.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/NativeS3DelegationTokenProvider.java @@ -35,10 +35,13 @@ import software.amazon.awssdk.services.sts.model.GetSessionTokenRequest; import software.amazon.awssdk.services.sts.model.GetSessionTokenResponse; +import javax.annotation.concurrent.ThreadSafe; + import java.util.Optional; /** Provider for AWS S3 delegation tokens using STS session credentials. */ @Internal +@ThreadSafe public class NativeS3DelegationTokenProvider implements DelegationTokenProvider { private static final Logger LOG = diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/NativeS3DelegationTokenReceiver.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/NativeS3DelegationTokenReceiver.java index b6d98c18a851e..59e6bc93bb5bf 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/NativeS3DelegationTokenReceiver.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/token/NativeS3DelegationTokenReceiver.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.services.sts.model.Credentials; import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; /** * Receiver for AWS S3 delegation tokens that stores credentials for use by the file system. @@ -48,6 +49,7 @@ * */ @Internal +@ThreadSafe public class NativeS3DelegationTokenReceiver implements DelegationTokenReceiver { private static final Logger LOG = diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java index 7e93b4e766e9f..89e3e2eba1718 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java @@ -21,6 +21,8 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter; +import javax.annotation.concurrent.ThreadSafe; + import java.io.IOException; import java.util.stream.Collectors; @@ -39,6 +41,7 @@ *
The "empty parts" check is a defensive measure against programming errors - in normal * operation, a multipart upload should always have at least one part before committing. */ +@ThreadSafe class NativeS3Committer implements RecoverableFsDataOutputStream.Committer { private final NativeS3ObjectOperations s3AccessHelper; diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java index 14ff1eeab12b8..5256bb8d65a99 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java @@ -51,6 +51,9 @@ import software.amazon.awssdk.transfer.s3.model.FileUpload; import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.ThreadSafe; + import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -93,6 +96,7 @@ * https://bucket.s3.amazonaws.com/key}) are not currently supported. */ @Internal +@ThreadSafe public class NativeS3ObjectOperations { private static final Logger LOG = LoggerFactory.getLogger(NativeS3ObjectOperations.class); @@ -446,6 +450,7 @@ public static String extractBucketName(Path path) { return path.toUri().getHost(); } + @Immutable public static class UploadPartResult { private final int partNumber; private final String eTag; @@ -464,6 +469,7 @@ public String getETag() { } } + @Immutable public static class PutObjectResult { private final String eTag; @@ -476,6 +482,7 @@ public String getETag() { } } + @Immutable public static class CompleteMultipartUploadResult { private final String bucketName; private final String key; @@ -507,6 +514,7 @@ public String getLocation() { } } + @Immutable public static class ObjectMetadata { private final long contentLength; private final String eTag; diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java index 942b1d3e80ba0..0a9a09aa861f3 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java @@ -27,6 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.ThreadSafe; + import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +36,7 @@ /** Recoverable writer for S3 using multipart uploads for exactly-once semantics. */ @Experimental +@ThreadSafe public class NativeS3RecoverableWriter implements RecoverableWriter, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NativeS3RecoverableWriter.class);