diff --git a/pom.xml b/pom.xml index d587854..5bcff5c 100644 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,11 @@ 1.7 + + com.azure + azure-core-http-netty + 1.14.2 + org.apache.logging.log4j log4j-api @@ -53,7 +58,7 @@ com.fasterxml.jackson.core jackson-databind - 2.9.10.8 + 2.13.5 org.apache.commons @@ -133,7 +138,20 @@ iot-device-client 1.3.30 - + + + com.azure + azure-sdk-bom + 1.2.23 + pom + import + + + azure-cosmos + com.azure + 4.59.0 + + com.amazonaws @@ -149,7 +167,7 @@ com.fasterxml.jackson.core jackson-core - 2.9.7 + 2.13.5 org.apache.pulsar diff --git a/src/main/java/net/acesinc/data/json/generator/JsonDataGenerator.java b/src/main/java/net/acesinc/data/json/generator/JsonDataGenerator.java index 2f35b8c..973535a 100644 --- a/src/main/java/net/acesinc/data/json/generator/JsonDataGenerator.java +++ b/src/main/java/net/acesinc/data/json/generator/JsonDataGenerator.java @@ -109,6 +109,12 @@ public JsonDataGenerator(String simConfigString) { } break; } + // add a case for ComsoDB + case "cosmosdb": { + log.info("Adding CosmosDB Logger with properties: " + elProps); + loggers.add(new CosmosDBLogger(elProps)); + break; + } } } if (loggers.isEmpty()) { diff --git a/src/main/java/net/acesinc/data/json/generator/log/CosmosDBLogger.java b/src/main/java/net/acesinc/data/json/generator/log/CosmosDBLogger.java new file mode 100644 index 0000000..067de80 --- /dev/null +++ b/src/main/java/net/acesinc/data/json/generator/log/CosmosDBLogger.java @@ -0,0 +1,56 @@ +package net.acesinc.data.json.generator.log; + +import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosContainer; +import com.azure.cosmos.CosmosDatabase; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class CosmosDBLogger implements EventLogger { + + private static final Logger log = LogManager.getLogger(CosmosDBLogger.class); + private CosmosClient cosmosClient; + private CosmosDatabase cosmosDatabase; + private CosmosContainer cosmosContainer; + private final ObjectMapper mapper = new ObjectMapper(); + + public CosmosDBLogger(Map props) { + String uri = (String) props.get("uri"); + String key = (String) props.get("key"); + String databaseName = (String) props.get("databaseName"); + String containerName = (String) props.get("containerName"); + + cosmosClient = new CosmosClientBuilder() + .endpoint(uri) + .key(key) + .buildClient(); + + cosmosDatabase = cosmosClient.getDatabase(databaseName); + cosmosContainer = cosmosDatabase.getContainer(containerName); + } + + @Override + public void logEvent(String event, Map producerConfig) { + try { + JsonNode jsonNode = mapper.readTree(event); + CosmosItemResponse item = cosmosContainer.createItem(jsonNode); + log.info("Document added to Cosmos DB with request charge of " + item.getRequestCharge() + " within session " + item.getSessionToken()); + } catch (Exception e) { + log.error("Error inserting JSON data into Cosmos DB", e); + } + } + + @Override + public void shutdown() { + if (cosmosClient != null) { + cosmosClient.close(); + log.info("Cosmos DB client closed successfully"); + } + } +}