Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graph debugger #97

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
58 changes: 54 additions & 4 deletions python/spacetime/managers/version_graph.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from multiprocessing import RLock
from multiprocessing import Process, RLock
from threading import Thread
from queue import Queue, Empty


class Node(object):
Expand Down Expand Up @@ -105,7 +107,6 @@ def maintain_edges(self):
self.nodes[start].all_next.remove(end)
self.nodes[end].all_prev.remove(start)


def merge_node(self, node, merger_function):
del self.nodes[node.current]
old_change = self.edges[(node.prev_master, node.current)].payload
Expand All @@ -121,7 +122,8 @@ def merge_node(self, node, merger_function):
node.prev_master, node.next_master, new_payload)
else:
# Figure out how to avoid this computation.
assert self.edges[(node.prev_master, node.next_master)].payload == new_payload, (self.edges[(node.prev_master, node.next_master)].payload, new_payload)
assert self.edges[(node.prev_master, node.next_master)].payload == new_payload, (
self.edges[(node.prev_master, node.next_master)].payload, new_payload)
if self.nodes[node.prev_master].next_master == node.current:
self.nodes[node.prev_master].next_master = node.next_master
self.nodes[node.prev_master].all_next.add(node.next_master)
Expand Down Expand Up @@ -154,4 +156,52 @@ def maintain(self, state_to_ref, merger_function):
# First divergent.
self.maintain_nodes(state_to_ref, merger_function, False)
# The master line.
self.maintain_nodes(state_to_ref, merger_function, True)
self.maintain_nodes(state_to_ref, merger_function, True)


class GraphActor(Thread):

def __init__(self):
self.graph = Graph()
super().__init__()
self.read_write_queue = Queue(maxsize=0)
self.daemon = True
self.output = None

def process_continue_chain(self, from_version, to_version, package):
self.graph.continue_chain(from_version, to_version, package)

def process_maintain(self, state_to_ref, merger_function):
self.graph.maintain(state_to_ref, merger_function)

def process_get_item(self, key):
self.output = None
self.output = self.graph.__getitem__(key)

def run(self):
while True:
try:
req = self.read_write_queue.get()
if req[0] == 'continue_chain':
from_version, to_version, package = req[1:]
self.process_continue_chain(from_version, to_version, package)
elif req[0] == 'maintain':
state_to_ref, merger_function = req[1:]
self.process_maintain(state_to_ref, merger_function)
elif req[0] == 'get_item':
key = req[1]
self.process_get_item(key)
except Empty:
pass

def continue_chain(self, from_version, to_version, package):
self.read_write_queue.put(('continue_chain', from_version, to_version, package))

def maintain(self, state_to_ref, merger_function):
self.read_write_queue.put(('maintain', state_to_ref, merger_function))

def __getitem__(self, key):
self.read_write_queue.put(('get_item', key))
while self.output is None:
continue
return self.output
39 changes: 29 additions & 10 deletions python/spacetime/managers/version_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from uuid import uuid4
from abc import ABCMeta, abstractmethod
from multiprocessing import Process, Queue
from spacetime.managers.version_graph import Graph
from spacetime.managers.version_graph import GraphActor
import spacetime.utils.utils as utils
from spacetime.utils.enums import Event, VersionBy
import time



class VersionManagerProcess(Process):

def __init__(self, appname, types, version_by):
Expand Down Expand Up @@ -389,12 +391,15 @@ class FullStateVersionManager(VersionManager):
def __init__(self, appname, types, dump_graph, instrument_record):
self.types = types
self.type_map = {tp.__r_meta__.name: tp for tp in types}
self.version_graph = Graph()
self.version_graph = GraphActor()
self.state_to_app = dict()
self.app_to_state = dict()
self.logger = utils.get_logger("%s_FullStateVersionManager" % appname)
self.dump_graphs = dump_graph
self.instrument_record = instrument_record
self.version_graph_head= 'ROOT'
self.version_graph.start()


def set_app_marker(self, appname, end_v):
self.state_to_app.setdefault(end_v, set()).add(appname)
Expand All @@ -404,36 +409,50 @@ def receive_data(self, appname, versions, package, from_external=True):
if start_v == end_v:
# The versions are the same, lets ignore.
return True
if start_v != self.version_graph.head.current:
if start_v!= self.version_graph_head:
#if start_v != self.version_graph.head.current:
self.resolve_conflict(start_v, end_v, package, from_external)
else:
self.version_graph.continue_chain(start_v, end_v, package)
self.version_graph_head=end_v
self.maintain(appname, end_v)
return True

def retrieve_data(self, appname, version):

data, version_change = self.retrieve_data_nomaintain(version)
self.set_app_marker(appname, version_change[1])
return data, version_change

#def retrieve_data_nomaintain(self, version):
#merged = dict()
#if version not in self.version_graph.nodes:
#return merged, [version, version]
#for delta in self.version_graph[version:]:
#merged = utils.merge_state_delta(merged, delta)
#return merged, [version, self.version_graph.head.current]

def retrieve_data_nomaintain(self, version):
merged = dict()
if version not in self.version_graph.nodes:
return merged, [version, version]

for delta in self.version_graph[version:]:
merged = utils.merge_state_delta(merged, delta)
return merged, [version, self.version_graph.head.current]
if not merged:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to make the equivalent check in Graph class in getitem to return an empty list if the start version does not exist.

return merged, [version, version]
return merged, [version, self.version_graph_head]

def data_sent_confirmed(self, app, version):
if version[0] != version[1]:
self.maintain(app, version[1])

def resolve_conflict(self, start_v, end_v, package, from_external):
new_v = self.version_graph.head.current
#new_v = self.version_graph.head.current
new_v= self.version_graph_head
change, _ = self.retrieve_data_nomaintain(start_v)
t_new_merge, t_conflict_merge = self.operational_transform(
start_v, change, package, from_external)
merge_v = str(uuid4())
self.version_graph_head=merge_v
self.version_graph.continue_chain(start_v, end_v, package)
self.version_graph.continue_chain(new_v, merge_v, t_new_merge)
self.version_graph.continue_chain(end_v, merge_v, t_conflict_merge)
Expand All @@ -457,9 +476,9 @@ def maintain(self, appname, end_v):
super().maintain(
self.state_to_app, self.app_to_state,
self.version_graph, appname, end_v, utils.merge_state_delta)
if self.instrument_record:
self.instrument_record.put(
("MEMORY", "{0}\t{1}\t{2}\n".format(time.time(), len(self.version_graph.nodes), len(self.version_graph.edges))))
#if self.instrument_record:
#self.instrument_record.put(
#("MEMORY", "{0}\t{1}\t{2}\n".format(time.time(), len(self.version_graph.nodes), len(self.version_graph.edges))))

class TypeVersionManager(VersionManager):
def __init__(self, appname, types, dump_graph):
Expand Down
96 changes: 96 additions & 0 deletions python/wordCounter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this file to SampleApps folder.

# -*- coding: utf-8 -*-
"""
Created on Sat Jan 26 10:43:27 2019

@author: prithadawn
"""

from rtypes import pcc_set, primarykey, dimension, merge
from spacetime import Application
import random, re, argparse
import time


@pcc_set
class word_class(object):
# word_id= primarykey(int)
word_desc = primarykey(str)
word_count = dimension(int)

def __init__(self, word_desc, word_count):

# self.word_id=random.randint(0,100000)
self.word_desc = word_desc
self.word_count = word_count

def __str__(self):
return (self.word_desc + ' count: ' + str(self.word_count))

@merge
def merge_func(original, mine, theirs):

if original is None:
theirs.word_count = mine.word_count + theirs.word_count

else:

theirs.word_count = mine.word_count + theirs.word_count - original.word_count

return theirs


def divide_chunks(l, n):
# looping till length of l
for i in range(0, len(l), n):
yield l[i:i + n]


def get_word_desc(word_obj):
return word_obj.word_desc


def mapper(df, word_list):
current_word = 0
current_count = 0
print(word_list)
for w in word_list:
word = df.read_one(word_class, w)
if word is None:
df.add_one(word_class, word_class(w, 1))
else:
word.word_count += 1
df.commit()
df.push()


def reducer(df):
with open('testFile.txt') as f:
word_list = re.sub(r'[^\w\s]', '', f.read()).split()
# x = list(divide_chunks(word_list, n))
word_list1 = word_list[:len(word_list) // 2]
word_list2 = word_list[len(word_list) // 2:]

mapper_app1 = Application(mapper, Types=[word_class], dataframe=df)
mapper_app2 = Application(mapper, Types=[word_class], dataframe=df)
mapper_app1.start_async(word_list1)
mapper_app2.start_async(word_list2)
mapper_app1.join()
mapper_app2.join()
time.sleep(1)

df.checkout()
for word_obj in df.read_all(word_class):
print(word_obj)


def main():
# parser = argparse.ArgumentParser()
# parser.add_argument("filename")
# args = parser.parse_args()
app = Application(reducer, Types=[word_class])
app.start()
app.start()


main()