diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy index 7ab4ba539d..f023fab4b5 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy @@ -17,6 +17,8 @@ package nextflow.cloud.azure.batch import java.math.RoundingMode +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.nio.file.Path import java.time.Duration import java.time.Instant @@ -93,6 +95,8 @@ import nextflow.util.CacheHelper import nextflow.util.MemoryUnit import nextflow.util.MustacheTemplateEngine import nextflow.util.Rnd +import reactor.core.publisher.Flux + /** * Implements Azure Batch operations for Nextflow executor * @@ -696,7 +700,7 @@ class AzBatchService implements Closeable { def pool = getPool(spec.poolId) if( !pool ) { if( config.batch().canCreatePool() ) { - createPool(spec) + safeCreatePool(spec) } else { throw new IllegalArgumentException("Can't find Azure Batch pool '$spec.poolId' - Make sure it exists or set `allowPoolCreation=true` in the nextflow config file") @@ -856,6 +860,27 @@ class AzBatchService implements Closeable { apply(() -> client.createPool(poolParams)) } + protected void safeCreatePool(AzVmPoolSpec spec) { + try { + createPool(spec) + } + catch (HttpResponseException e) { + if (e.response.statusCode == 409 && toString(e.response.body)?.contains("PoolExists")) { + log.debug "[AZURE BATCH] Pool '${spec.poolId}' already exists (ignoring creation request)" + return + } + throw e + } + } + + protected String toString(Flux body) { + body + .map(byteBuffer -> StandardCharsets.UTF_8.decode(byteBuffer).toString()) + .collectList() // Collects all strings into a List + .map(list -> String.join("", list)) // Joins the list into a single string + .block() + } + protected String scaleFormula(AzPoolOpts opts) { final target = opts.lowPriority ? 'TargetLowPriorityNodes' : 'TargetDedicatedNodes' // https://docs.microsoft.com/en-us/azure/batch/batch-automatic-scaling diff --git a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchServiceTest.groovy b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchServiceTest.groovy index 7923da46fc..d4a00bba5e 100644 --- a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchServiceTest.groovy +++ b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchServiceTest.groovy @@ -1,11 +1,15 @@ package nextflow.cloud.azure.batch +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.time.Instant import java.time.temporal.ChronoUnit import java.util.function.Predicate import com.azure.compute.batch.models.BatchPool import com.azure.compute.batch.models.ElevationLevel +import com.azure.core.exception.HttpResponseException +import com.azure.core.http.HttpResponse import com.azure.identity.ManagedIdentityCredential import com.google.common.hash.HashCode import nextflow.Global @@ -22,6 +26,7 @@ import nextflow.processor.TaskProcessor import nextflow.processor.TaskRun import nextflow.util.Duration import nextflow.util.MemoryUnit +import reactor.core.publisher.Flux import spock.lang.Specification import spock.lang.Unroll /** @@ -786,6 +791,7 @@ class AzBatchServiceTest extends Specification { [managedIdentity: [clientId: 'client-123']] | 'client-123' } + def 'should cache job id' () { given: def exec = Mock(AzBatchExecutor) @@ -836,4 +842,59 @@ class AzBatchServiceTest extends Specification { and: result == 'job3' } + + def 'should test safeCreatePool' () { + given: + def exec = Mock(AzBatchExecutor) + def service = Spy(new AzBatchService(exec)) + def spec = Mock(AzVmPoolSpec) { + getPoolId() >> 'test-pool' + } + + when: 'pool is created successfully' + service.safeCreatePool(spec) + + then: 'createPool is called once' + 1 * service.createPool(spec) >> null + and: + noExceptionThrown() + + when: 'pool already exists (409 with PoolExists)' + service.safeCreatePool(spec) + + then: 'exception is caught and debug message is logged' + 1 * service.createPool(spec) >> { + def response = Mock(HttpResponse) { + getStatusCode() >> 409 + getBody() >> { + Flux.just(ByteBuffer.wrap('{"error":{"code":"PoolExists"}}'.getBytes(StandardCharsets.UTF_8))) + } + } + throw new HttpResponseException("Pool already exists", response) + } + and: + noExceptionThrown() + + when: 'different HttpResponseException occurs' + service.safeCreatePool(spec) + + then: 'exception is rethrown' + 1 * service.createPool(spec) >> { + def response = Mock(HttpResponse) { + getStatusCode() >> 400 + getBody() >> { + Flux.just(ByteBuffer.wrap('{"error":{"code":"BadRequest"}}'.getBytes(StandardCharsets.UTF_8))) + } + } + throw new HttpResponseException("Bad request", response) + } + thrown(HttpResponseException) + + when: 'a different exception occurs' + service.safeCreatePool(spec) + + then: 'exception is not caught' + 1 * service.createPool(spec) >> { throw new IllegalArgumentException("Some other error") } + thrown(IllegalArgumentException) + } }