Skip to content

7 construction sequence generation #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 160 additions & 14 deletions csv2nq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
from os.path import exists
import datetime
import collections

# Local imports
from nq import nqwriter
Expand All @@ -23,25 +24,29 @@
# some features off after they have been added.

# Domain model expects triples to be expanded
HAS_POPULATION_MODEL = "feature#PopulationModel"
HAS_POPULATION_MODEL = "feature#PopulationModel"
# Strings appended to the average case to get min and max members of each population triplet
MIN_SUFFIX = "_Min"
MAX_SUFFIX = "_Max"

# Flags rather than naming conventions denote secondary and normal operational process threats
HAS_THREAT_TYPE_FLAGS = "feature#ThreatTypeFlags"
HAS_THREAT_TYPE_FLAGS = "feature#ThreatTypeFlags"

# Flags denote whether threats or control strategies should be used in current or future
# risk calculations.
HAS_RISK_TYPE_FLAGS = "feature#RiskTypeFlags"
HAS_RISK_TYPE_FLAGS = "feature#RiskTypeFlags"

# Threats can have mixed causes (both TWAS and MS), which means SSM doesn't need to raise the
# likelihood of each MS to at least the level equivalent to the TW level of its TWAS.
HAS_MIXED_THREAT_CAUSES = "feature#MixedThreatCauses"

# Asset and relationship types are flagged if they are only used for system model construction
# inference.
HAS_CONSTRUCTION_STATE = "feature#ConstructionStateFlags"
HAS_CONSTRUCTION_STATE = "feature#ConstructionStateFlags"

# Construction patterns form a partial sequence based on predecessor and successor relationships
# instead of having a specified priority (position) in a single construction sequence
HAS_CONSTRUCTION_DEPENDENCIES = "feature#ConstructionDependencies"

# There is no feature for CSGs having optional controls because as fas as SSM is concerned
# that is an optional feature.
Expand Down Expand Up @@ -70,18 +75,17 @@ def output_domain_model(nqw, unfiltered, heading):
comment_index = header.index("comment")
supported_index = header.index("supported")

population_support = False
for row in reader:
# Skip the first line which contains default values for csvformat
if DUMMY_URI in row: continue
# Extract the information we need from the subsequent row

# Check if the domain model specifies support for asset populations
if(row[uri_index] == HAS_POPULATION_MODEL):
population_support = True
if(not raw.expanded):
print("Domain model specifies population support, but this was suppressed by the csv2nq command line")
supported = row[supported_index].lower() == "true"

# Write out the line if the feature is supported
supported = row[supported_index].lower() == "true"
if(supported):
feature_list.append(row[uri_index])

Expand Down Expand Up @@ -978,13 +982,18 @@ def output_matching_patterns(nqw, heading, roles, assets, relationships, nodes,
# Output a spacer at the end of this section
nqw.write_comment("")

def output_construction_patterns(nqw, heading, roles, assets, relationships, nodes, links):
def output_construction_patterns(nqw, heading, roles, assets, relationships, nodes, links, cppredecessor, cpsequence):
# Output a heading for this section
nqw.write_comment("")
nqw.write_comment(heading)
nqw.write_comment("")

# Output the matching pattern
# Determine the source of construction sequence priorities
if(HAS_CONSTRUCTION_DEPENDENCIES in feature_list):
# Extract predecessor/successor relationships and create the CP partial sequence
create_construction_sequence(cppredecessor, cpsequence)

# Output the construction pattern
with open("ConstructionPattern.csv", newline="") as csvfile:
# Create the CSV reader object
reader = csv.reader(csvfile)
Expand All @@ -995,7 +1004,8 @@ def output_construction_patterns(nqw, heading, roles, assets, relationships, nod
label_index = header.index("label")
comment_index = header.index("comment")
hasMatchingPattern_index = header.index("hasMatchingPattern")
hasPriority_index = header.index("hasPriority")
if(HAS_CONSTRUCTION_DEPENDENCIES not in feature_list):
hasPriority_index = header.index("hasPriority")
iterate_index = header.index("iterate")
maxIterations_index = header.index("maxIterations")

Expand All @@ -1009,7 +1019,12 @@ def output_construction_patterns(nqw, heading, roles, assets, relationships, nod
label = nqw.encode_string(row[label_index])
comment = nqw.encode_string(row[comment_index])
hasMatchingPattern = nqw.encode_ssm_uri(row[hasMatchingPattern_index])
hasPriority = nqw.encode_integer(row[hasPriority_index])
if(HAS_CONSTRUCTION_DEPENDENCIES in feature_list):
# set the priority to the computed rank in the partial sequence
hasPriority = nqw.encode_integer(cpsequence[row[uri_index]])
else:
# set the priority to the hasPriority value from the CSV file
hasPriority = nqw.encode_integer(row[hasPriority_index])
iterate = nqw.encode_boolean(row[iterate_index].lower())
maxIterations = nqw.encode_integer(row[maxIterations_index])

Expand All @@ -1024,6 +1039,7 @@ def output_construction_patterns(nqw, heading, roles, assets, relationships, nod

# Output a spacer at the end of this resource
nqw.write_comment("")

except StopIteration:
pass

Expand Down Expand Up @@ -1125,6 +1141,103 @@ def output_construction_patterns(nqw, heading, roles, assets, relationships, nod
if row[hasInferredLink_index] not in links:
links[row[hasInferredLink_index]] = create_link(row[hasInferredLink_index], roles, relationships)

def create_construction_sequence(cppredecessor, cpsequence):

with open("ConstructionPattern.csv", newline="") as csvfile:
# Create the CSV reader object
reader = csv.reader(csvfile)

# Check that the table is as expected: if fields are missing this will raise an exception
header = next(reader)
uri_index = header.index("URI")

for row in reader:
# Skip the first line which contains default values for csvformat
if DUMMY_URI in row: continue

# Initialise the predecessors dictionary entry (an empty list) and sequence number (initially zero)
cppredecessor[row[uri_index]] = []
cpsequence[row[uri_index]] = 0

# Get the construction pattern predecessors and save them
with open("ConstructionPredecessor.csv", newline="") as csvfile:
# Create the CSV reader object
reader = csv.reader(csvfile)

# Check that the table is as expected: if fields are missing this will raise an exception
header = next(reader)
uri_index = header.index("URI")
predecessor_index = header.index("hasPredecessor")
fake_index = header.index("fake")

for row in reader:
# Skip the first line which contains default values for csvformat
if DUMMY_URI in row: continue

# Extract and save the information we need
uri = row[uri_index]
predecessor = row[predecessor_index]
fake = row[fake_index].lower()

# Add the predecessor to the set of predecessors for this pattern
if not fake == "true" and predecessor not in cppredecessor[uri]:
cppredecessor[uri].append(predecessor)

# Get the construction pattern successors and save them
with open("ConstructionSuccessor.csv", newline="") as csvfile:
# Create the CSV reader object
reader = csv.reader(csvfile)

# Check that the table is as expected: if fields are missing this will raise an exception
header = next(reader)
uri_index = header.index("URI")
successor_index = header.index("hasSuccessor")
fake_index = header.index("fake")

for row in reader:
# Skip the first line which contains default values for csvformat
if DUMMY_URI in row: continue

# Extract and save the information we need
uri = row[uri_index]
successor = row[successor_index]
fake = row[fake_index].lower()

# Add the URI to the set of predecessors of the successor
if not fake == "true" and uri not in cppredecessor[successor]:
cppredecessor[successor].append(uri)

# Use the data to find patterns and assign their priority
finished = False
i = 1
while not finished:
# Set finished flag until we find something still to do
finished = True

# Find CP with no remaining predecessors, and set their sequence number to a positive value
for uri in cpsequence:
if uri in cppredecessor:
if (cpsequence[uri] == 0) and (len(cppredecessor[uri])==0):
# Found something so the outer loop must continue
finished = False

# Mark this CP for removal
cpsequence[uri] = -1
else:
print("Error: CP {} has no list of predecessors".format(uri.replace("domain#CP-","")))

for uri in cpsequence:
if cpsequence[uri] < 0:
# Remove this CP from the list of predecessors of other CP
for cp in cppredecessor:
if uri in cppredecessor[cp]:
cppredecessor[cp].remove(uri)

# Set the rank of the removed CP
cpsequence[uri] = i

i = i + 1

#
# Control strategies, threats and threat categories and compliance sets: no triplets needed,
# but threats and CSGs do have some min/max properties.
Expand Down Expand Up @@ -2012,7 +2125,28 @@ def output_mapping_file(mapping_filename, ontology, domain_graph):
doc["icons"][uri] = icon
with open(mapping_filename, "w") as output:
output.write(json.dumps(doc, indent=4))


def log_sequence(log, header, cppredecessor, cpsequence):
od = collections.OrderedDict(sorted(cpsequence.items()))
log.write("{}\n".format(header))
for key in od.keys():
log.write("{}: {}".format(key.replace("domain#CP-",""), cpsequence[key]))
if key in cppredecessor:
values = cppredecessor[key]
i = len(values)
if i > 0:
log.write(", predecessors: ")
for uri in values:
i = i - 1
if(i==0):
log.write("{}\n".format(uri.replace("domain#CP-","")))
else:
log.write("{}, ".format(uri.replace("domain#CP-","")))
else:
log.write(", no predecessors\n")
else:
log.write(": found no predecessors list\n")
log.write("\n")

####################################################################################################################################################
#
Expand All @@ -2021,6 +2155,7 @@ def output_mapping_file(mapping_filename, ontology, domain_graph):

# Marshall command line arguments
parser = argparse.ArgumentParser(description="Convert from CSV files to NQ")
parser.add_argument("-l", "--log", dest="log", metavar="filename", help="Logfile for diagnostic output")
parser.add_argument("-i", "--input", dest="input", required=True, metavar="directory", help="Directory containing CSV files for input")
parser.add_argument("-o", "--output", dest="output", required=True, metavar="filename", help="Output NQ filename")
parser.add_argument("-m", "--mapping", dest="mapping", metavar="filename", help="Output JSON icon-mapping filename")
Expand Down Expand Up @@ -2053,6 +2188,14 @@ def output_mapping_file(mapping_filename, ontology, domain_graph):
else:
output_mapping = False

# Open the log file stream
if args["log"]:
log_mapping = True
log_filename = os.path.join(os.getcwd(), args["log"])
log = open(log_filename, mode="w")
else:
log_mapping = False

# Extract and enter input folder
csv_directory = args["input"]
os.chdir(csv_directory)
Expand All @@ -2068,6 +2211,9 @@ def output_mapping_file(mapping_filename, ontology, domain_graph):
twas = {} # will be filled with Trustworthiness Attribute URI
roles = {} # will be filled with Role URI

cppredecessor = {} # will be filled with lists of predecessor CPs
cpsequence = collections.OrderedDict() # will be filled with the calculated priority of each CP

twa_misbehaviour = {} # will be filled with the Misbehaviour per TWA, gleaned from the TWIS

control_sets = {} # will be filled with Control Set URI, gleaned from CSG controls
Expand Down Expand Up @@ -2112,7 +2258,7 @@ def output_mapping_file(mapping_filename, ontology, domain_graph):
# Output Patterns, saving nodes and links for later
output_root_patterns(nqw, "Root pattern definitions", roles, assets, relationships, nodes, role_links)
output_matching_patterns(nqw, "Matching pattern definitions", roles, assets, relationships, nodes, role_links)
output_construction_patterns(nqw, "Construction pattern definitions", roles, assets, relationships, nodes, role_links)
output_construction_patterns(nqw, "Construction pattern definitions", roles, assets, relationships, nodes, role_links, cppredecessor, cpsequence)

# Output Threat Categories, Threats and Control Strategies
output_threat_categories(nqw, "Threat category definitions")
Expand Down