diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md index 6188e90b47..9ccb4192e5 100644 --- a/docs/learn/documentation/versioned/jobs/samza-configurations.md +++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md @@ -264,6 +264,7 @@ Configs for producing to [Azure Blob Storage](https://azure.microsoft.com/en-us/ |systems.**_system-name_**.azureblob.proxy.hostname| |if proxy.use is true then host name of proxy.| |systems.**_system-name_**.azureblob.proxy.port| |if proxy.use is true then port of proxy.| |systems.**_system-name_**.azureblob.writer.factory.class|`org.apache.samza.system.`
`azureblob.avro.`
`AzureBlobAvroWriterFactory`|Fully qualified class name of the `org.apache.samza.system.azureblob.producer.AzureBlobWriter` impl for the system producer.

The default writer creates blobs that are of type AVRO and require the messages sent to a blob to be AVRO records. The blobs created by the default writer are of type [Block Blobs](https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs#about-block-blobs).| +|systems.**_system-name_**.azureblob.azureclientbuilder.factory.class | `org.apache.samza.system.`
`azureblob.AzureBlobClientBuilderFactory`|Fully qualified class name of the factory of the `org.apache.samza.system.azureblob.BlobClientBuilder` implementation for the client builder. | |systems.**_system-name_**.azureblob.compression.type|"none"|type of compression to be used before uploading blocks. Can be "none" or "gzip".| |systems.**_system-name_**.azureblob.maxFlushThresholdSize|10485760 (10 MB)|max size of the uncompressed block to be uploaded in bytes. Maximum size allowed by Azure is 100MB.| |systems.**_system-name_**.azureblob.maxBlobSize|Long.MAX_VALUE (unlimited)|max size of the uncompressed blob in bytes.
If default value then size is unlimited capped only by Azure BlockBlob size of 4.75 TB (100 MB per block X 50,000 blocks).| diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java index a8a6eb2df1..479733e315 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java @@ -39,7 +39,7 @@ * configs given to the SystemProducer - such as which authentication method to use, whether to use proxy to authenticate, * and so on. */ -public final class AzureBlobClientBuilder { +public final class AzureBlobClientBuilder implements BlobClientBuilder { private final String systemName; private final String azureUrlFormat; private final AzureBlobConfig azureBlobConfig; diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java new file mode 100644 index 0000000000..7a5120c117 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.azureblob; + +/** + * Default implementation of {@link BlobClientBuilderFactory} that constructs a + * new instance of {@link AzureBlobClientBuilder}. + */ +public class AzureBlobClientBuilderFactory implements BlobClientBuilderFactory { + @Override + public BlobClientBuilder getBlobClientBuilder(String systemName, String azureUrlFormat, + AzureBlobConfig azureBlobConfig) { + return new AzureBlobClientBuilder(systemName, azureUrlFormat, azureBlobConfig); + } +} diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java index 37c6ebcf3b..bedf433703 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java @@ -38,6 +38,8 @@ public class AzureBlobConfig extends MapConfig { // fully qualified class name of the AzureBlobWriter impl for the producer system public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME = SYSTEM_AZUREBLOB_PREFIX + "writer.factory.class"; public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT = "org.apache.samza.system.azureblob.avro.AzureBlobAvroWriterFactory"; + public static final String SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME = SYSTEM_AZUREBLOB_PREFIX + "azureclientbuilder.factory.class"; + public static final String SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME_DEFAULT = "org.apache.samza.system.azureblob.AzureBlobClientBuilderFactory"; public static final String SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "useTokenCredentialAuthentication"; private static final boolean SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION_DEFAULT = false; @@ -172,6 +174,11 @@ public String getAzureBlobWriterFactoryClassName(String systemName) { return get(String.format(SYSTEM_WRITER_FACTORY_CLASS_NAME, systemName), SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT); } + public String getAzureBlobClientBuilderFactoryClassName(String systemName) { + return get(String.format(SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME, systemName), + SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME_DEFAULT); + } + public int getMaxFlushThresholdSize(String systemName) { return getInt(String.format(SYSTEM_MAX_FLUSH_THRESHOLD_SIZE, systemName), SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT); } diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java new file mode 100644 index 0000000000..f76c3d9553 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.azureblob; + +import com.azure.storage.blob.BlobServiceAsyncClient; + + +/** + * Create a BlobServiceAsyncClient. Implementation controls construction of underlying client. + * Customers implementing their own System Producer need to ensure thread safety of following impl for generation of client. + * If org.apache.samza.system.azureblob.producer.AzureBlobSystemProducer is used, it by defaults allows only one thread to create the client. + * Please ensure any client implementation of this interface to be thread safe as well. + * AzureBlobSystemProducer also ensures to safely close the client on call of stop(). Please ensure to close clients if using this interface + * to create your own client. + */ +public interface BlobClientBuilder { + /** + * Create a client for uploading to Azure Blob Storage + * @return New instance of {@link BlobServiceAsyncClient} + */ + BlobServiceAsyncClient getBlobServiceAsyncClient(); +} diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java new file mode 100644 index 0000000000..00d4cf94d1 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.azureblob; + +/** + * Constructs a new instance of type {@link BlobClientBuilder}. + * Implementation controls construction of underlying instance. + */ +public interface BlobClientBuilderFactory { + /** + * Create a new instance of {@link BlobClientBuilder} + * @param systemName Name of the system for which the blob client builder is being created + * @param azureUrlFormat The format of the Azure URL, which should conform to Azure's URL formatting requirements. + * @param azureBlobConfig The configuration settings for Azure Blob, encapsulated in an {@link AzureBlobConfig} object. + * This includes metadata details for Azure Blob configs. + * @return New instance of {@link BlobClientBuilder} + */ + BlobClientBuilder getBlobClientBuilder(String systemName, String azureUrlFormat, AzureBlobConfig azureBlobConfig); +} diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java index 2774dba41c..9fd2b3b1e7 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java @@ -44,8 +44,8 @@ import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemProducer; import org.apache.samza.system.SystemProducerException; -import org.apache.samza.system.azureblob.AzureBlobClientBuilder; import org.apache.samza.system.azureblob.AzureBlobConfig; +import org.apache.samza.system.azureblob.BlobClientBuilderFactory; import org.apache.samza.system.azureblob.compression.CompressionFactory; import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory; import org.slf4j.Logger; @@ -116,6 +116,7 @@ public class AzureBlobSystemProducer implements SystemProducer { // Map of writers indexed first by sourceName and then by (streamName, partitionName) or just streamName if partition key does not exist. private final Map> writerMap; private final AzureBlobWriterFactory writerFactory; + private final BlobClientBuilderFactory clientFactory; private final int blockFlushThresholdSize; private final long flushTimeoutMs; @@ -143,6 +144,13 @@ public AzureBlobSystemProducer(String systemName, AzureBlobConfig config, Metric this.systemName = systemName; this.config = config; + String clientFactoryClassName = this.config.getAzureBlobClientBuilderFactoryClassName(this.systemName); + try { + this.clientFactory = (BlobClientBuilderFactory) Class.forName(clientFactoryClassName).newInstance(); + } catch (Exception e) { + throw new SystemProducerException("Could not create blob client factory with name " + clientFactoryClassName, e); + } + String writerFactoryClassName = this.config.getAzureBlobWriterFactoryClassName(this.systemName); try { this.writerFactory = (AzureBlobWriterFactory) Class.forName(writerFactoryClassName).newInstance(); @@ -344,8 +352,8 @@ public void flush(String source) { @VisibleForTesting void setupAzureContainer() { try { - BlobServiceAsyncClient storageClient = new AzureBlobClientBuilder(systemName, AZURE_URL, config) - .getBlobServiceAsyncClient(); + BlobServiceAsyncClient storageClient = + clientFactory.getBlobClientBuilder(systemName, AZURE_URL, config).getBlobServiceAsyncClient(); validateFlushThresholdSizeSupported(storageClient); containerAsyncClient = storageClient.getBlobContainerAsyncClient(systemName);