Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
import software.amazon.awssdk.transfer.s3.model.FileDownload;

import javax.annotation.concurrent.ThreadSafe;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -68,6 +70,7 @@
* consolidate S3 URI handling across the codebase.
*/
@Internal
@ThreadSafe
class NativeS3BulkCopyHelper {

private static final Logger LOG = LoggerFactory.getLogger(NativeS3BulkCopyHelper.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import software.amazon.awssdk.services.s3.model.S3Object;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -93,6 +94,7 @@
* return errors
* </ul>
*/
@ThreadSafe
class NativeS3FileSystem extends FileSystem
implements EntropyInjectingFileSystem, PathsCopyingFileSystem, AutoCloseableAsync {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.net.URI;
Expand All @@ -54,6 +55,7 @@
* @see org.apache.flink.core.fs.FileSystemFactory
*/
@Experimental
@ThreadSafe
public class NativeS3FileSystemFactory implements FileSystemFactory {

private static final Logger LOG = LoggerFactory.getLogger(NativeS3FileSystemFactory.class);
Expand Down Expand Up @@ -298,8 +300,8 @@ public class NativeS3FileSystemFactory implements FileSystemFactory {
+ "When not set, the default chain is used: delegation tokens -> "
+ "static credentials (if configured) -> DefaultCredentialsProvider.");

@Nullable private Configuration flinkConfig;
@Nullable private BucketConfigProvider bucketConfigProvider;
@Nullable private volatile Configuration flinkConfig;
@Nullable private volatile BucketConfigProvider bucketConfigProvider;
Comment on lines +303 to +304
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without volatile, the JVM can legally:

  • Cache flinkConfig in a CPU register on thread B's core
  • Never re-read it from main memory
  • Thread B sees null even though thread A wrote a real Configuration


@Override
public String getScheme() {
Expand Down Expand Up @@ -343,8 +345,9 @@ public FileSystem create(URI fsUri) throws IOException {
if (StringUtils.isNullOrWhitespaceOnly(bucketName)) {
throw new IOException("Invalid S3 URI: missing or empty bucket name in URI: " + fsUri);
}
if (bucketConfigProvider != null) {
S3BucketConfig overrides = bucketConfigProvider.getBucketConfig(bucketName);
BucketConfigProvider provider = this.bucketConfigProvider;
if (provider != null) {
S3BucketConfig overrides = provider.getBucketConfig(bucketName);
if (overrides != null) {
LOG.debug(
"Applying bucket-specific configuration for bucket '{}': {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import software.amazon.awssdk.services.s3.model.GetObjectResponse;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.io.BufferedInputStream;
import java.io.EOFException;
Expand All @@ -41,7 +42,11 @@
* work is deferred to the next {@link #read()} call via {@link #lazySeek()}, so multiple seeks
* between reads coalesce. When the seek is forward and within {@code readBufferSize}, bytes are
* skipped in-buffer instead of reopening the HTTP connection.
*
* <p><b>Thread Safety:</b> The lock guards all mutable state so that {@link #close()} can be safely
* invoked from another thread (e.g. during task cancellation).
*/
@ThreadSafe
class NativeS3InputStream extends FSDataInputStream {

private static final Logger LOG = LoggerFactory.getLogger(NativeS3InputStream.class);
Expand Down Expand Up @@ -78,7 +83,7 @@ class NativeS3InputStream extends FSDataInputStream {
private long streamPos;

@GuardedBy("lock")
private volatile boolean closed;
private boolean closed;

public NativeS3InputStream(
S3Client s3Client, String bucketName, String key, long contentLength) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
Expand All @@ -41,6 +44,7 @@
* can be safely invoked from another thread (e.g. during task cancellation) per {@link
* org.apache.flink.core.fs.FSDataOutputStream} contract.
*/
@ThreadSafe
class NativeS3OutputStream extends FSDataOutputStream {

private static final int BUFFER_SIZE = 64 * 1024;
Expand All @@ -54,9 +58,11 @@ class NativeS3OutputStream extends FSDataOutputStream {

private final ReentrantLock lock = new ReentrantLock();

@GuardedBy("lock")
private long position;

/** Flag to ensure upload happens exactly once. */
@GuardedBy("lock")
private boolean fileUploaded = false;

public NativeS3OutputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import software.amazon.awssdk.utils.SdkAutoCloseable;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;

import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
Expand All @@ -70,6 +72,7 @@
* atomic operations ({@link AtomicBoolean}).
*/
@Internal
@ThreadSafe
class S3ClientProvider implements AutoCloseableAsync {

private static final Logger LOG = LoggerFactory.getLogger(S3ClientProvider.class);
Expand Down Expand Up @@ -299,6 +302,8 @@ public static Builder builder() {
return new Builder();
}

/** Mutable builder for {@link S3ClientProvider}. Not safe for concurrent use. */
@NotThreadSafe
public static class Builder {
private String accessKey;
private String secretKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.services.sts.model.Credentials;

import javax.annotation.concurrent.ThreadSafe;

@Internal
@ThreadSafe
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi :
if we say thread safe , from what thread we are protecting against ?
if not then we dont care .

If there are multiple thread involved. list them !

public class DynamicTemporaryAWSCredentialsProvider implements AwsCredentialsProvider {

private static final Logger LOG =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@
import software.amazon.awssdk.services.sts.model.GetSessionTokenRequest;
import software.amazon.awssdk.services.sts.model.GetSessionTokenResponse;

import javax.annotation.concurrent.ThreadSafe;

import java.util.Optional;

/** Provider for AWS S3 delegation tokens using STS session credentials. */
@Internal
@ThreadSafe
public class NativeS3DelegationTokenProvider implements DelegationTokenProvider {

private static final Logger LOG =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import software.amazon.awssdk.services.sts.model.Credentials;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

/**
* Receiver for AWS S3 delegation tokens that stores credentials for use by the file system.
Expand All @@ -48,6 +49,7 @@
* </ul>
*/
@Internal
@ThreadSafe
public class NativeS3DelegationTokenReceiver implements DelegationTokenReceiver {

private static final Logger LOG =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;

import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.util.stream.Collectors;

Expand All @@ -39,6 +41,7 @@
* <p>The "empty parts" check is a defensive measure against programming errors - in normal
* operation, a multipart upload should always have at least one part before committing.
*/
@ThreadSafe
class NativeS3Committer implements RecoverableFsDataOutputStream.Committer {

private final NativeS3ObjectOperations s3AccessHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;

import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.ThreadSafe;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -93,6 +96,7 @@
* https://bucket.s3.amazonaws.com/key}) are not currently supported.
*/
@Internal
@ThreadSafe
public class NativeS3ObjectOperations {

private static final Logger LOG = LoggerFactory.getLogger(NativeS3ObjectOperations.class);
Expand Down Expand Up @@ -446,6 +450,7 @@ public static String extractBucketName(Path path) {
return path.toUri().getHost();
}

@Immutable
public static class UploadPartResult {
private final int partNumber;
private final String eTag;
Expand All @@ -464,6 +469,7 @@ public String getETag() {
}
}

@Immutable
public static class PutObjectResult {
private final String eTag;

Expand All @@ -476,6 +482,7 @@ public String getETag() {
}
}

@Immutable
public static class CompleteMultipartUploadResult {
private final String bucketName;
private final String key;
Expand Down Expand Up @@ -507,6 +514,7 @@ public String getLocation() {
}
}

@Immutable
public static class ObjectMetadata {
private final long contentLength;
private final String eTag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Recoverable writer for S3 using multipart uploads for exactly-once semantics. */
@Experimental
@ThreadSafe
public class NativeS3RecoverableWriter implements RecoverableWriter, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(NativeS3RecoverableWriter.class);
Expand Down