@@ -488,3 +488,94 @@ def test_submit_pod(self, mock_podmonitor, mock_get_namespace, mock_client):
488
488
# This is to inspect `with PodMonitor() as monitor`:
489
489
self .assertTrue (mock_podmonitor .return_value .__enter__ .return_value .add .called )
490
490
491
+
492
+ def setup_mock_watch (self , mock_watch , event_objects = []):
493
+ mock_stream = Mock ()
494
+ mock_stop = Mock ()
495
+ stream_events = []
496
+ for event_object in event_objects :
497
+ stream_events .append ({'object' : event_object })
498
+ mock_stream .return_value = stream_events
499
+ mock_watch .Watch .return_value .stream = mock_stream
500
+ mock_watch .Watch .return_value .stop = mock_stop
501
+
502
+ def make_mock_pod (self , name ):
503
+ mock_metadata = Mock ()
504
+ # Cannot mock name attribute without a propertymock
505
+ name_property = PropertyMock (return_value = name )
506
+ type(mock_metadata ).name = name_property
507
+ mock_pod = create_autospec (V1Pod , metadata = mock_metadata )
508
+ return mock_pod
509
+
510
+
511
+ @patch ('calrissian.dask.watch' , autospec = True )
512
+ def test_wait_calls_watch_pod_with_pod_name_field_selector (self , mock_watch , mock_get_namespace , mock_client ):
513
+ mock_pod = self .make_mock_pod ('test123' )
514
+ mock_pod .status .container_statuses [0 ].state = Mock (running = None , waiting = None , terminated = Mock (exit_code = 0 ))
515
+ self .setup_mock_watch (mock_watch , [mock_pod ])
516
+ kc = KubernetesDaskClient ()
517
+ kc ._set_pod (mock_pod )
518
+ kc .wait_for_completion (cm_name = 'dask-cm-random' )
519
+ mock_stream = mock_watch .Watch .return_value .stream
520
+ self .assertEqual (mock_stream .call_args , call (kc .core_api_instance .list_namespaced_pod , kc .namespace ,
521
+ field_selector = 'metadata.name=test123' ))
522
+
523
+ @patch ('calrissian.dask.watch' , autospec = True )
524
+ def test_wait_calls_watch_pod_with_imcomplete_status (self , mock_watch , mock_get_namespace , mock_client ):
525
+ self .setup_mock_watch (mock_watch )
526
+ mock_pod = self .make_mock_pod ('test123' )
527
+ kc = KubernetesDaskClient ()
528
+ kc ._set_pod (mock_pod )
529
+ # Assert IncompleteStatusException is raised
530
+ with self .assertRaises (IncompleteStatusException ):
531
+ kc .wait_for_completion (cm_name = 'dask-cm-random' )
532
+
533
+ @patch ('calrissian.dask.watch' , autospec = True )
534
+ def test_wait_skips_pod_when_containers_status_is_none (self , mock_watch , mock_get_namespace , mock_client ):
535
+
536
+ mock_pod = self .make_mock_pod ('test123' )
537
+ mock_pod .status .container_statuses = None
538
+
539
+ self .setup_mock_watch (mock_watch , [mock_pod ])
540
+ kc = KubernetesDaskClient ()
541
+ kc ._set_pod (Mock ())
542
+ with self .assertRaises (IncompleteStatusException ):
543
+ kc .wait_for_completion (cm_name = 'dask-cm-random' )
544
+ self .assertFalse (mock_watch .Watch .return_value .stop .called )
545
+ self .assertFalse (mock_client .CoreV1Api .return_value .delete_namespaced_pod .called )
546
+ self .assertIsNotNone (kc .pod )
547
+
548
+
549
+ @patch ('calrissian.dask.watch' , autospec = True )
550
+ def test_wait_skips_pod_when_state_is_waiting (self , mock_watch , mock_get_namespace , mock_client ):
551
+ mock_pod = create_autospec (V1Pod )
552
+ mock_pod .status .container_statuses [0 ].state = Mock (running = None , waiting = True , terminated = None )
553
+ self .setup_mock_watch (mock_watch , [mock_pod ])
554
+ kc = KubernetesDaskClient ()
555
+ kc ._set_pod (Mock ())
556
+ with self .assertRaises (IncompleteStatusException ):
557
+ kc .wait_for_completion (cm_name = 'dask-cm-random' )
558
+ self .assertFalse (mock_watch .Watch .return_value .stop .called )
559
+ self .assertFalse (mock_client .CoreV1Api .return_value .delete_namespaced_pod .called )
560
+ self .assertIsNotNone (kc .pod )
561
+
562
+
563
+ @patch ('calrissian.dask.watch' , autospec = True )
564
+ @patch ('calrissian.dask.DaskPodMonitor' )
565
+ @patch ('calrissian.k8s.KubernetesClient._extract_cpu_memory_requests' )
566
+ def test_wait_finishes_when_pod_state_is_terminated (self , mock_cpu_memory ,
567
+ mock_podmonitor , mock_watch , mock_get_namespace ,
568
+ mock_client ):
569
+ mock_pod = create_autospec (V1Pod )
570
+ mock_pod .status .container_statuses [0 ].state = Mock (running = None , waiting = None , terminated = Mock (exit_code = 123 ))
571
+ mock_cpu_memory .return_value = ('1' , '1Mi' )
572
+ self .setup_mock_watch (mock_watch , [mock_pod ])
573
+ kc = KubernetesDaskClient ()
574
+ kc ._set_pod (Mock ())
575
+ completion_result = kc .wait_for_completion (cm_name = 'dask-cm-random' )
576
+ self .assertEqual (completion_result .exit_code , 123 )
577
+ self .assertTrue (mock_watch .Watch .return_value .stop .called )
578
+ self .assertTrue (mock_client .CoreV1Api .return_value .delete_namespaced_pod .called )
579
+ self .assertIsNone (kc .pod )
580
+ # This is to inspect `with PodMonitor() as monitor`:
581
+ self .assertTrue (mock_podmonitor .return_value .__enter__ .return_value .remove .called )
0 commit comments