Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graph debugger #97

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
8 changes: 5 additions & 3 deletions python/rtypes/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ def set(self, oid, dimname, dim_obj, value):
converted = convert(dim_obj.dim_type, value)
if oid in self.store_as_temp:
self.store_as_temp[oid][dimname] = converted

# Write to local state map.
self.object_table[oid][dimname] = convert(dim_obj.dim_type, value)
else:
# Write to local state map.
self.object_table[oid][dimname] = convert(dim_obj.dim_type, value)
return oid

def set_primarykey(self, oid, dimname, dim_obj, value):
Expand All @@ -55,6 +55,8 @@ def set_primarykey(self, oid, dimname, dim_obj, value):


def get(self, oid, dimname, dim_obj):
if oid in self.store_as_temp and dimname in self.store_as_temp[oid]:
return unconvert(self.store_as_temp[oid][dimname], dim_obj.dim_type)
if oid not in self.object_table and dimname not in self.object_table[oid]:
# Value has not been assigned.
raise AttributeError("{0} has not been assigned a value.".format(dimname))
Expand Down
11 changes: 7 additions & 4 deletions python/spacetime/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def all_types(self):
def __init__(
self, dataframe=None, server_port=0,
version_by=VersionBy.FULLSTATE, instrument=None, dump_graph=None,
connection_as=ConnectionStyle.TSocket):
connection_as=ConnectionStyle.TSocket, debug=False):
self.appname = "{0}_{1}".format(func.__name__, str(uuid4()))
self.producer = producer
self.getter_setter = getter_setter
Expand All @@ -59,6 +59,7 @@ def __init__(
self.connection_as = connection_as
super().__init__()
self.daemon = False
self.debug = debug

def run(self):
# Create the dataframe.
Expand Down Expand Up @@ -91,7 +92,8 @@ def _create_dataframe(self):
version_by=self.version_by,
connection_as=self.connection_as,
instrument=self.instrument,
dump_graph=self.dump_graph)
dump_graph=self.dump_graph,
debug=self.debug)
#print(self.appname, self.all_types, details, df.details)
return df
return App
Expand All @@ -117,10 +119,11 @@ def Application(
target, dataframe=None, server_port=0,
Types=list(), Producer=list(), GetterSetter=list(),
Getter=list(), Setter=list(), Deleter=list(),
version_by=VersionBy.FULLSTATE, instrument=None, dump_graph=None, connection_as=ConnectionStyle.TSocket):
version_by=VersionBy.FULLSTATE, instrument=None, dump_graph=None, connection_as=ConnectionStyle.TSocket
, debug=False):
app_cls = get_app(
target, set(Types), set(Producer), set(GetterSetter),
set(Getter), set(Setter), set(Deleter))
return app_cls(
dataframe=dataframe, server_port=server_port, version_by=version_by,
instrument=instrument, dump_graph=dump_graph, connection_as=connection_as)
instrument=instrument, dump_graph=dump_graph, connection_as=connection_as, debug=debug)
5 changes: 3 additions & 2 deletions python/spacetime/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ def __init__(
self, appname, types, details=None, server_port=0,
version_by=enums.VersionBy.FULLSTATE, separate_dag=False,
connection_as=enums.ConnectionStyle.TSocket,
instrument=None, dump_graph=None):
instrument=None, dump_graph=None, debug=False):
self.appname = appname
self.logger = utils.get_logger("%s_Dataframe" % appname)
self.version_by = version_by
self.instrument = instrument
self.debug = debug
if self.instrument:
self.instrument_done = False
self.instrument_record = Queue()
Expand Down Expand Up @@ -76,7 +77,7 @@ def __init__(
self.appname, types, version_by)
self.versioned_heap.start()
elif version_by == enums.VersionBy.FULLSTATE:
self.versioned_heap = FullStateVersionManager(self.appname, types, dump_graph, self.instrument_record)
self.versioned_heap = FullStateVersionManager(self.appname, types, dump_graph, self.instrument_record,debug=self.debug)
elif version_by == enums.VersionBy.TYPE:
self.versioned_heap = TypeVersionManager(self.appname, types, dump_graph)
elif version_by == enums.VersionBy.OBJECT_NOSTORE:
Expand Down
95 changes: 90 additions & 5 deletions python/spacetime/managers/version_graph.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from multiprocessing import RLock
from multiprocessing import Process, RLock, Manager, Event
from threading import Thread
import json
from flask import Flask


class Node(object):
Expand Down Expand Up @@ -41,11 +44,22 @@ def __init__(self):
self.nodes = {"ROOT": self.tail}
self.edges = dict()

def convert_to_json(self):
graph_jsonified = {'nodes': [node.current for node in self.nodes.values()],
'head': self.head.current,
'tail': self.tail.current,
'edges': self.edges}
return json.dumps({'nodes': [node.current for node in self.nodes.values()],'head': self.head.current,
'tail': self.tail.current})

def __getitem__(self, key):
if isinstance(key, slice):
step_reverse = ((key.step is not None) and (key.step < 0))
if key.start:
start = self.nodes[key.start]
try:
start = self.nodes[key.start]
except KeyError:
return list()
elif step_reverse:
start = self.head
else:
Expand Down Expand Up @@ -105,7 +119,6 @@ def maintain_edges(self):
self.nodes[start].all_next.remove(end)
self.nodes[end].all_prev.remove(start)


def merge_node(self, node, merger_function):
del self.nodes[node.current]
old_change = self.edges[(node.prev_master, node.current)].payload
Expand All @@ -121,7 +134,8 @@ def merge_node(self, node, merger_function):
node.prev_master, node.next_master, new_payload)
else:
# Figure out how to avoid this computation.
assert self.edges[(node.prev_master, node.next_master)].payload == new_payload, (self.edges[(node.prev_master, node.next_master)].payload, new_payload)
assert self.edges[(node.prev_master, node.next_master)].payload == new_payload, (
self.edges[(node.prev_master, node.next_master)].payload, new_payload)
if self.nodes[node.prev_master].next_master == node.current:
self.nodes[node.prev_master].next_master = node.next_master
self.nodes[node.prev_master].all_next.add(node.next_master)
Expand Down Expand Up @@ -154,4 +168,75 @@ def maintain(self, state_to_ref, merger_function):
# First divergent.
self.maintain_nodes(state_to_ref, merger_function, False)
# The master line.
self.maintain_nodes(state_to_ref, merger_function, True)
self.maintain_nodes(state_to_ref, merger_function, True)


class VersionGraphProcess(Process):

def __init__(self):
self.graph = Graph()
super().__init__()
self.manager = Manager()
self.result_manager = Manager()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One manager should work. Why so many managers?

self.event_manager = Manager()
self.read_write_queue = self.manager.Queue()
self.request_list = []

def create_flask_app(self):
app = Flask(__name__)
#app.config["DEBUG"] = True

@app.route('/next', methods=['GET'])
def graph():
req = self.read_write_queue.get()
self.request_list.append(req[0])
if req[0] == "continue_chain":
from_version, to_version, package, e1 = req[1:]
self.process_continue_chain(from_version, to_version, package, e1)
elif req[0] == "maintain":
state_to_ref, merger_function, e2 = req[1:]
self.process_maintain(state_to_ref, merger_function, e2)
elif req[0] == "get_item":
key, result_queue = req[1], req[2]
self.process_get_item(key, result_queue)
return self.graph.convert_to_json()

@app.route('/displayQueue', methods=['GET'])
def display_request_queue():
request_string = str(self.request_list)
return request_string

return app
self.app = create_flask_app(self)
self.start()

def process_continue_chain(self, from_version, to_version, package, e1):
self.graph.continue_chain(from_version, to_version, package)
e1.set()

def process_maintain(self, state_to_ref, merger_function, e2):
self.graph.maintain(state_to_ref, merger_function)
e2.set()

def process_get_item(self, key, result_queue):
result_queue.put(list(self.graph.__getitem__(key)))

def run(self):
self.app.run()

def continue_chain(self, from_version, to_version, package):
e1 = self.event_manager.Event()
self.read_write_queue.put(("continue_chain", from_version, to_version, package, e1))
e1.wait()

def maintain(self, state_to_ref, merger_function):
e2 = self.event_manager.Event()
self.read_write_queue.put(("maintain", state_to_ref, merger_function, e2))
e2.wait()

def __getitem__(self, key):
result_queue = self.result_manager.Queue()
self.read_write_queue.put(("get_item", key,result_queue))
return result_queue.get()


36 changes: 25 additions & 11 deletions python/spacetime/managers/version_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from uuid import uuid4
from abc import ABCMeta, abstractmethod
from multiprocessing import Process, Queue
from spacetime.managers.version_graph import Graph
from spacetime.managers.version_graph import VersionGraphProcess,Graph
import spacetime.utils.utils as utils
from spacetime.utils.enums import Event, VersionBy
import time
from threading import Thread


class VersionManagerProcess(Process):

Expand Down Expand Up @@ -385,16 +387,24 @@ def dump(self, folder, graph):
count = len(os.listdir(folder))
open(os.path.join(folder, "heap{}.dot".format(count)), "w").write(gstr)


class FullStateVersionManager(VersionManager):
def __init__(self, appname, types, dump_graph, instrument_record):

def __init__(self, appname, types, dump_graph, instrument_record, debug=False):

self.types = types
self.type_map = {tp.__r_meta__.name: tp for tp in types}
self.version_graph = Graph()
self.debug = debug
self.state_to_app = dict()
self.app_to_state = dict()
self.logger = utils.get_logger("%s_FullStateVersionManager" % appname)
self.dump_graphs = dump_graph
self.instrument_record = instrument_record
self.version_graph_head = "ROOT"
if self.debug:
self.version_graph = VersionGraphProcess()
else:
self.version_graph = Graph()

def set_app_marker(self, appname, end_v):
self.state_to_app.setdefault(end_v, set()).add(appname)
Expand All @@ -404,10 +414,11 @@ def receive_data(self, appname, versions, package, from_external=True):
if start_v == end_v:
# The versions are the same, lets ignore.
return True
if start_v != self.version_graph.head.current:
if start_v != self.version_graph_head:
self.resolve_conflict(start_v, end_v, package, from_external)
else:
self.version_graph.continue_chain(start_v, end_v, package)
self.version_graph_head = end_v
self.maintain(appname, end_v)
return True

Expand All @@ -418,22 +429,24 @@ def retrieve_data(self, appname, version):

def retrieve_data_nomaintain(self, version):
merged = dict()
if version not in self.version_graph.nodes:
return merged, [version, version]

for delta in self.version_graph[version:]:
merged = utils.merge_state_delta(merged, delta)
return merged, [version, self.version_graph.head.current]
if not merged:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to make the equivalent check in Graph class in getitem to return an empty list if the start version does not exist.

return merged, [version, version]
return merged, [version, self.version_graph_head]

def data_sent_confirmed(self, app, version):
if version[0] != version[1]:
self.maintain(app, version[1])

def resolve_conflict(self, start_v, end_v, package, from_external):
new_v = self.version_graph.head.current
new_v = self.version_graph_head
change, _ = self.retrieve_data_nomaintain(start_v)
t_new_merge, t_conflict_merge = self.operational_transform(
start_v, change, package, from_external)
merge_v = str(uuid4())
self.version_graph_head = merge_v
self.version_graph.continue_chain(start_v, end_v, package)
self.version_graph.continue_chain(new_v, merge_v, t_new_merge)
self.version_graph.continue_chain(end_v, merge_v, t_conflict_merge)
Expand All @@ -457,9 +470,10 @@ def maintain(self, appname, end_v):
super().maintain(
self.state_to_app, self.app_to_state,
self.version_graph, appname, end_v, utils.merge_state_delta)
if self.instrument_record:
self.instrument_record.put(
("MEMORY", "{0}\t{1}\t{2}\n".format(time.time(), len(self.version_graph.nodes), len(self.version_graph.edges))))
#if self.instrument_record:
#self.instrument_record.put(
#("MEMORY", "{0}\t{1}\t{2}\n".format(time.time(), len(self.version_graph.nodes), len(self.version_graph.edges))))


class TypeVersionManager(VersionManager):
def __init__(self, appname, types, dump_graph):
Expand Down
4 changes: 3 additions & 1 deletion python/spacetime/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ def merge_object_delta(dtpname, old_change, new_change):
if (old_change["types"][dtpname] is Event.Delete
and new_change["types"][dtpname] is Event.New):
return deepcopy(new_change)
if new_change["types"][dtpname] is not Event.Modification:
if not (
new_change["types"][dtpname] is Event.Modification or (
new_change["types"][dtpname] is Event.New and old_change["types"][dtpname] is Event.New)):
raise RuntimeError(
"Not sure why the new change does not have modification.")
if old_change["types"][dtpname] is Event.Delete:
Expand Down
Loading