+ * This descriptor specifies that the property is required and identifies
+ * a controller service of type {@link OpenTDFControllerService}. The controller service
+ * provides the necessary configuration for the OpenTDF platform.
+ */
public static final PropertyDescriptor OPENTDF_CONFIG_SERVICE = new org.apache.nifi.components.PropertyDescriptor.Builder()
.name("OpenTDF Config Service")
.description("Controller Service providing OpenTDF Platform Configuration")
@@ -51,30 +69,54 @@ public abstract class AbstractTDFProcessor extends AbstractProcessor {
.identifiesControllerService(OpenTDFControllerService.class)
.build();
+ /**
+ * Defines a successful relationship for the NiFi processor. This relationship is used to route flow files
+ * that have been successfully processed. Flow files sent to this relationship indicate that the processor
+ * completed its intended action without errors.
+ *
+ * This relationship is commonly used as an output route for data that has passed all validation, transformation,
+ * and processing steps.
+ */
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("")
.build();
+ /**
+ * Relationship representing a failure in processing flow files.
+ *
+ * This relationship should be used to route flow files that could not
+ * be processed successfully by the processor. The reasons for failure
+ * can vary widely and may include issues like invalid data, processing
+ * errors, or configuration issues.
+ */
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("")
.build();
/**
- * Get a property value by evaluating attribute expressions if present.
+ * Evaluates the provided PropertyValue if expression language is present,
+ * otherwise returns the original PropertyValue.
*
- * @param propertyValue
- * @return
+ * @param propertyValue The PropertyValue to evaluate or return.
+ * @return The evaluated PropertyValue if expression language is present,
+ * otherwise the original PropertyValue.
*/
PropertyValue getPropertyValue(PropertyValue propertyValue) {
return propertyValue.isExpressionLanguagePresent() ? propertyValue.evaluateAttributeExpressions() : propertyValue;
}
- Optional getPropertyValue(PropertyDescriptor propertyDescriptor, ProcessContext processContext) {
+ /**
+ * Retrieves the value of the specified property from the given process context.
+ *
+ * @param processContext The context from which to retrieve the property value.
+ * @return An Optional containing the PropertyValue if it is set, or an empty Optional otherwise.
+ */
+ Optional getPropertyValue(ProcessContext processContext) {
PropertyValue propertyValue = null;
- if(processContext.getProperty(propertyDescriptor).isSet()){
- propertyValue = getPropertyValue(processContext.getProperty(propertyDescriptor));
+ if(processContext.getProperty(ConvertToZTDF.SIGN_ASSERTIONS).isSet()){
+ propertyValue = getPropertyValue(processContext.getProperty(ConvertToZTDF.SIGN_ASSERTIONS));
}
return Optional.ofNullable(propertyValue);
}
@@ -82,10 +124,10 @@ Optional getPropertyValue(PropertyDescriptor propertyDescriptor,
private SDK sdk;
/**
- * Create a new TDF SDK using the OpenTDFController Service as a source of configuration
+ * Retrieves an instance of the TDF SDK, initializing it if it is not already created.
*
- * @param processContext
- * @return
+ * @param processContext the NiFi ProcessContext providing necessary configuration and controller services.
+ * @return an instance of the initialized SDK.
*/
SDK getTDFSDK(ProcessContext processContext) {
if (sdk == null) {
@@ -159,17 +201,31 @@ public void onTrigger(ProcessContext processContext, ProcessSession processSessi
*/
abstract void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List flowFiles) throws ProcessException;
+ /**
+ * Creates and returns a new instance of TDF.
+ *
+ * @return A new instance of TDF.
+ */
TDF getTDF() {
return new TDF();
}
+ /**
+ * Creates and returns a new instance of NanoTDF.
+ *
+ * @return A new instance of NanoTDF.
+ */
NanoTDF getNanoTDF(){
return new NanoTDF();
}
+ /**
+ * Retrieves the list of property descriptors that are supported by this processor.
+ *
+ * @return A list containing the supported property descriptors.
+ */
@Override
public List getSupportedPropertyDescriptors() {
- return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE));
+ return List.of(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE);
}
-
}
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java
index a0c81ca..df96bc0 100644
--- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java
@@ -34,37 +34,52 @@ public List getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE, KAS_URL));
}
- /**{
- * Get the kas urls from a flowfile attribute or if none present fallback to processor configuration KAS URL;
- * format is a comma separated list
- * @param flowFile
- * @param processContext
- * @return
- * @throws Exception
+ /**
+ * Retrieves a list of KAS (Key Access Service) URLs either from the flow file attributes or from the process context.
+ * If the KAS URL is not provided through the flow file attribute and is not set in the process context, an exception is thrown.
+ *
+ * @param flowFile the flow file from which KAS URL attributes are retrieved.
+ * @param processContext the process context to get the default KAS URL if not available in the flow file.
+ * @return a list of KAS URLs.
+ * @throws Exception if no KAS URL is provided via the flow file or the default in the process context, or if the KAS URLs provided are empty.
*/
- List getKasUrl(FlowFile flowFile, ProcessContext processContext) throws Exception{
+ List getKasUrl(FlowFile flowFile, ProcessContext processContext) throws Exception {
String kasUrlAttribute = flowFile.getAttribute(KAS_URL_ATTRIBUTE);
- //check kas url
+ // Check kas url
if (!processContext.getProperty(KAS_URL).isSet() && kasUrlAttribute == null) {
throw new Exception("no " + KAS_URL_ATTRIBUTE + " flowfile attribute and no default KAS URL configured");
}
String kasUrlValues = kasUrlAttribute != null ? kasUrlAttribute : getPropertyValue(processContext.getProperty(KAS_URL)).getValue();
- List kasUrls = Arrays.stream(kasUrlValues.split(",")).filter(x->!x.isEmpty()).collect(Collectors.toList());
- if (kasUrlValues.isEmpty()){
+ List kasUrls = Arrays.stream(kasUrlValues.split(","))
+ .filter(x -> !x.isEmpty())
+ .toList(); // Use Stream.toList() for an unmodifiable list
+ if (kasUrlValues.isEmpty()) {
throw new Exception("no KAS Urls provided");
}
return kasUrls;
}
+ /**
+ * Converts a list of KAS (Key Access Service) URLs into a list of Config.KASInfo objects.
+ *
+ * @param kasUrls a list of strings representing the KAS URLs
+ * @return a list of Config.KASInfo objects with each object's URL field set to the corresponding string from the input list
+ */
List getKASInfoFromKASURLs(List kasUrls){
- return kasUrls.stream().map(x->{ var ki = new Config.KASInfo(); ki.URL=x; return ki;}).collect(Collectors.toList());
+ return kasUrls.stream().map(x -> {
+ var ki = new Config.KASInfo();
+ ki.URL = x;
+ return ki;
+ }).toList();
}
/**
- * Get data attributes on a FlowFile from attribute value
- * @param flowFile
- * @return
- * @throws Exception
+ * Extracts and returns a set of data attributes from the given FlowFile's attribute specified by TDF_ATTRIBUTE.
+ * The attributes are split by commas and filtered to remove empty strings.
+ *
+ * @param flowFile the FlowFile from which to retrieve the data attributes.
+ * @return a set of data attributes extracted from the given FlowFile.
+ * @throws Exception if no data attributes are provided via the TDF_ATTRIBUTE FlowFile attribute.
*/
Set getDataAttributes(FlowFile flowFile) throws Exception{
Set dataAttributes = Arrays.stream((flowFile.getAttribute(TDF_ATTRIBUTE) == null ? "" :
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java
index 63f4d6a..54800d1 100644
--- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java
@@ -12,10 +12,27 @@
import java.nio.ByteBuffer;
import java.util.List;
+/**
+ * A processor for decrypting NanoTDF flow file content using the OpenTDF framework.
+ *
+ * This processor reads encrypted NanoTDF data from incoming flow files and decrypts
+ * it using the associated SDK. The decrypted content is then written back into the
+ * flow file and routed to the success relationship. If decryption fails, the flow file
+ * is routed to the failure relationship.
+ */
@CapabilityDescription("Decrypts NanoTDF flow file content")
@Tags({"NanoTDF", "OpenTDF", "Decrypt", "Data Centric Security"})
public class ConvertFromNanoTDF extends AbstractTDFProcessor {
+ /**
+ * Processes the provided list of flow files by decrypting their content using the NanoTDF protocol.
+ * If decryption succeeds, the flow file is routed to the success relationship; otherwise, it is routed to the failure relationship.
+ *
+ * @param processContext the NiFi ProcessContext which provides configuration and controller services
+ * @param processSession the ProcessSession which provides mechanisms for reading, writing, transferring, and penalizing flow files
+ * @param flowFiles the list of FlowFile objects to be processed
+ * @throws ProcessException if any error occurs during the processing of flow files
+ */
@Override
public void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List flowFiles) throws ProcessException {
SDK sdk = getTDFSDK(processContext);
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java
index 66a7051..580d0da 100644
--- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java
@@ -16,12 +16,46 @@
import java.util.ArrayList;
import java.util.List;
-
+/**
+ * Converts and decrypts ZTDF (Zero Trust Data Format) flow file content.
+ * This class takes encrypted ZTDF content and decrypts it,
+ * transferring the decrypted data to a specified success relationship.
+ * If an error occurs during decryption, it transfers the flow file to a failure relationship.
+ *
+ * This processor uses TDF (Trusted Data Format) SDK for decryption and
+ * requires configuration of assertion verification keys to verify
+ * the integrity and authenticity of the encrypted data.
+ *
+ * It provides the primary method `processFlowFiles` which reads the encrypted
+ * content from incoming flow files, decrypts it, and writes the decrypted
+ * content back to the flow files.
+ *
+ * The method `processFlowFiles` performs the following steps:
+ * 1. Retrieves the TDF SDK instance.
+ * 2. Reads the entire encrypted content of each flow file into an in-memory byte channel.
+ * 3. Uses TDF Reader to load and decrypt the content.
+ * 4. Writes the decrypted content back into the flow file and transfers it to the success relationship.
+ * 5. If any error occurs during the decryption process, logs the error and transfers the flow file to the failure relationship.
+ */
@CapabilityDescription("Decrypts ZTDF flow file content")
@Tags({"ZTDF", "Zero Trust Data Format", "OpenTDF", "Decrypt", "Data Centric Security"})
public class ConvertFromZTDF extends AbstractTDFProcessor {
+ /**
+ * Processes a list of flow files by decrypting their content using the TDF (Trusted Data Format) SDK.
+ * For each flow file in the provided list, the following steps are executed:
+ * 1. Reads the entire encrypted content of the flow file into an in-memory byte channel.
+ * 2. Uses a TDF Reader to load and decrypt the content using the SDK.
+ * 3. Writes the decrypted content back to the flow file.
+ * 4. Transfers the successfully decrypted flow files to the success relationship.
+ * 5. In case of an error during decryption, logs the error and transfers the flow file to the failure relationship.
+ *
+ * @param processContext the NiFi ProcessContext providing configuration and controller services.
+ * @param processSession the NiFi ProcessSession used to read, write, and transfer flow files.
+ * @param flowFiles a list of flow files to be decrypted.
+ * @throws ProcessException if an error occurs during the decryption process.
+ */
@Override
void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List flowFiles) throws ProcessException {
SDK sdk = getTDFSDK(processContext);
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java
index 8ce0e55..118066b 100644
--- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java
@@ -20,6 +20,25 @@
import java.util.List;
import java.util.Set;
+/**
+ * Processor for converting the content of a FlowFile into a NanoTDF (Trusted Data Format).
+ *
+ * This class extends AbstractToProcessor and handles the encryption of FlowFile content into NanoTDF,
+ * applying specified KAS (Key Access Service) URLs and data attributes as defined in the flow file attributes
+ * or processor properties.
+ *
+ * Relationships:
+ * - REL_SUCCESS: When the conversion to NanoTDF is successful.
+ * - REL_FAILURE: When the conversion to NanoTDF fails.
+ * - REL_FLOWFILE_EXCEEDS_NANO_SIZE: When the content size exceeds the maximum allowed size for NanoTDF.
+ *
+ * Property Descriptors:
+ * - Inherited from AbstractToProcessor (e.g., KAS URL, SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, etc.)
+ *
+ * Reads Attributes:
+ * - kas_url: The Key Access Server (KAS) URL used for TDF creation. Overrides the default KAS URL property.
+ * - tdf_attribute: A comma-separated list of data attributes added to the created TDF Data Policy.
+ */
@CapabilityDescription("Transforms flow file content into a NanoTDF")
@Tags({"NanoTDF", "OpenTDF", "Encrypt", "Data Centric Security"})
@ReadsAttributes(value = {
@@ -30,19 +49,43 @@
})
public class ConvertToNanoTDF extends AbstractToProcessor {
+ /**
+ * Defines a relationship indicating that NanoTDF creation has failed
+ * due to the content size exceeding the maximum allowed NanoTDF size threshold.
+ */
public static final Relationship REL_FLOWFILE_EXCEEDS_NANO_SIZE = new Relationship.Builder()
.name("exceeds_size_limit")
.description("NanoTDF creation failed due to the content size exceeding the max NanoTDF size threshold")
.build();
+ /**
+ * Represents the maximum allowable size for processing a flow file in nano TDF conversion.
+ * Value is set to 16 MB.
+ */
static final long MAX_SIZE = 16777218;
+ /**
+ * Retrieves all the relationships defined in the ConvertToNanoTDF processor.
+ *
+ * @return a Set of Relationship objects representing the different relationships for the processor.
+ */
@Override
public Set getRelationships() {
return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_FLOWFILE_EXCEEDS_NANO_SIZE));
}
+ /**
+ * Processes a list of FlowFiles to convert them to NanoTDF format.
+ * If a FlowFile's size exceeds the maximum allowed size, it is routed to a specific relationship.
+ * Otherwise, it attempts to convert the FlowFile's content and transfer it to a success relationship.
+ * In case of an error during processing, the FlowFile is routed to a failure relationship.
+ *
+ * @param processContext the NiFi ProcessContext providing necessary configuration and controller services.
+ * @param processSession the NiFi ProcessSession representing a transaction context for the processing of FlowFiles.
+ * @param flowFiles a list of FlowFiles to be processed.
+ * @throws ProcessException if an error occurs during the processing of the FlowFiles.
+ */
@Override
public void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List flowFiles) throws ProcessException {
SDK sdk = getTDFSDK(processContext);
@@ -50,6 +93,8 @@ public void processFlowFiles(ProcessContext processContext, ProcessSession proce
try {
var kasInfoList = getKASInfoFromKASURLs(getKasUrl(flowFile, processContext));
Set dataAttributes = getDataAttributes(flowFile);
+ // Config.newNanoTDFConfig is correctly handling the varargs
+ @SuppressWarnings("unchecked")
Config.NanoTDFConfig config = Config.newNanoTDFConfig(
Config.withNanoKasInformation(kasInfoList.toArray(new Config.KASInfo[0])),
Config.witDataAttributes(dataAttributes.toArray(new String[0]))
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java
index 81088e4..c6e9ddb 100644
--- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java
@@ -20,12 +20,15 @@
import org.apache.nifi.processor.exception.ProcessException;
import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
-import java.security.spec.InvalidKeySpecException;
import java.util.*;
import java.util.function.Consumer;
+/**
+ * The ConvertToZTDF class transforms flow file content into a ZTDF (Zero Trust Data Format).
+ * It builds assertions from flow file attributes and configures TDF options based on these assertions.
+ * This class supports property descriptors and signing of assertions.
+ */
@CapabilityDescription("Transforms flow file content into a ZTDF")
@Tags({"ZTDF", "OpenTDF", "Zero Trust Data Format", "Encrypt", "Data Centric Security"})
@ReadsAttributes(value = {
@@ -49,6 +52,17 @@
})
public class ConvertToZTDF extends AbstractToProcessor {
+ /**
+ * Property descriptor for the "Sign Assertions" feature in the ConvertToZTDF processor. This property allows specifying whether
+ * the assertions should be signed or not. It is not a required property and defaults to "false".
+ *
+ * - Name: Sign Assertions
+ * - Description: sign assertions
+ * - Required: false
+ * - Default Value: false
+ * - Allowable Values: true, false
+ * - Expression Language Supported: {@link ExpressionLanguageScope#VARIABLE_REGISTRY}
+ */
public static final PropertyDescriptor SIGN_ASSERTIONS = new org.apache.nifi.components.PropertyDescriptor.Builder()
.name("Sign Assertions")
.description("sign assertions")
@@ -58,6 +72,15 @@ public class ConvertToZTDF extends AbstractToProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ /**
+ * Property descriptor for the "Private Key Controller Service".
+ *
+ * This descriptor defines an optional Private Key Service which is required
+ * for assertion signing. The property is compulsory and identifies the
+ * PrivateKeyService class as the controller service. It is dependent on
+ * the SIGN_ASSERTIONS property being set to "true" and supports expression
+ * language in the variable registry scope.
+ */
public static final PropertyDescriptor PRIVATE_KEY_CONTROLLER_SERVICE = new org.apache.nifi.components.PropertyDescriptor.Builder()
.name("Private Key Controller Service")
.description("Optional Private Key Service; this is need for assertion signing")
@@ -68,12 +91,23 @@ public class ConvertToZTDF extends AbstractToProcessor {
.build();
+ /**
+ * Retrieves the PrivateKeyService from the given process context if it is set.
+ *
+ * @param processContext the NiFi ProcessContext providing necessary configuration and controller services.
+ * @return an instance of PrivateKeyService if it is set in the process context, otherwise null.
+ */
PrivateKeyService getPrivateKeyService(ProcessContext processContext) {
return processContext.getProperty(PRIVATE_KEY_CONTROLLER_SERVICE).isSet() ?
processContext.getProperty(PRIVATE_KEY_CONTROLLER_SERVICE)
.asControllerService(PrivateKeyService.class) : null;
}
+ /**
+ * Retrieves a list of supported property descriptors for this processor.
+ *
+ * @return an unmodifiable list of PropertyDescriptor objects representing the supported properties.
+ */
@Override
public List getSupportedPropertyDescriptors() {
List propertyDescriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
@@ -91,18 +125,25 @@ public List getSupportedPropertyDescriptors() {
Map assertionAppliesToStateMap = Map.of("encrypted", AssertionConfig.AppliesToState.Encrypted,
"unencrypted", AssertionConfig.AppliesToState.Unencrypted);
/**
- * Build an assertion config from the flow file attribute
- * @return
- * @throws Exception throw exception when assertion is not valid
+ * Builds an {@link AssertionConfig} instance from the given NiFi {@link ProcessContext} and {@link FlowFile},
+ * using the provided flowFile attribute name to retrieve relevant data. This method deserializes an assertion
+ * JSON string from the flowFile attribute, populates the {@link AssertionConfig}, and performs necessary validations.
+ *
+ * @param processContext the NiFi ProcessContext providing necessary configuration and controller services
+ * @param flowFile the NiFi FlowFile containing the assertion JSON string in its attributes
+ * @param flowFileAttributeName the name of the attribute in the flowFile which contains the assertion JSON string
+ * @return an {@link AssertionConfig} instance populated with values from the assertion JSON string in the flowFile attribute
+ * @throws Exception if any essential assertion information is missing or invalid
*/
AssertionConfig buildAssertion(ProcessContext processContext, FlowFile flowFile, String flowFileAttributeName) throws Exception{
String assertionJson = flowFile.getAttribute(flowFileAttributeName);
Map,?> assertionMap = gson.fromJson(assertionJson, Map.class);
AssertionConfig assertionConfig = new AssertionConfig();
assertionConfig.id = assertionMap.containsKey("id") ? (String)assertionMap.get("id") : null;
- assertionConfig.type = assertionMap.containsKey("type") ? assertionTypeMap.get(assertionMap.get("type")) : null;
- assertionConfig.scope =assertionMap.containsKey("scope") ? assertionScopeMap.get(assertionMap.get("scope")) : null;
- assertionConfig.appliesToState = assertionMap.containsKey("appliesToState") ? assertionAppliesToStateMap.get(assertionMap.get("appliesToState")): null;
+
+ populateFieldFromMap(assertionMap, "type", assertionTypeMap, value -> assertionConfig.type = (AssertionConfig.Type) value);
+ populateFieldFromMap(assertionMap, "scope", assertionScopeMap, value -> assertionConfig.scope = (AssertionConfig.Scope) value);
+ populateFieldFromMap(assertionMap, "appliesToState", assertionAppliesToStateMap, value -> assertionConfig.appliesToState = (AssertionConfig.AppliesToState) value);
assertionConfig.statement = new AssertionConfig.Statement();
Map,?> statementMap = (Map,?>)assertionMap.get("statement");
@@ -129,6 +170,22 @@ AssertionConfig buildAssertion(ProcessContext processContext, FlowFile flowFile,
return assertionConfig;
}
+ private void populateFieldFromMap(Map, ?> sourceMap, String key, Map, ?> destinationMap, Consumer