diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystem.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystem.groovy index 873a9c93d7..eb7dc0ef76 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystem.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystem.groovy @@ -27,6 +27,8 @@ import java.nio.file.PathMatcher import java.nio.file.WatchService import java.nio.file.attribute.UserPrincipalLookupService import java.time.Duration +import java.time.temporal.ChronoUnit +import java.util.function.Predicate import com.azure.core.util.polling.SyncPoller import com.azure.storage.blob.BlobServiceClient @@ -35,9 +37,16 @@ import com.azure.storage.blob.models.BlobCopyInfo import com.azure.storage.blob.models.BlobItem import com.azure.storage.blob.models.BlobStorageException import com.azure.storage.blob.models.ListBlobsOptions +import dev.failsafe.Failsafe +import dev.failsafe.RetryPolicy +import dev.failsafe.event.EventListener +import dev.failsafe.event.ExecutionAttemptedEvent +import dev.failsafe.function.CheckedSupplier import groovy.transform.CompileStatic +import groovy.transform.Memoized import groovy.transform.PackageScope import groovy.util.logging.Slf4j +import nextflow.cloud.azure.config.AzConfig /** * Implements a file system for Azure Blob Storage service * @@ -69,7 +78,7 @@ class AzFileSystem extends FileSystem { @PackageScope AzFileSystem() {} @PackageScope - AzFileSystem(AzFileSystemProvider provider, BlobServiceClient storageClient, String bucket ) { + AzFileSystem(AzFileSystemProvider provider, BlobServiceClient storageClient, String bucket) { this.provider = provider this.containerName = bucket this.storageClient = storageClient @@ -109,6 +118,10 @@ class AzFileSystem extends FileSystem { } private Iterable listContainers() { + return apply(()-> listContainers0()) + } + + private Iterable listContainers0() { final containers = new ArrayList() storageClient .listBlobContainers() @@ -359,18 +372,18 @@ class AzFileSystem extends FileSystem { boolean exists = false boolean isDirectory = false - def opts = new ListBlobsOptions() + final opts = new ListBlobsOptions() .setPrefix(path.blobName()) .setMaxResultsPerPage(10) try { - def values = path.containerClient().listBlobs(opts,null).iterator() + final values = apply(()-> path.containerClient().listBlobs(opts,null).iterator()) final char SLASH = '/' final String name = path.blobName() int count=0 - while( values.hasNext() ) { - BlobItem blob = values.next() + while( apply(()-> values.hasNext()) ) { + BlobItem blob = apply(()-> values.next()) if( blob.name == name ) exists = true else if( blob.name.startsWith(name) && blob.name.charAt(name.length())==SLASH ) { @@ -425,7 +438,7 @@ class AzFileSystem extends FileSystem { } @PackageScope - AzFileAttributes readAttributes(AzPath path) { + AzFileAttributes readAttributes(AzPath path) { final cache = path.attributesCache() if( cache ) return cache @@ -496,5 +509,42 @@ class AzFileSystem extends FileSystem { return false } } - + + + /** + * Creates a retry policy using the configuration specified by {@link nextflow.cloud.azure.config.AzRetryConfig} + * + * @param cond A predicate that determines when a retry should be triggered + * @return The {@link dev.failsafe.RetryPolicy} instance + */ + @Memoized + protected RetryPolicy retryPolicy(Predicate cond) { + final cfg = AzConfig.getConfig().retryConfig() + final listener = new EventListener>() { + @Override + void accept(ExecutionAttemptedEvent event) throws Throwable { + log.debug("Azure I/O exception - attempt: ${event.attemptCount}; cause: ${event.lastFailure?.message}") + } + } + return RetryPolicy.builder() + .handleIf(cond) + .withBackoff(cfg.delay.toMillis(), cfg.maxDelay.toMillis(), ChronoUnit.MILLIS) + .withMaxAttempts(cfg.maxAttempts) + .withJitter(cfg.jitter) + .onRetry(listener) + .build() + } + + /** + * Carry out the invocation of the specified action using a retry policy + * when {@code TooManyRequests} Azure Batch error is returned + * + * @param action A {@link dev.failsafe.function.CheckedSupplier} instance modeling the action to be performed in a safe manner + * @return The result of the supplied action + */ + protected T apply(CheckedSupplier action) { + final cond = (e -> e instanceof IOException) as Predicate + final policy = retryPolicy(cond) + return Failsafe.with(policy).get(action) + } } diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystemProvider.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystemProvider.groovy index 53a5e9ed19..f8fd7ab5b7 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystemProvider.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystemProvider.groovy @@ -86,7 +86,7 @@ class AzFileSystemProvider extends FileSystemProvider { return this.accountKey } - static private AzPath asAzPath(Path path ) { + static private AzPath asAzPath(Path path) { if( path !instanceof AzPath ) throw new IllegalArgumentException("Not a valid Azure blob storage path object: `$path` [${path?.class?.name?:'-'}]" ) return (AzPath)path diff --git a/plugins/nf-azure/src/test/nextflow/cloud/azure/nio/AzNioTest.groovy b/plugins/nf-azure/src/test/nextflow/cloud/azure/nio/AzNioTest.groovy index e4cf83fbff..8a923bfea2 100644 --- a/plugins/nf-azure/src/test/nextflow/cloud/azure/nio/AzNioTest.groovy +++ b/plugins/nf-azure/src/test/nextflow/cloud/azure/nio/AzNioTest.groovy @@ -17,6 +17,8 @@ import java.nio.file.attribute.BasicFileAttributes import com.azure.storage.blob.BlobServiceClient import com.azure.storage.blob.BlobServiceClientBuilder import com.azure.storage.common.StorageSharedKeyCredential +import nextflow.Global +import nextflow.Session import nextflow.exception.AbortOperationException import nextflow.trace.TraceHelper import spock.lang.IgnoreIf @@ -43,8 +45,13 @@ class AzNioTest extends Specification implements AzBaseSpec { def credential = new StorageSharedKeyCredential(accountName, accountKey); def endpoint = String.format(Locale.ROOT, "https://%s.blob.core.windows.net", accountName); storageClient = new BlobServiceClientBuilder().endpoint(endpoint).credential(credential).buildClient(); + and: + Global.session = Mock(Session) } + def cleanupSpec() { + Global.session = null + } def 'should create a blob' () { given: