Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding NanoTDF support #5

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 38 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,65 @@ Integration of the [OpenTDF Platform](https://github.com/opentdf/platform) into

Components:
* "Zero Trust Data Format" (ZTDF) Processors:
* [ConvertToZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java): A NiFi processor that converts FlowFile content to TDF format. Does not currently support assertions
* [ConvertFromZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java): A NiFi processor that converts TDF formatted FlowFile content to it's plaintext representation
* [ConvertToZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java): A NiFi processor that converts FlowFile content to ZTDF format. Does not currently support assertions
* [ConvertFromZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java): A NiFi processor that converts ZTDF formatted FlowFile content to it's plaintext representation
* NanoTDF Processors ([See NanoTDF Specification](https://github.com/opentdf/spec/tree/main/schema/nanotdf#readme)):
* [ConvertToNanoTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java): A NiFi processor that converts FlowFile content to NanoTDF format. Does not currently support assertions
* [ConvertFromNanoTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java): A NiFi processor that converts NanoTDF formatted FlowFile content to it's plaintext representation

* Controller Services:
* [OpenTDFControllerService](./nifi-tdf-controller-services-api/src/main/java/io/opentdf/nifi/OpenTDFControllerService.java): A NiFi controller service providing OpenTDF Platform Configuration


#### FlowChart: Generic Plaintext to ZTDF Nifi Flow
#### FlowChart: Generic ZTDF Nifi Flows

```mermaid
---
title: Generic Plaintext to ZTDF NiFi Flow
title: Generic ZTDF NiFi Flows
---
flowchart TD
a[FlowFile: \nPlaintext content]
a[Nifi Processor]
b["`**UpdateAttribute**`" Add data policy attributes to FlowFile]
c["`**ConvertToZTDF**`"]
d["Process ZTDF"]
e["Handle Error"]
a -- success --> b
b -- success --> c
f[Nifi Processor]
g["`**ConvertFromZTDF**`"]
h[Process Plaintext]
i[Handle Error]
a -- success (content = PlainText) --> b
b -- success (content = PlainText) --> c
c -- success (content = ZTDF) --> d
c -- failure --> e
f -- success (content = ZTDF) --> g
g -- success (content = PlainText) --> h
g -- failure --> i
```

#### FlowChart: Generic ZTDF to Plaintext Nifi Flow
#### FlowChart: Generic NanoTDF NiFi Flows
```mermaid
---
title: Generic ZTDF to Plaintext Nifi Flow
title: Generic NanoTDF NiFi Flows
---
flowchart TD
a[FlowFile: \nZTDF content]
b["`**ConvertFromZTDF**`"]
c["Process ZTDF"]
d["Handle Error"]
a -- success --> b
b -- success (content = plaintext) --> c
b -- failure --> d
a[Nifi Processor]
b["`**UpdateAttribute**`" Add data policy attributes to FlowFile]
c["`**ConvertToNanoTDF**`"]
d["Process NanoTDF"]
e["Handle Error"]
e2["Handle Max Size Error"]
f[Nifi Processor]
g["`**ConvertFromZTDF**`"]
h[Process Plaintext]
i[Handle Error]
a -- success (content = Plaintext) --> b
b -- success (content = Plaintext)--> c
c -- success (content = NanoTDF) --> d
c -- failure --> e
c -- exceeds_size_limit --> e2
f -- success (content = NanoTDF) --> g
g -- success (content = Plaintext) --> h
g -- failure --> i
```

# Quick Start - Docker Compose
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.opentdf.nifi;

import io.opentdf.platform.sdk.NanoTDF;
import io.opentdf.platform.sdk.SDK;
import io.opentdf.platform.sdk.SDKBuilder;
import io.opentdf.platform.sdk.TDF;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
Expand All @@ -11,6 +13,7 @@
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
Expand All @@ -19,9 +22,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.*;

/**
* Common helper processor
Expand Down Expand Up @@ -133,4 +134,35 @@ byte[] readEntireFlowFile(FlowFile flowFile, ProcessSession processSession) {
processSession.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
return buffer;
}

@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
List<FlowFile> flowFiles = processSession.get(processContext.getProperty(FLOWFILE_PULL_SIZE).asInteger());
if (!flowFiles.isEmpty()) {
processFlowFiles(processContext, processSession, flowFiles);
}
}

/**
* Process the flow files pulled using pull size
* @param processContext NiFi process context
* @param processSession Nifi process session
* @param flowFiles List of FlowFile from the process session up to pull size limit
* @throws ProcessException Processing Exception
*/
abstract void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException;

TDF getTDF() {
return new TDF();
}

NanoTDF getNanoTDF(){
return new NanoTDF();
}

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.opentdf.nifi;

import io.opentdf.platform.sdk.Config;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Common utilities for a processor converting content to one of the TDF formats
*/
public abstract class AbstractToProcessor extends AbstractTDFProcessor{
static final String KAS_URL_ATTRIBUTE = "kas_url";
static final String TDF_ATTRIBUTE = "tdf_attribute";

public static final PropertyDescriptor KAS_URL = new org.apache.nifi.components.PropertyDescriptor.Builder()
.name("KAS URL")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.description("The KAS Url to use for encryption; this is a default if the kas_url attribute is not present in the flow file")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE, KAS_URL));
}

/**{
* Get the kas urls from a flowfile attribute or if none present fallback to processor configuration KAS URL;
* format is a comma separated list
* @param flowFile
* @param processContext
* @return
* @throws Exception
*/
List<String> getKasUrl(FlowFile flowFile, ProcessContext processContext) throws Exception{
String kasUrlAttribute = flowFile.getAttribute(KAS_URL_ATTRIBUTE);
//check kas url
if (!processContext.getProperty(KAS_URL).isSet() && kasUrlAttribute == null) {
throw new Exception("no " + KAS_URL_ATTRIBUTE + " flowfile attribute and no default KAS URL configured");
}
String kasUrlValues = kasUrlAttribute != null ? kasUrlAttribute : getPropertyValue(processContext.getProperty(KAS_URL)).getValue();
List<String> kasUrls = Arrays.stream(kasUrlValues.split(",")).filter(x->!x.isEmpty()).collect(Collectors.toList());
if (kasUrlValues.isEmpty()){
throw new Exception("no KAS Urls provided");
}
return kasUrls;
}

List<Config.KASInfo> getKASInfoFromKASURLs(List<String> kasUrls){
return kasUrls.stream().map(x->{ var ki = new Config.KASInfo(); ki.URL=x; return ki;}).collect(Collectors.toList());
}

/**
* Get data attributes on a FlowFile from attribute value
* @param flowFile
* @return
* @throws Exception
*/
Set<String> getDataAttributes(FlowFile flowFile) throws Exception{
Set<String> dataAttributes = Arrays.stream((flowFile.getAttribute(TDF_ATTRIBUTE) == null ? "" :
flowFile.getAttribute(TDF_ATTRIBUTE)).split(",")).filter(x -> !x.isEmpty()).collect(Collectors.toSet());
if (dataAttributes.isEmpty()) {
throw new Exception("no data attributes provided via " + TDF_ATTRIBUTE + " flowfile attribute");
}
return dataAttributes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.opentdf.nifi;

import io.opentdf.platform.sdk.SDK;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

@CapabilityDescription("Decrypts NanoTDF flow file content")
@Tags({"NanoTDF", "OpenTDF", "Decrypt", "Data Centric Security"})
public class ConvertFromNanoTDF extends AbstractTDFProcessor {

@Override
public void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException {
SDK sdk = getTDFSDK(processContext);
for (FlowFile flowFile : flowFiles) {
try {
byte[] nanoTDFBytes = readEntireFlowFile(flowFile, processSession);
FlowFile updatedFlowFile = processSession.write(flowFile, outputStream -> {
try {
getNanoTDF().readNanoTDF(ByteBuffer.wrap(nanoTDFBytes), outputStream, sdk.getServices().kas());
} catch (Exception e) {
getLogger().error("error decrypting NanoTDF", e);
throw new IOException(e);
}
});
processSession.transfer(updatedFlowFile, REL_SUCCESS);
} catch (Exception e) {
getLogger().error(flowFile.getId() + ": error decrypting flowfile", e);
processSession.transfer(flowFile, REL_FAILURE);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.stream.io.StreamUtils;

import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
Expand All @@ -23,38 +24,26 @@
public class ConvertFromZTDF extends AbstractTDFProcessor {

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE));
}


@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
List<FlowFile> flowFiles = processSession.get(processContext.getProperty(FLOWFILE_PULL_SIZE).asInteger());
if (!flowFiles.isEmpty()) {
SDK sdk = getTDFSDK(processContext);
for (FlowFile flowFile : flowFiles) {
try {
try (SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readEntireFlowFile(flowFile, processSession))) {
FlowFile updatedFlowFile = processSession.write(flowFile, outputStream -> {
try {
getTDF().loadTDF(seekableByteChannel, outputStream, sdk.getServices().kas());
} catch (Exception e) {
getLogger().error("error decrypting ZTDF", e);
throw new IOException(e);
}
});
processSession.transfer(updatedFlowFile, REL_SUCCESS);
}
} catch (Exception e) {
getLogger().error(flowFile.getId() + ": error decrypting flowfile", e);
processSession.transfer(flowFile, REL_FAILURE);
void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException {
SDK sdk = getTDFSDK(processContext);
for (FlowFile flowFile : flowFiles) {
try {
try (SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readEntireFlowFile(flowFile, processSession))) {
FlowFile updatedFlowFile = processSession.write(flowFile, outputStream -> {
try {
TDF.Reader reader = getTDF().loadTDF(seekableByteChannel, sdk.getServices().kas());
reader.readPayload(outputStream);
} catch (Exception e) {
getLogger().error("error decrypting ZTDF", e);
throw new IOException(e);
}
});
processSession.transfer(updatedFlowFile, REL_SUCCESS);
}
} catch (Exception e) {
getLogger().error(flowFile.getId() + ": error decrypting flowfile", e);
processSession.transfer(flowFile, REL_FAILURE);
}
}
}

TDF getTDF() {
return new TDF();
}
}
Loading