@@ -830,18 +830,16 @@ def cfbot_ingest(message):
830
830
# not it doesn't contain the newest patches that the CFBot knows about.
831
831
return
832
832
833
- # We don't simply fetch this from the patch.cfbot_branch, because we want
834
- # to get it as select_for_update. That way we can use this row as a lock to
835
- # ensure only one cfbot HTTP request can change a patch its Branch and
836
- # Tasks at the same time. (see also transaction.atomic at the top of this
837
- # function)
838
- old_branch = CfbotBranch .objects .select_for_update ().get (pk = patch_id )
839
- if old_branch and old_branch .branch_id != branch_id and old_branch .created .replace (tzinfo = timezone .utc ) > created :
840
- # This is a message for an old branch, ignore it.
841
- return
842
-
843
833
# Every message should have a branch_status, which we will INSERT
844
834
# or UPDATE. We do this first, because cfbot_task refers to it.
835
+ # Due to the way messages are sent/queued by cfbot it's possible that it
836
+ # sends the messages out-of-order. To handle this we we only update in two
837
+ # cases:
838
+ # 1. The created time of the branch is newer than the one in our database:
839
+ # This is a newer branch
840
+ # 2. If it's the same branch that we already have, but the modified time is
841
+ # newer: This is a status update for the current branch that we received
842
+ # in-order.
845
843
cursor .execute ("""INSERT INTO commitfest_cfbotbranch (patch_id, branch_id,
846
844
branch_name, commit_id,
847
845
apply_url, status,
@@ -855,7 +853,9 @@ def cfbot_ingest(message):
855
853
commit_id = EXCLUDED.commit_id,
856
854
apply_url = EXCLUDED.apply_url,
857
855
created = EXCLUDED.created
858
- WHERE commitfest_cfbotbranch.modified < EXCLUDED.modified
856
+ WHERE commitfest_cfbotbranch.created < EXCLUDED.created
857
+ OR (commitfest_cfbotbranch.branch_id = EXCLUDED.branch_id
858
+ AND commitfest_cfbotbranch.modified < EXCLUDED.modified)
859
859
""" ,
860
860
(
861
861
patch_id ,
@@ -868,6 +868,14 @@ def cfbot_ingest(message):
868
868
branch_status ["modified" ])
869
869
)
870
870
871
+ # Now we check what we have in our database. If that contains a different
872
+ # branch_id than what we just tried to insert, then apparently this is a
873
+ # status update for an old branch and we don't care about any of the
874
+ # contents of this message.
875
+ branch_in_db = CfbotBranch .objects .get (pk = patch_id )
876
+ if branch_in_db .branch_id != branch_id :
877
+ return
878
+
871
879
# Most messages have a task_status. It might be missing in rare cases, like
872
880
# when cfbot decides that a whole branch has timed out. We INSERT or
873
881
# UPDATE.
@@ -892,6 +900,14 @@ def cfbot_ingest(message):
892
900
task_status ["modified" ])
893
901
)
894
902
903
+ # Remove any old tasks that are not related to this branch. These should
904
+ # only be left over when we just updated the branch_id. Knowing if we just
905
+ # updated the branch_id was is not trivial though, because INSERT ON
906
+ # CONFLICT does not allow us to easily return the old value of the row.
907
+ # So instead we always delete all tasks that are not related to this
908
+ # branch. This is fine, because doing so is very cheap in the no-op case
909
+ # because we have an index on patch_id and there's only a handful of tasks
910
+ # per patch.
895
911
cursor .execute ("DELETE FROM commitfest_cfbottask WHERE patch_id=%s AND branch_id != %s" , (patch_id , branch_id ))
896
912
897
913
0 commit comments