Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nicholas/2023/combine export event #187

Merged
merged 7 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ public async Task Run(
try
{
log.LogInformation(logPrefix() + $"Service Bus queue trigger function processed a message: {resourceCreatedMessage.ToString()}");
//string eventMessage = eventData.EventBody.ToString();
//FhirResourceCreated resourceCreatedMessage = JsonConvert.DeserializeObject<List<FhirResourceCreated>>(eventMessage).Single<FhirResourceCreated>();

//AuthConfig authConfig = AuthConfig.ReadFromEnvironmentVariables();

// GET FHIR RESOURCE SECTION
string requestUrl = $"{configuration["BaseFhirUrl"]}/{resourceCreatedMessage.data.resourceType}/{resourceCreatedMessage.data.resourceFhirId}/_history/{resourceCreatedMessage.data.resourceVersionId}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
<PackageReference Include="Polly" Version="7.2.3" />
<PackageReference Include="Polly.Extensions.Http" Version="3.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\CDC.DEX.FHIR.Function.SharedCode\CDC.DEX.FHIR.Function.SharedCode.csproj" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using Azure.Core;
using Azure.Identity;
using Azure.Storage.Blobs;
using CDC.DEX.FHIR.Function.SharedCode.Models;
using CDC.DEX.FHIR.Function.SharedCode.Util;
using JsonFlatten;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
Expand All @@ -11,6 +13,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;

namespace CDC.DEX.FHIR.Function.ProcessExport
Expand Down Expand Up @@ -41,115 +44,152 @@ public ProcessExport(IHttpClientFactory httpClientFactory, IConfiguration config
/// <param name="log">Function logger</param>
[FunctionName("ProcessExport")]
public async Task Run(
[ServiceBusTrigger("fhirexportqueue", Connection = "FhirServiceBusConnectionString")] string fhirResourceToProcess,
[ServiceBusTrigger("fhireventqueue", Connection = "FhirServiceBusConnectionString")] FhirResourceCreated resourceCreatedMessage,
ILogger log)
{
var exceptions = new List<Exception>();

try
{
log.LogInformation(logPrefix() + $"Service Bus queue trigger function processed a message: {fhirResourceToProcess.ToString()}");
//EVENT SECTION

//FeatureFlagConfig featureFlagConfig = FeatureFlagConfig.ReadFromEnvironmentVariables();
bool flagFhirResourceCreatedExportFunctionFlatten = bool.Parse(configuration["Export:FlattenExport"]);
bool flagFhirResourceCreatedExportFunctionUnbundle = bool.Parse(configuration["Export:UnbundleExport"]);
log.LogInformation(logPrefix() + $"Service Bus queue trigger function processed a message: {resourceCreatedMessage.ToString()}");

string requestUrl = $"{configuration["BaseFhirUrl"]}/{resourceCreatedMessage.data.resourceType}/{resourceCreatedMessage.data.resourceFhirId}/_history/{resourceCreatedMessage.data.resourceVersionId}";

JObject fhirResourceToProcessJObject;

using (HttpClient client = httpClientFactory.CreateClient())
using (var request = new HttpRequestMessage(HttpMethod.Get, requestUrl))
{
// get auth token
string token = await FhirServiceUtils.GetFhirServerToken(configuration, client);

JObject jObject = JObject.Parse(fhirResourceToProcess);
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
request.Headers.Add("Ocp-Apim-Subscription-Key", configuration["OcpApimSubscriptionKey"]);

Dictionary<string, string> filesToWrite = new Dictionary<string, string>();
var response = await client.SendAsync(request);

if (jObject["resourceType"] != null && jObject["resourceType"].Value<string>() == "Bundle" && flagFhirResourceCreatedExportFunctionUnbundle)
{
// is a bundle and we will need to unbundle
List<JObject> unbundledFhirObjects = UnbundleFhirBundle(jObject);
response.EnsureSuccessStatusCode();

foreach (JObject subObject in unbundledFhirObjects)
{
if (flagFhirResourceCreatedExportFunctionFlatten)
{
//flatten
string flattenedJson = FlattenJsonResource(subObject);

string pathToWrite = subObject["resourceType"].Value<string>();
//get profile data for sorting bundles
pathToWrite += "/" + jObject["id"].Value<string>();
pathToWrite += "_" + subObject["id"].Value<string>();
filesToWrite.Add(pathToWrite, flattenedJson.ToString());
}
else
{
//no flatten
string pathToWrite = subObject["resourceType"].Value<string>();
//get profile data for sorting bundles
pathToWrite += "/" + subObject["id"].Value<string>();
filesToWrite.Add(pathToWrite, subObject.ToString());
}
}
}
else
{
// a single entry no need to unbundle
string jsonString = await response.Content.ReadAsStringAsync();

log.LogInformation(logPrefix() + $"FHIR Record details returned from FHIR service: {jsonString}");

fhirResourceToProcessJObject = JObject.Parse(jsonString);

}

//EXPORT SECTION

//log.LogInformation(logPrefix() + $"Service Bus queue trigger function processed a message: {fhirResourceToProcess.ToString()}");

bool flagFhirResourceCreatedExportFunctionFlatten = bool.Parse(configuration["Export:FlattenExport"]);
bool flagFhirResourceCreatedExportFunctionUnbundle = bool.Parse(configuration["Export:UnbundleExport"]);

Dictionary<string, string> filesToWrite = new Dictionary<string, string>();

if (fhirResourceToProcessJObject["resourceType"] != null && fhirResourceToProcessJObject["resourceType"].Value<string>() == "Bundle" && flagFhirResourceCreatedExportFunctionUnbundle)
{
// is a bundle and we will need to unbundle
List<JObject> unbundledFhirObjects = UnbundleFhirBundle(fhirResourceToProcessJObject);

foreach (JObject subObject in unbundledFhirObjects)
{
if (flagFhirResourceCreatedExportFunctionFlatten)
{
//flatten
string flattenedJson = FlattenJsonResource(jObject);
string flattenedJson = FlattenJsonResource(subObject);

string pathToWrite = jObject["resourceType"].Value<string>();
string pathToWrite = subObject["resourceType"].Value<string>();
//get profile data for sorting bundles
if (jObject["resourceType"].Value<string>() == "Bundle")
{
string profilePath = jObject["meta"]["profile"][0].Value<string>();
profilePath = profilePath.Substring(profilePath.LastIndexOf("/"));
pathToWrite += "/" + profilePath;
}
pathToWrite += "/" + jObject["id"].Value<string>();
pathToWrite += "/" + fhirResourceToProcessJObject["id"].Value<string>();
pathToWrite += "_" + subObject["id"].Value<string>();
filesToWrite.Add(pathToWrite, flattenedJson.ToString());
}
else
{
string pathToWrite = jObject["resourceType"].Value<string>();
//no flatten
string pathToWrite = subObject["resourceType"].Value<string>();
//get profile data for sorting bundles
if (jObject["resourceType"].Value<string>() == "Bundle")
{
string profilePath = jObject["meta"]["profile"][0].Value<string>();
profilePath = profilePath.Substring(profilePath.LastIndexOf("/"));
pathToWrite += "/" + profilePath;
}
pathToWrite += "/" + jObject["id"].Value<string>();
filesToWrite.Add(pathToWrite, jObject.ToString());
pathToWrite += "/" + subObject["id"].Value<string>();
filesToWrite.Add(pathToWrite, subObject.ToString());
}
}
}
else
{
// a single entry no need to unbundle

if (flagFhirResourceCreatedExportFunctionFlatten)
{
//flatten
string flattenedJson = FlattenJsonResource(fhirResourceToProcessJObject);

string pathToWrite = fhirResourceToProcessJObject["resourceType"].Value<string>();
//get profile data for sorting bundles
if (fhirResourceToProcessJObject["resourceType"].Value<string>() == "Bundle")
{
string profilePath = fhirResourceToProcessJObject["meta"]["profile"][0].Value<string>();
profilePath = profilePath.Substring(profilePath.LastIndexOf("/"));
pathToWrite += "/" + profilePath;
}
pathToWrite += "/" + fhirResourceToProcessJObject["id"].Value<string>();
filesToWrite.Add(pathToWrite, flattenedJson.ToString());
}
else
{
string pathToWrite = fhirResourceToProcessJObject["resourceType"].Value<string>();
//get profile data for sorting bundles
if (fhirResourceToProcessJObject["resourceType"].Value<string>() == "Bundle")
{
string profilePath = fhirResourceToProcessJObject["meta"]["profile"][0].Value<string>();
profilePath = profilePath.Substring(profilePath.LastIndexOf("/"));
pathToWrite += "/" + profilePath;
}
pathToWrite += "/" + fhirResourceToProcessJObject["id"].Value<string>();
filesToWrite.Add(pathToWrite, fhirResourceToProcessJObject.ToString());
}
}

// END GET FHIR RESOURCE SECTION
// FOR CONNECTATHON ALWAYS MAKE A FLATTEN VERSION, IN A SEPERATE DIRECTORY
string flattenedJsonTemp = FlattenJsonResource(fhirResourceToProcessJObject);

// START WRITING TO DATA LAKE SECTION
string pathToWriteTemp = "Flatten/"+fhirResourceToProcessJObject["resourceType"].Value<string>();
//get profile data for sorting bundles
if (fhirResourceToProcessJObject["resourceType"].Value<string>() == "Bundle")
{
string profilePath = fhirResourceToProcessJObject["meta"]["profile"][0].Value<string>();
profilePath = profilePath.Substring(profilePath.LastIndexOf("/"));
pathToWriteTemp += "/" + profilePath;
}
pathToWriteTemp += "/" + fhirResourceToProcessJObject["id"].Value<string>();
filesToWrite.Add(pathToWriteTemp, flattenedJsonTemp.ToString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bennettn4 will be we writting the flattened file twice if the feature flag is on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, yes
we will need to rework the exact mechanics of writing to the datalake once the live data lake is available to us, it's looking like we will need to push data to it via an API but the datalake team hasn't finalized yet

// CONNECTATHON ADDITIONAL FLATTEN END

string accountName = configuration["Export:DatalakeStorageAccount"];

TokenCredential credential = new DefaultAzureCredential();
// END GET FHIR RESOURCE SECTION

string blobUri = "https://" + accountName + ".blob.core.windows.net";
// START WRITING TO DATA LAKE SECTION

BlobServiceClient blobServiceClient = new BlobServiceClient(new Uri(blobUri), credential);
string accountName = configuration["Export:DatalakeStorageAccount"];

BlobContainerClient blobContainerClient = blobServiceClient.GetBlobContainerClient(configuration["Export:DatalakeBlobContainer"]);
TokenCredential credential = new DefaultAzureCredential();

foreach (var keyValPair in filesToWrite)
{
BlobClient blobClient = blobContainerClient.GetBlobClient($"{keyValPair.Key}.json");
log.LogInformation(logPrefix() + $"Writing data to file {keyValPair.Key}.json: \n {keyValPair.Value}");
await blobClient.UploadAsync(BinaryData.FromString($"{keyValPair.Value}"), true);
}
string blobUri = "https://" + accountName + ".blob.core.windows.net";

// END WRITING TO DATA LAKE SECTION
BlobServiceClient blobServiceClient = new BlobServiceClient(new Uri(blobUri), credential);

}
BlobContainerClient blobContainerClient = blobServiceClient.GetBlobContainerClient(configuration["Export:DatalakeBlobContainer"]);

foreach (var keyValPair in filesToWrite)
{
BlobClient blobClient = blobContainerClient.GetBlobClient($"{keyValPair.Key}.json");
log.LogInformation(logPrefix() + $"Writing data to file {keyValPair.Key}.json: \n {keyValPair.Value}");
await blobClient.UploadAsync(BinaryData.FromString($"{keyValPair.Value}"), true);
}

// END WRITING TO DATA LAKE SECTION

await Task.Yield();
}
Expand Down Expand Up @@ -209,18 +249,6 @@ public string FlattenJsonResource(JObject jsonToFlatten)

return JsonConvert.SerializeObject(flattenedObject);

//StringBuilder contentToWriteToFile = new StringBuilder();
//contentToWriteToFile.Append("{ \n");
//foreach (var keyValPair in flattenedObject)
//{
// contentToWriteToFile.Append($"\"{keyValPair.Key}\":\"{keyValPair.Value}\",");
// //clean newline from strings
// contentToWriteToFile.Replace("\n", " ").Replace("\r", " ");
// //contentToWriteToFile.Append('\n');
//}
//contentToWriteToFile.Append('}');
//
//return contentToWriteToFile.ToString();
}

private string logPrefix()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
https://go.microsoft.com/fwlink/?LinkID=208121.
-->
<Project>
<PropertyGroup>
<WebPublishMethod>ZipDeploy</WebPublishMethod>
<PublishProvider>AzureWebSite</PublishProvider>
<LastUsedBuildConfiguration>Release</LastUsedBuildConfiguration>
<LastUsedPlatform>Any CPU</LastUsedPlatform>
<SiteUrlToLaunchAfterPublish>https://fhir-service-event-export-combined.azurewebsites.net</SiteUrlToLaunchAfterPublish>
<LaunchSiteAfterPublish>false</LaunchSiteAfterPublish>
<ResourceId>/subscriptions/df479416-a3f3-42b4-97ab-0a0a2b788ba3/resourcegroups/cdc-dmi-dev-rg-eastus/providers/Microsoft.Web/sites/fhir-service-event-export-combined</ResourceId>
<UserName>$fhir-service-event-export-combined</UserName>
<_SavePWD>true</_SavePWD>
<PublishUrl>https://fhir-service-event-export-combined.scm.azurewebsites.net/</PublishUrl>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public async Task<IActionResult> Run(
JsonNode data;
string jsonString;

bool flagProcessMessageFunctionSkipValidate = bool.Parse(configuration["FunctionProcessMessage:SkipValidation"]);

data = JsonSerializer.Deserialize<JsonNode>(req.Body);
jsonString = data.ToString();

Expand All @@ -52,9 +54,19 @@ public async Task<IActionResult> Run(

log.LogInformation("ProcessMessage validation done with result: " + validateReportingBundleResult.JsonString);

bool isValid = !validateReportingBundleResult.JsonString.Contains("\"severity\":\"error\"");
//JsonNode validationNode = JsonNode.Parse(validateReportingBundleResult.JsonString);
//bool isValid = validationNode["issue"][0]["diagnostics"].ToString() == "All OK";
bool isValid;
if (flagProcessMessageFunctionSkipValidate)
{
log.LogInformation("Skipping ProcessMessage Validation");
isValid = true;
}
else
{
isValid = !validateReportingBundleResult.JsonString.Contains("\"severity\":\"error\"");
}

ContentResult contentResult = new ContentResult();
contentResult.ContentType = "application/fhir+json";

if (isValid)
{
Expand All @@ -64,11 +76,15 @@ public async Task<IActionResult> Run(

data["entry"][1]["resource"] = JsonNode.Parse(postResult.JsonString);

return new OkObjectResult(data.ToJsonString());
contentResult.Content = data.ToJsonString();
contentResult.StatusCode = 200;
return contentResult;
}
else
{
return new BadRequestObjectResult(validateReportingBundleResult.JsonString);
contentResult.Content = validateReportingBundleResult.JsonString;
contentResult.StatusCode = 400;
return contentResult;
}

}
Expand Down