-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate_behavioural_objects.py
120 lines (102 loc) · 4.18 KB
/
create_behavioural_objects.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from ocfl_interfaces.fedora.behavioural_objects import BehaviouralObjects
from pprint import pprint
from random import shuffle
import time, datetime
import json
import sys
from threading import Thread
# maxThreads = 1
maxThreads = int(sys.argv[1])
maxJobs = 3000
outFile = f"myResults_J{maxJobs}-T{maxThreads}.csv"
outResultsFile = f"myResults_J{maxJobs}-T{maxThreads}.log"
print(f"Using {maxThreads} threads")
description = {} # Name of what to do
callFunction = {} # Module to call
numberOfJobs = {} # Number of times we want to run this type of upload
types = ["Metadata", "Binary", "LargeBinary", "CplxBinary", "VeryLargeBinary"]
# types = ["Metadata", "Binary", "LargeBinary", "CplxBinary"]
# types = ["Metadata", "Binary"]
# numberOfJobs["Metadata"] = 243
# numberOfJobs["Metadata"] = 244
# numberOfJobs["Metadata"] = 250
numberOfJobs["Metadata"] = 487
description["Metadata"] = "Creating metadata object"
callFunction["Metadata"] = "b.create_metadata_object()"
# numberOfJobs["Binary"] = 250
numberOfJobs["Binary"] = 500
description["Binary"] = "Creating binary file object"
callFunction["Binary"] = "b.create_binary_file_objects()"
# numberOfJobs["LargeBinary"]= 5
numberOfJobs["LargeBinary"]= 10
description["LargeBinary"] = "Creating large binary file object"
callFunction["LargeBinary"] = "b.create_large_binary_file_objects()"
# numberOfJobs["CplxBinary"] = 1
numberOfJobs["CplxBinary"] = 2
description["CplxBinary"] = "Creating complex binary file object"
callFunction["CplxBinary"] = "b.create_complex_binary_file_objects()"
numberOfJobs["VeryLargeBinary"] = 1
description["VeryLargeBinary"] = "Creating very large binary file object"
callFunction["VeryLargeBinary"] = "b.create_very_large_binary_file_objects()"
jobsToRun = []
scaleFactor = round(maxJobs / 500.0)
if maxJobs > 500:
scaleFactor = round(maxJobs / 1000.0)
for type in types:
for _ in range(numberOfJobs[type] * scaleFactor):
jobsToRun.append((description[type], callFunction[type]))
shuffle(jobsToRun)
# jobsToRun = jobsToRun[:maxJobs]
jobsToRun = jobsToRun[:20]
with open(outFile, "w") as file_csv:
wStr = f"threadID, work, nSteps, finalStatus, stepStatusOR, start_time, end_time, time_diff, location\n"
file_csv.write(wStr)
# ToDo : Extract time for each of the nSteps
def runTheUpload(thread):
wait = True
while wait:
if len(jobsToRun) == 0:
print("Nothing more to run ...")
wait = False
return
(myStr, myFun) = jobsToRun.pop()
# start_time = datetime.datetime.utcnow()
start_time = time.time()
print(myStr, myFun)
results = eval(myFun)
# pprint(results)
myOkay = True
for att in results["msg"]:
if not att["status"]:
myOkay = False
break # Something went wrong in the upload test
# end_time = datetime.datetime.utcnow()
end_time = time.time()
# print("--- %s seconds ---" % (end_time - start_time))
nResults = len(results["msg"])
rStatus = results["status"]
if len(results.get("msg", [])) > 1:
rLocation = results["msg"][1].get("location", "")
# stime = start_time.strftime("%Y:%m:%d-%H:%M:%S.%f")
# etime = end_time.strftime("%Y:%m:%d-%H:%M:%S.%f")
tdiff = end_time - start_time
# wStr = f"{thread}, {myStr}, {nResults}, {rStatus}, {myOkay}, {stime}, {etime}, {tdiff.seconds}.{tdiff.microseconds}\n"
wStr = f"{thread}, {myStr}, {nResults}, {rStatus}, {myOkay}, {start_time}, {end_time}, {tdiff}, {rLocation}\n"
with open(outFile, "a") as file_csv:
file_csv.write(wStr)
with open(outResultsFile, "a") as f:
for msg in results.get('msg', []):
try:
msg['body'] = msg['body'].decode()
except:
msg['body'] = ''
f.write(f"{json.dumps(results)}\n")
b = BehaviouralObjects()
thList = []
for i in range(maxThreads):
thread = Thread(target=runTheUpload, args=(str(i))) # Define the transfer
thList.append(thread)
for thread in thList:
thread.start() # Start the transfer
for thread in thList:
thread.join() # Wait until the threads finish before going forward