diff --git a/cli/dcnios-cli.py b/cli/dcnios-cli.py
index 6db1d3ed..5ea44946 100644
--- a/cli/dcnios-cli.py
+++ b/cli/dcnios-cli.py
@@ -16,12 +16,22 @@
import yaml
from yaml.loader import SafeLoader
import json
-from NifiManagement import *
from oscar_python.client import Client
-import sys
+from sources.auxiliaryFunctions import *
+from sources.NifiManagement import *
+from sources.aws import *
+import boto3
import os
import argparse
+folder="template/"
+dcachefile=folder+"dcache.json"
+oscarfile=folder+"InvokeOSCAR.json"
+sqsfile=folder+"SQS_recive.json"
+kafkafile=folder+"Kafka.json"
+types=["dCache","OSCAR","S3","SQS","generic","Kafka"]
+typesSSL=["Kafka"]
+
def updateComponent(type):
if "components" in type:
print("Process group: "+type["name"])
@@ -37,9 +47,9 @@ def updateComponent(type):
def doType(type,function):
if type in data["nifi"]:
- for type in data["nifi"][type]:
- function(type["name"])
- print(str(function.__qualname__)+ " " + type["name"])
+ for singularoftype in data["nifi"][type]:
+ function(singularoftype["name"])
+ print(str(function.__qualname__)+ " " + singularoftype["name"])
def makeActionWithAllType(allType,function):
for type in allType:
@@ -49,12 +59,6 @@ def newProcessInfo(name):
print("New Process group: "+ str(name))
-folder="template/"
-dcachefile=folder+"dcache.json"
-oscarfile=folder+"InvokeOSCAR.json"
-types=["dcache","oscar","generic"]
-
-
parser = argparse.ArgumentParser(prog='ProgramName', description='What the program does', epilog='Text at the bottom of help')
parser.add_argument('option')
@@ -78,9 +82,10 @@ def newProcessInfo(name):
nifi_password=data["nifi"]["password"]
nifi=Nifi(nifi_endpoint,nifi_user,nifi_password)
if args.option == "apply":
- if "dcache" in data["nifi"]:
- for dcache in data["nifi"]["dcache"]:
- nifi.create(dcache["name"],dcachefile)
+ if "dCache" in data["nifi"]:
+ for dcache in data["nifi"]["dCache"]:
+ dcachecontent=prepareforAll(dcachefile)
+ nifi.create(dcache["name"], dcachecontent )
command="simple-client.py --state /state/"+dcache["statefile"]+" --endpoint "+ \
dcache["endpoint"]+" --user "+dcache["user"]+" --password "+ \
dcache["password"]+" "+ dcache["folder"]
@@ -88,9 +93,10 @@ def newProcessInfo(name):
newProcessInfo(dcache["name"])
updateComponent(dcache)
- if "oscar" in data["nifi"]:
- for oscar in data["nifi"]["oscar"]:
- nifi.create(oscar["name"],oscarfile)
+ if "OSCAR" in data["nifi"]:
+ for oscar in data["nifi"]["OSCAR"]:
+ oscarcontent=prepareforAll(oscarfile)
+ nifi.create(oscar["name"],oscarcontent)
nifi.changeVariable(oscar["name"],"endpoint", oscar["endpoint"])
nifi.changeVariable(oscar["name"],"service", oscar["service"])
if "user" in oscar and "password" in oscar:
@@ -104,17 +110,73 @@ def newProcessInfo(name):
updateComponent(oscar)
if "generic" in data["nifi"]:
for generic in data["nifi"]["generic"]:
- nifi.create(generic["name"],generic["file"])
+ genericcontent=prepareforAll(generic["file"])
+ nifi.create(generic["name"],genericcontent)
for variable in generic["variables"]:
nifi.changeVariable(generic["name"],variable,generic["variables"][variable])
newProcessInfo(generic["name"])
updateComponent(generic)
-
+ if "SQS" in data["nifi"]:
+ for sqs in data["nifi"]["SQS"]:
+ #Get credentials of AWS
+ getAWSCredentials(sqs)
+ #Create SQS
+ sqsDetails=createSQS(sqs)
+ #Prepare config
+ sqscontent=prepareforAll(sqsfile)
+ sqscontent=sqsPreparefile(sqscontent,sqs)
+ #Create object
+ nifi.create(sqs["name"],sqscontent)
+ nifi.changeVariable(sqs["name"],'queueurl',sqsDetails['QueueUrl'])
+ newProcessInfo(sqs["name"])
+ updateComponent(sqs)
+ if "S3" in data["nifi"]:
+ for s3 in data["nifi"]["S3"]:
+ #Get credentials of AWS
+ getAWSCredentials(s3)
+ #Create SQS
+ s3["queue_name"]= s3["AWS_S3_BUCKET"]+"_events"
+ sqsDetails=createSQS(s3)
+ #Create Notification from S3 event to SQS
+ s3NotificationSQS(s3)
+ #Prepare config
+ sqscontent=prepareforAll(sqsfile)
+ s3content=sqsPreparefile(sqscontent,s3)
+ #Create object
+ nifi.create(s3["name"],sqscontent)
+ nifi.changeVariable(s3["name"],'queueurl',sqsDetails['QueueUrl'])
+ newProcessInfo(s3["name"])
+ updateComponent(s3)
+ if "Kafka" in data["nifi"]:
+ for kafka in data["nifi"]["Kafka"]:
+ #Prepare config
+ kafkacontent=prepareforAll(kafkafile)
+ kafkacontent=kafkaPreparefile(kafkacontent,kafka)
+ #Set ssl context configuration
+ kafkacontent=ssl_context(kafkacontent,kafka["ssl_context"])
+ #Create object
+ nifi.create(kafka["name"],kafkacontent)
+ nifi.changeVariable(kafka["name"],"group_id", kafka["group_id"])
+ nifi.changeVariable(kafka["name"],"bootstrap_servers", kafka["bootstrap_servers"])
+ nifi.changeVariable(kafka["name"],"topic", kafka["topic"])
+ #enable SSL
+ nifi.enableSSL(kafka["name"])
+ newProcessInfo(kafka["name"])
+ updateComponent(kafka)
if "connection" in data["nifi"]:
for connection in data["nifi"]["connection"]:
nifi.makeConnection(connection["from"],connection["to"])
elif args.option == "delete":
+ makeActionWithAllType(typesSSL,nifi.disableSSL)
makeActionWithAllType(types,nifi.deleteProcess)
+ #Delete of SQS, not the notification
+ if "S3" in data["nifi"]:
+ for s3 in data["nifi"]["S3"]:
+ s3["queue_name"]= s3["AWS_S3_BUCKET"]+"_events"
+ deleteSQS(s3)
+ if "SQS" in data["nifi"]:
+ for sqs in data["nifi"]["S3"]:
+ deleteSQS(sqs)
elif args.option == "start":
makeActionWithAllType(types,nifi.startProcess)
elif args.option == "stop":
diff --git a/cli/NifiManagement.py b/cli/sources/NifiManagement.py
similarity index 82%
rename from cli/NifiManagement.py
rename to cli/sources/NifiManagement.py
index ee471793..0b2ca023 100755
--- a/cli/NifiManagement.py
+++ b/cli/sources/NifiManagement.py
@@ -97,11 +97,11 @@ def getProcessGroup(self, process_groupName):
return None
- def create(self,name,file):
+ def create(self,name,filecontent):
groupid=self.getProcessGroup(name)
if not groupid :
fields = {"groupName": name, "positionX": "-150","positionY": "-150","clientId" : "aaa",
- "file": (file, open(file).read(), "application/json"),}
+ "file": ("namefile", json.dumps(filecontent).encode('utf-8'), "application/json"),}
body, header = encode_multipart_formdata(fields)
response= self.callHttp(requests.post,"/nifi-api/process-groups/root/process-groups/upload",body,header)
@@ -155,4 +155,24 @@ def executionNode(self, pg, process, node):
def nifiVersion(self):
response= self.callHttp(requests.get,"/nifi-api/system-diagnostics",'')
- return response.json()["systemDiagnostics"]["aggregateSnapshot"]["versionInfo"]["niFiVersion"]
\ No newline at end of file
+ return response.json()["systemDiagnostics"]["aggregateSnapshot"]["versionInfo"]["niFiVersion"]
+
+ def enableSSL(self,name):
+ groupid=self.getProcessGroup(name)
+ response= self.callHttp(requests.get,"/nifi-api/flow/process-groups/"+groupid+"/controller-services",'')
+ for controller in response.json()["controllerServices"]:
+ if controller["parentGroupId"] == groupid:
+ ssl_context_id=controller["id"]
+ version=controller["revision"]["version"]
+ data='{"revision":{"clientId":"'+groupid+'","version":'+str(version)+'},"disconnectedNodeAcknowledged":false,"state":"ENABLED","uiOnly":true}'
+ response= self.callHttp(requests.put,"/nifi-api/controller-services/"+ ssl_context_id+"/run-status",data)
+
+ def disableSSL(self,name):
+ groupid=self.getProcessGroup(name)
+ response= self.callHttp(requests.get,"/nifi-api/flow/process-groups/"+groupid+"/controller-services",'')
+ for controller in response.json()["controllerServices"]:
+ if controller["parentGroupId"] == groupid:
+ ssl_context_id=controller["id"]
+ version=controller["revision"]["version"]
+ data='{"revision":{"clientId":"'+groupid+'","version":'+str(version)+'},"disconnectedNodeAcknowledged":false,"state":"DISABLED","uiOnly":true}'
+ response= self.callHttp(requests.put,"/nifi-api/controller-services/"+ ssl_context_id+"/run-status",data)
\ No newline at end of file
diff --git a/cli/sources/auxiliaryFunctions.py b/cli/sources/auxiliaryFunctions.py
new file mode 100644
index 00000000..e413eb06
--- /dev/null
+++ b/cli/sources/auxiliaryFunctions.py
@@ -0,0 +1,53 @@
+# dCNiOs
+# Copyright (C) 2023 - GRyCAP - Universitat Politecnica de Valencia
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Apache 2.0 Licence as published by
+# the Apache Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Apache 2.0 License for more details.
+#
+# You should have received a copy of the Apache 2.0 License
+# along with this program. If not, see .
+
+#!/usr/bin/env python3
+
+import json
+import os
+
+def addSensibleVariable(file,processorName,key,value):
+ for processor in file["flowContents"]["processors"]:
+ if processor["name"] == processorName:
+ processor["properties"][key]=value
+ return file
+
+
+def prepareforAll(fileName):
+ with open(fileName) as f:
+ filecontent = json.load(f)
+ filecontent["snapshotMetadata"]={}
+ filecontent["snapshotMetadata"]["bucketIdentifier"]=""
+ filecontent["snapshotMetadata"]["flowIdentifier"]=""
+ filecontent["snapshotMetadata"]["version"]=-1
+ return filecontent
+
+
+def kafkaPreparefile(filecontent,kafka):
+ filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","security.protocol",kafka["security_protocol"])
+ filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","sasl.mechanism",kafka["sasl_mechanism"])
+ filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","sasl.username",kafka["sasl_username"])
+ filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","sasl.password",kafka["sasl_password"])
+ if "separate_by_key" in kafka and kafka["separate_by_key"] == "true" :
+ filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","separate-by-key","true")
+ filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","message-demarcator",kafka["message_demarcator"])
+ else:
+ filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","separate-by-key","false")
+ return filecontent
+
+def ssl_context(filecontent, variables):
+ for var in variables:
+ filecontent["flowContents"]["controllerServices"][0]["properties"][var.replace("_", " ")]=variables[var]
+ return filecontent
diff --git a/cli/sources/aws.py b/cli/sources/aws.py
new file mode 100644
index 00000000..af3e146c
--- /dev/null
+++ b/cli/sources/aws.py
@@ -0,0 +1,79 @@
+# dCNiOs
+# Copyright (C) 2023 - GRyCAP - Universitat Politecnica de Valencia
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Apache 2.0 Licence as published by
+# the Apache Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Apache 2.0 License for more details.
+#
+# You should have received a copy of the Apache 2.0 License
+# along with this program. If not, see .
+
+#!/usr/bin/env python3
+
+import json
+import os
+import boto3
+from sources.auxiliaryFunctions import addSensibleVariable
+
+
+
+def getAWSCredentials(configuration):
+ if "AWS_ACCESS_KEY_ID" in os.environ and os.environ["AWS_ACCESS_KEY_ID"] != "" and \
+ "AWS_SECRET_ACCESS_KEY" in os.environ and os.environ["AWS_SECRET_ACCESS_KEY"] != "" :
+ print("AWS Credentials: Credentials from environment")
+ configuration["AWS_ACCESS_KEY_ID"] = os.environ["AWS_ACCESS_KEY_ID"]
+ configuration["AWS_SECRET_ACCESS_KEY"] = os.environ["AWS_SECRET_ACCESS_KEY"]
+ elif os.path.exists(os.environ["HOME"]+"/.aws/credentials"):
+ print("AWS Credentials: Credentials from credentials file")
+ session = boto3.Session(profile_name="default")
+ credentials = session.get_credentials()
+ configuration["AWS_ACCESS_KEY_ID"]= credentials.access_key
+ configuration["AWS_SECRET_ACCESS_KEY"] =credentials.secret_key
+ elif "AWS_ACCESS_KEY_ID" in configuration and "AWS_SECRET_ACCESS_KEY" in configuration:
+ print("AWS Credentials: Credentials from configuration file")
+
+def createSQS(configuration):
+ accountID=boto3.client('sts').get_caller_identity().get('Account')
+ sqsClient=boto3.client('sqs', region_name=configuration["AWS_DEFAULT_REGION"])
+ response = sqsClient.create_queue(QueueName=configuration["queue_name"], Attributes={
+ "SqsManagedSseEnabled":"false",
+ "Policy": '{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"__owner_statement","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::'+accountID+':root"},"Action":"SQS:*","Resource":"arn:aws:sqs:'+configuration["AWS_DEFAULT_REGION"]+':'+accountID+':'+configuration["queue_name"]+'"},{"Sid":"__sender_statement","Effect":"Allow","Principal":{"AWS":"*"},"Action":"SQS:SendMessage","Resource":"arn:aws:sqs:'+configuration["AWS_DEFAULT_REGION"]+':'+accountID+':'+configuration["queue_name"]+'"}]}'
+ })
+ return sqsClient.get_queue_url(QueueName=configuration["queue_name"])
+
+def sqsPreparefile(filecontent,configuration):
+ filecontent=addSensibleVariable(filecontent,"GetSQS","Access Key",configuration["AWS_ACCESS_KEY_ID"])
+ filecontent=addSensibleVariable(filecontent,"GetSQS","Secret Key",configuration["AWS_SECRET_ACCESS_KEY"])
+ filecontent=addSensibleVariable(filecontent,"GetSQS","Region",configuration["AWS_DEFAULT_REGION"])
+ return filecontent
+
+def deleteSQS(configuration):
+ sqsClient=boto3.client('sqs', region_name=configuration["AWS_DEFAULT_REGION"] )
+ queue = sqsClient.get_queue_url(QueueName=configuration["queue_name"])
+ response = sqsClient.delete_queue(QueueUrl=queue['QueueUrl'])
+
+
+def s3NotificationSQS(configuration):
+ owner=boto3.client('sts').get_caller_identity().get('Account')
+ arn= "arn:aws:sqs:"+configuration["AWS_DEFAULT_REGION"]+ ":" +owner+":" + configuration["queue_name"]
+ s3 = boto3.resource('s3')
+ bucket_notification = s3.BucketNotification(configuration["AWS_S3_BUCKET"])
+ response = bucket_notification.put(
+ NotificationConfiguration={
+ 'QueueConfigurations': [
+ {
+ 'Id': configuration["AWS_S3_BUCKET"]+"_event",
+ 'QueueArn': arn,
+ 'Events': [
+ 's3:ObjectCreated:*',
+ ],
+ },
+ ],
+ },
+ ExpectedBucketOwner=owner,
+ SkipDestinationValidation=False)
diff --git a/cli/template/Kafka.json b/cli/template/Kafka.json
new file mode 100644
index 00000000..4f4bed92
--- /dev/null
+++ b/cli/template/Kafka.json
@@ -0,0 +1,507 @@
+{
+ "flowContents": {
+ "identifier": "d6db9e57-c6f7-3f22-89c8-500a9113d1d0",
+ "instanceIdentifier": "8bb068ef-a5fe-1674-0000-0000101002d6",
+ "name": "Kafka",
+ "comments": "",
+ "position": {
+ "x": -292.80182860376476,
+ "y": -279.00025521335857
+ },
+ "processGroups": [],
+ "remoteProcessGroups": [],
+ "processors": [
+ {
+ "identifier": "4c5bc270-3928-3b09-97f7-3b69e9bade54",
+ "instanceIdentifier": "e2f95b7f-0572-38b7-8613-37a44e5174b2",
+ "name": "ConsumeKafka_2_6",
+ "comments": "",
+ "position": {
+ "x": -280.0,
+ "y": 984.0
+ },
+ "type": "org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6",
+ "bundle": {
+ "group": "org.apache.nifi",
+ "artifact": "nifi-kafka-2-6-nar",
+ "version": "1.20.0"
+ },
+ "properties": {
+ "header-name-regex": null,
+ "Commit Offsets": "true",
+ "group.id": "${group_id}",
+ "bootstrap.servers": "${bootstrap_servers}",
+ "topic_type": "pattern",
+ "sasl.kerberos.principal": null,
+ "sasl.kerberos.service.name": null,
+ "kerberos-credentials-service": null,
+ "max-uncommit-offset-wait": "1 secs",
+ "sasl.mechanism": "PLAIN",
+ "honor-transactions": "true",
+ "kerberos-user-service": null,
+ "message-header-encoding": "UTF-8",
+ "message-demarcator": null,
+ "sasl.username": "",
+ "max.poll.records": "10000",
+ "aws.profile.name": null,
+ "security.protocol": "SASL_SSL",
+ "ssl.context.service": "3598a8e1-9ef0-3b7f-9c69-83466719eb43",
+ "sasl.token.auth": "false",
+ "sasl.kerberos.keytab": null,
+ "Communications Timeout": "60 secs",
+ "topic": "${topic}",
+ "separate-by-key": "false",
+ "key-attribute-encoding": "utf-8",
+ "auto.offset.reset": "latest"
+ },
+ "propertyDescriptors": {
+ "header-name-regex": {
+ "name": "header-name-regex",
+ "displayName": "Headers to Add as Attributes (Regex)",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Commit Offsets": {
+ "name": "Commit Offsets",
+ "displayName": "Commit Offsets",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "group.id": {
+ "name": "group.id",
+ "displayName": "Group ID",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "bootstrap.servers": {
+ "name": "bootstrap.servers",
+ "displayName": "Kafka Brokers",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "topic_type": {
+ "name": "topic_type",
+ "displayName": "Topic Name Format",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "sasl.kerberos.principal": {
+ "name": "sasl.kerberos.principal",
+ "displayName": "Kerberos Principal",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "sasl.kerberos.service.name": {
+ "name": "sasl.kerberos.service.name",
+ "displayName": "Kerberos Service Name",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "kerberos-credentials-service": {
+ "name": "kerberos-credentials-service",
+ "displayName": "Kerberos Credentials Service",
+ "identifiesControllerService": true,
+ "sensitive": false
+ },
+ "max-uncommit-offset-wait": {
+ "name": "max-uncommit-offset-wait",
+ "displayName": "Max Uncommitted Time",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "sasl.mechanism": {
+ "name": "sasl.mechanism",
+ "displayName": "SASL Mechanism",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "honor-transactions": {
+ "name": "honor-transactions",
+ "displayName": "Honor Transactions",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "kerberos-user-service": {
+ "name": "kerberos-user-service",
+ "displayName": "Kerberos User Service",
+ "identifiesControllerService": true,
+ "sensitive": false
+ },
+ "message-header-encoding": {
+ "name": "message-header-encoding",
+ "displayName": "Message Header Encoding",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "message-demarcator": {
+ "name": "message-demarcator",
+ "displayName": "Message Demarcator",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "sasl.username": {
+ "name": "sasl.username",
+ "displayName": "Username",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "max.poll.records": {
+ "name": "max.poll.records",
+ "displayName": "Max Poll Records",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "aws.profile.name": {
+ "name": "aws.profile.name",
+ "displayName": "AWS Profile Name",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "security.protocol": {
+ "name": "security.protocol",
+ "displayName": "Security Protocol",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "ssl.context.service": {
+ "name": "ssl.context.service",
+ "displayName": "SSL Context Service",
+ "identifiesControllerService": true,
+ "sensitive": false
+ },
+ "sasl.token.auth": {
+ "name": "sasl.token.auth",
+ "displayName": "Token Authentication",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "sasl.kerberos.keytab": {
+ "name": "sasl.kerberos.keytab",
+ "displayName": "Kerberos Keytab",
+ "identifiesControllerService": false,
+ "sensitive": false,
+ "resourceDefinition": {
+ "cardinality": "SINGLE",
+ "resourceTypes": [
+ "FILE"
+ ]
+ }
+ },
+ "Communications Timeout": {
+ "name": "Communications Timeout",
+ "displayName": "Communications Timeout",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "topic": {
+ "name": "topic",
+ "displayName": "Topic Name(s)",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "separate-by-key": {
+ "name": "separate-by-key",
+ "displayName": "Separate By Key",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "sasl.password": {
+ "name": "sasl.password",
+ "displayName": "Password",
+ "identifiesControllerService": false,
+ "sensitive": true
+ },
+ "key-attribute-encoding": {
+ "name": "key-attribute-encoding",
+ "displayName": "Key Attribute Encoding",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "auto.offset.reset": {
+ "name": "auto.offset.reset",
+ "displayName": "Offset Reset",
+ "identifiesControllerService": false,
+ "sensitive": false
+ }
+ },
+ "style": {},
+ "schedulingPeriod": "0 sec",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "executionNode": "ALL",
+ "penaltyDuration": "30 sec",
+ "yieldDuration": "1 sec",
+ "bulletinLevel": "WARN",
+ "runDurationMillis": 0,
+ "concurrentlySchedulableTaskCount": 1,
+ "autoTerminatedRelationships": [],
+ "scheduledState": "ENABLED",
+ "retryCount": 10,
+ "retriedRelationships": [],
+ "backoffMechanism": "PENALIZE_FLOWFILE",
+ "maxBackoffPeriod": "10 mins",
+ "componentType": "PROCESSOR",
+ "groupIdentifier": "d6db9e57-c6f7-3f22-89c8-500a9113d1d0"
+ },
+ {
+ "identifier": "2c6ce5f8-8b93-316f-aa37-64bdd6be5716",
+ "instanceIdentifier": "15fb5fb5-cd8b-394c-bb87-a5caaf0f4786",
+ "name": "Base64EncodeContent",
+ "comments": "",
+ "position": {
+ "x": -288.0,
+ "y": 1192.0
+ },
+ "type": "org.apache.nifi.processors.standard.Base64EncodeContent",
+ "bundle": {
+ "group": "org.apache.nifi",
+ "artifact": "nifi-standard-nar",
+ "version": "1.20.0"
+ },
+ "properties": {
+ "Mode": "Encode"
+ },
+ "propertyDescriptors": {
+ "Mode": {
+ "name": "Mode",
+ "displayName": "Mode",
+ "identifiesControllerService": false,
+ "sensitive": false
+ }
+ },
+ "style": {},
+ "schedulingPeriod": "0 sec",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "executionNode": "ALL",
+ "penaltyDuration": "30 sec",
+ "yieldDuration": "1 sec",
+ "bulletinLevel": "WARN",
+ "runDurationMillis": 0,
+ "concurrentlySchedulableTaskCount": 1,
+ "autoTerminatedRelationships": [
+ "failure"
+ ],
+ "scheduledState": "ENABLED",
+ "retryCount": 10,
+ "retriedRelationships": [],
+ "backoffMechanism": "PENALIZE_FLOWFILE",
+ "maxBackoffPeriod": "10 mins",
+ "componentType": "PROCESSOR",
+ "groupIdentifier": "d6db9e57-c6f7-3f22-89c8-500a9113d1d0"
+ }
+ ],
+ "inputPorts": [],
+ "outputPorts": [
+ {
+ "identifier": "860bd814-2d5b-3cc9-abfb-6d8e60eed975",
+ "instanceIdentifier": "8cb83eb7-b95a-3f5b-9d2e-bc837aee779c",
+ "name": "Output",
+ "position": {
+ "x": -387.3546332690239,
+ "y": 1514.6725787122195
+ },
+ "type": "OUTPUT_PORT",
+ "concurrentlySchedulableTaskCount": 1,
+ "scheduledState": "ENABLED",
+ "allowRemoteAccess": false,
+ "componentType": "OUTPUT_PORT",
+ "groupIdentifier": "d6db9e57-c6f7-3f22-89c8-500a9113d1d0"
+ }
+ ],
+ "connections": [
+ {
+ "identifier": "64446fab-214b-3f65-88e7-57749ac9d6dc",
+ "instanceIdentifier": "ac9dcac1-bf45-3f7b-ae50-c7b1e059db97",
+ "name": "",
+ "source": {
+ "id": "4c5bc270-3928-3b09-97f7-3b69e9bade54",
+ "type": "PROCESSOR",
+ "groupId": "d6db9e57-c6f7-3f22-89c8-500a9113d1d0",
+ "name": "ConsumeKafka_2_6",
+ "comments": "",
+ "instanceIdentifier": "e2f95b7f-0572-38b7-8613-37a44e5174b2"
+ },
+ "destination": {
+ "id": "2c6ce5f8-8b93-316f-aa37-64bdd6be5716",
+ "type": "PROCESSOR",
+ "groupId": "d6db9e57-c6f7-3f22-89c8-500a9113d1d0",
+ "name": "Base64EncodeContent",
+ "comments": "",
+ "instanceIdentifier": "15fb5fb5-cd8b-394c-bb87-a5caaf0f4786"
+ },
+ "labelIndex": 1,
+ "zIndex": 0,
+ "selectedRelationships": [
+ "success"
+ ],
+ "backPressureObjectThreshold": 10000,
+ "backPressureDataSizeThreshold": "1 GB",
+ "flowFileExpiration": "0 sec",
+ "prioritizers": [],
+ "bends": [],
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "partitioningAttribute": "",
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "componentType": "CONNECTION",
+ "groupIdentifier": "d6db9e57-c6f7-3f22-89c8-500a9113d1d0"
+ },
+ {
+ "identifier": "4944dff6-e57c-3ae6-9d74-0b1935c28aab",
+ "instanceIdentifier": "9ad932a7-546b-3fa0-a0fc-fdf7b3399bd9",
+ "name": "",
+ "source": {
+ "id": "2c6ce5f8-8b93-316f-aa37-64bdd6be5716",
+ "type": "PROCESSOR",
+ "groupId": "d6db9e57-c6f7-3f22-89c8-500a9113d1d0",
+ "name": "Base64EncodeContent",
+ "comments": "",
+ "instanceIdentifier": "15fb5fb5-cd8b-394c-bb87-a5caaf0f4786"
+ },
+ "destination": {
+ "id": "860bd814-2d5b-3cc9-abfb-6d8e60eed975",
+ "type": "OUTPUT_PORT",
+ "groupId": "d6db9e57-c6f7-3f22-89c8-500a9113d1d0",
+ "name": "Output",
+ "instanceIdentifier": "8cb83eb7-b95a-3f5b-9d2e-bc837aee779c"
+ },
+ "labelIndex": 1,
+ "zIndex": 0,
+ "selectedRelationships": [
+ "success"
+ ],
+ "backPressureObjectThreshold": 10000,
+ "backPressureDataSizeThreshold": "1 GB",
+ "flowFileExpiration": "0 sec",
+ "prioritizers": [],
+ "bends": [],
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "partitioningAttribute": "",
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "componentType": "CONNECTION",
+ "groupIdentifier": "d6db9e57-c6f7-3f22-89c8-500a9113d1d0"
+ }
+ ],
+ "labels": [],
+ "funnels": [],
+ "controllerServices": [
+ {
+ "identifier": "3598a8e1-9ef0-3b7f-9c69-83466719eb43",
+ "instanceIdentifier": "8bb04183-a5fe-1674-0000-00004c347b53",
+ "name": "StandardRestrictedSSLContextService",
+ "comments": "",
+ "type": "org.apache.nifi.ssl.StandardRestrictedSSLContextService",
+ "bundle": {
+ "group": "org.apache.nifi",
+ "artifact": "nifi-ssl-context-service-nar",
+ "version": "1.20.0"
+ },
+ "properties": {
+ "Truststore Type": "",
+ "SSL Protocol": "",
+ "Keystore Type": null,
+ "Truststore Filename": "",
+ "Keystore Filename": null
+ },
+ "propertyDescriptors": {
+ "Truststore Type": {
+ "name": "Truststore Type",
+ "displayName": "Truststore Type",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "SSL Protocol": {
+ "name": "SSL Protocol",
+ "displayName": "TLS Protocol",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Keystore Type": {
+ "name": "Keystore Type",
+ "displayName": "Keystore Type",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Truststore Filename": {
+ "name": "Truststore Filename",
+ "displayName": "Truststore Filename",
+ "identifiesControllerService": false,
+ "sensitive": false,
+ "resourceDefinition": {
+ "cardinality": "SINGLE",
+ "resourceTypes": [
+ "FILE"
+ ]
+ }
+ },
+ "Keystore Password": {
+ "name": "Keystore Password",
+ "displayName": "Keystore Password",
+ "identifiesControllerService": false,
+ "sensitive": true
+ },
+ "key-password": {
+ "name": "key-password",
+ "displayName": "Key Password",
+ "identifiesControllerService": false,
+ "sensitive": true
+ },
+ "Truststore Password": {
+ "name": "Truststore Password",
+ "displayName": "Truststore Password",
+ "identifiesControllerService": false,
+ "sensitive": true
+ },
+ "Keystore Filename": {
+ "name": "Keystore Filename",
+ "displayName": "Keystore Filename",
+ "identifiesControllerService": false,
+ "sensitive": false,
+ "resourceDefinition": {
+ "cardinality": "SINGLE",
+ "resourceTypes": [
+ "FILE"
+ ]
+ }
+ }
+ },
+ "controllerServiceApis": [
+ {
+ "type": "org.apache.nifi.ssl.SSLContextService",
+ "bundle": {
+ "group": "org.apache.nifi",
+ "artifact": "nifi-standard-services-api-nar",
+ "version": "1.20.0"
+ }
+ },
+ {
+ "type": "org.apache.nifi.ssl.RestrictedSSLContextService",
+ "bundle": {
+ "group": "org.apache.nifi",
+ "artifact": "nifi-standard-services-api-nar",
+ "version": "1.20.0"
+ }
+ }
+ ],
+ "scheduledState": "DISABLED",
+ "bulletinLevel": "WARN",
+ "componentType": "CONTROLLER_SERVICE",
+ "groupIdentifier": "8bb068ef-a5fe-1674-0000-0000101002d6"
+ }
+ ],
+ "variables": {
+ "group_id": "",
+ "bootstrap_servers":"",
+ "topic":""
+ },
+ "defaultFlowFileExpiration": "0 sec",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "componentType": "PROCESS_GROUP",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE"
+ },
+ "externalControllerServices": {},
+ "parameterContexts": {},
+ "flowEncodingVersion": "1.0",
+ "parameterProviders": {},
+ "latest": false
+}
\ No newline at end of file
diff --git a/cli/template/SQS_recive.json b/cli/template/SQS_recive.json
new file mode 100644
index 00000000..527ed64e
--- /dev/null
+++ b/cli/template/SQS_recive.json
@@ -0,0 +1,247 @@
+{
+ "flowContents": {
+ "identifier": "5ebf3a0d-1c2a-3de8-a82c-8432eda3fa6f",
+ "instanceIdentifier": "cdbdf443-018e-1000-ffff-ffff91c0f7c4",
+ "name": "SQS_recived",
+ "comments": "",
+ "position": {
+ "x": -520.0,
+ "y": 8.0
+ },
+ "processGroups": [],
+ "remoteProcessGroups": [],
+ "processors": [
+ {
+ "identifier": "3e72aaee-214f-35e2-a673-e165a24cbb16",
+ "instanceIdentifier": "bfa6a082-f997-3703-bfdd-3b4e22400b2c",
+ "name": "GetSQS",
+ "comments": "",
+ "position": {
+ "x": 824.0,
+ "y": 800.0
+ },
+ "type": "org.apache.nifi.processors.aws.sqs.GetSQS",
+ "bundle": {
+ "group": "org.apache.nifi",
+ "artifact": "nifi-aws-nar",
+ "version": "1.20.0"
+ },
+ "properties": {
+ "Proxy Host": null,
+ "Auto Delete Messages": "true",
+ "Endpoint Override URL": null,
+ "Queue URL": "${queueurl}",
+ "Receive Message Wait Time": "0 sec",
+ "AWS Credentials Provider service": null,
+ "Batch Size": "1",
+ "Visibility Timeout": "15 mins",
+ "Communications Timeout": "30 secs",
+ "Credentials File": null,
+ "Region": "us-east-1",
+ "proxy-user-name": null,
+ "Character Set": "UTF-8",
+ "Proxy Host Port": null
+ },
+ "propertyDescriptors": {
+ "Proxy Host": {
+ "name": "Proxy Host",
+ "displayName": "Proxy Host",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Auto Delete Messages": {
+ "name": "Auto Delete Messages",
+ "displayName": "Auto Delete Messages",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Access Key": {
+ "name": "Access Key",
+ "displayName": "Access Key ID",
+ "identifiesControllerService": false,
+ "sensitive": true
+ },
+ "Endpoint Override URL": {
+ "name": "Endpoint Override URL",
+ "displayName": "Endpoint Override URL",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Queue URL": {
+ "name": "Queue URL",
+ "displayName": "Queue URL",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Receive Message Wait Time": {
+ "name": "Receive Message Wait Time",
+ "displayName": "Receive Message Wait Time",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "AWS Credentials Provider service": {
+ "name": "AWS Credentials Provider service",
+ "displayName": "AWS Credentials Provider Service",
+ "identifiesControllerService": true,
+ "sensitive": false
+ },
+ "Batch Size": {
+ "name": "Batch Size",
+ "displayName": "Batch Size",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Visibility Timeout": {
+ "name": "Visibility Timeout",
+ "displayName": "Visibility Timeout",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "proxy-user-password": {
+ "name": "proxy-user-password",
+ "displayName": "Proxy Password",
+ "identifiesControllerService": false,
+ "sensitive": true
+ },
+ "Communications Timeout": {
+ "name": "Communications Timeout",
+ "displayName": "Communications Timeout",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Secret Key": {
+ "name": "Secret Key",
+ "displayName": "Secret Access Key",
+ "identifiesControllerService": false,
+ "sensitive": true
+ },
+ "Credentials File": {
+ "name": "Credentials File",
+ "displayName": "Credentials File",
+ "identifiesControllerService": false,
+ "sensitive": false,
+ "resourceDefinition": {
+ "cardinality": "SINGLE",
+ "resourceTypes": [
+ "FILE"
+ ]
+ }
+ },
+ "Region": {
+ "name": "Region",
+ "displayName": "Region",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "proxy-user-name": {
+ "name": "proxy-user-name",
+ "displayName": "Proxy Username",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Character Set": {
+ "name": "Character Set",
+ "displayName": "Character Set",
+ "identifiesControllerService": false,
+ "sensitive": false
+ },
+ "Proxy Host Port": {
+ "name": "Proxy Host Port",
+ "displayName": "Proxy Host Port",
+ "identifiesControllerService": false,
+ "sensitive": false
+ }
+ },
+ "style": {},
+ "schedulingPeriod": "0 sec",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "executionNode": "ALL",
+ "penaltyDuration": "30 sec",
+ "yieldDuration": "1 sec",
+ "bulletinLevel": "WARN",
+ "runDurationMillis": 0,
+ "concurrentlySchedulableTaskCount": 1,
+ "autoTerminatedRelationships": [],
+ "scheduledState": "ENABLED",
+ "retryCount": 10,
+ "retriedRelationships": [],
+ "backoffMechanism": "PENALIZE_FLOWFILE",
+ "maxBackoffPeriod": "10 mins",
+ "componentType": "PROCESSOR",
+ "groupIdentifier": "5ebf3a0d-1c2a-3de8-a82c-8432eda3fa6f"
+ }
+ ],
+ "inputPorts": [],
+ "outputPorts": [
+ {
+ "identifier": "3df42ffa-d3f4-3362-ae9e-ae21a408bf0a",
+ "instanceIdentifier": "cb634233-cc76-34d8-b79e-ed7eb6318da4",
+ "name": "output",
+ "position": {
+ "x": 208.0,
+ "y": 840.0
+ },
+ "type": "OUTPUT_PORT",
+ "concurrentlySchedulableTaskCount": 1,
+ "scheduledState": "ENABLED",
+ "allowRemoteAccess": false,
+ "componentType": "OUTPUT_PORT",
+ "groupIdentifier": "5ebf3a0d-1c2a-3de8-a82c-8432eda3fa6f"
+ }
+ ],
+ "connections": [
+ {
+ "identifier": "2d2c5a51-2f26-3c1d-9434-d1ffd2c2f1bb",
+ "instanceIdentifier": "2128dcd4-2975-326c-b852-d950e7eb592d",
+ "name": "",
+ "source": {
+ "id": "3e72aaee-214f-35e2-a673-e165a24cbb16",
+ "type": "PROCESSOR",
+ "groupId": "5ebf3a0d-1c2a-3de8-a82c-8432eda3fa6f",
+ "name": "GetSQS",
+ "comments": "",
+ "instanceIdentifier": "bfa6a082-f997-3703-bfdd-3b4e22400b2c"
+ },
+ "destination": {
+ "id": "3df42ffa-d3f4-3362-ae9e-ae21a408bf0a",
+ "type": "OUTPUT_PORT",
+ "groupId": "5ebf3a0d-1c2a-3de8-a82c-8432eda3fa6f",
+ "name": "output",
+ "instanceIdentifier": "cb634233-cc76-34d8-b79e-ed7eb6318da4"
+ },
+ "labelIndex": 1,
+ "zIndex": 0,
+ "selectedRelationships": [
+ "success"
+ ],
+ "backPressureObjectThreshold": 10000,
+ "backPressureDataSizeThreshold": "1 GB",
+ "flowFileExpiration": "0 sec",
+ "prioritizers": [],
+ "bends": [],
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "partitioningAttribute": "",
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "componentType": "CONNECTION",
+ "groupIdentifier": "5ebf3a0d-1c2a-3de8-a82c-8432eda3fa6f"
+ }
+ ],
+ "labels": [],
+ "funnels": [],
+ "controllerServices": [],
+ "variables": {
+ "queueurl": "a"
+ },
+ "defaultFlowFileExpiration": "0 sec",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "componentType": "PROCESS_GROUP",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE"
+ },
+ "externalControllerServices": {},
+ "parameterContexts": {},
+ "flowEncodingVersion": "1.0",
+ "parameterProviders": {},
+ "latest": false
+}
\ No newline at end of file