Skip to content

Commit

Permalink
New interface & factory for AzureHTTPClient providers (#1700)
Browse files Browse the repository at this point in the history
* New interface for AzureHTTPClient providers

* Added license headers

* Updated method spec definitions

* Updated spec headers doc

* Updated config property in readme file

* Modified doc of method

* Fixed checkstyle failures
  • Loading branch information
khandelwal-ayush authored Aug 22, 2024
1 parent 3cb57c2 commit 2a94cad
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ Configs for producing to [Azure Blob Storage](https://azure.microsoft.com/en-us/
|systems.**_system-name_**.azureblob.proxy.hostname| |if proxy.use is true then host name of proxy.|
|systems.**_system-name_**.azureblob.proxy.port| |if proxy.use is true then port of proxy.|
|systems.**_system-name_**.azureblob.writer.factory.class|`org.apache.samza.system.`<br>`azureblob.avro.`<br>`AzureBlobAvroWriterFactory`|Fully qualified class name of the `org.apache.samza.system.azureblob.producer.AzureBlobWriter` impl for the system producer.<br><br>The default writer creates blobs that are of type AVRO and require the messages sent to a blob to be AVRO records. The blobs created by the default writer are of type [Block Blobs](https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs#about-block-blobs).|
|systems.**_system-name_**.azureblob.azureclientbuilder.factory.class | `org.apache.samza.system.`<br>`azureblob.AzureBlobClientBuilderFactory`|Fully qualified class name of the factory of the `org.apache.samza.system.azureblob.BlobClientBuilder` implementation for the client builder. |
|systems.**_system-name_**.azureblob.compression.type|"none"|type of compression to be used before uploading blocks. Can be "none" or "gzip".|
|systems.**_system-name_**.azureblob.maxFlushThresholdSize|10485760 (10 MB)|max size of the uncompressed block to be uploaded in bytes. Maximum size allowed by Azure is 100MB.|
|systems.**_system-name_**.azureblob.maxBlobSize|Long.MAX_VALUE (unlimited)|max size of the uncompressed blob in bytes.<br>If default value then size is unlimited capped only by Azure BlockBlob size of 4.75 TB (100 MB per block X 50,000 blocks).|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* configs given to the SystemProducer - such as which authentication method to use, whether to use proxy to authenticate,
* and so on.
*/
public final class AzureBlobClientBuilder {
public final class AzureBlobClientBuilder implements BlobClientBuilder {
private final String systemName;
private final String azureUrlFormat;
private final AzureBlobConfig azureBlobConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.azureblob;

/**
* Default implementation of {@link BlobClientBuilderFactory} that constructs a
* new instance of {@link AzureBlobClientBuilder}.
*/
public class AzureBlobClientBuilderFactory implements BlobClientBuilderFactory {
@Override
public BlobClientBuilder getBlobClientBuilder(String systemName, String azureUrlFormat,
AzureBlobConfig azureBlobConfig) {
return new AzureBlobClientBuilder(systemName, azureUrlFormat, azureBlobConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class AzureBlobConfig extends MapConfig {
// fully qualified class name of the AzureBlobWriter impl for the producer system
public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME = SYSTEM_AZUREBLOB_PREFIX + "writer.factory.class";
public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT = "org.apache.samza.system.azureblob.avro.AzureBlobAvroWriterFactory";
public static final String SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME = SYSTEM_AZUREBLOB_PREFIX + "azureclientbuilder.factory.class";
public static final String SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME_DEFAULT = "org.apache.samza.system.azureblob.AzureBlobClientBuilderFactory";

public static final String SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "useTokenCredentialAuthentication";
private static final boolean SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION_DEFAULT = false;
Expand Down Expand Up @@ -172,6 +174,11 @@ public String getAzureBlobWriterFactoryClassName(String systemName) {
return get(String.format(SYSTEM_WRITER_FACTORY_CLASS_NAME, systemName), SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT);
}

public String getAzureBlobClientBuilderFactoryClassName(String systemName) {
return get(String.format(SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME, systemName),
SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME_DEFAULT);
}

public int getMaxFlushThresholdSize(String systemName) {
return getInt(String.format(SYSTEM_MAX_FLUSH_THRESHOLD_SIZE, systemName), SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.azureblob;

import com.azure.storage.blob.BlobServiceAsyncClient;


/**
* Create a BlobServiceAsyncClient. Implementation controls construction of underlying client.
* Customers implementing their own System Producer need to ensure thread safety of following impl for generation of client.
* If org.apache.samza.system.azureblob.producer.AzureBlobSystemProducer is used, it by defaults allows only one thread to create the client.
* Please ensure any client implementation of this interface to be thread safe as well.
* AzureBlobSystemProducer also ensures to safely close the client on call of stop(). Please ensure to close clients if using this interface
* to create your own client.
*/
public interface BlobClientBuilder {
/**
* Create a client for uploading to Azure Blob Storage
* @return New instance of {@link BlobServiceAsyncClient}
*/
BlobServiceAsyncClient getBlobServiceAsyncClient();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.azureblob;

/**
* Constructs a new instance of type {@link BlobClientBuilder}.
* Implementation controls construction of underlying instance.
*/
public interface BlobClientBuilderFactory {
/**
* Create a new instance of {@link BlobClientBuilder}
* @param systemName Name of the system for which the blob client builder is being created
* @param azureUrlFormat The format of the Azure URL, which should conform to Azure's URL formatting requirements.
* @param azureBlobConfig The configuration settings for Azure Blob, encapsulated in an {@link AzureBlobConfig} object.
* This includes metadata details for Azure Blob configs.
* @return New instance of {@link BlobClientBuilder}
*/
BlobClientBuilder getBlobClientBuilder(String systemName, String azureUrlFormat, AzureBlobConfig azureBlobConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemProducerException;
import org.apache.samza.system.azureblob.AzureBlobClientBuilder;
import org.apache.samza.system.azureblob.AzureBlobConfig;
import org.apache.samza.system.azureblob.BlobClientBuilderFactory;
import org.apache.samza.system.azureblob.compression.CompressionFactory;
import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -116,6 +116,7 @@ public class AzureBlobSystemProducer implements SystemProducer {
// Map of writers indexed first by sourceName and then by (streamName, partitionName) or just streamName if partition key does not exist.
private final Map<String, Map<String, AzureBlobWriter>> writerMap;
private final AzureBlobWriterFactory writerFactory;
private final BlobClientBuilderFactory clientFactory;

private final int blockFlushThresholdSize;
private final long flushTimeoutMs;
Expand Down Expand Up @@ -143,6 +144,13 @@ public AzureBlobSystemProducer(String systemName, AzureBlobConfig config, Metric
this.systemName = systemName;
this.config = config;

String clientFactoryClassName = this.config.getAzureBlobClientBuilderFactoryClassName(this.systemName);
try {
this.clientFactory = (BlobClientBuilderFactory) Class.forName(clientFactoryClassName).newInstance();
} catch (Exception e) {
throw new SystemProducerException("Could not create blob client factory with name " + clientFactoryClassName, e);
}

String writerFactoryClassName = this.config.getAzureBlobWriterFactoryClassName(this.systemName);
try {
this.writerFactory = (AzureBlobWriterFactory) Class.forName(writerFactoryClassName).newInstance();
Expand Down Expand Up @@ -344,8 +352,8 @@ public void flush(String source) {
@VisibleForTesting
void setupAzureContainer() {
try {
BlobServiceAsyncClient storageClient = new AzureBlobClientBuilder(systemName, AZURE_URL, config)
.getBlobServiceAsyncClient();
BlobServiceAsyncClient storageClient =
clientFactory.getBlobClientBuilder(systemName, AZURE_URL, config).getBlobServiceAsyncClient();
validateFlushThresholdSizeSupported(storageClient);

containerAsyncClient = storageClient.getBlobContainerAsyncClient(systemName);
Expand Down

0 comments on commit 2a94cad

Please sign in to comment.