Skip to content

Commit

Permalink
Azure Storage: Azure storage support #3253 (#3254)
Browse files Browse the repository at this point in the history
* Setting up module for Azure storage #3253

---------

Co-authored-by: Shubham Girdhar <[email protected]>
  • Loading branch information
sfali and girdharshubham authored Oct 3, 2024
1 parent 6ef874c commit 4049dca
Show file tree
Hide file tree
Showing 51 changed files with 5,548 additions and 0 deletions.
46 changes: 46 additions & 0 deletions azure-storage/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
alpakka {
azure-storage {
api-version = "2024-11-04"
api-version = ${?AZURE_STORAGE_API_VERSION}
signing-algorithm = "HmacSHA256"

# for local testing via emulator
# endpoint-url = ""

#azure-credentials
credentials {
# valid values are anon (annonymous), SharedKey, and sas
authorization-type = anon
authorization-type = ${?AZURE_STORAGE_AUTHORIZATION_TYPE}

# required for all authorization types
account-name = ""
account-name = ${?AZURE_STORAGE_ACCOUNT_NAME}

# Account key is required for SharedKey or SharedKeyLite authorization
account-key = none
account-key = ${?AZURE_STORAGE_ACCOUNT_KEY}

# SAS token for sas authorization
sas-token = ""
sas-token = ${?AZURE_STORAGE_SAS_TOKEN}
}
#azure-credentials

# Default settings corresponding to automatic retry of requests in an Azure Blob Storage stream.
retry-settings {
# The maximum number of additional attempts (following transient errors) that will be made to process a given
# request before giving up.
max-retries = 3

# The minimum delay between request retries.
min-backoff = 200ms

# The maximum delay between request retries.
max-backoff = 10s

# Random jitter factor applied to retry delay calculation.
random-factor = 0.0
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage

import akka.stream.Attributes
import akka.stream.Attributes.Attribute

/**
* Akka Stream attributes that are used when materializing AzureStorage stream blueprints.
*/
object StorageAttributes {

/**
* Settings to use for the Azure Blob Storage stream
*/
def settings(settings: StorageSettings): Attributes = Attributes(StorageSettingsValue(settings))

/**
* Config path which will be used to resolve required AzureStorage settings
*/
def settingsPath(path: String): Attributes = Attributes(StorageSettingsPath(path))

/**
* Default settings
*/
def defaultSettings: Attributes = Attributes(StorageSettingsPath.Default)
}

final class StorageSettingsPath private (val path: String) extends Attribute

object StorageSettingsPath {
val Default: StorageSettingsPath = StorageSettingsPath(StorageSettings.ConfigPath)

def apply(path: String) = new StorageSettingsPath(path)
}

final class StorageSettingsValue private (val settings: StorageSettings) extends Attribute

object StorageSettingsValue {
def apply(settings: StorageSettings) = new StorageSettingsValue(settings)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage

import akka.http.scaladsl.model.StatusCode

import scala.util.{Failure, Success, Try}
import scala.xml.{Elem, NodeSeq, XML}

final case class StorageException(statusCode: StatusCode,
errorCode: String,
errorMessage: String,
resourceName: Option[String],
resourceValue: Option[String],
reason: Option[String])
extends RuntimeException(errorMessage) {

override def toString: String =
s"""StorageException(
|statusCode=$statusCode,
| errorCode=$errorCode,
| errorMessage=$errorMessage,
| resourceName=$resourceName,
| resourceValue=$resourceValue,
| reason=$reason
|)""".stripMargin.replaceAll(System.lineSeparator(), "")
}

object StorageException {
def apply(statusCode: StatusCode,
errorCode: String,
errorMessage: String,
resourceName: Option[String],
resourceValue: Option[String],
reason: Option[String]): StorageException =
new StorageException(statusCode, errorCode, errorMessage, resourceName, resourceValue, reason)

def apply(response: String, statusCode: StatusCode): StorageException = {
def getOptionalValue(xmlResponse: Elem, elementName: String, fallBackElementName: Option[String]) = {
val element = xmlResponse \ elementName
val node =
if (element.nonEmpty) element
else if (fallBackElementName.nonEmpty) xmlResponse \ fallBackElementName.get
else NodeSeq.Empty

emptyStringToOption(node.text)
}

Try {
val utf8_bom = "\uFEFF"
val sanitizedResponse = if (response.startsWith(utf8_bom)) response.substring(1) else response
val xmlResponse = XML.loadString(sanitizedResponse)
StorageException(
statusCode = statusCode,
errorCode = (xmlResponse \ "Code").text,
errorMessage = (xmlResponse \ "Message").text,
resourceName = getOptionalValue(xmlResponse, "QueryParameterName", Some("HeaderName")),
resourceValue = getOptionalValue(xmlResponse, "QueryParameterValue", Some("HeaderValue")),
reason = getOptionalValue(xmlResponse, "Reason", Some("AuthenticationErrorDetail"))
)
} match {
case Failure(ex) =>
val errorMessage = emptyStringToOption(ex.getMessage)
StorageException(
statusCode = statusCode,
errorCode = errorMessage.getOrElse("null"),
errorMessage = emptyStringToOption(response).orElse(errorMessage).getOrElse("null"),
resourceName = None,
resourceValue = None,
reason = None
)
case Success(value) => value
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage

import akka.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}

/**
* Manages one [[StorageSettings]] per `ActorSystem`.
*/
final class StorageExt private (sys: ExtendedActorSystem) extends Extension {
val settings: StorageSettings = settings(StorageSettings.ConfigPath)

def settings(prefix: String): StorageSettings = StorageSettings(sys.settings.config.getConfig(prefix))
}

object StorageExt extends ExtensionId[StorageExt] with ExtensionIdProvider {
override def lookup: StorageExt.type = StorageExt
override def createExtension(system: ExtendedActorSystem) = new StorageExt(system)

/**
* Java API.
* Get the Storage extension with the classic actors API.
*/
override def get(system: ActorSystem): StorageExt = super.apply(system)

/**
* Java API.
* Get the Storage extension with the new actors API.
*/
override def get(system: ClassicActorSystemProvider): StorageExt = super.apply(system)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage
package headers

import akka.annotation.InternalApi
import akka.http.scaladsl.model.HttpHeader
import akka.http.scaladsl.model.headers.RawHeader

import java.security.MessageDigest
import java.util.{Base64, Objects}

sealed abstract class ServerSideEncryption {
@InternalApi private[storage] def headers: Seq[HttpHeader]
}

object ServerSideEncryption {
def customerKey(key: String, hash: Option[String]): ServerSideEncryption = new CustomerKey(key, hash)
def customerKey(key: String): ServerSideEncryption = customerKey(key, None)
}

final class CustomerKey private[headers] (val key: String, val hash: Option[String] = None)
extends ServerSideEncryption {
override private[storage] def headers: Seq[HttpHeader] = Seq(
RawHeader("x-ms-encryption-algorithm", "AES256"),
RawHeader("x-ms-encryption-key", key),
RawHeader("x-ms-encryption-key-sha256", hash.getOrElse(createHash))
)

override def equals(obj: Any): Boolean =
obj match {
case other: CustomerKey => key == other.key && hash == other.hash
case _ => false
}

override def hashCode(): Int = Objects.hash(key, hash)

override def toString: String =
s"""ServerSideEncryption.CustomerKeys(
|key=$key,
| hash=$hash
|)
|""".stripMargin.replaceAll(System.lineSeparator(), "")

private def createHash = {
val messageDigest = MessageDigest.getInstance("SHA-256")
val decodedKey = messageDigest.digest(Base64.getDecoder.decode(key))
Base64.getEncoder.encodeToString(decodedKey)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage
package headers

import akka.annotation.InternalApi
import akka.http.scaladsl.model.{ContentType, HttpHeader}
import akka.http.scaladsl.model.headers.{CustomHeader, RawHeader}

private[storage] case class CustomContentTypeHeader(contentType: ContentType) extends CustomHeader {
override def name(): String = "Content-Type"

override def value(): String = contentType.value

override def renderInRequests(): Boolean = true

override def renderInResponses(): Boolean = true
}

private[storage] case class CustomContentLengthHeader(contentLength: Long) extends CustomHeader {
override def name(): String = "Content-Length"

override def value(): String = contentLength.toString

override def renderInRequests(): Boolean = true

override def renderInResponses(): Boolean = true
}

private[storage] case class BlobTypeHeader(blobType: String) {
@InternalApi private[storage] def header: HttpHeader = RawHeader(BlobTypeHeaderKey, blobType)
}

object BlobTypeHeader {
private[storage] val BlockBlobHeader = new BlobTypeHeader(BlockBlobType)
private[storage] val PageBlobHeader = new BlobTypeHeader(PageBlobType)
private[storage] val AppendBlobHeader = new BlobTypeHeader(AppendBlobType)
}

private[storage] case class RangeWriteTypeHeader(headerName: String, writeType: String) {
@InternalApi private[storage] def header: HttpHeader = RawHeader(headerName, writeType)
}

object RangeWriteTypeHeader {
private[storage] val UpdateFileHeader = new RangeWriteTypeHeader(FileWriteTypeHeaderKey, "update")
private[storage] val ClearFileHeader = new RangeWriteTypeHeader(FileWriteTypeHeaderKey, "clear")
private[storage] val UpdatePageHeader = new RangeWriteTypeHeader(PageWriteTypeHeaderKey, "update")
private[storage] val ClearPageHeader = new RangeWriteTypeHeader(PageWriteTypeHeaderKey, "clear")
}
Loading

0 comments on commit 4049dca

Please sign in to comment.