diff --git a/.github/workflows/checks.yaml b/.github/workflows/checks.yaml
new file mode 100644
index 0000000..c673e79
--- /dev/null
+++ b/.github/workflows/checks.yaml
@@ -0,0 +1,59 @@
+name: "Checks"
+
+on:
+ pull_request:
+ branches:
+ - main
+ push:
+ branches:
+ - main
+ merge_group:
+ branches:
+ - main
+ types:
+ - checks_requested
+
+permissions:
+ contents: read
+
+jobs:
+ pr:
+ name: Validate PR title
+ if: contains(fromJSON('["pull_request", "pull_request_target"]'), github.event_name)
+ runs-on: ubuntu-22.04
+ permissions:
+ pull-requests: read
+ steps:
+ - uses: amannn/action-semantic-pull-request@e9fabac35e210fea40ca5b14c0da95a099eff26f
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+
+ mavenverify:
+ runs-on: ubuntu-latest
+ if: always()
+ needs:
+ - pr
+ steps:
+ - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
+ - name: Set up JDK
+ uses: actions/setup-java@5896cecc08fd8a1fbdfaf517e29b571164b031f7
+ with:
+ java-version: "21"
+ distribution: "temurin"
+ server-id: github
+ - name: Maven Verify
+ run: |
+ mvn --batch-mode clean install -DskipTests -s settings.xml
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+
+
+ ci:
+ needs:
+ - mavenverify
+ - pr
+ runs-on: ubuntu-latest
+ if: always()
+ steps:
+ - if: contains(needs.*.result, 'failure')
+ run: echo "Failed due to ${{ contains(needs.*.result, 'failure') }}" && exit 1
diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml
new file mode 100644
index 0000000..f807372
--- /dev/null
+++ b/.github/workflows/release.yaml
@@ -0,0 +1,42 @@
+name: Release
+
+on:
+ push:
+ branches:
+ - main
+
+permissions:
+ contents: read
+ packages: write
+
+jobs:
+ release-please:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Generate a token
+ id: generate_token
+ uses: actions/create-github-app-token@f2acddfb5195534d487896a656232b016a682f3c # v1.9.0
+ with:
+ app-id: "${{ secrets.APP_ID }}"
+ private-key: "${{ secrets.AUTOMATION_KEY }}"
+ - uses: google-github-actions/release-please-action@v4
+ with:
+ token: "${{ steps.generate_token.outputs.token }}"
+ config-file: release-please.json
+ manifest-file: .release-please-manifest.json
+ release:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
+ - name: Set up JDK
+ uses: actions/setup-java@5896cecc08fd8a1fbdfaf517e29b571164b031f7
+ with:
+ java-version: "21"
+ distribution: "temurin"
+ server-id: github
+ - name: Publish package
+ run: mvn --batch-mode deploy -s settings.xml
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..6f427b5
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+/.idea/
+/**/target/
diff --git a/.release-please-manifest.json b/.release-please-manifest.json
new file mode 100644
index 0000000..e18ee07
--- /dev/null
+++ b/.release-please-manifest.json
@@ -0,0 +1,3 @@
+{
+ ".": "0.0.0"
+}
diff --git a/CODEOWNERS b/CODEOWNERS
new file mode 100644
index 0000000..757836b
--- /dev/null
+++ b/CODEOWNERS
@@ -0,0 +1,8 @@
+# CODEOWNERS
+
+* @opentdf/nifi @opentdf/architecture
+
+## High Security Area
+
+CODEOWNERS @opentdf/architecture @opentdf/security
+LICENSE @opentdf/architecture
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..0151878
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,19 @@
+# The Clear BSD License
+
+Copyright 2023 Virtru Corporation
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification, are permitted (subject to the limitations in the disclaimer below)
+provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+* Neither the name of Virtru Corporation nor the names of its contributors may be used to endorse or promote products derived from this software without
+ specific prior written permission.
+NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE
\ No newline at end of file
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..b7d88c3
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,17 @@
+
+.PHONY: compose-package
+compose-package: nar-build
+ @echo "package for docker compose"
+ rm -rf deploy/extensions/*.nar
+ cp nifi-tdf-nar/target/*.nar deploy/extensions
+ cp nifi-tdf-controller-services-api-nar/target/*.nar deploy/extensions
+
+.PHONY: truststore-create
+truststore-create:
+ @echo "Build Truststore from *.crt in ./deploy/truststore"
+ cd ./deploy && ./build_truststore.sh
+
+.PHONY: nar-build
+nar-build:
+ @echo "Build NARs"
+ mvn clean package -s settings.xml
\ No newline at end of file
diff --git a/README.md b/README.md
index 2099ca3..5e99f57 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,20 @@
-# nifi
-OpenTDF NiFi Processors
+# OpenTDF NiFi
+Integration of the [OpenTDF Platform](https://github.com/opentdf/platform) into [NiFi](https://nifi.apache.org/)
+
+Components:
+* ConvertToTDF: A NiFi processor that converts FlowFile content to TDF format
+* ConvertFromTDF: A NiFi processor that converts TDF formatted FlowFile content to it's plaintext representation
+* OpenTDFControllerService: A NiFi controller service providing OpenTDF Platform Configuration
+
+
+# Quick Start - Docker Compose
+
+1. Build the NiFi Archives (NARs) and place in the docker compose mounted volumes
+ ```shell
+ make compose-package
+ ```
+1. Start docker compose
+ ```shell
+ docker compose up
+ ```
+1. [Log into NiFi](http://localhost:18080/nifi)
diff --git a/deploy/build_truststore.sh b/deploy/build_truststore.sh
new file mode 100755
index 0000000..fd5bc55
--- /dev/null
+++ b/deploy/build_truststore.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+TRUSTSTORE_PASSWORD=password
+
+certDir="$(pwd)/truststore"
+
+echo "import certs from $certDir"
+
+for filename in $certDir/*.crt; do
+ echo "import $filename into truststore"
+ filelocal=$(basename ${filename})
+ docker run -v $(pwd)/truststore:/keys \
+ openjdk:latest keytool \
+ -import -trustcacerts \
+ -alias $filelocal \
+ -file keys/$filelocal \
+ -destkeystore keys/ca.jks \
+ -noprompt \
+ -deststorepass "$TRUSTSTORE_PASSWORD"
+done
+
diff --git a/docker-compose.yaml b/docker-compose.yaml
new file mode 100644
index 0000000..6595809
--- /dev/null
+++ b/docker-compose.yaml
@@ -0,0 +1,17 @@
+version: '3'
+services:
+ opentdf-nifi:
+ image: ghcr.io/ttschampel/nifi/nifi-1.25.0-jre17:latest
+ restart: always
+ ulimits:
+ nofile:
+ soft: 2048
+ hard: 4096
+ environment:
+ - NIFI_WEB_HTTP_PORT=8080
+ volumes:
+ - ./deploy/extensions:/opt/nifi/nifi-current/extensions #mount custom NARs
+ - ./deploy/truststore:/opt/nifi/nifi-current/truststore # mounts truststore
+ - ./deploy/custom-libs:/opt/nifi/nifi-current/custom-libs #mount additional libs
+ ports:
+ - 18080:8080/tcp
diff --git a/nifi-tdf-controller-services-api-nar/pom.xml b/nifi-tdf-controller-services-api-nar/pom.xml
new file mode 100644
index 0000000..41775c4
--- /dev/null
+++ b/nifi-tdf-controller-services-api-nar/pom.xml
@@ -0,0 +1,38 @@
+
+
+ 4.0.0
+
+
+ io.opentdf.nifi
+ nifi-pom
+ 0.1.0-SNAPSHOT
+
+ nifi-tdf-controller-services-nar
+ nifi-tdf-controller-services-nar
+ NiFi TDF Controller Service NAR Archive
+
+ true
+
+ nar
+
+
+ ${project.groupId}
+ nifi-tdf-controller-services-api
+ 0.1.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-ssl-context-service-nar
+ ${nifi.version}
+ nar
+
+
+
+
+
+ org.apache.nifi
+ nifi-nar-maven-plugin
+
+
+
+
\ No newline at end of file
diff --git a/nifi-tdf-controller-services-api/pom.xml b/nifi-tdf-controller-services-api/pom.xml
new file mode 100644
index 0000000..cd44687
--- /dev/null
+++ b/nifi-tdf-controller-services-api/pom.xml
@@ -0,0 +1,24 @@
+
+
+ 4.0.0
+
+
+ io.opentdf.nifi
+ nifi-pom
+ 0.1.0-SNAPSHOT
+
+ nifi-tdf-controller-services-api
+ nifi-tdf-controller-services-api
+ TDF NiFi Controller Service API
+ jar
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-utils
+
+
+
diff --git a/nifi-tdf-controller-services-api/src/main/java/io/opentdf/nifi/Config.java b/nifi-tdf-controller-services-api/src/main/java/io/opentdf/nifi/Config.java
new file mode 100644
index 0000000..e242a0a
--- /dev/null
+++ b/nifi-tdf-controller-services-api/src/main/java/io/opentdf/nifi/Config.java
@@ -0,0 +1,53 @@
+package io.opentdf.nifi;
+
+
+public class Config {
+
+ private boolean usePlainText;
+ private String platformEndpoint;
+ private String clientId;
+ private String clientSecret;
+
+ public Config(String platformEndpoint, String clientId, String clientSecret) {
+ this();
+ this.clientId = clientId;
+ this.clientSecret = clientSecret;
+ this.usePlainText = false;
+ }
+
+ public Config() {
+ this.usePlainText = false;
+ }
+
+ public boolean isUsePlainText() {
+ return usePlainText;
+ }
+
+ public void setUsePlainText(boolean usePlainText) {
+ this.usePlainText = usePlainText;
+ }
+
+ public String getPlatformEndpoint() {
+ return platformEndpoint;
+ }
+
+ public void setPlatformEndpoint(String platformEndpoint) {
+ this.platformEndpoint = platformEndpoint;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getClientSecret() {
+ return clientSecret;
+ }
+
+ public void setClientSecret(String clientSecret) {
+ this.clientSecret = clientSecret;
+ }
+}
diff --git a/nifi-tdf-controller-services-api/src/main/java/io/opentdf/nifi/OpenTDFControllerService.java b/nifi-tdf-controller-services-api/src/main/java/io/opentdf/nifi/OpenTDFControllerService.java
new file mode 100644
index 0000000..85487fa
--- /dev/null
+++ b/nifi-tdf-controller-services-api/src/main/java/io/opentdf/nifi/OpenTDFControllerService.java
@@ -0,0 +1,18 @@
+package io.opentdf.nifi;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@Tags({"TDF","OpenTDF", "Configuration"})
+@CapabilityDescription("Provides A Configuration Service for the OpenTDF SDK")
+public interface OpenTDFControllerService extends ControllerService {
+
+ /**
+ * Get Configuration
+ * @return
+ * @throws ProcessException
+ */
+ public Config getConfig() throws ProcessException;
+}
diff --git a/nifi-tdf-nar/pom.xml b/nifi-tdf-nar/pom.xml
new file mode 100644
index 0000000..fba6e0e
--- /dev/null
+++ b/nifi-tdf-nar/pom.xml
@@ -0,0 +1,38 @@
+
+
+ 4.0.0
+
+
+ io.opentdf.nifi
+ nifi-pom
+ 0.1.0-SNAPSHOT
+
+ nifi-tdf-nar
+ nifi-tdf-nar
+ NiFi TDF Processor NAR Archive
+
+ true
+
+ nar
+
+
+ ${project.groupId}
+ nifi-tdf-processors
+ 0.1.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-ssl-context-service-nar
+ ${nifi.version}
+ nar
+
+
+
+
+
+ org.apache.nifi
+ nifi-nar-maven-plugin
+
+
+
+
\ No newline at end of file
diff --git a/nifi-tdf-processors/pom.xml b/nifi-tdf-processors/pom.xml
new file mode 100644
index 0000000..ce56b47
--- /dev/null
+++ b/nifi-tdf-processors/pom.xml
@@ -0,0 +1,109 @@
+
+
+ 4.0.0
+
+
+ io.opentdf.nifi
+ nifi-pom
+ 0.1.0-SNAPSHOT
+
+ nifi-tdf-processors
+ nifi-tdf-processors
+ TDF Processors for NiFi
+ jar
+
+
+ io.opentdf.platform
+ sdk
+
+
+ ${project.groupId}
+ nifi-tdf-controller-services-api
+ 0.1.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-ssl-context-service-api
+ ${nifi.version}
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-utils
+
+
+ org.apache.commons
+ commons-compress
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+ org.slf4j
+ slf4j-api
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.commons
+ commons-text
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ 2.17.2
+ test
+
+
+ ch.qos.logback
+ logback-core
+ 1.4.14
+ test
+
+
+ ch.qos.logback
+ logback-classic
+ 1.4.14
+ test
+
+
+
+ net.bytebuddy
+ byte-buddy
+ 1.14.17
+ test
+
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+
+
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractTDFProcessor.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractTDFProcessor.java
new file mode 100644
index 0000000..36266e9
--- /dev/null
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractTDFProcessor.java
@@ -0,0 +1,136 @@
+package io.opentdf.nifi;
+
+import io.opentdf.platform.sdk.SDK;
+import io.opentdf.platform.sdk.SDKBuilder;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Common helper processor
+ */
+public abstract class AbstractTDFProcessor extends AbstractProcessor {
+
+ public static final PropertyDescriptor FLOWFILE_PULL_SIZE = new org.apache.nifi.components.PropertyDescriptor.Builder()
+ .name("FlowFile queue pull limit")
+ .description("FlowFile queue pull size limit")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("10")
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new org.apache.nifi.components.PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("Optional SSL Context Service")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+
+ public static final PropertyDescriptor OPENTDF_CONFIG_SERVICE = new org.apache.nifi.components.PropertyDescriptor.Builder()
+ .name("OpenTDF Config Service")
+ .description("Controller Service providing OpenTDF Platform Configuration")
+ .required(true)
+ .identifiesControllerService(OpenTDFControllerService.class)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("")
+ .build();
+
+ /**
+ * Get a property value by evaluating attribute expressions if present.
+ *
+ * @param propertyValue
+ * @return
+ */
+ PropertyValue getPropertyValue(PropertyValue propertyValue) {
+ return propertyValue.isExpressionLanguagePresent() ? propertyValue.evaluateAttributeExpressions() : propertyValue;
+ }
+
+ private SDK sdk;
+
+ /**
+ * Create a new TDF SDK using the OpenTDFController Service as a source of configuration
+ *
+ * @param processContext
+ * @return
+ */
+ SDK getTDFSDK(ProcessContext processContext) {
+ if (sdk == null) {
+ getLogger().info("SDK - create");
+ OpenTDFControllerService openTDFControllerService = processContext.getProperty(OPENTDF_CONFIG_SERVICE)
+ .asControllerService(OpenTDFControllerService.class);
+ Config config = openTDFControllerService.getConfig();
+
+ SDKBuilder sdkBuilder = createSDKBuilder().platformEndpoint(config.getPlatformEndpoint())
+ .clientSecret(config.getClientId(), config.getClientSecret());
+ if (processContext.getProperty(SSL_CONTEXT_SERVICE).isSet()) {
+ getLogger().info("SDK - use SSLFactory from SSL Context Service truststore");
+ SSLContextService sslContextService = processContext.getProperty(SSL_CONTEXT_SERVICE)
+ .asControllerService(SSLContextService.class);
+ sdkBuilder = sdkBuilder.sslFactoryFromKeyStore(sslContextService.getTrustStoreFile(), sslContextService.getTrustStorePassword());
+ }
+ if (config.isUsePlainText()) {
+ getLogger().info("SDK - use plaintext connection");
+ sdkBuilder = sdkBuilder.useInsecurePlaintextConnection(true);
+ }
+ sdk = sdkBuilder.build();
+ }
+ return this.sdk;
+ }
+
+ //this is really here to allow for easier mocking for testing
+ SDKBuilder createSDKBuilder() {
+ return SDKBuilder.newBuilder();
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+ this.sdk = null;
+ }
+
+
+ @Override
+ public Set getRelationships() {
+ return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
+ }
+
+ FlowFile writeContent(FlowFile flowFile, ProcessSession session, InputStream payload) {
+ return session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream outputStream) throws IOException {
+ IOUtils.copy(payload, outputStream);
+ }
+ });
+ }
+
+ byte[] readEntireFlowFile(FlowFile flowFile, ProcessSession processSession) {
+ final byte[] buffer = new byte[(int) flowFile.getSize()];
+ processSession.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
+ return buffer;
+ }
+}
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromTDF.java
new file mode 100644
index 0000000..7d3f139
--- /dev/null
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromTDF.java
@@ -0,0 +1,60 @@
+package io.opentdf.nifi;
+
+import io.opentdf.platform.sdk.SDK;
+import io.opentdf.platform.sdk.TDF;
+import org.apache.commons.compress.utils.SeekableInMemoryByteChannel;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.nio.channels.SeekableByteChannel;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+
+@CapabilityDescription("Decrypts TDF flow file content")
+@Tags({"TDF", "OpenTDF", "Decrypt", "Data Centric Security"})
+public class ConvertFromTDF extends AbstractTDFProcessor {
+
+ @Override
+ public List getSupportedPropertyDescriptors() {
+ return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE));
+ }
+
+
+ @Override
+ public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+ List flowFiles = processSession.get(processContext.getProperty(FLOWFILE_PULL_SIZE).asInteger());
+ if (!flowFiles.isEmpty()) {
+ SDK sdk = getTDFSDK(processContext);
+ for (FlowFile flowFile : flowFiles) {
+ try {
+ try (SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readEntireFlowFile(flowFile, processSession))) {
+ FlowFile updatedFlowFile = processSession.write(flowFile, outputStream -> {
+ try {
+ getTDF().loadTDF(seekableByteChannel, outputStream, sdk.getServices().kas());
+ } catch (Exception e) {
+ getLogger().error("error decrypting tdf", e);
+ throw new IOException(e);
+ }
+ });
+ processSession.transfer(updatedFlowFile, REL_SUCCESS);
+ }
+ } catch (Exception e) {
+ getLogger().error(flowFile.getId() + ": error decrypting flowfile", e);
+ processSession.transfer(flowFile, REL_FAILURE);
+ }
+ }
+ }
+ }
+
+ TDF getTDF() {
+ return new TDF();
+ }
+}
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToTDF.java
new file mode 100644
index 0000000..0003b64
--- /dev/null
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToTDF.java
@@ -0,0 +1,104 @@
+package io.opentdf.nifi;
+
+import io.opentdf.platform.sdk.Config;
+import io.opentdf.platform.sdk.Config.TDFConfig;
+import io.opentdf.platform.sdk.SDK;
+import io.opentdf.platform.sdk.TDF;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@CapabilityDescription("Transforms flow file content into a TDF")
+@Tags({"TDF", "OpenTDF", "Encrypt", "Data Centric Security"})
+@ReadsAttributes(value = {
+ @ReadsAttribute(attribute = "kas_url", description = "The Key Access Server (KAS) URL used TDF Creation. This overrides " +
+ "the KAS URL property of this processor."),
+ @ReadsAttribute(attribute = "tdf_attribute", description = "A comma separated list of data attributes added " +
+ "to created TDF Data Policy. e.g. http://example.org/attr/foo/value/bar,http://example.org/attr/foo/value/bar2")
+})
+public class ConvertToTDF extends AbstractTDFProcessor {
+ static final String KAS_URL_ATTRIBUTE = "kas_url";
+ static final String TDF_ATTRIBUTE = "tdf_attribute";
+
+ public static final PropertyDescriptor KAS_URL = new org.apache.nifi.components.PropertyDescriptor.Builder()
+ .name("KAS URL")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .description("The KAS Url to use for encryption; this is a default if the kas_url attribute is not present in the flow file")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ @Override
+ public List getSupportedPropertyDescriptors() {
+ return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE, KAS_URL));
+ }
+
+ @Override
+ public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+ List flowFiles = processSession.get(processContext.getProperty(FLOWFILE_PULL_SIZE).asInteger());
+ if (!flowFiles.isEmpty()) {
+ SDK sdk = getTDFSDK(processContext);
+ for (final FlowFile flowFile : flowFiles) {
+ try {
+ //check kas url
+ String kasUrlAttribute = flowFile.getAttribute(KAS_URL_ATTRIBUTE);
+ if (!processContext.getProperty(KAS_URL).isSet() && kasUrlAttribute == null) {
+ throw new Exception("no " + KAS_URL_ATTRIBUTE + " flowfile attribute and no default KAS URL configured");
+ } else {
+ String kasUrl = kasUrlAttribute != null ? kasUrlAttribute : getPropertyValue(processContext.getProperty(KAS_URL)).getValue();
+ var kasInfo = new Config.KASInfo();
+ kasInfo.URL = kasUrl;
+ Set dataAttributes = Arrays.stream((flowFile.getAttribute(TDF_ATTRIBUTE) == null ? "" :
+ flowFile.getAttribute(TDF_ATTRIBUTE)).split(",")).filter(x -> !x.isEmpty()).collect(Collectors.toSet());
+ if (dataAttributes.isEmpty()) {
+ throw new Exception("no data attributes provided via " + TDF_ATTRIBUTE + " flowfile attribute");
+ } else {
+ TDFConfig config = Config.newTDFConfig(Config.withKasInformation(kasInfo),
+ Config.withDataAttributes(dataAttributes.toArray(new String[0])));
+ //write tdf to flowfile
+ final long size = flowFile.getSize();
+ FlowFile updatedFlowFile = processSession.write(flowFile, new StreamCallback() {
+ @Override
+ public void process(InputStream inputStream, OutputStream outputStream) throws IOException {
+ try {
+ getTDF().createTDF(inputStream, outputStream, config, sdk.getServices().kas());
+ } catch (Exception e) {
+ getLogger().error("error creating tdf", e);
+ throw new IOException(e);
+ }
+ }
+ }
+ );
+ processSession.transfer(updatedFlowFile, REL_SUCCESS);
+ }
+ }
+ } catch (Exception e) {
+ getLogger().error(flowFile.getId() + ": error converting plain text to TDF", e);
+ processSession.transfer(flowFile, REL_FAILURE);
+ }
+ }
+ }
+ }
+
+ TDF getTDF() {
+ return new TDF();
+ }
+}
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/SimpleOpenTDFControllerService.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/SimpleOpenTDFControllerService.java
new file mode 100644
index 0000000..32816af
--- /dev/null
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/SimpleOpenTDFControllerService.java
@@ -0,0 +1,88 @@
+package io.opentdf.nifi;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"TDF", "OpenTDF", "Configuration"})
+@CapabilityDescription("Provides An implementation of the OpenTDFControllerService API for OpenTDF SDK Configuration Parameters")
+public class SimpleOpenTDFControllerService extends AbstractControllerService implements OpenTDFControllerService {
+
+ public static final PropertyDescriptor PLATFORM_ENDPOINT = new PropertyDescriptor.Builder()
+ .name("platform-endpoint")
+ .displayName("OpenTDF Platform ENDPOINT")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(false)
+ .description("OpenTDF Platform ENDPOINT")
+ .build();
+
+ public static final PropertyDescriptor CLIENT_SECRET = new PropertyDescriptor.Builder()
+ .name("clientSecret")
+ .displayName("Client Secret")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .description("OpenTDF Platform Authentication Client Secret")
+ .build();
+
+ public static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder()
+ .name("clientId")
+ .displayName("Client ID")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(false)
+ .description("OpenTDF Platform Authentication Client ID")
+ .build();
+
+
+ public static final PropertyDescriptor USE_PLAINTEXT = new PropertyDescriptor.Builder()
+ .name("usePlaintext")
+ .displayName("Platform Use Plaintext Connection")
+ .required(true)
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .sensitive(false)
+ .description("OpenTDF Platform Authentication Client ID")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ Config config = null;
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return Arrays.asList(PLATFORM_ENDPOINT, CLIENT_ID, CLIENT_SECRET, USE_PLAINTEXT);
+ }
+
+ @OnEnabled
+ public void enabled(final ConfigurationContext configurationContext) throws InitializationException {
+ config = new Config();
+ config.setClientId(getPropertyValue(configurationContext.getProperty(CLIENT_ID)).getValue());
+ config.setClientSecret(getPropertyValue(configurationContext.getProperty(CLIENT_SECRET)).getValue());
+ config.setPlatformEndpoint(getPropertyValue(configurationContext.getProperty(PLATFORM_ENDPOINT)).getValue());
+ config.setUsePlainText(getPropertyValue(configurationContext.getProperty(USE_PLAINTEXT)).asBoolean());
+ }
+
+ PropertyValue getPropertyValue(PropertyValue propertyValue) {
+ return propertyValue.isExpressionLanguagePresent() ? propertyValue.evaluateAttributeExpressions() : propertyValue;
+ }
+
+ @Override
+ public Config getConfig() throws ProcessException {
+ return config;
+ }
+}
diff --git a/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..64e7f5d
--- /dev/null
+++ b/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1 @@
+io.opentdf.nifi.SimpleOpenTDFControllerService
\ No newline at end of file
diff --git a/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..94c8308
--- /dev/null
+++ b/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,2 @@
+io.opentdf.nifi.ConvertFromTDF
+io.opentdf.nifi.ConvertToTDF
\ No newline at end of file
diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromTDFTest.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromTDFTest.java
new file mode 100644
index 0000000..ac85723
--- /dev/null
+++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertFromTDFTest.java
@@ -0,0 +1,135 @@
+package io.opentdf.nifi;
+
+import io.opentdf.platform.sdk.SDK;
+import io.opentdf.platform.sdk.SDKBuilder;
+import io.opentdf.platform.sdk.TDF;
+import nl.altindag.ssl.util.KeyStoreUtils;
+import org.apache.commons.compress.utils.SeekableInMemoryByteChannel;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static io.opentdf.nifi.AbstractTDFProcessor.OPENTDF_CONFIG_SERVICE;
+import static io.opentdf.nifi.SimpleOpenTDFControllerService.*;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+
+class ConvertFromTDFTest {
+
+ SDK mockSDK;
+ TDF mockTDF;
+
+ @BeforeEach
+ void setup() {
+ mockSDK = mock(SDK.class);
+ mockTDF = mock(TDF.class);
+ }
+
+ void setupTDFControllerService(TestRunner runner) throws Exception {
+ SimpleOpenTDFControllerService tdfControllerService = new SimpleOpenTDFControllerService();
+ Map controllerPropertyMap = new HashMap<>();
+ controllerPropertyMap.put(PLATFORM_ENDPOINT.getName(), "http://platform");
+ controllerPropertyMap.put(CLIENT_ID.getName(), "my-client");
+ controllerPropertyMap.put(CLIENT_SECRET.getName(), "123-456");
+ controllerPropertyMap.put(USE_PLAINTEXT.getName(), "false");
+ runner.addControllerService(OPENTDF_CONFIG_SERVICE.getName(), tdfControllerService, controllerPropertyMap);
+ runner.enableControllerService(tdfControllerService);
+ runner.assertValid(tdfControllerService);
+ runner.setProperty(OPENTDF_CONFIG_SERVICE.getName(), OPENTDF_CONFIG_SERVICE.getName());
+ }
+
+ @Test
+ public void testConvertFromTDF() throws Exception {
+ TestRunner runner = TestRunners.newTestRunner(MockRunner.class);
+ SDKBuilder mockSDKBuilder = mock(SDKBuilder.class);
+ ((MockRunner) runner.getProcessor()).mockTDF = mockTDF;
+ ((MockRunner) runner.getProcessor()).mockSDKBuilder = mockSDKBuilder;
+ setupTDFControllerService(runner);
+
+ //add ssl context
+ SSLContextService sslContextService = mock(SSLContextService.class);
+ File trustStoreFile = Files.createTempFile("trust", "jks").toFile();
+ final String TRUST_STORE_PATH = trustStoreFile.getAbsolutePath();
+ final String TRUST_STORE_PASSWORD = "foo";
+ when(sslContextService.validate(any())).thenReturn(Collections.emptyList());
+ when(sslContextService.getTrustStoreFile()).thenReturn(TRUST_STORE_PATH);
+ when(sslContextService.getTrustStorePassword()).thenReturn(TRUST_STORE_PASSWORD);
+
+ try (FileOutputStream fos = new FileOutputStream(trustStoreFile)) {
+ KeyStoreUtils.createKeyStore().store(fos, TRUST_STORE_PASSWORD.toCharArray());
+ }
+ when(sslContextService.getIdentifier()).thenReturn(AbstractTDFProcessor.SSL_CONTEXT_SERVICE.getName());
+ runner.addControllerService(AbstractTDFProcessor.SSL_CONTEXT_SERVICE.getName(), sslContextService, new HashMap<>());
+ runner.enableControllerService(sslContextService);
+ runner.setProperty(AbstractTDFProcessor.SSL_CONTEXT_SERVICE, AbstractTDFProcessor.SSL_CONTEXT_SERVICE.getName());
+
+
+ runner.assertValid();
+
+ SDK.Services mockServices = mock(SDK.Services.class);
+ SDK.KAS mockKAS = mock(SDK.KAS.class);
+ when(mockSDK.getServices()).thenReturn(mockServices);
+ when(mockServices.kas()).thenReturn(mockKAS);
+ when(mockSDKBuilder.platformEndpoint("http://platform")).thenReturn(mockSDKBuilder);
+ when(mockSDKBuilder.clientSecret("my-client", "123-456")).thenReturn(mockSDKBuilder);
+ when(mockSDKBuilder.sslFactoryFromKeyStore(TRUST_STORE_PATH, TRUST_STORE_PASSWORD)).thenReturn(mockSDKBuilder);
+ when(mockSDKBuilder.build()).thenReturn(mockSDK);
+
+ ArgumentCaptor seekableByteChannelArgumentCaptor = ArgumentCaptor.forClass(SeekableByteChannel.class);
+ ArgumentCaptor outputStreamArgumentCaptor = ArgumentCaptor.forClass(OutputStream.class);
+ ArgumentCaptor kasArgumentCaptor = ArgumentCaptor.forClass(SDK.KAS.class);
+
+ doAnswer(invocationOnMock -> {
+ SeekableInMemoryByteChannel seekableByteChannel = invocationOnMock.getArgument(0);
+ OutputStream outputStream = invocationOnMock.getArgument(1);
+ SDK.KAS kas = invocationOnMock.getArgument(2);
+ outputStream.write(("Decrypted:" + new String(seekableByteChannel.array())).getBytes());
+ assertNotNull(kas, "KAS is not null");
+ assertSame(mockKAS, kas, "Expected KAS passed in");
+ return null;
+ }).when(mockTDF).loadTDF(seekableByteChannelArgumentCaptor.capture(),
+ outputStreamArgumentCaptor.capture(),
+ kasArgumentCaptor.capture());
+ MockFlowFile messageOne = runner.enqueue("message one".getBytes());
+ MockFlowFile messageTwo = runner.enqueue("message two".getBytes());
+ runner.run(1);
+ List flowFileList =
+ runner.getFlowFilesForRelationship(ConvertFromTDF.REL_SUCCESS);
+ assertEquals(2, flowFileList.size(), "Two successful flow files");
+ assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageOne.getAttribute("filename")))
+ .filter(x -> x.getContent().equals("Decrypted:message one")).count());
+ assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageTwo.getAttribute("filename")))
+ .filter(x -> x.getContent().equals("Decrypted:message two")).count());
+
+ }
+
+ public static class MockRunner extends ConvertFromTDF {
+ TDF mockTDF;
+ SDKBuilder mockSDKBuilder;
+
+ @Override
+ SDKBuilder createSDKBuilder() {
+ return mockSDKBuilder;
+ }
+
+ @Override
+ TDF getTDF() {
+ return mockTDF;
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertToTDFTest.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertToTDFTest.java
new file mode 100644
index 0000000..2f1c531
--- /dev/null
+++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ConvertToTDFTest.java
@@ -0,0 +1,135 @@
+package io.opentdf.nifi;
+
+import io.opentdf.platform.sdk.Config;
+import io.opentdf.platform.sdk.SDK;
+import io.opentdf.platform.sdk.TDF;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static io.opentdf.nifi.AbstractTDFProcessor.OPENTDF_CONFIG_SERVICE;
+import static io.opentdf.nifi.SimpleOpenTDFControllerService.*;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+class ConvertToTDFTest {
+
+ SDK mockSDK;
+ TDF mockTDF;
+
+ @BeforeEach
+ void setup() {
+ mockSDK = mock(SDK.class);
+ mockTDF = mock(TDF.class);
+ }
+
+ void setupTDFControllerService(TestRunner runner) throws Exception {
+ SimpleOpenTDFControllerService tdfControllerService = new SimpleOpenTDFControllerService();
+ Map controllerPropertyMap = new HashMap<>();
+ controllerPropertyMap.put(PLATFORM_ENDPOINT.getName(), "http://platform");
+ controllerPropertyMap.put(CLIENT_ID.getName(), "my-client");
+ controllerPropertyMap.put(CLIENT_SECRET.getName(), "123-456");
+ controllerPropertyMap.put(USE_PLAINTEXT.getName(), "false");
+ runner.addControllerService(OPENTDF_CONFIG_SERVICE.getName(), tdfControllerService, controllerPropertyMap);
+ runner.enableControllerService(tdfControllerService);
+ runner.assertValid(tdfControllerService);
+ runner.setProperty(OPENTDF_CONFIG_SERVICE.getName(), OPENTDF_CONFIG_SERVICE.getName());
+
+ }
+
+ @Test
+ public void testToTDF() throws Exception {
+ TestRunner runner = TestRunners.newTestRunner(MockRunner.class);
+ ((MockRunner) runner.getProcessor()).mockSDK = mockSDK;
+ ((MockRunner) runner.getProcessor()).mockTDF = mockTDF;
+ runner.setProperty(ConvertToTDF.KAS_URL, "https://kas1");
+ setupTDFControllerService(runner);
+ runner.assertValid();
+
+ SDK.Services mockServices = mock(SDK.Services.class);
+ SDK.KAS mockKAS = mock(SDK.KAS.class);
+ when(mockSDK.getServices()).thenReturn(mockServices);
+ when(mockServices.kas()).thenReturn(mockKAS);
+
+ ArgumentCaptor inputStreamArgumentCaptor = ArgumentCaptor.forClass(InputStream.class);
+ ArgumentCaptor outputStreamArgumentCaptor = ArgumentCaptor.forClass(OutputStream.class);
+ ArgumentCaptor kasArgumentCaptor = ArgumentCaptor.forClass(SDK.KAS.class);
+ ArgumentCaptor configArgumentCaptor = ArgumentCaptor.forClass(Config.TDFConfig.class);
+
+ doAnswer(invocationOnMock -> {
+ InputStream inputStream = invocationOnMock.getArgument(0);
+ OutputStream outputStream = invocationOnMock.getArgument(1);
+ Config.TDFConfig config = invocationOnMock.getArgument(2);
+ SDK.KAS kas = invocationOnMock.getArgument(3);
+ byte[] b = IOUtils.toByteArray(inputStream);
+ outputStream.write(("TDF:" + new String(b)).getBytes());
+ assertNotNull(kas, "KAS is not null");
+ assertSame(mockKAS, kas, "Expected KAS passed in");
+ if (new String(b).equals("message two")) {
+ assertEquals(2, config.attributes.size());
+ assertTrue(config.attributes.containsAll(Arrays.asList("https://example.org/attr/one/value/a", "https://example.org/attr/one/value/b")));
+ } else {
+ assertEquals(1, config.attributes.size());
+ assertTrue(config.attributes.contains("https://example.org/attr/one/value/c"));
+ }
+ return null;
+ }).when(mockTDF).createTDF(inputStreamArgumentCaptor.capture(),
+ outputStreamArgumentCaptor.capture(),
+ configArgumentCaptor.capture(),
+ kasArgumentCaptor.capture());
+
+ //message one has no attribute
+ MockFlowFile messageOne = runner.enqueue("message one".getBytes());
+ //message two has attributes
+ MockFlowFile messageTwo = runner.enqueue("message two".getBytes(), Map.of(ConvertToTDF.TDF_ATTRIBUTE,
+ "https://example.org/attr/one/value/a,https://example.org/attr/one/value/b"));
+ //message three has attributes and kas url override
+ MockFlowFile messageThree = runner.enqueue("message three".getBytes(), Map.of(ConvertToTDF.TDF_ATTRIBUTE,
+ "https://example.org/attr/one/value/c", ConvertToTDF.KAS_URL_ATTRIBUTE, "https://kas2"));
+ runner.run(1);
+ List flowFileList =
+ runner.getFlowFilesForRelationship(ConvertFromTDF.REL_SUCCESS);
+ assertEquals(2, flowFileList.size(), "Two flowfiles for success relationship");
+ assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageTwo.getAttribute("filename")))
+ .filter(x -> x.getContent().equals("TDF:message two")).count());
+ assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageThree.getAttribute("filename")))
+ .filter(x -> x.getContent().equals("TDF:message three")).count());
+
+
+ flowFileList =
+ runner.getFlowFilesForRelationship(ConvertFromTDF.REL_FAILURE);
+ assertEquals(1, flowFileList.size(), "One flowfile for failure relationship");
+ assertEquals(1, flowFileList.stream().filter(x -> x.getAttribute("filename").equals(messageOne.getAttribute("filename")))
+ .filter(x -> x.getContent().equals("message one")).count());
+ }
+
+ public static class MockRunner extends ConvertToTDF {
+ SDK mockSDK;
+ TDF mockTDF;
+
+ @Override
+ SDK getTDFSDK(ProcessContext processContext) {
+ return mockSDK;
+ }
+
+ @Override
+ TDF getTDF() {
+ return mockTDF;
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/SimpleOpenTDFControllerServiceTest.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/SimpleOpenTDFControllerServiceTest.java
new file mode 100644
index 0000000..4a5c813
--- /dev/null
+++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/SimpleOpenTDFControllerServiceTest.java
@@ -0,0 +1,5 @@
+package io.opentdf.nifi;
+
+class SimpleOpenTDFControllerServiceTest {
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..d4ae7e7
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,201 @@
+
+
+
+ 4.0.0
+
+ io.opentdf.nifi
+ nifi-pom
+ 0.1.0-SNAPSHOT
+ nifi-pom
+ pom
+ NiFi processors for OpenTDF
+
+
+ Clear BSD License
+ https://github.com/opentdf/nifi/blob/main/LICENSE
+
+
+
+ 1.23.1
+ 5.10.0
+ 17
+ 17
+ .7
+
+
+ nifi-tdf-controller-services-api
+ nifi-tdf-controller-services-api-nar
+ nifi-tdf-processors
+ nifi-tdf-nar
+
+
+
+
+ org.apache.nifi
+ nifi-api
+ ${nifi.version}
+
+
+ org.apache.nifi
+ nifi-utils
+ ${nifi.version}
+
+
+ org.apache.nifi
+ nifi-mock
+ ${nifi.version}
+
+
+ io.opentdf.platform
+ sdk
+ 0.1.0-SNAPSHOT
+
+
+ com.google.guava
+ guava
+
+
+
+
+ org.apache.commons
+ commons-compress
+ 1.26.1
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${junit.version}
+ test
+
+
+ org.mockito
+ mockito-core
+ 5.2.0
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ 5.2.0
+ test
+
+
+
+
+
+
+
+ org.apache.nifi
+ nifi-nar-maven-plugin
+ 1.5.1
+ true
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.12
+
+
+ jacoco-prepare-agent
+
+ prepare-agent
+
+
+
+ jacoco-prepare-agent-integration
+
+ prepare-agent-integration
+
+
+
+ jacoco-report
+
+ report
+
+ test
+
+
+ check
+
+ check
+
+
+
+
+ BUNDLE
+
+
+ LINE
+ COVEREDRATIO
+ ${jacoco.line.coverage}
+
+
+
+
+
+
+
+
+
+
+
+
+ org.cyclonedx
+ cyclonedx-maven-plugin
+ 2.8.0
+
+
+ package
+
+ makeAggregateBom
+
+
+
+
+
+
+
+
+ github
+ https://maven.pkg.github.com/opentdf/nifi
+
+ true
+
+
+ true
+
+
+
+ opentdf
+ https://maven.pkg.github.com/opentdf/java-sdk
+
+ true
+
+
+ true
+
+
+
+
+
+ ghcr
+
+ true
+
+
+
+ github
+ ghcr
+ https://maven.pkg.github.com/opentdf/nifi
+
+
+ github
+ ghcr
+ https://maven.pkg.github.com/opentdf/nifi
+
+
+
+
+
+
diff --git a/release-please.json b/release-please.json
new file mode 100644
index 0000000..2ae06e7
--- /dev/null
+++ b/release-please.json
@@ -0,0 +1,11 @@
+{
+ "$schema": "https://raw.githubusercontent.com/googleapis/release-please/main/schemas/config.json",
+ "release-type": "java-yoshi-mono-repo",
+ "versioning": "always-bump-minor",
+ "separate-pull-requests": false,
+ "include-component-in-tag": false,
+ "group-pull-request-title-pattern": "chore(${branch}): release ${version}",
+ "packages": {
+ ".": {}
+ }
+}
diff --git a/settings.xml b/settings.xml
new file mode 100644
index 0000000..1cbc95d
--- /dev/null
+++ b/settings.xml
@@ -0,0 +1,16 @@
+
+
+
+ github
+ ${env.GITHUB_ACTOR}
+ ${env.GITHUB_TOKEN}
+
+
+ opentdf
+ ${env.GITHUB_ACTOR}
+ ${env.GITHUB_TOKEN}
+
+
+
\ No newline at end of file
diff --git a/versions.txt b/versions.txt
new file mode 100644
index 0000000..cb4380c
--- /dev/null
+++ b/versions.txt
@@ -0,0 +1,4 @@
+# Format:
+# module:released-version:current-version
+
+nifi:0.0.0:0.1.0-SNAPSHOT
\ No newline at end of file