Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure mediakind #797

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -24,48 +24,24 @@ service {
}

# Azure Media Service Config
azure {
location = "centralindia"
tenant = "tenant"
subscription_id = "subscription id "

login {
endpoint="https://login.microsoftonline.com"
}

azure_mediakind{
project_name="subscriptionname"
auth_token="authToken"
account_name="media servcie accountname"
api {
endpoint="https://management.azure.com"
version = "2018-07-01"
endpoint="https://api.mk.io/api"
}

account_name = "account name"
resource_group_name = "group name"

transform {
default = "media_transform_default"
hls = "media_transform_hls"
}

stream {
base_url = "https://sunbirdspikemedia-inct.streaming.media.azure.net"
base_url = "https://ep-default-mkservicepoc.japaneast.streaming.mediakind.com"
endpoint_name = "default"
protocol = "Hls"
policy_name = "Predefined_ClearStreamingOnly"
}

token {
client_key = "client key"
client_secret = "client secret"
}
}

azure_tenant="tenant"
azure_subscription_id="subscription id"
azure_account_name="account name"
azure_resource_group_name="group name"
azure_token_client_key="client key"
azure_token_client_secret="client secret"

# CSP Name. e.g: aws or azure
media_service_type="oci"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.sunbird.job.videostream.helpers

object AzureRequestBody {

val create_asset = " {\"properties\": {\"description\": \"assetDescription\",\"alternateId\" : \"assetId\"}}"
val create_asset = " {\"properties\": {\"storageAccountName\": \"assetStorageAccountName\",\"description\": \"assetDescription\",\"alternateId\" : \"assetId\"}}"
val submit_job = "{\"properties\": {\"input\": {\"@odata.type\": \"#Microsoft.Media.JobInputHttp\",\"baseUri\": \"baseInputUrl\",\"files\": [\"inputVideoFile\"]},\"outputs\": [{\"@odata.type\": \"#Microsoft.Media.JobOutputAsset\",\"assetName\": \"assetId\"}]}}"
val create_stream_locator="{\"properties\":{\"assetName\": \"assetId\",\"streamingPolicyName\": \"policyName\"}}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,7 @@ import scala.collection.immutable.HashMap
abstract class AzureMediaService extends IMediaService {

private var API_ACCESS_TOKEN: String = ""

private def getToken()(implicit config: VideoStreamGeneratorConfig, httpUtil: HttpUtil): String = {
val tenant = config.getSystemConfig("azure.tenant")
val clientKey = config.getSystemConfig("azure.token.client_key")
val clientSecret = config.getSystemConfig("azure.token.client_secret")
val loginUrl = config.getConfig("azure.login.endpoint") + "/" + tenant + "/oauth2/token"

val data = Map[String, String](
"grant_type" -> "client_credentials",
"client_id" -> clientKey,
"client_secret" -> clientSecret,
"resource" -> "https://management.core.windows.net/"
)

val header = Map[String, String](
"Content-Type" -> "application/x-www-form-urlencoded",
"Keep-Alive" -> "true"
)

val response:MediaResponse = Response.getResponse(httpUtil.post_map(loginUrl, data, header))
if(response.responseCode == "OK"){
response.result("access_token").asInstanceOf[String]
} else {
throw new Exception("Error while getting the azure access token::"+JSONUtil.serialize(response))
}
}


protected def getJobDetails(jobId: String)(implicit config: VideoStreamGeneratorConfig, httpUtil: HttpUtil): MediaResponse = {
val url = getApiUrl("job").replace("jobIdentifier", jobId)
val response:MediaResponse = Response.getResponse(httpUtil.get(url, getDefaultHeader()))
Expand All @@ -51,9 +25,11 @@ abstract class AzureMediaService extends IMediaService {
}

protected def createAsset(assetId: String, jobId: String)(implicit config: VideoStreamGeneratorConfig, httpUtil: HttpUtil): MediaResponse = {
val accountName: String = config.getConfig("azure_mediakind.account_name")
val url = getApiUrl("asset").replace("assetId", assetId)
val requestBody = AzureRequestBody.create_asset.replace("assetId", assetId)
.replace("assetDescription", "Output Asset for " + jobId)
.replace("assetStorageAccountName", accountName)
val response:MediaResponse = Response.getResponse(httpUtil.put(url, requestBody, getDefaultHeader()))
if(response.responseCode == "OK"){
response
Expand All @@ -64,7 +40,7 @@ abstract class AzureMediaService extends IMediaService {

protected def createStreamingLocator(streamingLocatorName: String, assetName: String)(implicit config: VideoStreamGeneratorConfig, httpUtil: HttpUtil): MediaResponse = {
val url = getApiUrl("stream_locator").replace("streamingLocatorName", streamingLocatorName)
val streamingPolicyName = config.getConfig("azure.stream.policy_name")
val streamingPolicyName = config.getConfig("azure_mediakind.stream.policy_name")
val reqBody = AzureRequestBody.create_stream_locator.replace("assetId", assetName).replace("policyName", streamingPolicyName)
Response.getResponse(httpUtil.put(url, reqBody, getDefaultHeader()))
}
Expand All @@ -90,42 +66,34 @@ abstract class AzureMediaService extends IMediaService {
}

protected def getApiUrl(apiName: String)(implicit config: VideoStreamGeneratorConfig, httpUtil: HttpUtil): String = {
val subscriptionId: String = config.getSystemConfig("azure.subscription_id")
val resourceGroupName: String = config.getSystemConfig("azure.resource_group_name")
val accountName: String = config.getSystemConfig("azure.account_name")
val apiVersion: String = config.getConfig("azure.api.version")
val transformName: String = config.getConfig("azure.transform.default")
val transformName: String = config.getConfig("azure_mediakind.transform.default")
val projectName:String = config.getConfig("azure_mediakind.project_name")

val baseUrl: String = new StringBuilder().append(config.getConfig("azure.api.endpoint")+"/subscriptions/")
.append(subscriptionId)
.append("/resourceGroups/")
.append(resourceGroupName)
.append("/providers/Microsoft.Media/mediaServices/")
.append(accountName).mkString
val baseUrl: String = new StringBuilder().append(config.getConfig("azure_mediakind.api.endpoint")+"/ams/")
.append(projectName).mkString


apiName.toLowerCase() match {
case "asset" => baseUrl + "/assets/assetId?api-version=" + apiVersion
case "job" => baseUrl + "/transforms/" + transformName + "/jobs/jobIdentifier?api-version=" + apiVersion
case "stream_locator" => baseUrl + "/streamingLocators/streamingLocatorName?api-version=" + apiVersion
case "list_paths" => baseUrl + "/streamingLocators/streamingLocatorName/listPaths?api-version=" + apiVersion
case "asset" => baseUrl + "/assets/assetId"
case "job" => baseUrl + "/transforms/" + transformName + "/jobs/jobIdentifier"
case "stream_locator" => baseUrl + "/streamingLocators/streamingLocatorName"
case "list_paths" => baseUrl + "/streamingLocators/streamingLocatorName/listPaths"
case _ => throw new MediaServiceException("ERR_INVALID_API_NAME", "Please Provide Valid Media Service API Name")
}
}

protected def getDefaultHeader()(implicit config: VideoStreamGeneratorConfig, httpUtil: HttpUtil): Map[String, String] = {
val accessToken = if (StringUtils.isNotBlank(API_ACCESS_TOKEN)) API_ACCESS_TOKEN else getToken()
val authToken = "Bearer " + accessToken
val authToken = config.getConfig("azure_mediakind.auth_token")
HashMap[String, String](
"Content-Type" -> "application/json",
"Accept" -> "application/json",
"Authorization" -> authToken
"content-type" -> "application/json",
"accept" -> "application/json",
"x-mkio-token" -> authToken
)
}

protected def prepareStreamingUrl(streamLocatorName: String, jobId: String)(implicit config: VideoStreamGeneratorConfig, httpUtil: HttpUtil): Map[String, AnyRef] = {
val streamType = config.getConfig("azure.stream.protocol")
val streamHost = config.getConfig("azure.stream.base_url")
val streamType = config.getConfig("azure_mediakind.stream.protocol")
val streamHost = config.getConfig("azure_mediakind.stream.base_url")
var url = ""
val listPathResponse = getStreamUrls(streamLocatorName)
if (listPathResponse.responseCode.equalsIgnoreCase("OK")) {
Expand All @@ -146,4 +114,4 @@ abstract class AzureMediaService extends IMediaService {
HashMap[String, AnyRef]("streamUrl" -> streamUrl)
}
}
}
}
21 changes: 19 additions & 2 deletions video-stream-generator/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ threshold.batch.write.size = 4
}
}

azure_mediakind{
project_name="subscriptionname"
auth_token="authToken"
account_name="media servcie accountname"
api {
endpoint="https://api.mk.io/api"
}
transform {
default = "media_transform_default"
}
stream {
base_url = "https://ep-default-mkservicepoc.japaneast.streaming.mediakind.com"
endpoint_name = "default"
protocol = "Hls"
policy_name = "Predefined_ClearStreamingOnly"
}
}

azure_tenant="test_tenant"
azure_subscription_id="test_id"
azure_account_name="test_account_name"
Expand All @@ -70,5 +88,4 @@ azure_token_client_secret="test_client_secret"
elasticsearch.service.endpoint="test_service_endpoint"
elasticsearch.index.compositesearch.name="test_compositesearch_name"

media_service_type="azure"

media_service_type="azure"
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class VideoStreamGeneratorTaskTestSpec extends BaseTestSpec {
ignore should "submit a job" in {
when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new VideoStreamGeneratorMapSource)

when(mockHttpUtil.post_map(contains("/oauth2/token"), any[Map[String, AnyRef]](), any[Map[String, String]]())).thenReturn(HTTPResponse(200, accessTokenResp))
when(mockHttpUtil.put(contains("/providers/Microsoft.Media/mediaServices/"+jobConfig.getSystemConfig("azure.account.name")+"/assets/asset-"), anyString(), any())).thenReturn(HTTPResponse(200, assetJson))
// when(mockHttpUtil.post_map(contains("/oauth2/token"), any[Map[String, AnyRef]](), any[Map[String, String]]())).thenReturn(HTTPResponse(200, accessTokenResp))
when(mockHttpUtil.put(contains(jobConfig.getConfig("azure_mediakind.project_name")+"/assets/asset-"), anyString(), any())).thenReturn(HTTPResponse(200, assetJson))
when(mockHttpUtil.put(contains("transforms/media_transform_default/jobs"), anyString(), any())).thenReturn(HTTPResponse(200, submitJobJson))
when(mockHttpUtil.get(contains("transforms/media_transform_default/jobs"), any())).thenReturn(HTTPResponse(200, getJobJson))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,13 @@ class VideoStreamServiceTestSpec extends BaseTestSpec {
}

"VideoStreamService" should "submit job request" in {
when(mockHttpUtil.post_map(contains("/oauth2/token"), any[Map[String, AnyRef]](), any[Map[String, String]]())).thenReturn(HTTPResponse(200, accessTokenResp))
when(mockHttpUtil.put(contains("/providers/Microsoft.Media/mediaServices/"+jobConfig.getSystemConfig("azure.account.name")+"/assets/asset-"), anyString(), any())).thenReturn(HTTPResponse(200, assetJson))
// when(mockHttpUtil.post_map(contains("/oauth2/token"), any[Map[String, AnyRef]](), any[Map[String, String]]())).thenReturn(HTTPResponse(200, accessTokenResp))
when(mockHttpUtil.put(contains(jobConfig.getConfig("azure_mediakind.project_name")+"/assets/asset-"), anyString(), any())).thenReturn(HTTPResponse(200, assetJson))
when(mockHttpUtil.put(contains("transforms/media_transform_default/jobs"), anyString(), any())).thenReturn(HTTPResponse(200, submitJobJson))
when(mockHttpUtil.get(contains("transforms/media_transform_default/jobs"), any())).thenReturn(HTTPResponse(200, getJobJson))

when(mockHttpUtil.post(contains("/streamingLocators/sl-do_3126597193576939521910_1605816926271/listPaths?api-version="), any(), any())).thenReturn(HTTPResponse(200, getStreamUrlJson))
when(mockHttpUtil.put(contains("/streamingLocators/sl-do_3126597193576939521910_1605816926271?api-version="), any(), any())).thenReturn(HTTPResponse(400, getJobJson))
when(mockHttpUtil.get(contains("/streamingLocators/sl-do_3126597193576939521910_1605816926271?api-version="), any())).thenReturn(HTTPResponse(200, getStreamLocatorJson))
when(mockHttpUtil.post(contains("/streamingLocators/sl-do_3126597193576939521910_1605816926271/listPaths"), any(), any())).thenReturn(HTTPResponse(200, getStreamUrlJson))
when(mockHttpUtil.put(contains("/streamingLocators/sl-do_3126597193576939521910_1605816926271"), any(), any())).thenReturn(HTTPResponse(400, getJobJson))
when(mockHttpUtil.get(contains("/streamingLocators/sl-do_3126597193576939521910_1605816926271"), any())).thenReturn(HTTPResponse(200, getStreamLocatorJson))
when(mockHttpUtil.patch(contains(jobConfig.contentV4Update), any(), any())).thenReturn(HTTPResponse(200, getJobJson))
doNothing().when(mockMetrics).incCounter(any())

Expand Down
Loading