From 094f500badde1629168dc2422dd6fbf2345ec7c7 Mon Sep 17 00:00:00 2001 From: Timothy Tschampel Date: Wed, 5 Jun 2024 13:38:47 -0700 Subject: [PATCH 1/4] adding NanoTDF support --- README.md | 8 +- .../io/opentdf/nifi/AbstractTDFProcessor.java | 38 +++++- .../io/opentdf/nifi/AbstractToProcessor.java | 76 +++++++++++ .../io/opentdf/nifi/ConvertFromNanoTDF.java | 40 ++++++ .../java/io/opentdf/nifi/ConvertFromZTDF.java | 48 +++---- .../io/opentdf/nifi/ConvertToNanoTDF.java | 85 ++++++++++++ .../java/io/opentdf/nifi/ConvertToZTDF.java | 92 +++---------- .../org.apache.nifi.processor.Processor | 4 +- .../opentdf/nifi/ConvertFromNanoTDFTest.java | 118 ++++++++++++++++ .../io/opentdf/nifi/ConvertFromZTDFTest.java | 15 +-- .../io/opentdf/nifi/ConvertToNanoTDFTest.java | 126 ++++++++++++++++++ .../io/opentdf/nifi/ConvertToZTDFTest.java | 16 +-- .../src/test/java/io/opentdf/nifi/Utils.java | 28 ++++ 13 files changed, 558 insertions(+), 136 deletions(-) create mode 100644 nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java create mode 100644 nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java create mode 100644 nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java create mode 100644 nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromNanoTDFTest.java create mode 100644 nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertToNanoTDFTest.java create mode 100644 nifi-tdf-processors/src/test/java/io/opentdf/nifi/Utils.java diff --git a/README.md b/README.md index e68c156..33c93b7 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,12 @@ Integration of the [OpenTDF Platform](https://github.com/opentdf/platform) into Components: * "Zero Trust Data Format" (ZTDF) Processors: - * [ConvertToZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java): A NiFi processor that converts FlowFile content to TDF format. Does not currently support assertions - * [ConvertFromZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java): A NiFi processor that converts TDF formatted FlowFile content to it's plaintext representation + * [ConvertToZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java): A NiFi processor that converts FlowFile content to ZTDF format. Does not currently support assertions + * [ConvertFromZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java): A NiFi processor that converts ZTDF formatted FlowFile content to it's plaintext representation +* NanoTDF Processors ([See NanoTDF Specification](https://github.com/opentdf/spec/tree/main/schema/nanotdf#readme)): + * [ConvertToNanoTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java): A NiFi processor that converts FlowFile content to NanoTDF format. Does not currently support assertions + * [ConvertFromNanoTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java): A NiFi processor that converts NanoTDF formatted FlowFile content to it's plaintext representation + * Controller Services: * [OpenTDFControllerService](./nifi-tdf-controller-services-api/src/main/java/io/opentdf/nifi/OpenTDFControllerService.java): A NiFi controller service providing OpenTDF Platform Configuration diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractTDFProcessor.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractTDFProcessor.java index 36266e9..666daad 100644 --- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractTDFProcessor.java +++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractTDFProcessor.java @@ -1,7 +1,9 @@ package io.opentdf.nifi; +import io.opentdf.platform.sdk.NanoTDF; import io.opentdf.platform.sdk.SDK; import io.opentdf.platform.sdk.SDKBuilder; +import io.opentdf.platform.sdk.TDF; import org.apache.commons.io.IOUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -11,6 +13,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; @@ -19,9 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; +import java.util.*; /** * Common helper processor @@ -133,4 +134,35 @@ byte[] readEntireFlowFile(FlowFile flowFile, ProcessSession processSession) { processSession.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer)); return buffer; } + + @Override + public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException { + List flowFiles = processSession.get(processContext.getProperty(FLOWFILE_PULL_SIZE).asInteger()); + if (!flowFiles.isEmpty()) { + processFlowFiles(processContext, processSession, flowFiles); + } + } + + /** + * Process the flow files pulled using pull size + * @param processContext NiFi process context + * @param processSession Nifi process session + * @param flowFiles List of FlowFile from the process session up to pull size limit + * @throws ProcessException Processing Exception + */ + abstract void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List flowFiles) throws ProcessException; + + TDF getTDF() { + return new TDF(); + } + + NanoTDF getNanoTDF(){ + return new NanoTDF(); + } + + @Override + public List getSupportedPropertyDescriptors() { + return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE)); + } + } diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java new file mode 100644 index 0000000..6c21e4c --- /dev/null +++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java @@ -0,0 +1,76 @@ +package io.opentdf.nifi; + +import io.opentdf.platform.sdk.Config; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Common utilities for a processor converting content to one of the TDF formats + */ +public abstract class AbstractToProcessor extends AbstractTDFProcessor{ + static final String KAS_URL_ATTRIBUTE = "kas_url"; + static final String TDF_ATTRIBUTE = "tdf_attribute"; + + public static final PropertyDescriptor KAS_URL = new org.apache.nifi.components.PropertyDescriptor.Builder() + .name("KAS URL") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .description("The KAS Url to use for encryption; this is a default if the kas_url attribute is not present in the flow file") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + @Override + public List getSupportedPropertyDescriptors() { + return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE, KAS_URL)); + } + + /**{ + * Get the kas urls from a flowfile attribute or if none present fallback to processor configuration KAS URL; + * format is a comma separated list + * @param flowFile + * @param processContext + * @return + * @throws Exception + */ + List getKasUrl(FlowFile flowFile, ProcessContext processContext) throws Exception{ + String kasUrlAttribute = flowFile.getAttribute(KAS_URL_ATTRIBUTE); + //check kas url + if (!processContext.getProperty(KAS_URL).isSet() && kasUrlAttribute == null) { + throw new Exception("no " + KAS_URL_ATTRIBUTE + " flowfile attribute and no default KAS URL configured"); + } + String kasUrlValues = kasUrlAttribute != null ? kasUrlAttribute : getPropertyValue(processContext.getProperty(KAS_URL)).getValue(); + List kasUrls = Arrays.stream(kasUrlValues.split(",")).filter(x->!x.isEmpty()).collect(Collectors.toList()); + if (kasUrlValues.isEmpty()){ + throw new Exception("no KAS Urls provided"); + } + return kasUrls; + } + + List getKASInfoFromKASURLs(List kasUrls){ + return kasUrls.stream().map(x->{ var ki = new Config.KASInfo(); ki.URL=x; return ki;}).collect(Collectors.toList()); + } + + /** + * Get data attributes on a FlowFile from attribute value + * @param flowFile + * @return + * @throws Exception + */ + Set getDataAttributes(FlowFile flowFile) throws Exception{ + Set dataAttributes = Arrays.stream((flowFile.getAttribute(TDF_ATTRIBUTE) == null ? "" : + flowFile.getAttribute(TDF_ATTRIBUTE)).split(",")).filter(x -> !x.isEmpty()).collect(Collectors.toSet()); + if (dataAttributes.isEmpty()) { + throw new Exception("no data attributes provided via " + TDF_ATTRIBUTE + " flowfile attribute"); + } + return dataAttributes; + } +} diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java new file mode 100644 index 0000000..63f4d6a --- /dev/null +++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java @@ -0,0 +1,40 @@ +package io.opentdf.nifi; + +import io.opentdf.platform.sdk.SDK; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +@CapabilityDescription("Decrypts NanoTDF flow file content") +@Tags({"NanoTDF", "OpenTDF", "Decrypt", "Data Centric Security"}) +public class ConvertFromNanoTDF extends AbstractTDFProcessor { + + @Override + public void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List flowFiles) throws ProcessException { + SDK sdk = getTDFSDK(processContext); + for (FlowFile flowFile : flowFiles) { + try { + byte[] nanoTDFBytes = readEntireFlowFile(flowFile, processSession); + FlowFile updatedFlowFile = processSession.write(flowFile, outputStream -> { + try { + getNanoTDF().readNanoTDF(ByteBuffer.wrap(nanoTDFBytes), outputStream, sdk.getServices().kas()); + } catch (Exception e) { + getLogger().error("error decrypting NanoTDF", e); + throw new IOException(e); + } + }); + processSession.transfer(updatedFlowFile, REL_SUCCESS); + } catch (Exception e) { + getLogger().error(flowFile.getId() + ": error decrypting flowfile", e); + processSession.transfer(flowFile, REL_FAILURE); + } + } + } +} \ No newline at end of file diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java index 374f2d6..8a58a66 100644 --- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java +++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java @@ -1,7 +1,6 @@ package io.opentdf.nifi; import io.opentdf.platform.sdk.SDK; -import io.opentdf.platform.sdk.TDF; import org.apache.commons.compress.utils.SeekableInMemoryByteChannel; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -23,38 +22,25 @@ public class ConvertFromZTDF extends AbstractTDFProcessor { @Override - public List getSupportedPropertyDescriptors() { - return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE)); - } - - - @Override - public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException { - List flowFiles = processSession.get(processContext.getProperty(FLOWFILE_PULL_SIZE).asInteger()); - if (!flowFiles.isEmpty()) { - SDK sdk = getTDFSDK(processContext); - for (FlowFile flowFile : flowFiles) { - try { - try (SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readEntireFlowFile(flowFile, processSession))) { - FlowFile updatedFlowFile = processSession.write(flowFile, outputStream -> { - try { - getTDF().loadTDF(seekableByteChannel, outputStream, sdk.getServices().kas()); - } catch (Exception e) { - getLogger().error("error decrypting ZTDF", e); - throw new IOException(e); - } - }); - processSession.transfer(updatedFlowFile, REL_SUCCESS); - } - } catch (Exception e) { - getLogger().error(flowFile.getId() + ": error decrypting flowfile", e); - processSession.transfer(flowFile, REL_FAILURE); + void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List flowFiles) throws ProcessException { + SDK sdk = getTDFSDK(processContext); + for (FlowFile flowFile : flowFiles) { + try { + try (SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readEntireFlowFile(flowFile, processSession))) { + FlowFile updatedFlowFile = processSession.write(flowFile, outputStream -> { + try { + getTDF().loadTDF(seekableByteChannel, outputStream, sdk.getServices().kas()); + } catch (Exception e) { + getLogger().error("error decrypting ZTDF", e); + throw new IOException(e); + } + }); + processSession.transfer(updatedFlowFile, REL_SUCCESS); } + } catch (Exception e) { + getLogger().error(flowFile.getId() + ": error decrypting flowfile", e); + processSession.transfer(flowFile, REL_FAILURE); } } } - - TDF getTDF() { - return new TDF(); - } } diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java new file mode 100644 index 0000000..8ce0e55 --- /dev/null +++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java @@ -0,0 +1,85 @@ +package io.opentdf.nifi; + +import io.opentdf.platform.sdk.Config; +import io.opentdf.platform.sdk.SDK; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.stream.io.StreamUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@CapabilityDescription("Transforms flow file content into a NanoTDF") +@Tags({"NanoTDF", "OpenTDF", "Encrypt", "Data Centric Security"}) +@ReadsAttributes(value = { + @ReadsAttribute(attribute = "kas_url", description = "The Key Access Server (KAS) URL used TDF Creation. This overrides " + + "the KAS URL property of this processor."), + @ReadsAttribute(attribute = "tdf_attribute", description = "A comma separated list of data attributes added " + + "to created TDF Data Policy. e.g. http://example.org/attr/foo/value/bar,http://example.org/attr/foo/value/bar2") +}) +public class ConvertToNanoTDF extends AbstractToProcessor { + + public static final Relationship REL_FLOWFILE_EXCEEDS_NANO_SIZE = new Relationship.Builder() + .name("exceeds_size_limit") + .description("NanoTDF creation failed due to the content size exceeding the max NanoTDF size threshold") + .build(); + + static final long MAX_SIZE = 16777218; + + @Override + public Set getRelationships() { + return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_FLOWFILE_EXCEEDS_NANO_SIZE)); + } + + + @Override + public void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List flowFiles) throws ProcessException { + SDK sdk = getTDFSDK(processContext); + for (final FlowFile flowFile : flowFiles) { + try { + var kasInfoList = getKASInfoFromKASURLs(getKasUrl(flowFile, processContext)); + Set dataAttributes = getDataAttributes(flowFile); + Config.NanoTDFConfig config = Config.newNanoTDFConfig( + Config.withNanoKasInformation(kasInfoList.toArray(new Config.KASInfo[0])), + Config.witDataAttributes(dataAttributes.toArray(new String[0])) + ); + + if (flowFile.getSize() >MAX_SIZE){ + getLogger().error(flowFile.getId() + ": error converting plain text to NanoTDF; content length of " + flowFile.getSize() + " > " + MAX_SIZE); + processSession.transfer(flowFile, REL_FLOWFILE_EXCEEDS_NANO_SIZE); + }else { + + //write NanoTDF to FlowFile + FlowFile updatedFlowFile = processSession.write(flowFile, (inputStream, outputStream) -> { + try { + byte[] bytes = new byte[(int) flowFile.getSize()]; + StreamUtils.fillBuffer(inputStream, bytes); + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + getNanoTDF().createNanoTDF(byteBuffer, outputStream, config, sdk.getServices().kas()); + } catch (Exception e) { + getLogger().error("error creating NanoTDF", e); + throw new IOException(e); + } + } + ); + processSession.transfer(updatedFlowFile, REL_SUCCESS); + } + } catch (Exception e) { + getLogger().error(flowFile.getId() + ": error converting plain text to NanoTDF", e); + processSession.transfer(flowFile, REL_FAILURE); + } + } + } + +} diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java index 9cc698a..f20d925 100644 --- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java +++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java @@ -3,25 +3,16 @@ import io.opentdf.platform.sdk.Config; import io.opentdf.platform.sdk.Config.TDFConfig; import io.opentdf.platform.sdk.SDK; -import io.opentdf.platform.sdk.TDF; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.StreamCallback; -import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -34,71 +25,32 @@ @ReadsAttribute(attribute = "tdf_attribute", description = "A comma separated list of data attributes added " + "to created TDF Data Policy. e.g. http://example.org/attr/foo/value/bar,http://example.org/attr/foo/value/bar2") }) -public class ConvertToZTDF extends AbstractTDFProcessor { - static final String KAS_URL_ATTRIBUTE = "kas_url"; - static final String TDF_ATTRIBUTE = "tdf_attribute"; - - public static final PropertyDescriptor KAS_URL = new org.apache.nifi.components.PropertyDescriptor.Builder() - .name("KAS URL") - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .description("The KAS Url to use for encryption; this is a default if the kas_url attribute is not present in the flow file") - .required(false) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .build(); - - @Override - public List getSupportedPropertyDescriptors() { - return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE, KAS_URL)); - } +public class ConvertToZTDF extends AbstractToProcessor { @Override - public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException { - List flowFiles = processSession.get(processContext.getProperty(FLOWFILE_PULL_SIZE).asInteger()); - if (!flowFiles.isEmpty()) { - SDK sdk = getTDFSDK(processContext); - for (final FlowFile flowFile : flowFiles) { - try { - //check kas url - String kasUrlAttribute = flowFile.getAttribute(KAS_URL_ATTRIBUTE); - if (!processContext.getProperty(KAS_URL).isSet() && kasUrlAttribute == null) { - throw new Exception("no " + KAS_URL_ATTRIBUTE + " flowfile attribute and no default KAS URL configured"); - } else { - String kasUrl = kasUrlAttribute != null ? kasUrlAttribute : getPropertyValue(processContext.getProperty(KAS_URL)).getValue(); - var kasInfo = new Config.KASInfo(); - kasInfo.URL = kasUrl; - Set dataAttributes = Arrays.stream((flowFile.getAttribute(TDF_ATTRIBUTE) == null ? "" : - flowFile.getAttribute(TDF_ATTRIBUTE)).split(",")).filter(x -> !x.isEmpty()).collect(Collectors.toSet()); - if (dataAttributes.isEmpty()) { - throw new Exception("no data attributes provided via " + TDF_ATTRIBUTE + " flowfile attribute"); - } else { - TDFConfig config = Config.newTDFConfig(Config.withKasInformation(kasInfo), - Config.withDataAttributes(dataAttributes.toArray(new String[0]))); - //write tdf to flowfile - final long size = flowFile.getSize(); - FlowFile updatedFlowFile = processSession.write(flowFile, new StreamCallback() { - @Override - public void process(InputStream inputStream, OutputStream outputStream) throws IOException { - try { - getTDF().createTDF(inputStream, outputStream, config, sdk.getServices().kas()); - } catch (Exception e) { - getLogger().error("error creating tdf", e); - throw new IOException(e); - } - } - } - ); - processSession.transfer(updatedFlowFile, REL_SUCCESS); + void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List flowFiles) throws ProcessException { + SDK sdk = getTDFSDK(processContext); + for (final FlowFile flowFile : flowFiles) { + try { + var kasInfoList = getKASInfoFromKASURLs( getKasUrl(flowFile, processContext)); + Set dataAttributes = getDataAttributes(flowFile); + TDFConfig config = Config.newTDFConfig(Config.withKasInformation(kasInfoList.toArray(new Config.KASInfo[0])), + Config.withDataAttributes(dataAttributes.toArray(new String[0]))); + //write ZTDF to FlowFile + FlowFile updatedFlowFile = processSession.write(flowFile, (inputStream, outputStream) -> { + try { + getTDF().createTDF(inputStream, outputStream, config, sdk.getServices().kas()); + } catch (Exception e) { + getLogger().error("error creating ZTDF", e); + throw new IOException(e); + } } - } - } catch (Exception e) { - getLogger().error(flowFile.getId() + ": error converting plain text to TDF", e); - processSession.transfer(flowFile, REL_FAILURE); - } + ); + processSession.transfer(updatedFlowFile, REL_SUCCESS); + } catch (Exception e) { + getLogger().error(flowFile.getId() + ": error converting plain text to ZTDF", e); + processSession.transfer(flowFile, REL_FAILURE); } } } - - TDF getTDF() { - return new TDF(); - } } diff --git a/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 138c940..1411d19 100644 --- a/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -1,2 +1,4 @@ io.opentdf.nifi.ConvertFromZTDF -io.opentdf.nifi.ConvertToZTDF \ No newline at end of file +io.opentdf.nifi.ConvertToZTDF +io.opentdf.nifi.ConvertFromNanoTDF +io.opentdf.nifi.ConvertToNanoTDF \ No newline at end of file diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromNanoTDFTest.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromNanoTDFTest.java new file mode 100644 index 0000000..0a7cd8e --- /dev/null +++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromNanoTDFTest.java @@ -0,0 +1,118 @@ +package io.opentdf.nifi; + +import io.opentdf.platform.sdk.NanoTDF; +import io.opentdf.platform.sdk.SDK; +import io.opentdf.platform.sdk.SDKBuilder; +import nl.altindag.ssl.util.KeyStoreUtils; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +class ConvertFromNanoTDFTest { + + SDK mockSDK; + NanoTDF mockNanoTDF; + + @BeforeEach + void setup() { + mockSDK = mock(SDK.class); + mockNanoTDF = mock(NanoTDF.class); + } + + @Test + public void testConvertFromTDF() throws Exception { + TestRunner runner = TestRunners.newTestRunner(MockRunner.class); + SDKBuilder mockSDKBuilder = mock(SDKBuilder.class); + ((MockRunner) runner.getProcessor()).mockNanoTDF = mockNanoTDF; + ((MockRunner) runner.getProcessor()).mockSDKBuilder = mockSDKBuilder; + Utils.setupTDFControllerService(runner); + + //add ssl context + SSLContextService sslContextService = mock(SSLContextService.class); + File trustStoreFile = Files.createTempFile("trust", "jks").toFile(); + final String TRUST_STORE_PATH = trustStoreFile.getAbsolutePath(); + final String TRUST_STORE_PASSWORD = "foo"; + when(sslContextService.validate(any())).thenReturn(Collections.emptyList()); + when(sslContextService.getTrustStoreFile()).thenReturn(TRUST_STORE_PATH); + when(sslContextService.getTrustStorePassword()).thenReturn(TRUST_STORE_PASSWORD); + + try (FileOutputStream fos = new FileOutputStream(trustStoreFile)) { + KeyStoreUtils.createKeyStore().store(fos, TRUST_STORE_PASSWORD.toCharArray()); + } + when(sslContextService.getIdentifier()).thenReturn(AbstractTDFProcessor.SSL_CONTEXT_SERVICE.getName()); + runner.addControllerService(AbstractTDFProcessor.SSL_CONTEXT_SERVICE.getName(), sslContextService, new HashMap<>()); + runner.enableControllerService(sslContextService); + runner.setProperty(AbstractTDFProcessor.SSL_CONTEXT_SERVICE, AbstractTDFProcessor.SSL_CONTEXT_SERVICE.getName()); + + + runner.assertValid(); + + SDK.Services mockServices = mock(SDK.Services.class); + SDK.KAS mockKAS = mock(SDK.KAS.class); + when(mockSDK.getServices()).thenReturn(mockServices); + when(mockServices.kas()).thenReturn(mockKAS); + when(mockSDKBuilder.platformEndpoint("http://platform")).thenReturn(mockSDKBuilder); + when(mockSDKBuilder.clientSecret("my-client", "123-456")).thenReturn(mockSDKBuilder); + when(mockSDKBuilder.sslFactoryFromKeyStore(TRUST_STORE_PATH, TRUST_STORE_PASSWORD)).thenReturn(mockSDKBuilder); + when(mockSDKBuilder.build()).thenReturn(mockSDK); + + ArgumentCaptor byteBufferCapture = ArgumentCaptor.forClass(ByteBuffer.class); + ArgumentCaptor outputStreamArgumentCaptor = ArgumentCaptor.forClass(OutputStream.class); + ArgumentCaptor kasArgumentCaptor = ArgumentCaptor.forClass(SDK.KAS.class); + + doAnswer(invocationOnMock -> { + ByteBuffer byteBuffer = invocationOnMock.getArgument(0); + OutputStream outputStream = invocationOnMock.getArgument(1); + SDK.KAS kas = invocationOnMock.getArgument(2); + outputStream.write(("Decrypted:" + new String(byteBuffer.array())).getBytes()); + assertNotNull(kas, "KAS is not null"); + assertSame(mockKAS, kas, "Expected KAS passed in"); + return null; + }).when(mockNanoTDF).readNanoTDF(byteBufferCapture.capture(), + outputStreamArgumentCaptor.capture(), + kasArgumentCaptor.capture()); + MockFlowFile messageOne = runner.enqueue("message one".getBytes()); + MockFlowFile messageTwo = runner.enqueue("message two".getBytes()); + runner.run(1); + List flowFileList = + runner.getFlowFilesForRelationship(ConvertFromNanoTDF.REL_SUCCESS); + assertEquals(2, flowFileList.size(), "Two successful flow files"); + assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageOne.getAttribute("filename"))) + .filter(x -> x.getContent().equals("Decrypted:message one")).count()); + assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageTwo.getAttribute("filename"))) + .filter(x -> x.getContent().equals("Decrypted:message two")).count()); + + } + + public static class MockRunner extends ConvertFromNanoTDF { + NanoTDF mockNanoTDF; + SDKBuilder mockSDKBuilder; + + @Override + SDKBuilder createSDKBuilder() { + return mockSDKBuilder; + } + + @Override + NanoTDF getNanoTDF() { + return mockNanoTDF; + } + } +} \ No newline at end of file diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromZTDFTest.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromZTDFTest.java index 2daf927..afe7c62 100644 --- a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromZTDFTest.java +++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromZTDFTest.java @@ -40,26 +40,13 @@ void setup() { mockTDF = mock(TDF.class); } - void setupTDFControllerService(TestRunner runner) throws Exception { - SimpleOpenTDFControllerService tdfControllerService = new SimpleOpenTDFControllerService(); - Map controllerPropertyMap = new HashMap<>(); - controllerPropertyMap.put(PLATFORM_ENDPOINT.getName(), "http://platform"); - controllerPropertyMap.put(CLIENT_ID.getName(), "my-client"); - controllerPropertyMap.put(CLIENT_SECRET.getName(), "123-456"); - controllerPropertyMap.put(USE_PLAINTEXT.getName(), "false"); - runner.addControllerService(OPENTDF_CONFIG_SERVICE.getName(), tdfControllerService, controllerPropertyMap); - runner.enableControllerService(tdfControllerService); - runner.assertValid(tdfControllerService); - runner.setProperty(OPENTDF_CONFIG_SERVICE.getName(), OPENTDF_CONFIG_SERVICE.getName()); - } - @Test public void testConvertFromTDF() throws Exception { TestRunner runner = TestRunners.newTestRunner(MockRunner.class); SDKBuilder mockSDKBuilder = mock(SDKBuilder.class); ((MockRunner) runner.getProcessor()).mockTDF = mockTDF; ((MockRunner) runner.getProcessor()).mockSDKBuilder = mockSDKBuilder; - setupTDFControllerService(runner); + Utils.setupTDFControllerService(runner); //add ssl context SSLContextService sslContextService = mock(SSLContextService.class); diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertToNanoTDFTest.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertToNanoTDFTest.java new file mode 100644 index 0000000..7bac3ed --- /dev/null +++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertToNanoTDFTest.java @@ -0,0 +1,126 @@ +package io.opentdf.nifi; + +import io.opentdf.platform.sdk.Config; +import io.opentdf.platform.sdk.NanoTDF; +import io.opentdf.platform.sdk.SDK; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class ConvertToNanoTDFTest { + SDK mockSDK; + NanoTDF mockNanoTDF; + + @BeforeEach + void setup() { + mockSDK = mock(SDK.class); + mockNanoTDF = mock(NanoTDF.class); + } + + @Test + void testToNano() throws Exception { + TestRunner runner = TestRunners.newTestRunner(ConvertToNanoTDFTest.MockRunner.class); + ((MockRunner) runner.getProcessor()).mockSDK = mockSDK; + ((MockRunner) runner.getProcessor()).mockNanoTDF = mockNanoTDF; + runner.setProperty(ConvertToZTDF.KAS_URL, "https://kas1"); + Utils.setupTDFControllerService(runner); + runner.assertValid(); + + SDK.Services mockServices = mock(SDK.Services.class); + SDK.KAS mockKAS = mock(SDK.KAS.class); + when(mockSDK.getServices()).thenReturn(mockServices); + when(mockServices.kas()).thenReturn(mockKAS); + + ArgumentCaptor byteBufferArgumentCaptor = ArgumentCaptor.forClass(ByteBuffer.class); + ArgumentCaptor outputStreamArgumentCaptor = ArgumentCaptor.forClass(OutputStream.class); + ArgumentCaptor kasArgumentCaptor = ArgumentCaptor.forClass(SDK.KAS.class); + ArgumentCaptor configArgumentCaptor = ArgumentCaptor.forClass(Config.NanoTDFConfig.class); + + doAnswer(invocationOnMock -> { + ByteBuffer byteBuffer = invocationOnMock.getArgument(0); + OutputStream outputStream = invocationOnMock.getArgument(1); + Config.NanoTDFConfig config = invocationOnMock.getArgument(2); + SDK.KAS kas = invocationOnMock.getArgument(3); + byte[] b = byteBuffer.array(); + outputStream.write(("TDF:" + new String(b)).getBytes()); + assertNotNull(kas, "KAS is not null"); + assertSame(mockKAS, kas, "Expected KAS passed in"); + if (new String(b).equals("message two")) { + assertEquals(2, config.attributes.size()); + assertTrue(config.attributes.containsAll(Arrays.asList("https://example.org/attr/one/value/a", "https://example.org/attr/one/value/b"))); + } else { + assertEquals(1, config.attributes.size()); + assertTrue(config.attributes.contains("https://example.org/attr/one/value/c")); + } + return null; + }).when(mockNanoTDF).createNanoTDF(byteBufferArgumentCaptor.capture(), + outputStreamArgumentCaptor.capture(), + configArgumentCaptor.capture(), + kasArgumentCaptor.capture()); + + //message one has no attribute + MockFlowFile messageOne = runner.enqueue("message one".getBytes()); + //message two has attributes + MockFlowFile messageTwo = runner.enqueue("message two".getBytes(), Map.of(ConvertToNanoTDF.TDF_ATTRIBUTE, + "https://example.org/attr/one/value/a,https://example.org/attr/one/value/b")); + //message three has attributes and kas url override + MockFlowFile messageThree = runner.enqueue("message three".getBytes(), Map.of(ConvertToNanoTDF.TDF_ATTRIBUTE, + "https://example.org/attr/one/value/c", ConvertToNanoTDF.KAS_URL_ATTRIBUTE, "https://kas2")); + + //message four has content length > max length + char[] chars = new char[(int) (ConvertToNanoTDF.MAX_SIZE + 10)]; + Arrays.fill(chars, 'a'); + MockFlowFile messageFour = runner.enqueue(new String(chars).getBytes(), Map.of(ConvertToNanoTDF.TDF_ATTRIBUTE, + "https://example.org/attr/one/value/c", ConvertToNanoTDF.KAS_URL_ATTRIBUTE, "https://kas2")); + + runner.run(1); + List flowFileList = + runner.getFlowFilesForRelationship(ConvertToNanoTDF.REL_SUCCESS); + assertEquals(2, flowFileList.size(), "Two flowfiles for success relationship"); + assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageTwo.getAttribute("filename"))) + .filter(x -> x.getContent().equals("TDF:message two")).count()); + assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageThree.getAttribute("filename"))) + .filter(x -> x.getContent().equals("TDF:message three")).count()); + + + flowFileList = + runner.getFlowFilesForRelationship(ConvertToNanoTDF.REL_FAILURE); + assertEquals(1, flowFileList.size(), "One flowfile for failure relationship"); + assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageOne.getAttribute("filename"))) + .filter(x -> x.getContent().equals("message one")).count()); + + flowFileList = + runner.getFlowFilesForRelationship(ConvertToNanoTDF.REL_FLOWFILE_EXCEEDS_NANO_SIZE); + assertEquals(1, flowFileList.size(), "One flowfile for failure exceeds"); + assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename") + .equals(messageFour.getAttribute("filename"))).count()); + } + + public static class MockRunner extends ConvertToNanoTDF { + SDK mockSDK; + NanoTDF mockNanoTDF; + + @Override + SDK getTDFSDK(ProcessContext processContext) { + return mockSDK; + } + + @Override + NanoTDF getNanoTDF() { + return mockNanoTDF; + } + } +} \ No newline at end of file diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertToZTDFTest.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertToZTDFTest.java index 6e9fea7..e8893c0 100644 --- a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertToZTDFTest.java +++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertToZTDFTest.java @@ -35,27 +35,13 @@ void setup() { mockTDF = mock(TDF.class); } - void setupTDFControllerService(TestRunner runner) throws Exception { - SimpleOpenTDFControllerService tdfControllerService = new SimpleOpenTDFControllerService(); - Map controllerPropertyMap = new HashMap<>(); - controllerPropertyMap.put(PLATFORM_ENDPOINT.getName(), "http://platform"); - controllerPropertyMap.put(CLIENT_ID.getName(), "my-client"); - controllerPropertyMap.put(CLIENT_SECRET.getName(), "123-456"); - controllerPropertyMap.put(USE_PLAINTEXT.getName(), "false"); - runner.addControllerService(OPENTDF_CONFIG_SERVICE.getName(), tdfControllerService, controllerPropertyMap); - runner.enableControllerService(tdfControllerService); - runner.assertValid(tdfControllerService); - runner.setProperty(OPENTDF_CONFIG_SERVICE.getName(), OPENTDF_CONFIG_SERVICE.getName()); - - } - @Test public void testToTDF() throws Exception { TestRunner runner = TestRunners.newTestRunner(MockRunner.class); ((MockRunner) runner.getProcessor()).mockSDK = mockSDK; ((MockRunner) runner.getProcessor()).mockTDF = mockTDF; runner.setProperty(ConvertToZTDF.KAS_URL, "https://kas1"); - setupTDFControllerService(runner); + Utils.setupTDFControllerService(runner); runner.assertValid(); SDK.Services mockServices = mock(SDK.Services.class); diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/Utils.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/Utils.java new file mode 100644 index 0000000..b505c18 --- /dev/null +++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/Utils.java @@ -0,0 +1,28 @@ +package io.opentdf.nifi; + +import org.apache.nifi.util.TestRunner; + +import java.util.HashMap; +import java.util.Map; + +import static io.opentdf.nifi.AbstractTDFProcessor.OPENTDF_CONFIG_SERVICE; +import static io.opentdf.nifi.SimpleOpenTDFControllerService.*; +import static io.opentdf.nifi.SimpleOpenTDFControllerService.USE_PLAINTEXT; + +public class Utils { + + static void setupTDFControllerService(TestRunner runner) throws Exception { + SimpleOpenTDFControllerService tdfControllerService = new SimpleOpenTDFControllerService(); + Map controllerPropertyMap = new HashMap<>(); + controllerPropertyMap.put(PLATFORM_ENDPOINT.getName(), "http://platform"); + controllerPropertyMap.put(CLIENT_ID.getName(), "my-client"); + controllerPropertyMap.put(CLIENT_SECRET.getName(), "123-456"); + controllerPropertyMap.put(USE_PLAINTEXT.getName(), "false"); + runner.addControllerService(OPENTDF_CONFIG_SERVICE.getName(), tdfControllerService, controllerPropertyMap); + runner.enableControllerService(tdfControllerService); + runner.assertValid(tdfControllerService); + runner.setProperty(OPENTDF_CONFIG_SERVICE.getName(), OPENTDF_CONFIG_SERVICE.getName()); + } + + +} From 756c532f8e52d73a32204538c6c478a3ed03ae39 Mon Sep 17 00:00:00 2001 From: Timothy Tschampel Date: Wed, 5 Jun 2024 13:51:52 -0700 Subject: [PATCH 2/4] adding NanoTDF support --- README.md | 46 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 33c93b7..11db22b 100644 --- a/README.md +++ b/README.md @@ -13,37 +13,55 @@ Components: * [OpenTDFControllerService](./nifi-tdf-controller-services-api/src/main/java/io/opentdf/nifi/OpenTDFControllerService.java): A NiFi controller service providing OpenTDF Platform Configuration -#### FlowChart: Generic Plaintext to ZTDF Nifi Flow +#### FlowChart: Generic ZTDF Nifi Flows ```mermaid --- -title: Generic Plaintext to ZTDF NiFi Flow +title: Generic ZTDF NiFi Flows --- flowchart TD - a[FlowFile: \nPlaintext content] + a[Nifi Processor] b["`**UpdateAttribute**`" Add data policy attributes to FlowFile] c["`**ConvertToZTDF**`"] d["Process ZTDF"] e["Handle Error"] - a -- success --> b - b -- success --> c + f[Nifi Processor] + g["`**ConvertFromZTDF**`"] + h[Process Plaintext] + i[Handle Error] + a -- success (content = PlainText) --> b + b -- success (content = PlainText) --> c c -- success (content = ZTDF) --> d c -- failure --> e + f -- success (content = ZTDF) --> g + g -- success (content = PlainText) --> h + g -- failure --> i ``` -#### FlowChart: Generic ZTDF to Plaintext Nifi Flow +#### FlowChart: Generic NanoTDF NiFi Flows ```mermaid --- -title: Generic ZTDF to Plaintext Nifi Flow +title: Generic NanoTDF NiFi Flows --- flowchart TD - a[FlowFile: \nZTDF content] - b["`**ConvertFromZTDF**`"] - c["Process ZTDF"] - d["Handle Error"] - a -- success --> b - b -- success (content = plaintext) --> c - b -- failure --> d + a[Nifi Processor] + b["`**UpdateAttribute**`" Add data policy attributes to FlowFile] + c["`**ConvertToNanoTDF**`"] + d["Process NanoTDF"] + e["Handle Error"] + e2["Handle Max Size Error"] + f[Nifi Processor] + g["`**ConvertFromZTDF**`"] + h[Process Plaintext] + i[Handle Error] + a -- success (content = Plaintext) --> b + b -- success (content = Plaintext)--> c + c -- success (content = NanoTDF) --> d + c -- failure --> e + c -- exceeds_size_limit --> e2 + f -- success (content = NanoTDF) --> g + g -- success (content = Plaintext) --> h + g -- failure --> i ``` # Quick Start - Docker Compose From 093aead1fe12833f89bb0008c4f2d8feabca9bd2 Mon Sep 17 00:00:00 2001 From: Timothy Tschampel Date: Thu, 6 Jun 2024 12:27:06 -0700 Subject: [PATCH 3/4] update for upstream java-sdk api breaking change --- .../java/io/opentdf/nifi/ConvertFromZTDF.java | 5 ++- .../io/opentdf/nifi/ConvertFromZTDFTest.java | 40 ++++++++++++------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java index 8a58a66..fac81cb 100644 --- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java +++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java @@ -1,6 +1,7 @@ package io.opentdf.nifi; import io.opentdf.platform.sdk.SDK; +import io.opentdf.platform.sdk.TDF; import org.apache.commons.compress.utils.SeekableInMemoryByteChannel; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -9,6 +10,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.stream.io.StreamUtils; import java.io.IOException; import java.nio.channels.SeekableByteChannel; @@ -29,7 +31,8 @@ void processFlowFiles(ProcessContext processContext, ProcessSession processSessi try (SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readEntireFlowFile(flowFile, processSession))) { FlowFile updatedFlowFile = processSession.write(flowFile, outputStream -> { try { - getTDF().loadTDF(seekableByteChannel, outputStream, sdk.getServices().kas()); + TDF.Reader reader = getTDF().loadTDF(seekableByteChannel, sdk.getServices().kas()); + reader.readPayload(outputStream); } catch (Exception e) { getLogger().error("error decrypting ZTDF", e); throw new IOException(e); diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromZTDFTest.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromZTDFTest.java index afe7c62..a1205f3 100644 --- a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromZTDFTest.java +++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromZTDFTest.java @@ -3,7 +3,9 @@ import io.opentdf.platform.sdk.SDK; import io.opentdf.platform.sdk.SDKBuilder; import io.opentdf.platform.sdk.TDF; +import io.opentdf.platform.sdk.TDF.Reader; import nl.altindag.ssl.util.KeyStoreUtils; +import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.compress.utils.SeekableInMemoryByteChannel; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.MockFlowFile; @@ -16,15 +18,12 @@ import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; -import static io.opentdf.nifi.AbstractTDFProcessor.OPENTDF_CONFIG_SERVICE; -import static io.opentdf.nifi.SimpleOpenTDFControllerService.*; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -78,19 +77,29 @@ public void testConvertFromTDF() throws Exception { when(mockSDKBuilder.build()).thenReturn(mockSDK); ArgumentCaptor seekableByteChannelArgumentCaptor = ArgumentCaptor.forClass(SeekableByteChannel.class); - ArgumentCaptor outputStreamArgumentCaptor = ArgumentCaptor.forClass(OutputStream.class); ArgumentCaptor kasArgumentCaptor = ArgumentCaptor.forClass(SDK.KAS.class); + Reader mockReader = mock(Reader.class); + + ArgumentCaptor outputStreamArgumentCaptor = ArgumentCaptor.forClass(OutputStream.class); + List messages = new ArrayList<>(); + + final AtomicInteger ai = new AtomicInteger(0); + doAnswer(invocationOnMock -> { + OutputStream outputStream = invocationOnMock.getArgument(0); + outputStream.write(("Decrypted: Decrypted message " + ai.incrementAndGet()).getBytes()); + return null; + }).when(mockReader).readPayload(outputStreamArgumentCaptor.capture()); doAnswer(invocationOnMock -> { SeekableInMemoryByteChannel seekableByteChannel = invocationOnMock.getArgument(0); - OutputStream outputStream = invocationOnMock.getArgument(1); - SDK.KAS kas = invocationOnMock.getArgument(2); - outputStream.write(("Decrypted:" + new String(seekableByteChannel.array())).getBytes()); + ByteBuffer bb = ByteBuffer.allocate((int)seekableByteChannel.size()); + seekableByteChannel.read(bb); + messages.add(new String(bb.array())); + SDK.KAS kas = invocationOnMock.getArgument(1); assertNotNull(kas, "KAS is not null"); assertSame(mockKAS, kas, "Expected KAS passed in"); - return null; + return mockReader; }).when(mockTDF).loadTDF(seekableByteChannelArgumentCaptor.capture(), - outputStreamArgumentCaptor.capture(), kasArgumentCaptor.capture()); MockFlowFile messageOne = runner.enqueue("message one".getBytes()); MockFlowFile messageTwo = runner.enqueue("message two".getBytes()); @@ -99,10 +108,13 @@ public void testConvertFromTDF() throws Exception { runner.getFlowFilesForRelationship(ConvertFromZTDF.REL_SUCCESS); assertEquals(2, flowFileList.size(), "Two successful flow files"); assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageOne.getAttribute("filename"))) - .filter(x -> x.getContent().equals("Decrypted:message one")).count()); + .filter(x -> x.getContent().equals("Decrypted: Decrypted message 1")).count()); assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageTwo.getAttribute("filename"))) - .filter(x -> x.getContent().equals("Decrypted:message two")).count()); + .filter(x -> x.getContent().equals("Decrypted: Decrypted message 2")).count()); + assertEquals(2, seekableByteChannelArgumentCaptor.getAllValues().size()); + assertTrue(messages.contains("message one")); + assertTrue(messages.contains("message two")); } public static class MockRunner extends ConvertFromZTDF { From efd4500ba62b54f4c36e706a8a157fd67d668d06 Mon Sep 17 00:00:00 2001 From: Timothy Tschampel Date: Thu, 6 Jun 2024 17:28:02 -0700 Subject: [PATCH 4/4] don't exclude guava from sdk --- .../src/test/java/io/opentdf/nifi/NifiIT.java | 46 +++++++++++++++++++ pom.xml | 6 --- 2 files changed, 46 insertions(+), 6 deletions(-) create mode 100644 nifi-tdf-processors/src/test/java/io/opentdf/nifi/NifiIT.java diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/NifiIT.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/NifiIT.java new file mode 100644 index 0000000..ba74b78 --- /dev/null +++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/NifiIT.java @@ -0,0 +1,46 @@ +package io.opentdf.nifi; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.DescriptorProtos; +import io.opentdf.platform.policy.AttributeRuleTypeEnum; +import io.opentdf.platform.policy.Namespace; +import io.opentdf.platform.policy.attributes.CreateAttributeRequest; +import io.opentdf.platform.policy.attributes.CreateAttributeResponse; +import io.opentdf.platform.policy.attributes.CreateAttributeValueRequest; +import io.opentdf.platform.policy.namespaces.*; +import io.opentdf.platform.sdk.SDK; +import io.opentdf.platform.sdk.SDKBuilder; +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class NifiIT { + + String clientId = System.getenv("CLIENT_ID"); + String clientSecret = System.getenv("CLIENT_SECRET"); + String platformEndpoint = System.getenv("PLATFORM_ENDPOINT"); + String NS="opentdf.io"; + @Test + public void testIt() throws Exception{ + SDK sdk = SDKBuilder.newBuilder().platformEndpoint(platformEndpoint) + .clientSecret(clientId, clientSecret).build(); + ListenableFuture resp = sdk.getServices().namespaces().listNamespaces(ListNamespacesRequest.newBuilder().build()); + Optional nsOpt = resp.get().getNamespacesList().stream().filter(x->x.getName().equals(NS)).findFirst(); + Namespace namespace = nsOpt.isPresent() ? nsOpt.get() : null; + if (namespace==null){ + ListenableFuture createNSRespFuture = sdk.getServices().namespaces().createNamespace(CreateNamespaceRequest.newBuilder().setName(NS).build()); + CreateNamespaceResponse createNamespaceResponse = createNSRespFuture.get(); + namespace = createNamespaceResponse.getNamespace(); + System.out.println("Created namespace " + NS + " " + namespace.getId()); + } + ListenableFuture caRespFuture = sdk.getServices().attributes().createAttribute(CreateAttributeRequest.newBuilder() + .setName("intellectualproperty") + .setRule(AttributeRuleTypeEnum.ATTRIBUTE_RULE_TYPE_ENUM_HIERARCHY) + .addValues("tradesecret").addValues("confidential") + .setNamespaceId(namespace.getId()).build()); + CreateAttributeResponse caResp = caRespFuture.get(); + System.out.println("Created attribute and values for " + caResp.getAttribute().getFqn() +", id = " + caResp.getAttribute().getId()); + } +} diff --git a/pom.xml b/pom.xml index 2c8871f..0244387 100644 --- a/pom.xml +++ b/pom.xml @@ -50,12 +50,6 @@ io.opentdf.platform sdk 0.1.0-SNAPSHOT - - - com.google.guava - guava - - org.apache.commons