Skip to content

Commit 8fd3894

Browse files
Ertan OnurErtan Onur
Ertan Onur
authored and
Ertan Onur
committed
Mutual Exclusion RicartAgrawala and Raymond added Barker and Peterson removed
1 parent b15c604 commit 8fd3894

File tree

3 files changed

+363
-0
lines changed

3 files changed

+363
-0
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
#!/usr/bin/env python
2+
"""
3+
Implementation of the Raymond's Algorithm for mutual exclusion.
4+
"""
5+
6+
__author__ = "Berker Acır"
7+
__contact__ = "[email protected]"
8+
__copyright__ = "Copyright 2021, WINSLAB"
9+
__credits__ = ["Berker Acır"]
10+
__date__ = "2021/05/18"
11+
__deprecated__ = False
12+
__email__ = "[email protected]"
13+
__license__ = "GPLv3"
14+
__maintainer__ = "developer"
15+
__status__ = "Production"
16+
__version__ = "0.0.1"
17+
18+
from enum import Enum
19+
from time import sleep
20+
import networkx as nx
21+
22+
from ...Experimentation.Topology import Topology
23+
from ...GenericModel import GenericModel, GenericMessageHeader, GenericMessagePayload, GenericMessage
24+
from ...Generics import *
25+
26+
class RaymondEventTypes(Enum):
27+
TOKEN = "TOKEN"
28+
REQUEST = "REQUEST"
29+
PRIVILEGE = "PRIVILEGE"
30+
31+
32+
class RaymondMessageTypes(Enum):
33+
TOKEN = "TOKEN"
34+
REQUEST = "REQUEST"
35+
36+
37+
class RaymondMessageHeader(GenericMessageHeader):
38+
39+
def __init__(self, messageType, messageFrom, messageTo, nextHop=float('inf'), interfaceID=float('inf'),
40+
sequenceID=-1):
41+
super().__init__(messageType, messageFrom, messageTo, nextHop, interfaceID, sequenceID)
42+
43+
44+
class RaymondMessagePayload(GenericMessagePayload):
45+
46+
def __init__(self, nodeID):
47+
self.nodeID = nodeID
48+
49+
50+
class MutualExclusionRaymondComponent(GenericModel):
51+
privilegeSleepAmount = 1
52+
53+
def __init__(self, componentname, componentinstancenumber, context=None, configurationparameters=None, num_worker_threads=1, topology=None):
54+
super().__init__(componentname, componentinstancenumber, context, configurationparameters, num_worker_threads, topology)
55+
56+
self.eventhandlers[RaymondEventTypes.TOKEN] = self.token_received
57+
self.eventhandlers[RaymondEventTypes.REQUEST] = self.request_received
58+
self.eventhandlers[RaymondEventTypes.PRIVILEGE] = self.on_privilege
59+
60+
self.neighborNodeIDs = set()
61+
self.parentNodeID = None
62+
self.queue = list()
63+
64+
self.isRoot = False
65+
self.havePendingRequest = False
66+
self.isPrivileged = False
67+
68+
self.privilegeCount = 0
69+
self.sentRequestCount = 0
70+
self.sentTokenCount = 0
71+
self.receivedRequestCount = 0
72+
self.receivedTokenCount = 0
73+
self.forwardedMessageCount = 0
74+
75+
def on_init(self, eventobj: Event):
76+
mstG = nx.minimum_spanning_tree(self.topology.G)
77+
self.neighborNodeIDs = set(mstG.neighbors(self.componentinstancenumber))
78+
if self.componentinstancenumber == 0:
79+
self.isRoot = True
80+
else:
81+
self.parentNodeID = nx.shortest_path(mstG, self.componentinstancenumber, 0)[1]
82+
83+
def on_message_from_bottom(self, eventobj: Event):
84+
message = eventobj.eventcontent
85+
header = message.header
86+
messageType = header.messagetype
87+
messageTo = header.messageto
88+
89+
if messageTo == self.componentinstancenumber:
90+
if messageType == RaymondMessageTypes.REQUEST:
91+
eventobj.event = RaymondEventTypes.REQUEST
92+
self.send_self(eventobj)
93+
elif messageType == RaymondMessageTypes.TOKEN:
94+
eventobj.event = RaymondEventTypes.TOKEN
95+
self.send_self(eventobj)
96+
97+
def put(self, nodeID=None):
98+
if nodeID is None:
99+
nodeID = self.componentinstancenumber
100+
101+
if not self.queue:
102+
headChanged = True
103+
else:
104+
headChanged = False
105+
self.queue.append(nodeID)
106+
if nodeID == self.componentinstancenumber:
107+
self.havePendingRequest = True
108+
109+
if headChanged:
110+
if self.isRoot:
111+
if nodeID == self.componentinstancenumber:
112+
self.send_self(Event(self, RaymondEventTypes.PRIVILEGE, None))
113+
else:
114+
self.send_token(nodeID)
115+
else:
116+
self.send_request()
117+
118+
def pop(self):
119+
nodeID = self.queue.pop(0)
120+
if nodeID == self.componentinstancenumber and nodeID not in self.queue:
121+
self.havePendingRequest = False
122+
123+
if self.queue:
124+
head = self.queue[0]
125+
if self.isRoot:
126+
if head == self.componentinstancenumber:
127+
self.send_self(Event(self, RaymondEventTypes.PRIVILEGE, None))
128+
else:
129+
self.send_token(head)
130+
else:
131+
self.send_request()
132+
133+
def on_privilege(self, eventobj: Event):
134+
self.isPrivileged = True
135+
136+
self.privilegeCount += 1
137+
sleep(self.privilegeSleepAmount)
138+
139+
self.isPrivileged = False
140+
self.pop()
141+
142+
def token_received(self, eventobj: Event):
143+
self.receivedTokenCount += 1
144+
self.isRoot = True
145+
self.parentNodeID = None
146+
147+
head = self.queue[0]
148+
if head == self.componentinstancenumber:
149+
self.send_self(Event(self, RaymondEventTypes.PRIVILEGE, None))
150+
else:
151+
self.send_token(head)
152+
153+
def request_received(self, eventobj: Event):
154+
self.receivedRequestCount += 1
155+
receivedRequestNodeID = eventobj.eventcontent.payload.nodeID
156+
self.put(receivedRequestNodeID)
157+
158+
def send_token(self, nodeID):
159+
self.sentTokenCount += 1
160+
self.isRoot = False
161+
self.parentNodeID = nodeID
162+
163+
nextHop = self.parentNodeID
164+
interfaceID = f"{self.componentinstancenumber}-{nextHop}"
165+
header = RaymondMessageHeader(RaymondMessageTypes.TOKEN, self.componentinstancenumber, self.parentNodeID,
166+
nextHop, interfaceID)
167+
payload = RaymondMessagePayload(self.componentinstancenumber)
168+
message = GenericMessage(header, payload)
169+
self.send_down(Event(self, EventTypes.MFRT, message))
170+
171+
self.pop()
172+
173+
def send_request(self):
174+
self.sentRequestCount += 1
175+
nextHop = self.parentNodeID
176+
interfaceID = f"{self.componentinstancenumber}-{nextHop}"
177+
header = RaymondMessageHeader(RaymondMessageTypes.REQUEST, self.componentinstancenumber, self.parentNodeID,
178+
nextHop, interfaceID)
179+
payload = RaymondMessagePayload(self.componentinstancenumber)
180+
message = GenericMessage(header, payload)
181+
self.send_down(Event(self, EventTypes.MFRT, message))
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
#!/usr/bin/env python
2+
"""
3+
Implementation of the Ricart-Agrawala Algorithm for mutual exclusion.
4+
"""
5+
6+
__author__ = "Berker Acır"
7+
__contact__ = "[email protected]"
8+
__copyright__ = "Copyright 2021, WINSLAB"
9+
__credits__ = ["Berker Acır"]
10+
__date__ = "2021/04/24"
11+
__deprecated__ = False
12+
__email__ = "[email protected]"
13+
__license__ = "GPLv3"
14+
__maintainer__ = "developer"
15+
__status__ = "Production"
16+
__version__ = "0.0.1"
17+
18+
from enum import Enum
19+
from time import sleep
20+
21+
from ...Experimentation.Topology import Topology
22+
from ...GenericModel import GenericModel, GenericMessageHeader, GenericMessagePayload, GenericMessage
23+
from ...Generics import *
24+
25+
class RicartAgrawalaEventTypes(Enum):
26+
REQUEST = "REQUEST"
27+
REPLY = "REPLY"
28+
PRIVILEGE = "PRIVILEGE"
29+
30+
31+
class RicartAgrawalaMessageTypes(Enum):
32+
REQUEST = "REQUEST"
33+
REPLY = "REPLY"
34+
35+
36+
class RicartAgrawalaMessageHeader(GenericMessageHeader):
37+
38+
def __init__(self, messageType, messageFrom, messageTo, nextHop=float('inf'), interfaceID=float('inf'),
39+
sequenceID=-1):
40+
super().__init__(messageType, messageFrom, messageTo, nextHop, interfaceID, sequenceID)
41+
42+
43+
class RicartAgrawalaMessagePayload(GenericMessagePayload):
44+
45+
def __init__(self, clock, nodeID):
46+
self.clock = clock
47+
self.nodeID = nodeID
48+
49+
50+
class MutualExclusionAgrawalaComponent(GenericModel):
51+
privilegeSleepAmount = 1
52+
53+
def __init__(self, componentname, componentinstancenumber, context=None, configurationparameters=None, num_worker_threads=1, topology=None):
54+
super().__init__(componentname, componentinstancenumber, context, configurationparameters, num_worker_threads, topology)
55+
56+
self.eventhandlers[RicartAgrawalaEventTypes.REQUEST] = self.request_received
57+
self.eventhandlers[RicartAgrawalaEventTypes.REPLY] = self.reply_received
58+
self.eventhandlers[RicartAgrawalaEventTypes.PRIVILEGE] = self.on_privilege
59+
60+
self.clock = 0
61+
self.havePendingRequest = False
62+
self.pendingRequestClock = None
63+
self.isPrivileged = False
64+
65+
self.deferredRequests = list()
66+
self.receivedReplies = set()
67+
self.otherNodeIDs = set()
68+
69+
self.privilegeCount = 0
70+
self.sentRequestCount = 0
71+
self.sentReplyCount = 0
72+
self.receivedRequestCount = 0
73+
self.receivedReplyCount = 0
74+
self.forwardedMessageCount = 0
75+
76+
def on_init(self, eventobj: Event):
77+
self.otherNodeIDs = set(self.topology.nodes.keys())
78+
self.otherNodeIDs.remove(self.componentinstancenumber)
79+
80+
def on_message_from_bottom(self, eventobj: Event):
81+
message = eventobj.eventcontent
82+
header = message.header
83+
messageType = header.messagetype
84+
messageTo = header.messageto
85+
86+
if messageTo == self.componentinstancenumber:
87+
if messageType == RicartAgrawalaMessageTypes.REQUEST:
88+
eventobj.event = RicartAgrawalaEventTypes.REQUEST
89+
self.send_self(eventobj)
90+
elif messageType == RicartAgrawalaMessageTypes.REPLY:
91+
eventobj.event = RicartAgrawalaEventTypes.REPLY
92+
self.send_self(eventobj)
93+
else:
94+
nextHop = Topology().get_next_hop(self.componentinstancenumber, messageTo)
95+
interfaceID = f"{self.componentinstancenumber}-{nextHop}"
96+
97+
if nextHop != inf and nextHop != self.componentinstancenumber:
98+
self.forwardedMessageCount += 1
99+
header.nexthop = nextHop
100+
header.interfaceid = interfaceID
101+
self.send_down(Event(self, EventTypes.MFRT, message))
102+
103+
def request_received(self, eventobj: Event):
104+
self.receivedRequestCount += 1
105+
receivedRequestClock = eventobj.eventcontent.payload.clock
106+
receivedRequestNodeID = eventobj.eventcontent.payload.nodeID
107+
108+
if not self.isPrivileged: # Not privileged
109+
if self.havePendingRequest: # Have pending request
110+
if receivedRequestClock < self.pendingRequestClock:
111+
isMessageDeferred = False
112+
elif receivedRequestClock == self.pendingRequestClock and receivedRequestNodeID < self.componentinstancenumber:
113+
isMessageDeferred = False
114+
else:
115+
isMessageDeferred = True
116+
else: # Does not have pending request
117+
isMessageDeferred = False
118+
else: # Privileged
119+
isMessageDeferred = True
120+
121+
if isMessageDeferred:
122+
self.deferredRequests.append(eventobj)
123+
else:
124+
if self.clock <= receivedRequestClock:
125+
self.clock = receivedRequestClock + 1
126+
self.send_reply(receivedRequestNodeID)
127+
128+
def reply_received(self, eventobj: Event):
129+
self.receivedReplyCount += 1
130+
replyFrom = eventobj.eventcontent.payload.messagepayload
131+
self.receivedReplies.add(replyFrom)
132+
133+
if len(self.receivedReplies) == len(self.otherNodeIDs):
134+
self.send_self(Event(self, RicartAgrawalaEventTypes.PRIVILEGE, None))
135+
elif len(self.receivedReplies) > len(self.otherNodeIDs):
136+
raise RuntimeError("Received reply message count exceeded expected limit!")
137+
138+
def on_privilege(self, eventobj: Event):
139+
self.isPrivileged = True
140+
self.havePendingRequest = False
141+
self.receivedReplies.clear()
142+
143+
self.privilegeCount += 1
144+
sleep(self.privilegeSleepAmount)
145+
146+
self.isPrivileged = False
147+
self.send_replies_to_deferred_requests()
148+
149+
def send_request(self):
150+
self.sentRequestCount += 1
151+
self.havePendingRequest = True
152+
self.pendingRequestClock = self.clock
153+
self.clock += 1
154+
155+
for nodeID in self.otherNodeIDs:
156+
nextHop = self.topology.get_next_hop(self.componentinstancenumber, nodeID)
157+
interfaceID = f"{self.componentinstancenumber}-{nextHop}"
158+
header = RicartAgrawalaMessageHeader(RicartAgrawalaMessageTypes.REQUEST, self.componentinstancenumber,
159+
nodeID, nextHop, interfaceID)
160+
payload = RicartAgrawalaMessagePayload(self.pendingRequestClock, self.componentinstancenumber)
161+
message = GenericMessage(header, payload)
162+
self.send_down(Event(self, EventTypes.MFRT, message))
163+
164+
def send_reply(self, nodeID):
165+
self.sentReplyCount += 1
166+
nextHop = self.topology.get_next_hop(self.componentinstancenumber, nodeID)
167+
interfaceID = f"{self.componentinstancenumber}-{nextHop}"
168+
header = RicartAgrawalaMessageHeader(RicartAgrawalaMessageTypes.REPLY, self.componentinstancenumber, nodeID,
169+
nextHop, interfaceID)
170+
payload = GenericMessagePayload(self.componentinstancenumber)
171+
message = GenericMessage(header, payload)
172+
self.send_down(Event(self, EventTypes.MFRT, message))
173+
174+
def send_replies_to_deferred_requests(self):
175+
while self.deferredRequests:
176+
event = self.deferredRequests.pop(0)
177+
deferredRequestNodeID = event.eventcontent.payload.nodeID
178+
deferredRequestClock = event.eventcontent.payload.clock
179+
180+
if self.clock <= deferredRequestClock:
181+
self.clock = deferredRequestClock + 1
182+
self.send_reply(deferredRequestNodeID)

adhoccomputing/DistributedAlgorithms/MutualExclusion/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)