13
13
SchedulerRequest = namedtuple ("Scheduler_request" ,
14
14
["sched_cycle" , "timeout_cycle" , "min_fidelity" , "purpose_id" , "create_id" , "num_pairs" ,
15
15
"priority" , "store" , "atomic" , "measure_directly" , "master_request" ])
16
- SchedulerRequest .__new__ .__defaults__ = (0 ,) * 7 + (True , False , False , True )
16
+ SchedulerRequest .__new__ .__defaults__ = (0 ,) * 5 + ( 1 ,) + ( 0 ,) + (True , False , False , True )
17
17
18
18
WFQSchedulerRequest = namedtuple ("WFQ_Scheduler_request" ,
19
- ["sched_cycle" , "timeout_cycle" , "min_fidelity" , "purpose_id" , "create_id" , "num_pairs" ,
20
- "priority" , "init_virtual_finish" , "est_cycles_per_pair" , "store" , "atomic" , "measure_directly" , "master_request" ])
21
- WFQSchedulerRequest .__new__ .__defaults__ = (0 ,) * 9 + (True , False , False , True )
19
+ ["sched_cycle" , "timeout_cycle" , "min_fidelity" , "purpose_id" , "create_id" ,
20
+ "num_pairs" ,
21
+ "priority" , "init_virtual_finish" , "est_cycles_per_pair" , "store" , "atomic" ,
22
+ "measure_directly" , "master_request" ])
23
+ WFQSchedulerRequest .__new__ .__defaults__ = (0 ,) * 5 + (1 ,) + (0 ,) * 3 + (True , False , False , True )
22
24
23
25
SchedulerGen = namedtuple ("Scheduler_gen" , ["flag" , "aid" , "comm_q" , "storage_q" , "param" ])
24
26
SchedulerGen .__new__ .__defaults__ = (False ,) + (None ,) * 4
@@ -48,11 +50,10 @@ def next(self):
48
50
49
51
50
52
class StrictPriorityRequestScheduler (Scheduler ):
51
-
52
53
# The scheduler request named tuple to use
53
54
_scheduler_request_named_tuple = SchedulerRequest
54
55
55
- def __init__ (self , distQueue , qmm , feu ):
56
+ def __init__ (self , distQueue , qmm , feu = None ):
56
57
"""
57
58
Stub for a scheduler to decide how we assign and consume elements of the queue.
58
59
This scheduler puts requests in queues depending on only their specified priority
@@ -199,11 +200,13 @@ def _get_scheduler_request(cls, egp_request, create_id, sched_cycle, timeout_cyc
199
200
:return: :obj:`~qlinklayer.scheduler.SchedulerRequest`
200
201
"""
201
202
scheduler_request = cls ._scheduler_request_named_tuple (sched_cycle = sched_cycle , timeout_cycle = timeout_cycle ,
202
- min_fidelity = egp_request .min_fidelity , purpose_id = egp_request .purpose_id ,
203
- create_id = create_id , num_pairs = egp_request .num_pairs ,
204
- priority = egp_request .priority , store = egp_request .store ,
205
- atomic = egp_request .atomic , measure_directly = egp_request .measure_directly ,
206
- master_request = master_request )
203
+ min_fidelity = egp_request .min_fidelity ,
204
+ purpose_id = egp_request .purpose_id ,
205
+ create_id = create_id , num_pairs = egp_request .num_pairs ,
206
+ priority = egp_request .priority , store = egp_request .store ,
207
+ atomic = egp_request .atomic ,
208
+ measure_directly = egp_request .measure_directly ,
209
+ master_request = master_request )
207
210
208
211
return scheduler_request
209
212
@@ -793,11 +796,10 @@ def _reset_outstanding_req_data(self):
793
796
794
797
795
798
class WFQRequestScheduler (StrictPriorityRequestScheduler ):
796
-
797
799
# The scheduler request named tuple to use
798
800
_scheduler_request_named_tuple = WFQSchedulerRequest
799
801
800
- def __init__ (self , distQueue , qmm , feu , weights = None ):
802
+ def __init__ (self , distQueue , qmm , feu = None , weights = None ):
801
803
"""
802
804
Stub for a scheduler to decide how we assign and consume elements of the queue.
803
805
This weighted fair queue (WFQ) scheduler implements a weighted fair queue where queue i
@@ -851,6 +853,17 @@ def _compare_mhp_cycle(self, cycle1, cycle2):
851
853
else :
852
854
return 1
853
855
856
+ def _get_largest_mhp_cycle (self ):
857
+ """
858
+ Return the MHP which is considered largest, which comparison of MHP cycles is defined
859
+ in self._compare_mhp_cycle
860
+ :return: int
861
+ """
862
+ # Compute opposite of current
863
+ opposite = self .mhp_cycle_number - int (self .max_mhp_cycle_number / 2 )
864
+
865
+ return (opposite - 1 ) % self .max_mhp_cycle_number
866
+
854
867
def _get_relative_weights (self , weights ):
855
868
if weights is None :
856
869
return [1 ] * len (self .distQueue .queueList )
@@ -863,7 +876,8 @@ def _get_relative_weights(self, weights):
863
876
if weight < 0 :
864
877
raise ValueError ("All weights need to be non-zero" )
865
878
except TypeError :
866
- raise ValueError ("Weights need to be comparable, got TypeError when comparing weight={} to 0" .format (weight ))
879
+ raise ValueError (
880
+ "Weights need to be comparable, got TypeError when comparing weight={} to 0" .format (weight ))
867
881
868
882
# Compute relative weights
869
883
total_sum = sum (weights )
@@ -883,16 +897,18 @@ def select_queue(self):
883
897
# Virt start is the virtual start of the service (see https://en.wikipedia.org/wiki/Fair_queuing#Pseudo_code)
884
898
# Look through the head of each queue and pick the one with the smallest virtual_finish
885
899
# unless a queue has infinite (0) weight.
886
- min_virtual_finish = float ( 'inf' )
900
+ min_virtual_finish = self . _get_largest_mhp_cycle ( )
887
901
queue_item_to_use = None
888
- for local_queue in self .distQueue .queueList :
902
+
903
+ # Go in reverse such that if two items have the same virt finish, take the one with the lowest qid
904
+ for local_queue in reversed (self .distQueue .queueList ):
889
905
queue_item = local_queue .peek ()
890
906
if queue_item is not None and queue_item .ready :
891
- if queue_item .request .virtual_finish is None :
907
+ if queue_item .request .init_virtual_finish is None :
892
908
# This queue has infinite weight so just take this queue_item directly
893
909
queue_item_to_use = queue_item
894
910
break
895
- if self ._compare_mhp_cycle (queue_item .virtual_finish , min_virtual_finish ) == - 1 :
911
+ if self ._compare_mhp_cycle (queue_item .virtual_finish , min_virtual_finish ) < 1 :
896
912
min_virtual_finish = queue_item .virtual_finish
897
913
queue_item_to_use = queue_item
898
914
if queue_item_to_use is None :
@@ -912,7 +928,8 @@ def set_virtual_finish(self, scheduler_request, qid):
912
928
# If queue has weight zero (seen as infinite) we put virtual finish to None
913
929
if self .relative_weights [qid ] == 0 :
914
930
init_virt_finish = None
915
- wfq_scheduler_request = WFQSchedulerRequest (** scheduler_request ._asdict (), init_virtual_finish = init_virt_finish )
931
+ wfq_scheduler_request = WFQSchedulerRequest (** scheduler_request ._asdict (),
932
+ init_virtual_finish = init_virt_finish )
916
933
else :
917
934
virt_start = max (self .mhp_cycle_number , self .last_virt_finish [qid ])
918
935
est_nr_cycles_per_pair = self ._estimate_nr_of_cycles_per_pair (scheduler_request )
@@ -924,11 +941,14 @@ def set_virtual_finish(self, scheduler_request, qid):
924
941
self .last_virt_finish [qid ] = virt_start + virt_duration / self .relative_weights [qid ]
925
942
else :
926
943
virt_duration = est_nr_cycles_per_pair
927
- self .last_virt_finish [qid ] = virt_start + virt_duration * scheduler_request .num_pairs / self .relative_weights [qid ]
944
+ self .last_virt_finish [qid ] = (
945
+ virt_start + virt_duration * scheduler_request .num_pairs / self .relative_weights [qid ])
928
946
929
947
# Compute initial virt finish (if non-atomic this will be updated per success)
930
948
init_virt_finish = virt_start + virt_duration / self .relative_weights [qid ]
931
- wfq_scheduler_request = WFQSchedulerRequest (** scheduler_request ._asdict (), init_virtual_finish = init_virt_finish , est_cycles_per_pair = est_nr_cycles_per_pair )
949
+ wfq_scheduler_request = WFQSchedulerRequest (** scheduler_request ._asdict (),
950
+ init_virtual_finish = init_virt_finish ,
951
+ est_cycles_per_pair = est_nr_cycles_per_pair )
932
952
933
953
return wfq_scheduler_request
934
954
0 commit comments