Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.

Resolve #363 - Feature/token resolver #372

Open
wants to merge 5 commits into
base: 2.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ limitations under the License.
<exclude>com.microsoft.azure.cosmosdb.internal.query.metrics.*</exclude>
<exclude>com.microsoft.azure.cosmosdb.internal.query.orderbyquery.*</exclude>
<exclude>com.microsoft.azure.cosmosdb.internal.routing.*</exclude>
<exclude>com.microsoft.azure.cosmosdb.TokenResolver</exclude>
<exclude>com.microsoft.azure.cosmosdb.CosmosResourceType</exclude>
</excludes>
</relocation>
<relocation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,20 @@ import com.microsoft.azure.cosmosdb._
import com.microsoft.azure.cosmosdb.internal._
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRowConverter
import com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBWriteStreamRetryPolicy
import org.apache.spark.sql.Row

import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.ClassTag

import java.util.concurrent.ConcurrentHashMap

import com.microsoft.azure.cosmosdb.spark.util.CosmosUtils

case class AsyncClientConfiguration(host: String,
key: String,
connectionPolicy: ConnectionPolicy,
consistencyLevel: ConsistencyLevel)
consistencyLevel: ConsistencyLevel,
tokenResolver: CosmosDBTokenResolver)

object AsyncCosmosDBConnection {
private lazy val clients: ConcurrentHashMap[Config, AsyncDocumentClient] = {
Expand Down Expand Up @@ -103,13 +104,26 @@ object AsyncCosmosDBConnection {
val consistencyLevel = ConsistencyLevel.valueOf(config.get[String](CosmosDBConfig.ConsistencyLevel)
.getOrElse(CosmosDBConfig.DefaultConsistencyLevel))

val resourceToken = config.getOrElse[String](CosmosDBConfig.ResourceToken, "")
var resourceKey: String = null

// Check Resource Token and Token Resolver
var tokenResolver: CosmosDBTokenResolver = null
val tokenResolverClassName = config.getOrElse[String](CosmosDBConfig.TokenResolver, "")

if (!tokenResolverClassName.isEmpty) {
tokenResolver = CosmosUtils.getTokenResolverFromClassName(tokenResolverClassName)
tokenResolver.initialize(config)
} else {
val resourceToken = config.getOrElse[String](CosmosDBConfig.ResourceToken, "")
resourceKey = config.getOrElse[String](CosmosDBConfig.Masterkey, resourceToken)
}

AsyncClientConfiguration(
config.get[String](CosmosDBConfig.Endpoint).get,
config.getOrElse[String](CosmosDBConfig.Masterkey, resourceToken),
resourceKey,
connectionPolicy,
consistencyLevel
consistencyLevel,
tokenResolver
)
}

Expand All @@ -126,6 +140,7 @@ object AsyncCosmosDBConnection {
.withMasterKeyOrResourceToken(clientConfig.key)
.withConnectionPolicy(clientConfig.connectionPolicy)
.withConsistencyLevel(clientConfig.consistencyLevel)
.withTokenResolver(clientConfig.tokenResolver)
.build()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ package com.microsoft.azure.cosmosdb.spark
import java.lang.management.ManagementFactory
import java.util.{Timer, TimerTask}

import com.microsoft.azure.cosmosdb.{CosmosResourceType}
import com.microsoft.azure.cosmosdb.spark.config._
import com.microsoft.azure.documentdb
import com.microsoft.azure.cosmosdb.spark.util.CosmosUtils
import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor
import com.microsoft.azure.documentdb.internal._
Expand All @@ -43,7 +44,8 @@ case class ClientConfiguration(host: String,
key: String,
connectionPolicy: ConnectionPolicy,
consistencyLevel: ConsistencyLevel,
resourceLink: String)
resourceLink: String,
tokenResolver: CosmosDBTokenResolver)

object CosmosDBConnection extends CosmosDBLoggingTrait {
// For verification purpose
Expand Down Expand Up @@ -518,19 +520,31 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
val consistencyLevel = ConsistencyLevel.valueOf(config.get[String](CosmosDBConfig.ConsistencyLevel)
.getOrElse(CosmosDBConfig.DefaultConsistencyLevel))

//Check if resource token exists
val resourceToken = config.getOrElse[String](CosmosDBConfig.ResourceToken, "")
var resourceLink: String = ""
if(!resourceToken.isEmpty) {
resourceLink = s"dbs/${config.get[String](CosmosDBConfig.Database).get}/colls/${config.get[String](CosmosDBConfig.Collection).get}"
// check Token Resolver before checking resource token
var resourceLink = s"dbs/${config.get[String](CosmosDBConfig.Database).get}/colls/${config.get[String](CosmosDBConfig.Collection).get}"
var resourceToken = config.getOrElse(CosmosDBConfig.ResourceToken, "")

var tokenResolver: CosmosDBTokenResolver = null
val tokenResolverClassName = config.getOrElse[String](CosmosDBConfig.TokenResolver, "")

if (!tokenResolverClassName.isEmpty) {
tokenResolver = CosmosUtils.getTokenResolverFromClassName(tokenResolverClassName)
tokenResolver.initialize(config)
resourceToken = tokenResolver.getAuthorizationToken("GET", resourceLink, CosmosResourceType.DocumentCollection, config.asOptions)
}

if(resourceToken.isEmpty) {
resourceLink = ""
}

ClientConfiguration(
config.get[String](CosmosDBConfig.Endpoint).get,
config.getOrElse[String](CosmosDBConfig.Masterkey, resourceToken),
connectionPolicy,
consistencyLevel,
resourceLink)
resourceLink,
tokenResolver
)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.microsoft.azure.cosmosdb.spark

import com.microsoft.azure.cosmosdb.spark.config.Config
import com.microsoft.azure.cosmosdb.TokenResolver

trait CosmosDBTokenResolver extends TokenResolver {
def initialize(config: Config): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ object CosmosDBConfig {
val Collection = "collection"
val Masterkey = "masterkey"
val ResourceToken = "resourcetoken"
val TokenResolver = "tokenresolver"

val PreferredRegionsList = "preferredregions"
val ConsistencyLevel = "consistencylevel"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.microsoft.azure.cosmosdb.spark.util

import com.microsoft.azure.cosmosdb.spark.CosmosDBTokenResolver

object CosmosUtils extends Serializable {

def getTokenResolverFromClassName(className: String, constructorArgs: AnyRef*): CosmosDBTokenResolver = {
val argsClassSeq = constructorArgs.map(e => e.getClass)
Class.forName(className).getDeclaredConstructor(argsClassSeq:_*).newInstance(constructorArgs:_*).asInstanceOf[CosmosDBTokenResolver]
}

}