diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java index 4faba16211..5a59b61c2a 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java @@ -56,310 +56,311 @@ */ public class S3Client { - private static final Logger log = LoggerFactory.getLogger(S3Client.class); - - private software.amazon.awssdk.services.s3.S3Client client; - - // Semaphore to limit concurrent client connections when using virtual threads. - private Semaphore semaphore; - - private ObjectCannedACL cannedAcl; - - private String kmsKeyId; - - private ServerSideEncryption storageEncryption; - - private ExtendedS3TransferManager transferManager; - - private ExecutorService transferPool; - - private Integer transferManagerThreads = 10; - - private Boolean isRequesterPaysEnabled = false; - - private String callerAccount; - - private AwsClientFactory factory; - - private Properties props; - - private boolean global; - - public S3Client(AwsClientFactory factory, Properties props, boolean global) { - S3SyncClientConfiguration clientConfig = S3SyncClientConfiguration.create(props); - this.factory = factory; - this.props = props; - this.global = global; - this.client = factory.getS3Client(clientConfig, global); - this.semaphore = Threads.useVirtual() ? new Semaphore(clientConfig.getMaxConnections()) : null; - this.callerAccount = fetchCallerAccount(); - } - - /** - * Perform an action that requires the S3 semaphore to limit concurrent connections. - * - * @param action - */ - private T runWithPermit(Supplier action) { - try { - if (semaphore != null) semaphore.acquire(); - try { - return action.get(); - } finally { - if (semaphore != null) semaphore.release(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while acquiring S3 client semaphore", e); - } - } - - /** - * AmazonS3Client#getS3AccountOwner() is not available in SDK v2. - * The STSClient#getCallerIdentity returns the account, but it does not include the canonical ID required for ACLs. - * - * This function and the fetchCallerAccount() emulate the old behavior retrieving the canonicalId can only be - * retrieved if the user owns a bucket. - */ - public String getCallerAccount() { - return callerAccount; - } - - private String fetchCallerAccount(){ - try { - List buckets = runWithPermit(() -> client.listBuckets(ListBucketsRequest.builder().maxBuckets(1).build()).buckets()); - if (buckets == null || buckets.isEmpty()) - return null; - return getBucketAcl(buckets.get(0).name()).owner().id(); - }catch (Throwable e){ - log.debug("Unable to fetch caller account - {} ", e.getMessage()); - return null; - } - } - - /** - * @see software.amazon.awssdk.services.s3.S3Client#listBuckets() - */ - public List listBuckets() throws IOException { + private static final Logger log = LoggerFactory.getLogger(S3Client.class); + + private software.amazon.awssdk.services.s3.S3Client client; + + // Semaphore to limit concurrent client connections when using virtual threads. + private Semaphore semaphore; + + private ObjectCannedACL cannedAcl; + + private String kmsKeyId; + + private ServerSideEncryption storageEncryption; + + private ExtendedS3TransferManager transferManager; + + private ExecutorService transferPool; + + private Integer transferManagerThreads = 10; + + private Boolean isRequesterPaysEnabled = false; + + private String callerAccount; + + private AwsClientFactory factory; + + private Properties props; + + private boolean global; + + public S3Client(AwsClientFactory factory, Properties props, boolean global) { + S3SyncClientConfiguration clientConfig = S3SyncClientConfiguration.create(props); + this.factory = factory; + this.props = props; + this.global = global; + this.client = factory.getS3Client(clientConfig, global); + this.semaphore = Threads.useVirtual() ? new Semaphore(clientConfig.getMaxConnections()) : null; + this.callerAccount = fetchCallerAccount(); + } + + /** + * Perform an action that requires the S3 semaphore to limit concurrent connections. + * + * @param action + */ + private T runWithPermit(Supplier action) { + try { + if (semaphore != null) semaphore.acquire(); + try { + return action.get(); + } finally { + if (semaphore != null) semaphore.release(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while acquiring S3 client semaphore", e); + } + } + + /** + * AmazonS3Client#getS3AccountOwner() is not available in SDK v2. + * The STSClient#getCallerIdentity returns the account, but it does not include the canonical ID required for ACLs. + * + * This function and the fetchCallerAccount() emulate the old behavior retrieving the canonicalId can only be + * retrieved if the user owns a bucket. + */ + public String getCallerAccount() { + return callerAccount; + } + + private String fetchCallerAccount() { + try { + List buckets = runWithPermit(() -> client.listBuckets(ListBucketsRequest.builder().maxBuckets(1).build()).buckets()); + if (buckets == null || buckets.isEmpty()) + return null; + return getBucketAcl(buckets.get(0).name()).owner().id(); + } catch (Throwable e) { + log.debug("Unable to fetch caller account - {} ", e.getMessage()); + return null; + } + } + + /** + * @see software.amazon.awssdk.services.s3.S3Client#listBuckets() + */ + public List listBuckets() throws IOException { try { return runWithPermit(() -> client.listBuckets().buckets()); } catch (SdkException e) { - throw convertAwsException(e, "listBuckets", null, null); + throw convertAwsException(e, "listBuckets", null, null); } - } + } - /** - * @see software.amazon.awssdk.services.s3.S3Client#listObjects(ListObjectsRequest) - */ - public ListObjectsResponse listObjects(ListObjectsRequest request) throws IOException { - try { + /** + * @see software.amazon.awssdk.services.s3.S3Client#listObjects(ListObjectsRequest) + */ + public ListObjectsResponse listObjects(ListObjectsRequest request) throws IOException { + try { return runWithPermit(() -> client.listObjects(request)); - } catch (SdkException e){ + } catch (SdkException e) { throw convertAwsException(e, "listObject", request.bucket(), request.prefix()); } - } - - /** - * Convert an AWS SDK exception into the most appropriate {@link IOException} subtype - * so callers can handle it via standard NIO semantics. - * - * The original {@code SdkException} is always attached as the cause for diagnostics. - */ - // package-private for testing - static IOException convertAwsException(SdkException e, String method, String bucket, String key) { - final String s3path = (key != null && !key.isEmpty()) - ? "s3://" + bucket + "/" + key - : "s3://" + (bucket != null ? bucket : ""); - final String message = String.format("Exception calling %s for %s", method, s3path); - - if (e instanceof NoSuchBucketException || e instanceof NoSuchKeyException) { - final NoSuchFileException nsfe = new NoSuchFileException(s3path); - nsfe.initCause(e); - return nsfe; - } - - if (e instanceof AwsServiceException) { - final int code = ((AwsServiceException) e).statusCode(); - if (code == 404) { - final NoSuchFileException nsfe = new NoSuchFileException(s3path); - nsfe.initCause(e); - return nsfe; - } - if (code == 401 || code == 403) { - final AccessDeniedException ade = new AccessDeniedException(s3path, null, e.getMessage()); - ade.initCause(e); - return ade; - } - } - - return new IOException(message, e); - } - - /** - * @see software.amazon.awssdk.services.s3.S3Client#getObject - */ - public ResponseInputStream getObject(String bucketName, String key) throws IOException { - GetObjectRequest.Builder reqBuilder = GetObjectRequest.builder().bucket(bucketName).key(key); - if( this.isRequesterPaysEnabled ) - reqBuilder.requestPayer(RequestPayer.REQUESTER); + } + + /** + * Convert an AWS SDK exception into the most appropriate {@link IOException} subtype + * so callers can handle it via standard NIO semantics. + * + * The original {@code SdkException} is always attached as the cause for diagnostics. + */ + // package-private for testing + static IOException convertAwsException(SdkException e, String method, String bucket, String key) { + final String s3path = (key != null && !key.isEmpty()) + ? "s3://" + bucket + "/" + key + : "s3://" + (bucket != null ? bucket : ""); + final String message = String.format("Exception calling %s for %s", method, s3path); + + if (e instanceof NoSuchBucketException || e instanceof NoSuchKeyException) { + final NoSuchFileException nsfe = new NoSuchFileException(s3path); + nsfe.initCause(e); + return nsfe; + } + + if (e instanceof AwsServiceException) { + final int code = ((AwsServiceException) e).statusCode(); + if (code == 404) { + final NoSuchFileException nsfe = new NoSuchFileException(s3path); + nsfe.initCause(e); + return nsfe; + } + if (code == 401 || code == 403) { + final AccessDeniedException ade = new AccessDeniedException(s3path, null, e.getMessage()); + ade.initCause(e); + return ade; + } + } + + return new IOException(message, e); + } + + /** + * @see software.amazon.awssdk.services.s3.S3Client#getObject + */ + public ResponseInputStream getObject(String bucketName, String key) throws IOException { + GetObjectRequest.Builder reqBuilder = GetObjectRequest.builder().bucket(bucketName).key(key); + if( this.isRequesterPaysEnabled ) + reqBuilder.requestPayer(RequestPayer.REQUESTER); try { return runWithPermit(() -> client.getObject(reqBuilder.build())); } catch (SdkException e) { throw convertAwsException(e, "getObject", bucketName, key); } - } - - /** - * @see software.amazon.awssdk.services.s3.S3Client#putObject - */ - public PutObjectResponse putObject(String bucket, String key, File file) throws IOException { - PutObjectRequest.Builder builder = PutObjectRequest.builder().bucket(bucket).key(key); - if( cannedAcl != null ) { - log.trace("Setting canned ACL={}; bucket={}; key={}", cannedAcl, bucket, key); - builder.acl(cannedAcl); - } - try { + } + + /** + * @see software.amazon.awssdk.services.s3.S3Client#putObject + */ + public PutObjectResponse putObject(String bucket, String key, File file) throws IOException { + PutObjectRequest.Builder builder = PutObjectRequest.builder().bucket(bucket).key(key); + if( cannedAcl != null ) { + log.trace("Setting canned ACL={}; bucket={}; key={}", cannedAcl, bucket, key); + builder.acl(cannedAcl); + } + try { return runWithPermit(() -> client.putObject(builder.build(), file.toPath())); } catch (SdkException e) { throw convertAwsException(e, "putObject", bucket, key); } - } - - private PutObjectRequest preparePutObjectRequest(PutObjectRequest.Builder reqBuilder, List tags, String contentType, String storageClass) { - if( cannedAcl != null ) { - reqBuilder.acl(cannedAcl); - } - if( tags != null && !tags.isEmpty()) { - reqBuilder.tagging(Tagging.builder().tagSet(tags).build()); - } - if( kmsKeyId != null ) { - reqBuilder.ssekmsKeyId(kmsKeyId); - } - if( storageEncryption!=null ) { - reqBuilder.serverSideEncryption(storageEncryption); - } - if( contentType!=null ) { - reqBuilder.contentType(contentType); - } - if( storageClass!=null ) { - reqBuilder.storageClass(storageClass); - } - return reqBuilder.build(); - } - - /** - * @see software.amazon.awssdk.services.s3.S3Client#putObject - */ - public PutObjectResponse putObject(String bucket, String keyName, InputStream inputStream, List tags, String contentType, long contentLength) throws IOException { - PutObjectRequest.Builder reqBuilder = PutObjectRequest.builder() - .bucket(bucket) - .key(keyName); - if( cannedAcl != null ) { - reqBuilder.acl(cannedAcl); - } - if( tags != null && !tags.isEmpty()) { - reqBuilder.tagging(Tagging.builder().tagSet(tags).build()); - } - if( kmsKeyId != null ) { - reqBuilder.ssekmsKeyId(kmsKeyId); - } - if( storageEncryption!=null ) { - reqBuilder.serverSideEncryption(storageEncryption); - } - if( contentType!=null ) { - reqBuilder.contentType(contentType); - } - PutObjectRequest req = reqBuilder.build(); - if( log.isTraceEnabled() ) { - log.trace("S3 PutObject request {}", req); - } - try { - return runWithPermit(() -> client.putObject(req, RequestBody.fromInputStream(inputStream, contentLength))); - } catch (SdkException e) { - throw convertAwsException(e, "putObject", bucket, keyName); - } - } - - /** - * @see software.amazon.awssdk.services.s3.S3Client#deleteObject - */ - public void deleteObject(String bucket, String key) throws IOException { + } + + private PutObjectRequest preparePutObjectRequest(PutObjectRequest.Builder reqBuilder, List tags, String contentType, String storageClass) { + if( cannedAcl != null ) { + reqBuilder.acl(cannedAcl); + } + if( tags != null && !tags.isEmpty()) { + reqBuilder.tagging(Tagging.builder().tagSet(tags).build()); + } + if( kmsKeyId != null ) { + reqBuilder.ssekmsKeyId(kmsKeyId); + } + if( storageEncryption!=null ) { + reqBuilder.serverSideEncryption(storageEncryption); + } + if( contentType!=null ) { + reqBuilder.contentType(contentType); + } + if( storageClass!=null ) { + reqBuilder.storageClass(storageClass); + } + return reqBuilder.build(); + } + + /** + * @see software.amazon.awssdk.services.s3.S3Client#putObject + */ + public PutObjectResponse putObject(String bucket, String keyName, InputStream inputStream, List tags, String contentType, long contentLength) throws IOException { + PutObjectRequest.Builder reqBuilder = PutObjectRequest.builder() + .bucket(bucket) + .key(keyName); + if( cannedAcl != null ) { + reqBuilder.acl(cannedAcl); + } + if( tags != null && !tags.isEmpty()) { + reqBuilder.tagging(Tagging.builder().tagSet(tags).build()); + } + if( kmsKeyId != null ) { + reqBuilder.ssekmsKeyId(kmsKeyId); + } + if( storageEncryption!=null ) { + reqBuilder.serverSideEncryption(storageEncryption); + } + if( contentType!=null ) { + reqBuilder.contentType(contentType); + } + PutObjectRequest req = reqBuilder.build(); + if( log.isTraceEnabled() ) { + log.trace("S3 PutObject request {}", req); + } + try { + return runWithPermit(() -> client.putObject(req, RequestBody.fromInputStream(inputStream, contentLength))); + } catch (SdkException e) { + throw convertAwsException(e, "putObject", bucket, keyName); + } + } + + /** + * @see software.amazon.awssdk.services.s3.S3Client#deleteObject + */ + public void deleteObject(String bucket, String key) throws IOException { try { runWithPermit(() -> client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build())); } catch (SdkException e) { throw convertAwsException(e, "deleteObject", bucket, key); } - } + } - /** - * @see software.amazon.awssdk.services.s3.S3Client#getBucketAcl - */ - public AccessControlPolicy getBucketAcl(String bucket) throws IOException { + /** + * @see software.amazon.awssdk.services.s3.S3Client#getBucketAcl + */ + public AccessControlPolicy getBucketAcl(String bucket) throws IOException { try { GetBucketAclResponse response = runWithPermit(() -> client.getBucketAcl(GetBucketAclRequest.builder().bucket(bucket).build())); return AccessControlPolicy.builder().grants(response.grants()).owner(response.owner()).build(); } catch (SdkException e) { throw convertAwsException(e, "getBucketAcl", bucket, null); } - } - - public void setCannedAcl(String acl) { - if( acl==null ) - return; - this.cannedAcl = AwsHelper.parseS3Acl(acl); - log.debug("Setting S3 canned ACL={} [{}]", this.cannedAcl, acl); - } - - public void setKmsKeyId(String kmsKeyId) { - if( kmsKeyId==null ) - return; - this.kmsKeyId = kmsKeyId; - log.debug("Setting S3 SSE kms Id={}", kmsKeyId); - } - - public void setStorageEncryption(String alg) { - if( alg == null ) - return; - this.storageEncryption = ServerSideEncryption.fromValue(alg); - log.debug("Setting S3 SSE storage encryption algorithm={}", alg); - } - - public void setRequesterPaysEnabled(String requesterPaysEnabled) { - if( requesterPaysEnabled == null ) - return; - this.isRequesterPaysEnabled = Boolean.valueOf(requesterPaysEnabled); - log.debug("Setting S3 requester pays enabled={}", isRequesterPaysEnabled); - } - - public ObjectCannedACL getCannedAcl() { - return cannedAcl; - } - - public software.amazon.awssdk.services.s3.S3Client getClient() { - return client; - } - - /** - * @see software.amazon.awssdk.services.s3.S3Client#getObjectAcl - */ - public AccessControlPolicy getObjectAcl(String bucketName, String key) throws IOException { + } + + public void setCannedAcl(String acl) { + if( acl==null ) + return; + this.cannedAcl = AwsHelper.parseS3Acl(acl); + log.debug("Setting S3 canned ACL={} [{}]", this.cannedAcl, acl); + } + + public void setKmsKeyId(String kmsKeyId) { + if( kmsKeyId==null ) + return; + this.kmsKeyId = kmsKeyId; + log.debug("Setting S3 SSE kms Id={}", kmsKeyId); + } + + public void setStorageEncryption(String alg) { + if( alg == null ) + return; + this.storageEncryption = ServerSideEncryption.fromValue(alg); + log.debug("Setting S3 SSE storage encryption algorithm={}", alg); + } + + public void setRequesterPaysEnabled(String requesterPaysEnabled) { + if( requesterPaysEnabled == null ) + return; + this.isRequesterPaysEnabled = Boolean.valueOf(requesterPaysEnabled); + log.debug("Setting S3 requester pays enabled={}", isRequesterPaysEnabled); + } + + public ObjectCannedACL getCannedAcl() { + return cannedAcl; + } + + public software.amazon.awssdk.services.s3.S3Client getClient() { + return client; + } + + /** + * @see software.amazon.awssdk.services.s3.S3Client#getObjectAcl + */ + public AccessControlPolicy getObjectAcl(String bucketName, String key) throws IOException { try { GetObjectAclResponse response = runWithPermit(() -> client.getObjectAcl(GetObjectAclRequest.builder().bucket(bucketName).key(key).build())); return AccessControlPolicy.builder().grants(response.grants()).owner(response.owner()).build(); } catch (SdkException e) { throw convertAwsException(e, "getObjectAcl", bucketName, key); } - } - /** - * @see software.amazon.awssdk.services.s3.S3Client#headObject - */ - public HeadObjectResponse getObjectMetadata(String bucketName, String key) throws IOException { + } + + /** + * @see software.amazon.awssdk.services.s3.S3Client#headObject + */ + public HeadObjectResponse getObjectMetadata(String bucketName, String key) throws IOException { try { return runWithPermit(() -> client.headObject(HeadObjectRequest.builder().bucket(bucketName).key(key).build())); } catch (SdkException e) { throw convertAwsException(e, "getObjectMetadata", bucketName, key); } - } + } /** * @see software.amazon.awssdk.services.s3.S3Client#headBucket @@ -372,59 +373,59 @@ public HeadBucketResponse getBucketMetadata(String bucketName) throws IOExceptio } } - public List getObjectTags(String bucketName, String key) throws IOException { + public List getObjectTags(String bucketName, String key) throws IOException { try { return runWithPermit(() -> client.getObjectTagging(GetObjectTaggingRequest.builder().bucket(bucketName).key(key).build()).tagSet()); } catch (SdkException e) { throw convertAwsException(e, "getObjectTags", bucketName, key); } - } + } - public String getObjectKmsKeyId(String bucketName, String key) throws IOException { - return getObjectMetadata(bucketName, key).ssekmsKeyId(); - } + public String getObjectKmsKeyId(String bucketName, String key) throws IOException { + return getObjectMetadata(bucketName, key).ssekmsKeyId(); + } - /** - * @see software.amazon.awssdk.services.s3.S3Client#listObjectsV2Paginator - */ - public ListObjectsV2Iterable listObjectsV2Paginator(ListObjectsV2Request request) throws IOException { + /** + * @see software.amazon.awssdk.services.s3.S3Client#listObjectsV2Paginator + */ + public ListObjectsV2Iterable listObjectsV2Paginator(ListObjectsV2Request request) throws IOException { try { return runWithPermit(() -> client.listObjectsV2Paginator(request)); } catch (SdkException e) { throw convertAwsException(e, "listObjects", request.bucket(), request.prefix()); } - } + } - // ===== transfer manager section ===== + // ===== transfer manager section ===== - synchronized ExtendedS3TransferManager transferManager() { - if( transferManager == null ) { - transferPool = ThreadPoolManager.create("S3TransferManager"); - var delegate = S3TransferManager.builder() - .s3Client(factory.getS3AsyncClient(S3AsyncClientConfiguration.create(props), global)) - .executor(transferPool) - .build(); + synchronized ExtendedS3TransferManager transferManager() { + if( transferManager == null ) { + transferPool = ThreadPoolManager.create("S3TransferManager"); + var delegate = S3TransferManager.builder() + .s3Client(factory.getS3AsyncClient(S3AsyncClientConfiguration.create(props), global)) + .executor(transferPool) + .build(); transferManager = new ExtendedS3TransferManager(delegate, props); - } - return transferManager; - } + } + return transferManager; + } public void downloadFile(S3Path source, File target, long size) throws IOException { - try{ + try { DownloadFileRequest downloadFileRequest = DownloadFileRequest.builder() .getObjectRequest(b -> b.bucket(source.getBucket()).key(source.getKey())) .destination(target) .build(); - transferManager().downloadFile(downloadFileRequest,size).completionFuture().get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new InterruptedIOException(String.format("S3 download file: s3://%s/%s cancelled", source.getBucket(), source.getKey())); - } catch (ExecutionException e) { - String msg = String.format("Exception thrown downloading S3 object s3://%s/%s", source.getBucket(), source.getKey()); - throw new IOException(msg, e.getCause()); - } - } + transferManager().downloadFile(downloadFileRequest,size).completionFuture().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(String.format("S3 download file: s3://%s/%s cancelled", source.getBucket(), source.getKey())); + } catch (ExecutionException e) { + String msg = String.format("Exception thrown downloading S3 object s3://%s/%s", source.getBucket(), source.getKey()); + throw new IOException(msg, e.getCause()); + } + } private static void createDirectory(Path dir) throws IOException { try { @@ -498,7 +499,7 @@ private void cleanupQueuedDownloads(Queue allDownloads, Int IOException firstException = null; while(!allDownloads.isEmpty()) { OngoingFileDownload current = allDownloads.poll(); - try{ + try { current.download.completionFuture().get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); @@ -547,77 +548,77 @@ public void uploadFile(File source, S3Path target) throws IOException { String msg = String.format("Exception thrown uploading S3 object s3://%s/%s", target.getBucket(), target.getKey()); throw new IOException(msg, e.getCause()); } - } - - private Consumer transformUploadRequest(List tags) { - return builder -> builder.putObjectRequest(updateBuilder(builder.build().putObjectRequest().toBuilder(), tags).build()); - } - - private PutObjectRequest.Builder updateBuilder(PutObjectRequest.Builder porBuilder, List tags) { - - if( cannedAcl != null ) - porBuilder.acl(cannedAcl); - if( storageEncryption != null ) - porBuilder.serverSideEncryption(storageEncryption); - if( kmsKeyId != null ) - porBuilder.ssekmsKeyId(kmsKeyId); - if( tags != null && !tags.isEmpty() ) - porBuilder.tagging(Tagging.builder().tagSet(tags).build()); - return porBuilder; - } - - public void uploadDirectory(File source, S3Path target) throws IOException { - UploadDirectoryRequest request = UploadDirectoryRequest.builder() - .bucket(target.getBucket()) - .s3Prefix(target.getKey()) - .source(source.toPath()) - .uploadFileRequestTransformer(transformUploadRequest(target.getTagsList())) - .build(); - - try { - CompletedDirectoryUpload completed = transferManager().uploadDirectory(request).completionFuture().get(); - if (!completed.failedTransfers().isEmpty()) { - log.debug("S3 upload directory: s3://{}/{} failed transfers", target.getBucket(), target.getKey()); - throw new IOException("Some transfers in S3 upload directory: s3://"+ target.getBucket() +"/"+ target.getKey() +" has failed - Transfers: " + completed.failedTransfers() ); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new InterruptedIOException(String.format("S3 upload directory: s3://%s/%s interrupted", target.getBucket(), target.getKey())); - } catch (ExecutionException e) { - String msg = String.format("Exception thrown uploading S3 object s3://%s/%s", target.getBucket(), target.getKey()); - throw new IOException(msg, e.getCause()); - } - } + } + + private Consumer transformUploadRequest(List tags) { + return builder -> builder.putObjectRequest(updateBuilder(builder.build().putObjectRequest().toBuilder(), tags).build()); + } + + private PutObjectRequest.Builder updateBuilder(PutObjectRequest.Builder porBuilder, List tags) { + + if( cannedAcl != null ) + porBuilder.acl(cannedAcl); + if( storageEncryption != null ) + porBuilder.serverSideEncryption(storageEncryption); + if( kmsKeyId != null ) + porBuilder.ssekmsKeyId(kmsKeyId); + if( tags != null && !tags.isEmpty() ) + porBuilder.tagging(Tagging.builder().tagSet(tags).build()); + return porBuilder; + } + + public void uploadDirectory(File source, S3Path target) throws IOException { + UploadDirectoryRequest request = UploadDirectoryRequest.builder() + .bucket(target.getBucket()) + .s3Prefix(target.getKey()) + .source(source.toPath()) + .uploadFileRequestTransformer(transformUploadRequest(target.getTagsList())) + .build(); + + try { + CompletedDirectoryUpload completed = transferManager().uploadDirectory(request).completionFuture().get(); + if (!completed.failedTransfers().isEmpty()) { + log.debug("S3 upload directory: s3://{}/{} failed transfers", target.getBucket(), target.getKey()); + throw new IOException("Some transfers in S3 upload directory: s3://"+ target.getBucket() +"/"+ target.getKey() +" has failed - Transfers: " + completed.failedTransfers() ); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(String.format("S3 upload directory: s3://%s/%s interrupted", target.getBucket(), target.getKey())); + } catch (ExecutionException e) { + String msg = String.format("Exception thrown uploading S3 object s3://%s/%s", target.getBucket(), target.getKey()); + throw new IOException(msg, e.getCause()); + } + } public void copyFile(CopyObjectRequest.Builder reqBuilder, List tags, String contentType, String storageClass) throws IOException { - if( tags !=null && !tags.isEmpty()) { - log.debug("Setting tags: {}", tags); + if( tags !=null && !tags.isEmpty()) { + log.debug("Setting tags: {}", tags); reqBuilder.taggingDirective(TaggingDirective.REPLACE); reqBuilder.tagging(Tagging.builder().tagSet(tags).build()); - } - if( cannedAcl != null ) { - reqBuilder.acl(cannedAcl); - } - if( storageEncryption != null ) { - reqBuilder.serverSideEncryption(storageEncryption); - } - if( kmsKeyId !=null ) { + } + if( cannedAcl != null ) { + reqBuilder.acl(cannedAcl); + } + if( storageEncryption != null ) { + reqBuilder.serverSideEncryption(storageEncryption); + } + if( kmsKeyId !=null ) { reqBuilder.ssekmsKeyId(kmsKeyId); - } - if( contentType!=null ) { - reqBuilder.metadataDirective(MetadataDirective.REPLACE); + } + if( contentType!=null ) { + reqBuilder.metadataDirective(MetadataDirective.REPLACE); reqBuilder.contentType(contentType); - } - if( storageClass!=null ) { - reqBuilder.storageClass(storageClass); - } + } + if( storageClass!=null ) { + reqBuilder.storageClass(storageClass); + } CopyObjectRequest req = reqBuilder.build(); - if( log.isTraceEnabled() ) { - log.trace("S3 CopyObject request {}", req); - } - CopyRequest copyRequest = CopyRequest.builder().copyObjectRequest(req).build(); + if( log.isTraceEnabled() ) { + log.trace("S3 CopyObject request {}", req); + } + CopyRequest copyRequest = CopyRequest.builder().copyObjectRequest(req).build(); try { - transferManager().copy(copyRequest).completionFuture().get(); + transferManager().copy(copyRequest).completionFuture().get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new InterruptedIOException(String.format("S3 copy s3://%s/%s to s3://%s/%s interrupted", req.sourceBucket(), req.sourceKey(), req.destinationBucket(), req.destinationKey())); @@ -626,7 +627,7 @@ public void copyFile(CopyObjectRequest.Builder reqBuilder, List tags, Strin throw new IOException(msg, e.getCause()); } - } + } static class OngoingFileDownload { String bucket; diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileAttributes.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileAttributes.java index 963b089da9..bc3cefa43b 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileAttributes.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileAttributes.java @@ -23,70 +23,70 @@ public class S3FileAttributes implements BasicFileAttributes { - private final FileTime lastModifiedTime; - private final long size; - private final boolean directory; - private final boolean regularFile; - private final String key; - - public S3FileAttributes(String key, FileTime lastModifiedTime, long size, - boolean isDirectory, boolean isRegularFile) { - this.key = key; - this.lastModifiedTime = lastModifiedTime; - this.size = size; - directory = isDirectory; - regularFile = isRegularFile; - } - - @Override - public FileTime lastModifiedTime() { - return lastModifiedTime; - } - - @Override - public FileTime lastAccessTime() { - return lastModifiedTime; - } - - @Override - public FileTime creationTime() { - return lastModifiedTime; - } - - @Override - public boolean isRegularFile() { - return regularFile; - } - - @Override - public boolean isDirectory() { - return directory; - } - - @Override - public boolean isSymbolicLink() { - return false; - } - - @Override - public boolean isOther() { - return false; - } - - @Override - public long size() { - return size; - } - - @Override - public Object fileKey() { - return key; - } - - @Override - public String toString() { - return format( - "[%s: lastModified=%s, size=%s, isDirectory=%s, isRegularFile=%s]", - key, lastModifiedTime, size, directory, regularFile); - } + private final FileTime lastModifiedTime; + private final long size; + private final boolean directory; + private final boolean regularFile; + private final String key; + + public S3FileAttributes(String key, FileTime lastModifiedTime, long size, + boolean isDirectory, boolean isRegularFile) { + this.key = key; + this.lastModifiedTime = lastModifiedTime; + this.size = size; + directory = isDirectory; + regularFile = isRegularFile; + } + + @Override + public FileTime lastModifiedTime() { + return lastModifiedTime; + } + + @Override + public FileTime lastAccessTime() { + return lastModifiedTime; + } + + @Override + public FileTime creationTime() { + return lastModifiedTime; + } + + @Override + public boolean isRegularFile() { + return regularFile; + } + + @Override + public boolean isDirectory() { + return directory; + } + + @Override + public boolean isSymbolicLink() { + return false; + } + + @Override + public boolean isOther() { + return false; + } + + @Override + public long size() { + return size; + } + + @Override + public Object fileKey() { + return key; + } + + @Override + public String toString() { + return format( + "[%s: lastModified=%s, size=%s, isDirectory=%s, isRegularFile=%s]", + key, lastModifiedTime, size, directory, regularFile); + } } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystem.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystem.java index d83fb440b5..222ba91638 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystem.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystem.java @@ -35,53 +35,53 @@ public class S3FileSystem extends FileSystem { - private final S3FileSystemProvider provider; - private final S3Client client; - private final String endpoint; - private final String bucketName; - - private final Properties properties; - - public S3FileSystem(S3FileSystemProvider provider, S3Client client, URI uri, Properties props) { - this.provider = provider; - this.client = client; - this.endpoint = uri.getHost(); - this.bucketName = S3Path.bucketName(uri); - this.properties = props; - } - - @Override - public FileSystemProvider provider() { - return provider; - } - - public Properties properties() { - return properties; - } - - @Override - public void close() { - this.provider.fileSystems.remove(bucketName); - } - - @Override - public boolean isOpen() { - return this.provider.fileSystems.containsKey(bucketName); - } - - @Override - public boolean isReadOnly() { - return false; - } - - @Override - public String getSeparator() { - return S3Path.PATH_SEPARATOR; - } - - @Override - public Iterable getRootDirectories() { - ImmutableList.Builder builder = ImmutableList.builder(); + private final S3FileSystemProvider provider; + private final S3Client client; + private final String endpoint; + private final String bucketName; + + private final Properties properties; + + public S3FileSystem(S3FileSystemProvider provider, S3Client client, URI uri, Properties props) { + this.provider = provider; + this.client = client; + this.endpoint = uri.getHost(); + this.bucketName = S3Path.bucketName(uri); + this.properties = props; + } + + @Override + public FileSystemProvider provider() { + return provider; + } + + public Properties properties() { + return properties; + } + + @Override + public void close() { + this.provider.fileSystems.remove(bucketName); + } + + @Override + public boolean isOpen() { + return this.provider.fileSystems.containsKey(bucketName); + } + + @Override + public boolean isReadOnly() { + return false; + } + + @Override + public String getSeparator() { + return S3Path.PATH_SEPARATOR; + } + + @Override + public Iterable getRootDirectories() { + ImmutableList.Builder builder = ImmutableList.builder(); try { for (Bucket bucket : client.listBuckets()) { builder.add(new S3Path(this, bucket.name())); @@ -90,58 +90,58 @@ public Iterable getRootDirectories() { throw new UncheckedIOException(e); } - return builder.build(); - } - - @Override - public Iterable getFileStores() { - return ImmutableList.of(); - } - - @Override - public Set supportedFileAttributeViews() { - return ImmutableSet.of("basic"); - } - - @Override - public Path getPath(String first, String... more) { - if (more.length == 0) { - return new S3Path(this, first); - } - - return new S3Path(this, first, more); - } - - @Override - public PathMatcher getPathMatcher(String syntaxAndPattern) { - throw new UnsupportedOperationException(); - } - - @Override - public UserPrincipalLookupService getUserPrincipalLookupService() { - throw new UnsupportedOperationException(); - } - - @Override - public WatchService newWatchService() throws IOException { - throw new UnsupportedOperationException(); - } - - public S3Client getClient() { - return client; - } - - /** - * get the endpoint associated with this fileSystem. - * - * @see http://docs.aws.amazon.com/general/latest/gr/rande.html - * @return string - */ - public String getEndpoint() { - return endpoint; - } - - public String getBucketName() { - return bucketName; - } + return builder.build(); + } + + @Override + public Iterable getFileStores() { + return ImmutableList.of(); + } + + @Override + public Set supportedFileAttributeViews() { + return ImmutableSet.of("basic"); + } + + @Override + public Path getPath(String first, String... more) { + if (more.length == 0) { + return new S3Path(this, first); + } + + return new S3Path(this, first, more); + } + + @Override + public PathMatcher getPathMatcher(String syntaxAndPattern) { + throw new UnsupportedOperationException(); + } + + @Override + public UserPrincipalLookupService getUserPrincipalLookupService() { + throw new UnsupportedOperationException(); + } + + @Override + public WatchService newWatchService() throws IOException { + throw new UnsupportedOperationException(); + } + + public S3Client getClient() { + return client; + } + + /** + * get the endpoint associated with this fileSystem. + * + * @see http://docs.aws.amazon.com/general/latest/gr/rande.html + * @return string + */ + public String getEndpoint() { + return endpoint; + } + + public String getBucketName() { + return bucketName; + } } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java index 8a30e4b856..dfc1c9bf0d 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java @@ -111,57 +111,57 @@ */ public class S3FileSystemProvider extends FileSystemProvider implements FileSystemTransferAware { - private static final Logger log = LoggerFactory.getLogger(S3FileSystemProvider.class); + private static final Logger log = LoggerFactory.getLogger(S3FileSystemProvider.class); - final Map fileSystems = new HashMap<>(); + final Map fileSystems = new HashMap<>(); private final S3ObjectSummaryLookup s3ObjectSummaryLookup = new S3ObjectSummaryLookup(); - @Override - public String getScheme() { - return "s3"; - } - - @Override - public FileSystem newFileSystem(URI uri, Map env) throws IOException { - Preconditions.checkNotNull(uri, "uri is null"); - Preconditions.checkArgument(uri.getScheme().equals("s3"), "uri scheme must be 's3': '%s'", uri); - - final String bucketName = S3Path.bucketName(uri); - synchronized (fileSystems) { - if( fileSystems.containsKey(bucketName)) - throw new FileSystemAlreadyExistsException("S3 filesystem already exists. Use getFileSystem() instead"); - - final AwsConfig awsConfig = new AwsConfig(env); - // - final S3FileSystem result = createFileSystem(uri, awsConfig); - fileSystems.put(bucketName, result); - return result; - } - } - - @Override - public FileSystem getFileSystem(URI uri) { - final String bucketName = S3Path.bucketName(uri); - final FileSystem fileSystem = this.fileSystems.get(bucketName); - - if (fileSystem == null) { - throw new FileSystemNotFoundException("S3 filesystem not yet created. Use newFileSystem() instead"); - } - - return fileSystem; - } - - /** - * Deviation from spec: throws FileSystemNotFoundException if FileSystem - * hasn't yet been initialized. Call newFileSystem() first. - * Need credentials. Maybe set credentials after? how? - */ - @Override - public Path getPath(URI uri) { - Preconditions.checkArgument(uri.getScheme().equals(getScheme()),"URI scheme must be %s", getScheme()); - return getFileSystem(uri).getPath(uri.getPath()); - } + @Override + public String getScheme() { + return "s3"; + } + + @Override + public FileSystem newFileSystem(URI uri, Map env) throws IOException { + Preconditions.checkNotNull(uri, "uri is null"); + Preconditions.checkArgument(uri.getScheme().equals("s3"), "uri scheme must be 's3': '%s'", uri); + + final String bucketName = S3Path.bucketName(uri); + synchronized (fileSystems) { + if( fileSystems.containsKey(bucketName)) + throw new FileSystemAlreadyExistsException("S3 filesystem already exists. Use getFileSystem() instead"); + + final AwsConfig awsConfig = new AwsConfig(env); + // + final S3FileSystem result = createFileSystem(uri, awsConfig); + fileSystems.put(bucketName, result); + return result; + } + } + + @Override + public FileSystem getFileSystem(URI uri) { + final String bucketName = S3Path.bucketName(uri); + final FileSystem fileSystem = this.fileSystems.get(bucketName); + + if (fileSystem == null) { + throw new FileSystemNotFoundException("S3 filesystem not yet created. Use newFileSystem() instead"); + } + + return fileSystem; + } + + /** + * Deviation from spec: throws FileSystemNotFoundException if FileSystem + * hasn't yet been initialized. Call newFileSystem() first. + * Need credentials. Maybe set credentials after? how? + */ + @Override + public Path getPath(URI uri) { + Preconditions.checkArgument(uri.getScheme().equals(getScheme()),"URI scheme must be %s", getScheme()); + return getFileSystem(uri).getPath(uri.getPath()); + } @Override public DirectoryStream newDirectoryStream(Path dir, DirectoryStream.Filter filter) throws IOException { @@ -182,19 +182,19 @@ public Iterator iterator() { }; } - @Override - public InputStream newInputStream(Path path, OpenOption... options) - throws IOException { - Preconditions.checkArgument(options.length == 0, - "OpenOptions not yet supported: %s", - ImmutableList.copyOf(options)); // TODO + @Override + public InputStream newInputStream(Path path, OpenOption... options) + throws IOException { + Preconditions.checkArgument(options.length == 0, + "OpenOptions not yet supported: %s", + ImmutableList.copyOf(options)); // TODO - Preconditions.checkArgument(path instanceof S3Path, - "path must be an instance of %s", S3Path.class.getName()); - S3Path s3Path = (S3Path) path; + Preconditions.checkArgument(path instanceof S3Path, + "path must be an instance of %s", S3Path.class.getName()); + S3Path s3Path = (S3Path) path; - Preconditions.checkArgument(!s3Path.getKey().equals(""), - "cannot create InputStream for root directory: %s", FilesEx.toUriString(s3Path)); + Preconditions.checkArgument(!s3Path.getKey().equals(""), + "cannot create InputStream for root directory: %s", FilesEx.toUriString(s3Path)); final ResponseInputStream result = s3Path .getFileSystem() @@ -202,213 +202,213 @@ public InputStream newInputStream(Path path, OpenOption... options) .getObject(s3Path.getBucket(), s3Path.getKey()); if (result == null) - throw new IOException(String.format("The specified path is a directory: %s", FilesEx.toUriString(s3Path))); - - // Wrap the response stream so that close() aborts the underlying HTTP connection - // instead of draining the remaining bytes. Apache HTTP client's ContentLengthInputStream.close() - // reads to end-of-stream to release the connection back to the pool, which for a large S3 - // object (e.g. a multi-GB FASTQ) can block the caller for many minutes. Callers of - // newInputStream() typically do not consume the whole object, so abort() is the correct - // semantics here. - return new FilterInputStream(result) { - @Override - public void close() { - result.abort(); - } + throw new IOException(String.format("The specified path is a directory: %s", FilesEx.toUriString(s3Path))); + + // Wrap the response stream so that close() aborts the underlying HTTP connection + // instead of draining the remaining bytes. Apache HTTP client's ContentLengthInputStream.close() + // reads to end-of-stream to release the connection back to the pool, which for a large S3 + // object (e.g. a multi-GB FASTQ) can block the caller for many minutes. Callers of + // newInputStream() typically do not consume the whole object, so abort() is the correct + // semantics here. + return new FilterInputStream(result) { + @Override + public void close() { + result.abort(); + } // Just-used for testing - void abort(){ + void abort() { result.abort(); } - }; - } - - @Override - public OutputStream newOutputStream(final Path path, final OpenOption... options) throws IOException { - Preconditions.checkArgument(path instanceof S3Path, "path must be an instance of %s", S3Path.class.getName()); - S3Path s3Path = (S3Path)path; - - // validate options - if (options.length > 0) { - Set opts = new LinkedHashSet<>(Arrays.asList(options)); - - // cannot handle APPEND here -> use newByteChannel() implementation - if (opts.contains(StandardOpenOption.APPEND)) { - return super.newOutputStream(path, options); - } - - if (opts.contains(StandardOpenOption.READ)) { - throw new IllegalArgumentException("READ not allowed"); - } - - boolean create = opts.remove(StandardOpenOption.CREATE); - boolean createNew = opts.remove(StandardOpenOption.CREATE_NEW); - boolean truncateExisting = opts.remove(StandardOpenOption.TRUNCATE_EXISTING); - - // remove irrelevant/ignored options - opts.remove(StandardOpenOption.WRITE); - opts.remove(StandardOpenOption.SPARSE); - - if (!opts.isEmpty()) { - throw new UnsupportedOperationException(opts.iterator().next() + " not supported"); - } - - if (!(create && truncateExisting)) { - if (exists(s3Path)) { - if (createNew || !truncateExisting) { - throw new FileAlreadyExistsException(FilesEx.toUriString(s3Path)); - } - } else { - if (!createNew && !create) { - throw new NoSuchFileException(FilesEx.toUriString(s3Path)); - } - } - } - } - - return createUploaderOutputStream(s3Path); - } - - @Override - public boolean canUpload(Path source, Path target) { - return FileSystems.getDefault().equals(source.getFileSystem()) && target instanceof S3Path; - } - - @Override - public boolean canDownload(Path source, Path target) { - return source instanceof S3Path && FileSystems.getDefault().equals(target.getFileSystem()); - } - - @Override - public void download(Path remoteFile, Path localDestination, CopyOption... options) throws IOException { - final S3Path source = (S3Path)remoteFile; - - final CopyOptions opts = CopyOptions.parse(options); - // delete target if it exists and REPLACE_EXISTING is specified - if (opts.replaceExisting()) { - FileHelper.deletePath(localDestination); - } - else if (Files.exists(localDestination)) - throw new FileAlreadyExistsException(localDestination.toString()); - - // Read S3 file attributes (metadata) for the source path, returns Optional.empty() if file doesn't exist - final Optional attrs = readAttr1(source); - // Extract directory status from attributes, defaulting to false if no attributes found - final boolean isDir = attrs.map(S3FileAttributes::isDirectory).orElse(false); - // Get file size only for non-directories (directories have size 0), defaulting to 0L if no attributes - final long size = attrs.filter(a -> !a.isDirectory()).map(S3FileAttributes::size).orElse(0L); - final String type = isDir ? "directory": "file"; - final S3Client s3Client = source.getFileSystem().getClient(); - log.debug("S3 download {} from={} to={} size={}", type, FilesEx.toUriString(source), localDestination, size); - if( isDir ) { - s3Client.downloadDirectory(source, localDestination.toFile()); - } + }; + } + + @Override + public OutputStream newOutputStream(final Path path, final OpenOption... options) throws IOException { + Preconditions.checkArgument(path instanceof S3Path, "path must be an instance of %s", S3Path.class.getName()); + S3Path s3Path = (S3Path)path; + + // validate options + if (options.length > 0) { + Set opts = new LinkedHashSet<>(Arrays.asList(options)); + + // cannot handle APPEND here -> use newByteChannel() implementation + if (opts.contains(StandardOpenOption.APPEND)) { + return super.newOutputStream(path, options); + } + + if (opts.contains(StandardOpenOption.READ)) { + throw new IllegalArgumentException("READ not allowed"); + } + + boolean create = opts.remove(StandardOpenOption.CREATE); + boolean createNew = opts.remove(StandardOpenOption.CREATE_NEW); + boolean truncateExisting = opts.remove(StandardOpenOption.TRUNCATE_EXISTING); + + // remove irrelevant/ignored options + opts.remove(StandardOpenOption.WRITE); + opts.remove(StandardOpenOption.SPARSE); + + if (!opts.isEmpty()) { + throw new UnsupportedOperationException(opts.iterator().next() + " not supported"); + } + + if (!(create && truncateExisting)) { + if (exists(s3Path)) { + if (createNew || !truncateExisting) { + throw new FileAlreadyExistsException(FilesEx.toUriString(s3Path)); + } + } else { + if (!createNew && !create) { + throw new NoSuchFileException(FilesEx.toUriString(s3Path)); + } + } + } + } + + return createUploaderOutputStream(s3Path); + } + + @Override + public boolean canUpload(Path source, Path target) { + return FileSystems.getDefault().equals(source.getFileSystem()) && target instanceof S3Path; + } + + @Override + public boolean canDownload(Path source, Path target) { + return source instanceof S3Path && FileSystems.getDefault().equals(target.getFileSystem()); + } + + @Override + public void download(Path remoteFile, Path localDestination, CopyOption... options) throws IOException { + final S3Path source = (S3Path)remoteFile; + + final CopyOptions opts = CopyOptions.parse(options); + // delete target if it exists and REPLACE_EXISTING is specified + if (opts.replaceExisting()) { + FileHelper.deletePath(localDestination); + } + else if (Files.exists(localDestination)) + throw new FileAlreadyExistsException(localDestination.toString()); + + // Read S3 file attributes (metadata) for the source path, returns Optional.empty() if file doesn't exist + final Optional attrs = readAttr1(source); + // Extract directory status from attributes, defaulting to false if no attributes found + final boolean isDir = attrs.map(S3FileAttributes::isDirectory).orElse(false); + // Get file size only for non-directories (directories have size 0), defaulting to 0L if no attributes + final long size = attrs.filter(a -> !a.isDirectory()).map(S3FileAttributes::size).orElse(0L); + final String type = isDir ? "directory": "file"; + final S3Client s3Client = source.getFileSystem().getClient(); + log.debug("S3 download {} from={} to={} size={}", type, FilesEx.toUriString(source), localDestination, size); + if( isDir ) { + s3Client.downloadDirectory(source, localDestination.toFile()); + } else if( size > 0 ) { s3Client.downloadFile(source, localDestination.toFile(), size); } - else { + else { Files.deleteIfExists(localDestination); Files.createFile(localDestination); } - } - - @Override - public void upload(Path localFile, Path remoteDestination, CopyOption... options) throws IOException { - final S3Path target = (S3Path) remoteDestination; - - CopyOptions opts = CopyOptions.parse(options); - LinkOption[] linkOptions = (opts.followLinks()) ? new LinkOption[0] : new LinkOption[] { LinkOption.NOFOLLOW_LINKS }; - - // attributes of source file - if (Files.readAttributes(localFile, BasicFileAttributes.class, linkOptions).isSymbolicLink()) - throw new IOException("Uploading of symbolic links not supported - offending path: " + localFile); - - final Optional attrs = readAttr1(target); - final boolean exits = attrs.isPresent(); - - // delete target if it exists and REPLACE_EXISTING is specified - if (opts.replaceExisting()) { - FileHelper.deletePath(target); - } - else if ( exits ) - throw new FileAlreadyExistsException(target.toString()); - - final boolean isDir = Files.isDirectory(localFile); - final String type = isDir ? "directory": "file"; - log.debug("S3 upload {} from={} to={}", type, localFile, FilesEx.toUriString(target)); - final S3Client s3Client = target.getFileSystem().getClient(); - if( isDir ) { - s3Client.uploadDirectory(localFile.toFile(), target); - } - else { - s3Client.uploadFile(localFile.toFile(), target); - } - } - - private S3OutputStream createUploaderOutputStream( S3Path fileToUpload ) { - S3Client s3 = fileToUpload.getFileSystem().getClient(); - Properties props = fileToUpload.getFileSystem().properties(); - - final String storageClass = fileToUpload.getStorageClass()!=null ? fileToUpload.getStorageClass() : props.getProperty("upload_storage_class"); - final S3MultipartOptions opts = props != null ? new S3MultipartOptions(props) : new S3MultipartOptions(); - final S3ObjectId objectId = fileToUpload.toS3ObjectId(); - S3OutputStream stream = new S3OutputStream(s3.getClient(), objectId, opts) - .setCannedAcl(s3.getCannedAcl()) - .setStorageClass(storageClass) - .setStorageEncryption(props.getProperty("storage_encryption")) - .setKmsKeyId(props.getProperty("storage_kms_key_id")) - .setContentType(fileToUpload.getContentType()) - .setTags(fileToUpload.getTagsList()); - return stream; - } - - @Override - public SeekableByteChannel newByteChannel(Path path, - Set options, FileAttribute... attrs) - throws IOException { - Preconditions.checkArgument(path instanceof S3Path, - "path must be an instance of %s", S3Path.class.getName()); - final S3Path s3Path = (S3Path) path; - // we resolve to a file inside the temp folder with the s3path name + } + + @Override + public void upload(Path localFile, Path remoteDestination, CopyOption... options) throws IOException { + final S3Path target = (S3Path) remoteDestination; + + CopyOptions opts = CopyOptions.parse(options); + LinkOption[] linkOptions = (opts.followLinks()) ? new LinkOption[0] : new LinkOption[] { LinkOption.NOFOLLOW_LINKS }; + + // attributes of source file + if (Files.readAttributes(localFile, BasicFileAttributes.class, linkOptions).isSymbolicLink()) + throw new IOException("Uploading of symbolic links not supported - offending path: " + localFile); + + final Optional attrs = readAttr1(target); + final boolean exits = attrs.isPresent(); + + // delete target if it exists and REPLACE_EXISTING is specified + if (opts.replaceExisting()) { + FileHelper.deletePath(target); + } + else if ( exits ) + throw new FileAlreadyExistsException(target.toString()); + + final boolean isDir = Files.isDirectory(localFile); + final String type = isDir ? "directory": "file"; + log.debug("S3 upload {} from={} to={}", type, localFile, FilesEx.toUriString(target)); + final S3Client s3Client = target.getFileSystem().getClient(); + if( isDir ) { + s3Client.uploadDirectory(localFile.toFile(), target); + } + else { + s3Client.uploadFile(localFile.toFile(), target); + } + } + + private S3OutputStream createUploaderOutputStream( S3Path fileToUpload ) { + S3Client s3 = fileToUpload.getFileSystem().getClient(); + Properties props = fileToUpload.getFileSystem().properties(); + + final String storageClass = fileToUpload.getStorageClass()!=null ? fileToUpload.getStorageClass() : props.getProperty("upload_storage_class"); + final S3MultipartOptions opts = props != null ? new S3MultipartOptions(props) : new S3MultipartOptions(); + final S3ObjectId objectId = fileToUpload.toS3ObjectId(); + S3OutputStream stream = new S3OutputStream(s3.getClient(), objectId, opts) + .setCannedAcl(s3.getCannedAcl()) + .setStorageClass(storageClass) + .setStorageEncryption(props.getProperty("storage_encryption")) + .setKmsKeyId(props.getProperty("storage_kms_key_id")) + .setContentType(fileToUpload.getContentType()) + .setTags(fileToUpload.getTagsList()); + return stream; + } + + @Override + public SeekableByteChannel newByteChannel(Path path, + Set options, FileAttribute... attrs) + throws IOException { + Preconditions.checkArgument(path instanceof S3Path, + "path must be an instance of %s", S3Path.class.getName()); + final S3Path s3Path = (S3Path) path; + // we resolve to a file inside the temp folder with the s3path name final Path tempFile = createTempDir().resolve(path.getFileName().toString()); - try { - InputStream is = s3Path.getFileSystem().getClient() - .getObject(s3Path.getBucket(), s3Path.getKey()); - - if (is == null) - throw new IOException(String.format("The specified path is a directory: %s", path)); - - Files.write(tempFile, IOUtils.toByteArray(is)); - } - catch (NoSuchFileException e) { - // When opening for CREATE/CREATE_NEW the remote object is allowed to not exist yet - // — the temp file will be created and uploaded on close. For any other open mode - // propagate the original exception so the caller sees the real s3:// path. - if (!options.contains(StandardOpenOption.CREATE) && !options.contains(StandardOpenOption.CREATE_NEW)) { - throw e; - } - log.trace("S3 object does not exist yet, will be created on close: {}", FilesEx.toUriString(s3Path)); - } + try { + InputStream is = s3Path.getFileSystem().getClient() + .getObject(s3Path.getBucket(), s3Path.getKey()); + + if (is == null) + throw new IOException(String.format("The specified path is a directory: %s", path)); + + Files.write(tempFile, IOUtils.toByteArray(is)); + } + catch (NoSuchFileException e) { + // When opening for CREATE/CREATE_NEW the remote object is allowed to not exist yet + // — the temp file will be created and uploaded on close. For any other open mode + // propagate the original exception so the caller sees the real s3:// path. + if (!options.contains(StandardOpenOption.CREATE) && !options.contains(StandardOpenOption.CREATE_NEW)) { + throw e; + } + log.trace("S3 object does not exist yet, will be created on close: {}", FilesEx.toUriString(s3Path)); + } // and we can use the File SeekableByteChannel implementation - final SeekableByteChannel seekable = Files .newByteChannel(tempFile, options); - final List tags = ((S3Path) path).getTagsList(); - final String contentType = ((S3Path) path).getContentType(); + final SeekableByteChannel seekable = Files .newByteChannel(tempFile, options); + final List tags = ((S3Path) path).getTagsList(); + final String contentType = ((S3Path) path).getContentType(); - return new SeekableByteChannel() { - @Override - public boolean isOpen() { - return seekable.isOpen(); - } + return new SeekableByteChannel() { + @Override + public boolean isOpen() { + return seekable.isOpen(); + } - @Override - public void close() throws IOException { + @Override + public void close() throws IOException { if (!seekable.isOpen()) { return; } - seekable.close(); - // upload the content where the seekable ends (close) + seekable.close(); + // upload the content where the seekable ends (close) if (Files.exists(tempFile)) { try (InputStream stream = Files.newInputStream(tempFile)) { /* @@ -426,139 +426,139 @@ public void close() throws IOException { s3Path.getFileSystem(). getClient().deleteObject(s3Path.getBucket(), s3Path.getKey()); } - // and delete the temp dir + // and delete the temp dir Files.deleteIfExists(tempFile); Files.deleteIfExists(tempFile.getParent()); - } - - @Override - public int write(ByteBuffer src) throws IOException { - return seekable.write(src); - } - - @Override - public SeekableByteChannel truncate(long size) throws IOException { - return seekable.truncate(size); - } - - @Override - public long size() throws IOException { - return seekable.size(); - } - - @Override - public int read(ByteBuffer dst) throws IOException { - return seekable.read(dst); - } - - @Override - public SeekableByteChannel position(long newPosition) - throws IOException { - return seekable.position(newPosition); - } - - @Override - public long position() throws IOException { - return seekable.position(); - } - }; - } - - /** - * Deviations from spec: Does not perform atomic check-and-create. Since a - * directory is just an S3 object, all directories in the hierarchy are - * created or it already existed. - */ - @Override - public void createDirectory(Path dir, FileAttribute... attrs) - throws IOException { - - // FIXME: throw exception if the same key already exists at amazon s3 - - S3Path s3Path = (S3Path) dir; - - Preconditions.checkArgument(attrs.length == 0, - "attrs not yet supported: %s", ImmutableList.copyOf(attrs)); // TODO - - // Creating a bucket is not supported - if (s3Path.getKey().isEmpty()) { - throw new UnsupportedOperationException("Creating a bucket is not supported"); - } - - List tags = s3Path.getTagsList(); - - String keyName = s3Path.getKey() - + (s3Path.getKey().endsWith("/") ? "" : "/"); - - s3Path.getFileSystem() - .getClient() - .putObject(s3Path.getBucket(), keyName, new ByteArrayInputStream(new byte[0]), tags, null, 0); - } - - @Override - public void delete(Path path) throws IOException { - Preconditions.checkArgument(path instanceof S3Path, - "path must be an instance of %s", S3Path.class.getName()); - - S3Path s3Path = (S3Path) path; - - if (Files.notExists(path)){ + } + + @Override + public int write(ByteBuffer src) throws IOException { + return seekable.write(src); + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + return seekable.truncate(size); + } + + @Override + public long size() throws IOException { + return seekable.size(); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return seekable.read(dst); + } + + @Override + public SeekableByteChannel position(long newPosition) + throws IOException { + return seekable.position(newPosition); + } + + @Override + public long position() throws IOException { + return seekable.position(); + } + }; + } + + /** + * Deviations from spec: Does not perform atomic check-and-create. Since a + * directory is just an S3 object, all directories in the hierarchy are + * created or it already existed. + */ + @Override + public void createDirectory(Path dir, FileAttribute... attrs) + throws IOException { + + // FIXME: throw exception if the same key already exists at amazon s3 + + S3Path s3Path = (S3Path) dir; + + Preconditions.checkArgument(attrs.length == 0, + "attrs not yet supported: %s", ImmutableList.copyOf(attrs)); // TODO + + // Creating a bucket is not supported + if (s3Path.getKey().isEmpty()) { + throw new UnsupportedOperationException("Creating a bucket is not supported"); + } + + List tags = s3Path.getTagsList(); + + String keyName = s3Path.getKey() + + (s3Path.getKey().endsWith("/") ? "" : "/"); + + s3Path.getFileSystem() + .getClient() + .putObject(s3Path.getBucket(), keyName, new ByteArrayInputStream(new byte[0]), tags, null, 0); + } + + @Override + public void delete(Path path) throws IOException { + Preconditions.checkArgument(path instanceof S3Path, + "path must be an instance of %s", S3Path.class.getName()); + + S3Path s3Path = (S3Path) path; + + if (Files.notExists(path)) { throw new NoSuchFileException("the path: " + FilesEx.toUriString(s3Path) + " does not exist"); } - // Deleting a bucket is not supported + // Deleting a bucket is not supported if (s3Path.getKey().isEmpty()) { throw new UnsupportedOperationException("Deleting a bucket is not supported"); } - // NOTE: S3 directories are virtual (marker objects or implied key prefixes), - // so we do not check for emptiness before deleting. Enforcing POSIX-like - // DirectoryNotEmptyException semantics on S3 is unreliable due to eventual - // consistency and unnecessary because deleting a directory marker does not - // affect its children. - - // we delete the two objects (sometimes exists the key '/' and sometimes not) - s3Path.getFileSystem().getClient() - .deleteObject(s3Path.getBucket(), s3Path.getKey()); - s3Path.getFileSystem().getClient() - .deleteObject(s3Path.getBucket(), s3Path.getKey() + "/"); - } - - @Override - public void copy(Path source, Path target, CopyOption... options) - throws IOException { - Preconditions.checkArgument(source instanceof S3Path, - "source must be an instance of %s", S3Path.class.getName()); - Preconditions.checkArgument(target instanceof S3Path, - "target must be an instance of %s", S3Path.class.getName()); - - if (isSameFile(source, target)) { - return; - } - - S3Path s3Source = (S3Path) source; - S3Path s3Target = (S3Path) target; - /* - * Preconditions.checkArgument(!s3Source.isDirectory(), - * "copying directories is not yet supported: %s", source); // TODO - * Preconditions.checkArgument(!s3Target.isDirectory(), - * "copying directories is not yet supported: %s", target); // TODO - */ - ImmutableSet actualOptions = ImmutableSet.copyOf(options); - verifySupportedOptions(EnumSet.of(StandardCopyOption.REPLACE_EXISTING), - actualOptions); - - if (!actualOptions.contains(StandardCopyOption.REPLACE_EXISTING)) { - if (exists(s3Target)) { - throw new FileAlreadyExistsException(format( - "target already exists: %s", FilesEx.toUriString(s3Target))); - } - } - - S3Client client = s3Source.getFileSystem() .getClient(); - final List tags = ((S3Path) target).getTagsList(); - final String contentType = ((S3Path) target).getContentType(); - final String storageClass = ((S3Path) target).getStorageClass(); + // NOTE: S3 directories are virtual (marker objects or implied key prefixes), + // so we do not check for emptiness before deleting. Enforcing POSIX-like + // DirectoryNotEmptyException semantics on S3 is unreliable due to eventual + // consistency and unnecessary because deleting a directory marker does not + // affect its children. + + // we delete the two objects (sometimes exists the key '/' and sometimes not) + s3Path.getFileSystem().getClient() + .deleteObject(s3Path.getBucket(), s3Path.getKey()); + s3Path.getFileSystem().getClient() + .deleteObject(s3Path.getBucket(), s3Path.getKey() + "/"); + } + + @Override + public void copy(Path source, Path target, CopyOption... options) + throws IOException { + Preconditions.checkArgument(source instanceof S3Path, + "source must be an instance of %s", S3Path.class.getName()); + Preconditions.checkArgument(target instanceof S3Path, + "target must be an instance of %s", S3Path.class.getName()); + + if (isSameFile(source, target)) { + return; + } + + S3Path s3Source = (S3Path) source; + S3Path s3Target = (S3Path) target; + /* + * Preconditions.checkArgument(!s3Source.isDirectory(), + * "copying directories is not yet supported: %s", source); // TODO + * Preconditions.checkArgument(!s3Target.isDirectory(), + * "copying directories is not yet supported: %s", target); // TODO + */ + ImmutableSet actualOptions = ImmutableSet.copyOf(options); + verifySupportedOptions(EnumSet.of(StandardCopyOption.REPLACE_EXISTING), + actualOptions); + + if (!actualOptions.contains(StandardCopyOption.REPLACE_EXISTING)) { + if (exists(s3Target)) { + throw new FileAlreadyExistsException(format( + "target already exists: %s", FilesEx.toUriString(s3Target))); + } + } + + S3Client client = s3Source.getFileSystem() .getClient(); + final List tags = ((S3Path) target).getTagsList(); + final String contentType = ((S3Path) target).getContentType(); + final String storageClass = ((S3Path) target).getStorageClass(); //TransferManager alternative CopyObjectRequest.Builder reqBuilder = CopyObjectRequest.builder() @@ -566,84 +566,84 @@ public void copy(Path source, Path target, CopyOption... options) .sourceKey(s3Source.getKey()) .destinationBucket(s3Target.getBucket()) .destinationKey(s3Target.getKey()); - log.trace("Copy file via copy object - source: source={}, target={}, tags={}, storageClass={}", s3Source, s3Target, tags, storageClass); - client.copyFile(reqBuilder, tags, contentType, storageClass); - } - - - @Override - public void move(Path source, Path target, CopyOption... options) throws IOException { - for( CopyOption it : options ) { - if( it==StandardCopyOption.ATOMIC_MOVE ) - throw new IllegalArgumentException("Atomic move not supported by S3 file system provider"); - } - copy(source,target,options); - delete(source); - } - - @Override - public boolean isSameFile(Path path1, Path path2) throws IOException { - return path1.isAbsolute() && path2.isAbsolute() && path1.equals(path2); - } - - @Override - public boolean isHidden(Path path) throws IOException { - return false; - } - - @Override - public FileStore getFileStore(Path path) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void checkAccess(Path path, AccessMode... modes) throws IOException { - S3Path s3Path = (S3Path) path; - Preconditions.checkArgument(s3Path.isAbsolute(), - "path must be absolute: %s", s3Path); - - S3Client client = s3Path.getFileSystem().getClient(); - - if( modes==null || modes.length==0 ) { - // when no modes are given, the method is invoked - // by `Files.exists` method, therefore just use summary lookup - s3ObjectSummaryLookup.lookup((S3Path)path); - return; - } - - // get ACL and check if the file exists as a side-effect - AccessControlPolicy acl = getAccessControl(s3Path); + log.trace("Copy file via copy object - source: source={}, target={}, tags={}, storageClass={}", s3Source, s3Target, tags, storageClass); + client.copyFile(reqBuilder, tags, contentType, storageClass); + } + + + @Override + public void move(Path source, Path target, CopyOption... options) throws IOException { + for( CopyOption it : options ) { + if( it==StandardCopyOption.ATOMIC_MOVE ) + throw new IllegalArgumentException("Atomic move not supported by S3 file system provider"); + } + copy(source,target,options); + delete(source); + } + + @Override + public boolean isSameFile(Path path1, Path path2) throws IOException { + return path1.isAbsolute() && path2.isAbsolute() && path1.equals(path2); + } + + @Override + public boolean isHidden(Path path) throws IOException { + return false; + } + + @Override + public FileStore getFileStore(Path path) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void checkAccess(Path path, AccessMode... modes) throws IOException { + S3Path s3Path = (S3Path) path; + Preconditions.checkArgument(s3Path.isAbsolute(), + "path must be absolute: %s", s3Path); + + S3Client client = s3Path.getFileSystem().getClient(); + + if( modes==null || modes.length==0 ) { + // when no modes are given, the method is invoked + // by `Files.exists` method, therefore just use summary lookup + s3ObjectSummaryLookup.lookup((S3Path)path); + return; + } + + // get ACL and check if the file exists as a side-effect + AccessControlPolicy acl = getAccessControl(s3Path); String caller = client.getCallerAccount(); - for (AccessMode accessMode : modes) { - switch (accessMode) { - case EXECUTE: - throw new AccessDeniedException(s3Path.toString(), null, - "file is not executable"); - case READ: - if (caller == null) { + for (AccessMode accessMode : modes) { + switch (accessMode) { + case EXECUTE: + throw new AccessDeniedException(s3Path.toString(), null, + "file is not executable"); + case READ: + if (caller == null) { //if we cannot get the user's canonical ID, try read the object; s3ObjectSummaryLookup.lookup((S3Path) path); } else if (!hasPermissions(acl, caller, - EnumSet.of(Permission.FULL_CONTROL, Permission.READ))) { - throw new AccessDeniedException(s3Path.toString(), null, - "file is not readable"); - } - break; - case WRITE: - if (caller == null) { + EnumSet.of(Permission.FULL_CONTROL, Permission.READ))) { + throw new AccessDeniedException(s3Path.toString(), null, + "file is not readable"); + } + break; + case WRITE: + if (caller == null) { log.warn("User's Canonical Id cannot be retrieved. We can not check the access."); } else if (!hasPermissions(acl, caller, - EnumSet.of(Permission.FULL_CONTROL, Permission.WRITE))) { - throw new AccessDeniedException(s3Path.toString(), null, - format("bucket '%s' is not writable", - s3Path.getBucket())); - } - break; - } - } - } + EnumSet.of(Permission.FULL_CONTROL, Permission.WRITE))) { + throw new AccessDeniedException(s3Path.toString(), null, + format("bucket '%s' is not writable", + s3Path.getBucket())); + } + break; + } + } + } /** * check if the param acl has the same owner than the parameter owner and @@ -653,198 +653,198 @@ else if (!hasPermissions(acl, caller, * @param permissions almost one * @return */ - private boolean hasPermissions(AccessControlPolicy acl, String owner, - EnumSet permissions) { - boolean result = false; - for (Grant grant : acl.grants()) { - if (grant.grantee().id().equals(owner) - && permissions.contains(grant.permission())) { - result = true; - break; - } - } - return result; - } - - @Override - public V getFileAttributeView(Path path, Class type, LinkOption... options) { - Preconditions.checkArgument(path instanceof S3Path, - "path must be an instance of %s", S3Path.class.getName()); - S3Path s3Path = (S3Path) path; - if (type.isAssignableFrom(BasicFileAttributeView.class)) { - try { - return (V) new S3FileAttributesView(readAttr0(s3Path)); - } - catch (IOException e) { - throw new RuntimeException("Unable read attributes for file: " + FilesEx.toUriString(s3Path), e); - } - } - log.trace("Unsupported S3 file system provider file attribute view: " + type.getName()); + private boolean hasPermissions(AccessControlPolicy acl, String owner, + EnumSet permissions) { + boolean result = false; + for (Grant grant : acl.grants()) { + if (grant.grantee().id().equals(owner) + && permissions.contains(grant.permission())) { + result = true; + break; + } + } + return result; + } + + @Override + public V getFileAttributeView(Path path, Class type, LinkOption... options) { + Preconditions.checkArgument(path instanceof S3Path, + "path must be an instance of %s", S3Path.class.getName()); + S3Path s3Path = (S3Path) path; + if (type.isAssignableFrom(BasicFileAttributeView.class)) { + try { + return (V) new S3FileAttributesView(readAttr0(s3Path)); + } + catch (IOException e) { + throw new RuntimeException("Unable read attributes for file: " + FilesEx.toUriString(s3Path), e); + } + } + log.trace("Unsupported S3 file system provider file attribute view: " + type.getName()); return null; - } - - - @Override - public A readAttributes(Path path, Class type, LinkOption... options) throws IOException { - Preconditions.checkArgument(path instanceof S3Path, - "path must be an instance of %s", S3Path.class.getName()); - S3Path s3Path = (S3Path) path; - if (type.isAssignableFrom(BasicFileAttributes.class)) { - return (A) ("".equals(s3Path.getKey()) - // the root bucket is implicitly a directory - ? new S3FileAttributes("/", null, 0, true, false) - // read the target path attributes - : readAttr0(s3Path)); - } - // not support attribute class - throw new UnsupportedOperationException(format("only %s supported", BasicFileAttributes.class)); - } - - private Optional readAttr1(S3Path s3Path) throws IOException { - try { - return Optional.of(readAttr0(s3Path)); - } - catch (NoSuchFileException e) { - return Optional.empty(); - } - } - - private S3FileAttributes readAttr0(S3Path s3Path) throws IOException { - S3Object objectSummary = s3ObjectSummaryLookup.lookup(s3Path); - - // parse the data to BasicFileAttributes. - FileTime lastModifiedTime = null; - if( objectSummary.lastModified() != null ) { - lastModifiedTime = FileTime.from(objectSummary.lastModified().toEpochMilli(), TimeUnit.MILLISECONDS); - } - - long size = objectSummary.size(); - boolean directory = false; - boolean regularFile = false; - String key = objectSummary.key(); - // check if is a directory and the key of this directory exists in amazon s3 - if (objectSummary.key().equals(s3Path.getKey() + "/") && objectSummary.key().endsWith("/")) { - directory = true; - } - // is a directory but does not exist in amazon s3 - else if ((!objectSummary.key().equals(s3Path.getKey()) || "".equals(s3Path.getKey())) && objectSummary.key().startsWith(s3Path.getKey())){ - directory = true; - // no metadata, we fake one - size = 0; - // delete extra part - key = s3Path.getKey() + "/"; - } - // is a file: - else { - regularFile = true; - } - - return new S3FileAttributes(key, lastModifiedTime, size, directory, regularFile); - } - - @Override - public Map readAttributes(Path path, String attributes, LinkOption... options) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void setAttribute(Path path, String attribute, Object value, - LinkOption... options) throws IOException { - throw new UnsupportedOperationException(); - } - - protected S3FileSystem createFileSystem(URI uri, AwsConfig awsConfig) { - // try to load amazon props - Properties props = loadAmazonProperties(); - // add properties for legacy compatibility - props.putAll(awsConfig.getS3LegacyProperties()); - - final String bucketName = S3Path.bucketName(uri); + } + + + @Override + public A readAttributes(Path path, Class type, LinkOption... options) throws IOException { + Preconditions.checkArgument(path instanceof S3Path, + "path must be an instance of %s", S3Path.class.getName()); + S3Path s3Path = (S3Path) path; + if (type.isAssignableFrom(BasicFileAttributes.class)) { + return (A) ("".equals(s3Path.getKey()) + // the root bucket is implicitly a directory + ? new S3FileAttributes("/", null, 0, true, false) + // read the target path attributes + : readAttr0(s3Path)); + } + // not support attribute class + throw new UnsupportedOperationException(format("only %s supported", BasicFileAttributes.class)); + } + + private Optional readAttr1(S3Path s3Path) throws IOException { + try { + return Optional.of(readAttr0(s3Path)); + } + catch (NoSuchFileException e) { + return Optional.empty(); + } + } + + private S3FileAttributes readAttr0(S3Path s3Path) throws IOException { + S3Object objectSummary = s3ObjectSummaryLookup.lookup(s3Path); + + // parse the data to BasicFileAttributes. + FileTime lastModifiedTime = null; + if( objectSummary.lastModified() != null ) { + lastModifiedTime = FileTime.from(objectSummary.lastModified().toEpochMilli(), TimeUnit.MILLISECONDS); + } + + long size = objectSummary.size(); + boolean directory = false; + boolean regularFile = false; + String key = objectSummary.key(); + // check if is a directory and the key of this directory exists in amazon s3 + if (objectSummary.key().equals(s3Path.getKey() + "/") && objectSummary.key().endsWith("/")) { + directory = true; + } + // is a directory but does not exist in amazon s3 + else if ((!objectSummary.key().equals(s3Path.getKey()) || "".equals(s3Path.getKey())) && objectSummary.key().startsWith(s3Path.getKey())) { + directory = true; + // no metadata, we fake one + size = 0; + // delete extra part + key = s3Path.getKey() + "/"; + } + // is a file: + else { + regularFile = true; + } + + return new S3FileAttributes(key, lastModifiedTime, size, directory, regularFile); + } + + @Override + public Map readAttributes(Path path, String attributes, LinkOption... options) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void setAttribute(Path path, String attribute, Object value, + LinkOption... options) throws IOException { + throw new UnsupportedOperationException(); + } + + protected S3FileSystem createFileSystem(URI uri, AwsConfig awsConfig) { + // try to load amazon props + Properties props = loadAmazonProperties(); + // add properties for legacy compatibility + props.putAll(awsConfig.getS3LegacyProperties()); + + final String bucketName = S3Path.bucketName(uri); // do not use `global` flag for custom endpoint because // when enabling that flag, it overrides S3 endpoints with AWS global endpoint // see https://github.com/nextflow-io/nextflow/pull/5779 - final boolean global = bucketName!=null && !awsConfig.getS3Config().isCustomEndpoint(); - final AwsClientFactory factory = new AwsClientFactory(awsConfig, awsConfig.resolveS3Region()); - final S3Client client = new S3Client(factory, props, global); - - // set the client acl - client.setCannedAcl(getProp(props, "s_3_acl", "s3_acl", "s3acl", "s3Acl")); - client.setStorageEncryption(props.getProperty("storage_encryption")); - client.setKmsKeyId(props.getProperty("storage_kms_key_id")); + final boolean global = bucketName!=null && !awsConfig.getS3Config().isCustomEndpoint(); + final AwsClientFactory factory = new AwsClientFactory(awsConfig, awsConfig.resolveS3Region()); + final S3Client client = new S3Client(factory, props, global); + + // set the client acl + client.setCannedAcl(getProp(props, "s_3_acl", "s3_acl", "s3acl", "s3Acl")); + client.setStorageEncryption(props.getProperty("storage_encryption")); + client.setKmsKeyId(props.getProperty("storage_kms_key_id")); client.setRequesterPaysEnabled(props.getProperty("requester_pays")); - if( props.getProperty("glacier_auto_retrieval") != null ) - log.warn("Glacier auto-retrieval is no longer supported, config option `aws.client.glacierAutoRetrieval` will be ignored"); - - return new S3FileSystem(this, client, uri, props); - } - - protected String getProp(Properties props, String... keys) { - for( String k : keys ) { - if( props.containsKey(k) ) { - return props.getProperty(k); - } - } - return null; - } - - /** - * find /amazon.properties in the classpath - * @return Properties amazon.properties - */ - protected Properties loadAmazonProperties() { - Properties props = new Properties(); - // http://www.javaworld.com/javaworld/javaqa/2003-06/01-qa-0606-load.html - // http://www.javaworld.com/javaqa/2003-08/01-qa-0808-property.html - try(InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream("amazon.properties")){ - if (in != null){ - props.load(in); - } - - } catch (IOException e) {} - - return props; - } - - // ~~~ - - private void verifySupportedOptions(Set allowedOptions, - Set actualOptions) { - Sets.SetView unsupported = difference(actualOptions, - allowedOptions); - Preconditions.checkArgument(unsupported.isEmpty(), - "the following options are not supported: %s", unsupported); - } - /** - * check that the paths exists or not - * @param path S3Path - * @return true if exists - */ - private boolean exists(S3Path path) { - try { + if( props.getProperty("glacier_auto_retrieval") != null ) + log.warn("Glacier auto-retrieval is no longer supported, config option `aws.client.glacierAutoRetrieval` will be ignored"); + + return new S3FileSystem(this, client, uri, props); + } + + protected String getProp(Properties props, String... keys) { + for( String k : keys ) { + if( props.containsKey(k) ) { + return props.getProperty(k); + } + } + return null; + } + + /** + * find /amazon.properties in the classpath + * @return Properties amazon.properties + */ + protected Properties loadAmazonProperties() { + Properties props = new Properties(); + // http://www.javaworld.com/javaworld/javaqa/2003-06/01-qa-0606-load.html + // http://www.javaworld.com/javaqa/2003-08/01-qa-0808-property.html + try(InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream("amazon.properties")) { + if (in != null) { + props.load(in); + } + + } catch (IOException e) {} + + return props; + } + + // ~~~ + + private void verifySupportedOptions(Set allowedOptions, + Set actualOptions) { + Sets.SetView unsupported = difference(actualOptions, + allowedOptions); + Preconditions.checkArgument(unsupported.isEmpty(), + "the following options are not supported: %s", unsupported); + } + /** + * check that the paths exists or not + * @param path S3Path + * @return true if exists + */ + private boolean exists(S3Path path) { + try { s3ObjectSummaryLookup.lookup(path); - return true; - } + return true; + } catch (IOException e) { - return false; - } - } + return false; + } + } - /** - * Get the Control List, if the path does not exist + /** + * Get the Control List, if the path does not exist * (because the path is a directory and this key isn't created at amazon s3) * then return the ACL of the first child. * - * @param path {@link S3Path} - * @return AccessControlList - * @throws IOException if error getting access control - */ - private AccessControlPolicy getAccessControl(S3Path path) throws IOException{ + * @param path {@link S3Path} + * @return AccessControlList + * @throws IOException if error getting access control + */ + private AccessControlPolicy getAccessControl(S3Path path) throws IOException { String key = path.getKey(); if (key == null || key.isEmpty()) return path.getFileSystem().getClient().getBucketAcl(path.getBucket()); return path.getFileSystem().getClient().getObjectAcl(path.getBucket(), key); - } + } /** * create a temporal directory to create streams diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java index a4bcc9e73f..c0f00f3805 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java @@ -49,50 +49,50 @@ public class S3Path implements Path, TagAwareFile { - public static final String PATH_SEPARATOR = "/"; - /** - * bucket name - */ - private final String bucket; - /** - * Parts without bucket name. - */ - private final List parts; - /** - * actual filesystem - */ - private S3FileSystem fileSystem; - - private S3Object object; - - private Map tags; - - private String contentType; - - private String storageClass; - - /** - * path must be a string of the form "/{bucket}", "/{bucket}/{key}" or just - * "{key}". - * Examples: - *
    - *
  • "/{bucket}//{value}" good, empty key paths are ignored
  • - *
  • "//{key}" error, missing bucket
  • - *
  • "/" error, missing bucket
  • - *
- * - */ - public S3Path(S3FileSystem fileSystem, String path) { - - this(fileSystem, path, ""); - } + public static final String PATH_SEPARATOR = "/"; + /** + * bucket name + */ + private final String bucket; + /** + * Parts without bucket name. + */ + private final List parts; + /** + * actual filesystem + */ + private S3FileSystem fileSystem; + + private S3Object object; + + private Map tags; + + private String contentType; + + private String storageClass; + + /** + * path must be a string of the form "/{bucket}", "/{bucket}/{key}" or just + * "{key}". + * Examples: + *
    + *
  • "/{bucket}//{value}" good, empty key paths are ignored
  • + *
  • "//{key}" error, missing bucket
  • + *
  • "/" error, missing bucket
  • + *
+ * + */ + public S3Path(S3FileSystem fileSystem, String path) { + + this(fileSystem, path, ""); + } /** * Build an S3Path from path segments. '/' are stripped from each segment. * @param first should be star with a '/' and the first element is the bucket * @param more directories and files */ - public S3Path(S3FileSystem fileSystem, String first, + public S3Path(S3FileSystem fileSystem, String first, String ... more) { String bucket = null; @@ -128,10 +128,10 @@ public S3Path(S3FileSystem fileSystem, String first, parts.addAll(moreSplitted); - this.bucket = bucket; - this.parts = KeyParts.parse(parts); - this.fileSystem = fileSystem; - } + this.bucket = bucket; + this.parts = KeyParts.parse(parts); + this.fileSystem = fileSystem; + } private S3Path(S3FileSystem fileSystem, String bucket, Iterable keys){ @@ -141,472 +141,474 @@ private S3Path(S3FileSystem fileSystem, String bucket, } - public String getBucket() { - return bucket; - } - /** - * key for amazon without final slash. - * note: the final slash need to be added to save a directory (Amazon s3 spec) - */ - public String getKey() { - if (parts.isEmpty()) { - return ""; - } - - ImmutableList.Builder builder = ImmutableList - . builder().addAll(parts); - - return Joiner.on(PATH_SEPARATOR).join(builder.build()); - } - - public S3ObjectId toS3ObjectId() { - return new S3ObjectId(bucket, getKey()); - } - - @Override - public S3FileSystem getFileSystem() { - return this.fileSystem; - } - - @Override - public boolean isAbsolute() { - return bucket != null; - } - - @Override - public Path getRoot() { - if (isAbsolute()) { - return new S3Path(fileSystem, bucket, ImmutableList. of()); - } - - return null; - } - - @Override - public Path getFileName() { - if (!parts.isEmpty()) { - return new S3Path(fileSystem, null, parts.subList(parts.size() - 1, - parts.size())); - } + public String getBucket() { + return bucket; + } + + /** + * key for amazon without final slash. + * note: the final slash need to be added to save a directory (Amazon s3 spec) + */ + public String getKey() { + if (parts.isEmpty()) { + return ""; + } + + ImmutableList.Builder builder = ImmutableList + . builder().addAll(parts); + + return Joiner.on(PATH_SEPARATOR).join(builder.build()); + } + + public S3ObjectId toS3ObjectId() { + return new S3ObjectId(bucket, getKey()); + } + + @Override + public S3FileSystem getFileSystem() { + return this.fileSystem; + } + + @Override + public boolean isAbsolute() { + return bucket != null; + } + + @Override + public Path getRoot() { + if (isAbsolute()) { + return new S3Path(fileSystem, bucket, ImmutableList. of()); + } + + return null; + } + + @Override + public Path getFileName() { + if (!parts.isEmpty()) { + return new S3Path(fileSystem, null, parts.subList(parts.size() - 1, + parts.size())); + } else { // bucket dont have fileName return null; } - } - - @Override - public Path getParent() { - // bucket is not present in the parts - if (parts.isEmpty()) { - return null; - } - - if (parts.size() == 1 && (bucket == null || bucket.isEmpty())){ - return null; - } - - return new S3Path(fileSystem, bucket, - parts.subList(0, parts.size() - 1)); - } - - @Override - public int getNameCount() { - return parts.size(); - } - - @Override - public Path getName(int index) { - return new S3Path(fileSystem, null, parts.subList(index, index + 1)); - } - - @Override - public Path subpath(int beginIndex, int endIndex) { - return new S3Path(fileSystem, null, parts.subList(beginIndex, endIndex)); - } - - @Override - public boolean startsWith(Path other) { - - if (other.getNameCount() > this.getNameCount()){ - return false; - } - - if (!(other instanceof S3Path)){ - return false; - } - - S3Path path = (S3Path) other; - - if (path.parts.size() == 0 && path.bucket == null && - (this.parts.size() != 0 || this.bucket != null)){ - return false; - } - - if ((path.getBucket() != null && !path.getBucket().equals(this.getBucket())) || - (path.getBucket() == null && this.getBucket() != null)){ - return false; - } - - for (int i = 0; i < path.parts.size() ; i++){ - if (!path.parts.get(i).equals(this.parts.get(i))){ - return false; - } - } - return true; - } - - @Override - public boolean startsWith(String path) { - S3Path other = new S3Path(this.fileSystem, path); - return this.startsWith(other); - } - - @Override - public boolean endsWith(Path other) { - if (other.getNameCount() > this.getNameCount()){ - return false; - } - // empty - if (other.getNameCount() == 0 && - this.getNameCount() != 0){ - return false; - } - - if (!(other instanceof S3Path)){ - return false; - } - - S3Path path = (S3Path) other; - - if ((path.getBucket() != null && !path.getBucket().equals(this.getBucket())) || - (path.getBucket() != null && this.getBucket() == null)){ - return false; - } - - // check subkeys - - int i = path.parts.size() - 1; - int j = this.parts.size() - 1; - for (; i >= 0 && j >= 0 ;){ - - if (!path.parts.get(i).equals(this.parts.get(j))){ - return false; - } - i--; - j--; - } - return true; - } - - @Override - public boolean endsWith(String other) { - return this.endsWith(new S3Path(this.fileSystem, other)); - } - - @Override - public Path normalize() { - if( parts==null || parts.size()==0 ) - return this; - - return new S3Path(fileSystem, bucket, normalize0(parts)); - } - - private Iterable normalize0(List parts) { - final String s0 = Path.of(String.join(PATH_SEPARATOR, parts)).normalize().toString(); - return Lists.newArrayList(Splitter.on(PATH_SEPARATOR).split(s0)); - } - - @Override - public Path resolve(Path other) { - Preconditions.checkArgument(other instanceof S3Path, - "other must be an instance of %s", S3Path.class.getName()); - - S3Path s3Path = (S3Path) other; - - if (s3Path.isAbsolute()) { - return s3Path; - } - - if (s3Path.parts.isEmpty()) { // other is relative and empty - return this; - } - - return new S3Path(fileSystem, bucket, concat(parts, s3Path.parts)); - } - - @Override - public Path resolve(String other) { - return resolve(new S3Path(this.getFileSystem(), other)); - } - - @Override - public Path resolveSibling(Path other) { - Preconditions.checkArgument(other instanceof S3Path, - "other must be an instance of %s", S3Path.class.getName()); - - S3Path s3Path = (S3Path) other; - - Path parent = getParent(); - - if (parent == null || s3Path.isAbsolute()) { - return s3Path; - } - - if (s3Path.parts.isEmpty()) { // other is relative and empty - return parent; - } - - return new S3Path(fileSystem, bucket, concat( - parts.subList(0, parts.size() - 1), s3Path.parts)); - } - - @Override - public Path resolveSibling(String other) { - return resolveSibling(new S3Path(this.getFileSystem(), other)); - } - - @Override - public Path relativize(Path other) { - Preconditions.checkArgument(other instanceof S3Path, - "other must be an instance of %s", S3Path.class.getName()); - S3Path s3Path = (S3Path) other; - - if (this.equals(other)) { - return new S3Path(this.getFileSystem(), ""); - } - - Preconditions.checkArgument(isAbsolute(), - "Path is already relative: %s", this); - Preconditions.checkArgument(s3Path.isAbsolute(), - "Cannot relativize against a relative path: %s", s3Path); - Preconditions.checkArgument(bucket.equals(s3Path.getBucket()), - "Cannot relativize paths with different buckets: '%s', '%s'", - this, other); - - Preconditions.checkArgument(parts.size() <= s3Path.parts.size(), - "Cannot relativize against a parent path: '%s', '%s'", - this, other); - - - int startPart = 0; - for (int i = 0; i resultParts = new ArrayList<>(); - for (int i = startPart; i < s3Path.parts.size(); i++){ - resultParts.add(s3Path.parts.get(i)); - } - - return new S3Path(fileSystem, null, resultParts); - } - - @Override - public URI toUri() { - StringBuilder builder = new StringBuilder(); - builder.append("s3://"); - if (fileSystem.getEndpoint() != null) { - builder.append(fileSystem.getEndpoint()); - } - builder.append("/"); - builder.append(bucket); - builder.append(PATH_SEPARATOR); - builder.append(Joiner.on(PATH_SEPARATOR).join(parts)); - return URI.create(builder.toString()); - } - - @Override - public Path toAbsolutePath() { - if (isAbsolute()) { - return this; - } - - throw new IllegalStateException(format( - "Relative path cannot be made absolute: %s", this)); - } - - @Override - public Path toRealPath(LinkOption... options) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public File toFile() { - throw new UnsupportedOperationException(); - } - - @Override - public WatchKey register(WatchService watcher, WatchEvent.Kind[] events, - WatchEvent.Modifier... modifiers) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public WatchKey register(WatchService watcher, WatchEvent.Kind... events) - throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator iterator() { - ImmutableList.Builder builder = ImmutableList.builder(); - - for (Iterator iterator = parts.iterator(); iterator.hasNext();) { - String part = iterator.next(); - builder.add(new S3Path(fileSystem, null, ImmutableList.of(part))); - } - - return builder.build().iterator(); - } - - @Override - public int compareTo(Path other) { - return toString().compareTo(other.toString()); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - - if (isAbsolute()) { - builder.append(PATH_SEPARATOR); - builder.append(bucket); - builder.append(PATH_SEPARATOR); - } - - builder.append(Joiner.on(PATH_SEPARATOR).join(parts)); - - return builder.toString(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - S3Path paths = (S3Path) o; - - if (bucket != null ? !bucket.equals(paths.bucket) - : paths.bucket != null) { - return false; - } - if (!parts.equals(paths.parts)) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = bucket != null ? bucket.hashCode() : 0; - result = 31 * result + parts.hashCode(); - return result; - } - - /** - * This method returns the cached {@link S3Object} instance if this path has been created - * while iterating a directory structures by the {@link S3Iterator}. - *
- * After calling this method the cached object is reset, so any following method invocation will return {@code null}. - * This is necessary to discard the object meta-data and force to reload file attributes when required. - * - * @return The cached {@link S3Object} for this path if any. - */ - public S3Object fetchObject() { - S3Object result = object; - object = null; - return result; - } - - // note: package scope to limit the access to this setter - void setObjectSummary(S3Object objectSummary) { - this.object = objectSummary; - } - - @Override - public void setTags(Map tags) { - this.tags = tags; - } - - @Override - public void setContentType(String type) { - this.contentType = type; - } - - @Override - public void setStorageClass(String storageClass) { - this.storageClass = storageClass; - } - - public List getTagsList() { - // nothing found, just return - if( tags==null ) - return Collections.emptyList(); - // create a list of Tag out of the Map - List result = new ArrayList<>(); - for( Map.Entry entry : tags.entrySet()) { - result.add( Tag.builder().key(entry.getKey()).value(entry.getValue()).build() ); - } - return result; - } - - public String getContentType() { - return contentType; - } - - public String getStorageClass() { - return storageClass; - } - - // ~ helpers methods - - private static Function strip(final String ... strs) { - return new Function() { - public String apply(String input) { - String res = input; - for (String str : strs) { - res = res.replace(str, ""); - } - return res; - } - }; - } - - private static Predicate notEmpty() { - return new Predicate() { - @Override - public boolean apply(@Nullable String input) { - return input != null && !input.isEmpty(); - } - }; - } - /* - * delete redundant "/" and empty parts - */ - private abstract static class KeyParts{ - - private static ImmutableList parse(List parts) { - return ImmutableList.copyOf(filter(transform(parts, strip("/")), notEmpty())); - } - - private static ImmutableList parse(Iterable parts) { - return ImmutableList.copyOf(filter(transform(parts, strip("/")), notEmpty())); - } - } - - public static String bucketName(URI uri) { - final String path = uri.getPath(); - if( path==null || !path.startsWith("/") ) - throw new IllegalArgumentException("Invalid S3 path: " + uri); - final String[] parts = path.split("/"); - // note the element 0 contains the slash char - return parts.length>1 ? parts[1] : null; - } + } + + @Override + public Path getParent() { + // bucket is not present in the parts + if (parts.isEmpty()) { + return null; + } + + if (parts.size() == 1 && (bucket == null || bucket.isEmpty())){ + return null; + } + + return new S3Path(fileSystem, bucket, + parts.subList(0, parts.size() - 1)); + } + + @Override + public int getNameCount() { + return parts.size(); + } + + @Override + public Path getName(int index) { + return new S3Path(fileSystem, null, parts.subList(index, index + 1)); + } + + @Override + public Path subpath(int beginIndex, int endIndex) { + return new S3Path(fileSystem, null, parts.subList(beginIndex, endIndex)); + } + + @Override + public boolean startsWith(Path other) { + + if (other.getNameCount() > this.getNameCount()){ + return false; + } + + if (!(other instanceof S3Path)){ + return false; + } + + S3Path path = (S3Path) other; + + if (path.parts.size() == 0 && path.bucket == null && + (this.parts.size() != 0 || this.bucket != null)){ + return false; + } + + if ((path.getBucket() != null && !path.getBucket().equals(this.getBucket())) || + (path.getBucket() == null && this.getBucket() != null)){ + return false; + } + + for (int i = 0; i < path.parts.size() ; i++){ + if (!path.parts.get(i).equals(this.parts.get(i))){ + return false; + } + } + return true; + } + + @Override + public boolean startsWith(String path) { + S3Path other = new S3Path(this.fileSystem, path); + return this.startsWith(other); + } + + @Override + public boolean endsWith(Path other) { + if (other.getNameCount() > this.getNameCount()){ + return false; + } + // empty + if (other.getNameCount() == 0 && + this.getNameCount() != 0){ + return false; + } + + if (!(other instanceof S3Path)){ + return false; + } + + S3Path path = (S3Path) other; + + if ((path.getBucket() != null && !path.getBucket().equals(this.getBucket())) || + (path.getBucket() != null && this.getBucket() == null)){ + return false; + } + + // check subkeys + + int i = path.parts.size() - 1; + int j = this.parts.size() - 1; + for (; i >= 0 && j >= 0 ;){ + + if (!path.parts.get(i).equals(this.parts.get(j))){ + return false; + } + i--; + j--; + } + return true; + } + + @Override + public boolean endsWith(String other) { + return this.endsWith(new S3Path(this.fileSystem, other)); + } + + @Override + public Path normalize() { + if( parts==null || parts.size()==0 ) + return this; + + return new S3Path(fileSystem, bucket, normalize0(parts)); + } + + private Iterable normalize0(List parts) { + final String s0 = Path.of(String.join(PATH_SEPARATOR, parts)).normalize().toString(); + return Lists.newArrayList(Splitter.on(PATH_SEPARATOR).split(s0)); + } + + @Override + public Path resolve(Path other) { + Preconditions.checkArgument(other instanceof S3Path, + "other must be an instance of %s", S3Path.class.getName()); + + S3Path s3Path = (S3Path) other; + + if (s3Path.isAbsolute()) { + return s3Path; + } + + if (s3Path.parts.isEmpty()) { // other is relative and empty + return this; + } + + return new S3Path(fileSystem, bucket, concat(parts, s3Path.parts)); + } + + @Override + public Path resolve(String other) { + return resolve(new S3Path(this.getFileSystem(), other)); + } + + @Override + public Path resolveSibling(Path other) { + Preconditions.checkArgument(other instanceof S3Path, + "other must be an instance of %s", S3Path.class.getName()); + + S3Path s3Path = (S3Path) other; + + Path parent = getParent(); + + if (parent == null || s3Path.isAbsolute()) { + return s3Path; + } + + if (s3Path.parts.isEmpty()) { // other is relative and empty + return parent; + } + + return new S3Path(fileSystem, bucket, concat( + parts.subList(0, parts.size() - 1), s3Path.parts)); + } + + @Override + public Path resolveSibling(String other) { + return resolveSibling(new S3Path(this.getFileSystem(), other)); + } + + @Override + public Path relativize(Path other) { + Preconditions.checkArgument(other instanceof S3Path, + "other must be an instance of %s", S3Path.class.getName()); + S3Path s3Path = (S3Path) other; + + if (this.equals(other)) { + return new S3Path(this.getFileSystem(), ""); + } + + Preconditions.checkArgument(isAbsolute(), + "Path is already relative: %s", this); + Preconditions.checkArgument(s3Path.isAbsolute(), + "Cannot relativize against a relative path: %s", s3Path); + Preconditions.checkArgument(bucket.equals(s3Path.getBucket()), + "Cannot relativize paths with different buckets: '%s', '%s'", + this, other); + + Preconditions.checkArgument(parts.size() <= s3Path.parts.size(), + "Cannot relativize against a parent path: '%s', '%s'", + this, other); + + + int startPart = 0; + for (int i = 0; i resultParts = new ArrayList<>(); + for (int i = startPart; i < s3Path.parts.size(); i++){ + resultParts.add(s3Path.parts.get(i)); + } + + return new S3Path(fileSystem, null, resultParts); + } + + @Override + public URI toUri() { + StringBuilder builder = new StringBuilder(); + builder.append("s3://"); + if (fileSystem.getEndpoint() != null) { + builder.append(fileSystem.getEndpoint()); + } + builder.append("/"); + builder.append(bucket); + builder.append(PATH_SEPARATOR); + builder.append(Joiner.on(PATH_SEPARATOR).join(parts)); + return URI.create(builder.toString()); + } + + @Override + public Path toAbsolutePath() { + if (isAbsolute()) { + return this; + } + + throw new IllegalStateException(format( + "Relative path cannot be made absolute: %s", this)); + } + + @Override + public Path toRealPath(LinkOption... options) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public File toFile() { + throw new UnsupportedOperationException(); + } + + @Override + public WatchKey register(WatchService watcher, WatchEvent.Kind[] events, + WatchEvent.Modifier... modifiers) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public WatchKey register(WatchService watcher, WatchEvent.Kind... events) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + ImmutableList.Builder builder = ImmutableList.builder(); + + for (Iterator iterator = parts.iterator(); iterator.hasNext();) { + String part = iterator.next(); + builder.add(new S3Path(fileSystem, null, ImmutableList.of(part))); + } + + return builder.build().iterator(); + } + + @Override + public int compareTo(Path other) { + return toString().compareTo(other.toString()); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + + if (isAbsolute()) { + builder.append(PATH_SEPARATOR); + builder.append(bucket); + builder.append(PATH_SEPARATOR); + } + + builder.append(Joiner.on(PATH_SEPARATOR).join(parts)); + + return builder.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + S3Path paths = (S3Path) o; + + if (bucket != null ? !bucket.equals(paths.bucket) + : paths.bucket != null) { + return false; + } + if (!parts.equals(paths.parts)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = bucket != null ? bucket.hashCode() : 0; + result = 31 * result + parts.hashCode(); + return result; + } + + /** + * This method returns the cached {@link S3Object} instance if this path has been created + * while iterating a directory structures by the {@link S3Iterator}. + *
+ * After calling this method the cached object is reset, so any following method invocation will return {@code null}. + * This is necessary to discard the object meta-data and force to reload file attributes when required. + * + * @return The cached {@link S3Object} for this path if any. + */ + public S3Object fetchObject() { + S3Object result = object; + object = null; + return result; + } + + // note: package scope to limit the access to this setter + void setObjectSummary(S3Object objectSummary) { + this.object = objectSummary; + } + + @Override + public void setTags(Map tags) { + this.tags = tags; + } + + @Override + public void setContentType(String type) { + this.contentType = type; + } + + @Override + public void setStorageClass(String storageClass) { + this.storageClass = storageClass; + } + + public List getTagsList() { + // nothing found, just return + if( tags==null ) + return Collections.emptyList(); + // create a list of Tag out of the Map + List result = new ArrayList<>(); + for( Map.Entry entry : tags.entrySet()) { + result.add( Tag.builder().key(entry.getKey()).value(entry.getValue()).build() ); + } + return result; + } + + public String getContentType() { + return contentType; + } + + public String getStorageClass() { + return storageClass; + } + + // ~ helpers methods + + private static Function strip(final String ... strs) { + return new Function() { + public String apply(String input) { + String res = input; + for (String str : strs) { + res = res.replace(str, ""); + } + return res; + } + }; + } + + private static Predicate notEmpty() { + return new Predicate() { + @Override + public boolean apply(@Nullable String input) { + return input != null && !input.isEmpty(); + } + }; + } + + /* + * delete redundant "/" and empty parts + */ + private abstract static class KeyParts { + + private static ImmutableList parse(List parts) { + return ImmutableList.copyOf(filter(transform(parts, strip("/")), notEmpty())); + } + + private static ImmutableList parse(Iterable parts) { + return ImmutableList.copyOf(filter(transform(parts, strip("/")), notEmpty())); + } + } + + public static String bucketName(URI uri) { + final String path = uri.getPath(); + if( path==null || !path.startsWith("/") ) + throw new IllegalArgumentException("Invalid S3 path: " + uri); + final String[] parts = path.split("/"); + // note the element 0 contains the slash char + return parts.length>1 ? parts[1] : null; + } } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/S3AsyncClientConfiguration.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/S3AsyncClientConfiguration.java index ceea07677d..2836acb69a 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/S3AsyncClientConfiguration.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/S3AsyncClientConfiguration.java @@ -21,7 +21,6 @@ import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration; import software.amazon.awssdk.services.s3.multipart.MultipartConfiguration; - import java.time.Duration; import java.util.Properties; @@ -30,7 +29,7 @@ * * @author Jorge Ejarque */ -public class S3AsyncClientConfiguration extends S3ClientConfiguration{ +public class S3AsyncClientConfiguration extends S3ClientConfiguration { private static final long DEFAULT_SOCKET_TIMEOUT_MS = 30_000L; @@ -41,51 +40,51 @@ public class S3AsyncClientConfiguration extends S3ClientConfiguration{ private Double targetThroughputInGbps; private Long maxNativeMemoryInBytes; - private S3CrtHttpConfiguration.Builder crtHttpConfiguration(){ + private S3CrtHttpConfiguration.Builder crtHttpConfiguration() { if( this.crtHttpConfiguration == null) this.crtHttpConfiguration = S3CrtHttpConfiguration.builder(); return this.crtHttpConfiguration; } - private MultipartConfiguration.Builder multipartBuilder(){ + private MultipartConfiguration.Builder multipartBuilder() { if( this.multiPartBuilder == null) this.multiPartBuilder = MultipartConfiguration.builder(); return this.multiPartBuilder; } - public S3CrtHttpConfiguration getCrtHttpConfiguration(){ + public S3CrtHttpConfiguration getCrtHttpConfiguration() { if ( this.crtHttpConfiguration == null ) return null; return this.crtHttpConfiguration.build(); } - public MultipartConfiguration getMultipartConfiguration(){ + public MultipartConfiguration getMultipartConfiguration() { if( this.multiPartBuilder == null ) return null; return this.multiPartBuilder.build(); } - private S3AsyncClientConfiguration(){ + private S3AsyncClientConfiguration() { super(); } - public S3CrtRetryConfiguration getCrtRetryConfiguration(){ + public S3CrtRetryConfiguration getCrtRetryConfiguration() { return this.crtRetryConfiguration; } - public Integer getMaxConcurrency(){ + public Integer getMaxConcurrency() { return this.maxConcurrency; } - public Double getTargetThroughputInGbps(){ + public Double getTargetThroughputInGbps() { return this.targetThroughputInGbps; } - public Long getMaxNativeMemoryInBytes(){ + public Long getMaxNativeMemoryInBytes() { return this.maxNativeMemoryInBytes; } - private void setAsyncConfiguration(Properties props){ + private void setAsyncConfiguration(Properties props) { if( props.containsKey("max_error_retry")) { log.trace("AWS client config - max_error_retry: {}", props.getProperty("max_error_retry")); @@ -161,11 +160,11 @@ private void setAsyncConfiguration(Properties props){ } public static S3AsyncClientConfiguration create(Properties props) { - S3AsyncClientConfiguration config = new S3AsyncClientConfiguration(); - if( props != null ){ + S3AsyncClientConfiguration config = new S3AsyncClientConfiguration(); + if( props != null ) { config.setClientOverrideConfiguration(props); config.setAsyncConfiguration(props); } - return config; - } + return config; + } } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/S3ClientConfiguration.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/S3ClientConfiguration.java index 975f429a51..e0156acb99 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/S3ClientConfiguration.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/S3ClientConfiguration.java @@ -40,46 +40,45 @@ public class S3ClientConfiguration { private ClientOverrideConfiguration.Builder cocBuilder; - private ClientOverrideConfiguration.Builder cocBuilder(){ + private ClientOverrideConfiguration.Builder cocBuilder() { if( this.cocBuilder == null ) this.cocBuilder = ClientOverrideConfiguration.builder(); return this.cocBuilder; } - public ClientOverrideConfiguration getClientOverrideConfiguration(){ + public ClientOverrideConfiguration getClientOverrideConfiguration() { if( cocBuilder == null ) return null; return cocBuilder.build(); } - protected S3ClientConfiguration(){} + protected S3ClientConfiguration() {} protected final void setClientOverrideConfiguration(Properties props) { - if( props == null ) - return; + if( props == null ) + return; - if( props.containsKey("max_error_retry")) { - log.trace("AWS client config - max_error_retry: {}", props.getProperty("max_error_retry")); - cocBuilder().retryStrategy(StandardRetryStrategy.builder().maxAttempts((Integer.parseInt(props.getProperty("max_error_retry")) + 1 )).build()); - } + if( props.containsKey("max_error_retry")) { + log.trace("AWS client config - max_error_retry: {}", props.getProperty("max_error_retry")); + cocBuilder().retryStrategy(StandardRetryStrategy.builder().maxAttempts((Integer.parseInt(props.getProperty("max_error_retry")) + 1 )).build()); + } - if( props.containsKey("protocol")) { - log.warn("AWS client config 'protocol' doesn't exist in AWS SDK V2"); - } + if( props.containsKey("protocol")) { + log.warn("AWS client config 'protocol' doesn't exist in AWS SDK V2"); + } - if ( props.containsKey("signer_override")) { - log.warn("AWS client config 'signerOverride' is not supported in AWS SDK V2. This option will be ignored."); + if ( props.containsKey("signer_override")) { + log.warn("AWS client config 'signerOverride' is not supported in AWS SDK V2. This option will be ignored."); - } + } - if( props.containsKey("socket_send_buffer_size_hints") || props.containsKey("socket_recv_buffer_size_hints") ) { - log.warn("AWS client config - 'socket_send_buffer_size_hints' and 'socket_recv_buffer_size_hints' do not exist in AWS SDK V2" ); - } + if( props.containsKey("socket_send_buffer_size_hints") || props.containsKey("socket_recv_buffer_size_hints") ) { + log.warn("AWS client config - 'socket_send_buffer_size_hints' and 'socket_recv_buffer_size_hints' do not exist in AWS SDK V2" ); + } - if( props.containsKey("user_agent")) { - log.warn("AWS client config 'user_agent' is not supported in AWS SDK V2. This option will be ignored."); - } - } + if( props.containsKey("user_agent")) { + log.warn("AWS client config 'user_agent' is not supported in AWS SDK V2. This option will be ignored."); + } + } } -