Skip to content

Commit

Permalink
Add a new test function 'test_CmisManagerTask_need_lp_mode_for_dpdein…
Browse files Browse the repository at this point in the history
…it_set_power_expired()' in test_xcvrd.py to enhance the test code coverage.
  • Loading branch information
CharlieChenEC committed Feb 14, 2025
1 parent 29776ce commit 778ea5e
Showing 1 changed file with 50 additions and 1 deletion.
51 changes: 50 additions & 1 deletion sonic-xcvrd/tests/test_xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,7 @@ def test_CmisManagerTask_get_configured_tx_power_from_db(self, mock_table_helper
(2, 0x11, 'Credo', False),
(1, 0x1, 'Molex', False)
])
def test_CmisManagerTask_is_need_low_power_first(self, appl, host_assign, vendor, expected):
def test_CmisManagerTask_need_lp_mode_for_dpdeinit(self, appl, host_assign, vendor, expected):
port_mapping = PortMapping()
stop_event = threading.Event()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
Expand All @@ -1756,6 +1756,55 @@ def test_CmisManagerTask_is_need_low_power_first(self, appl, host_assign, vendor
mock_xcvr_api.get_host_lane_assignment_option = MagicMock(return_value=host_assign)
assert task.need_lp_mode_for_dpdeinit(mock_xcvr_api, appl) == expected

@patch('xcvrd.xcvrd.XcvrTableHelper.get_status_tbl')
@patch('xcvrd.xcvrd.platform_chassis')
@patch('xcvrd.xcvrd.is_fast_reboot_enabled', MagicMock(return_value=(False)))
@patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock()))
@patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD'))
@patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock())
@patch('xcvrd.xcvrd.is_cmis_api', MagicMock(return_value=True))
def test_CmisManagerTask_need_lp_mode_for_dpdeinit_set_power_expired(self, mock_chassis, mock_get_status_tbl):
# set low power mode expired
mock_get_status_tbl = Table("STATE_DB", TRANSCEIVER_STATUS_TABLE)
mock_xcvr_api = MagicMock()
mock_xcvr_api.is_flat_memory = MagicMock(return_value=False)
mock_xcvr_api.is_coherent_module = MagicMock(return_value=False)
mock_xcvr_api.get_tx_config_power = MagicMock(return_value=0)
mock_xcvr_api.get_laser_config_freq = MagicMock(return_value=0)
mock_xcvr_api.get_module_type_abbreviation = MagicMock(return_value='QSFP-DD')

mock_sfp = MagicMock()
mock_sfp.get_presence = MagicMock(return_value=True)
mock_sfp.get_xcvr_api = MagicMock(return_value=mock_xcvr_api)
mock_chassis.get_all_sfps = MagicMock(return_value=[mock_sfp])
mock_chassis.get_sfp = MagicMock(return_value=mock_sfp)

port_mapping = PortMapping()
stop_event = threading.Event()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
task.xcvr_table_helper.get_status_tbl.return_value = mock_get_status_tbl

port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET)
task.on_port_update_event(port_change_event)
assert task.isPortConfigDone
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET,
{'speed':'400000', 'lanes':'1,2,3,4,5,6,7,8'})
task.on_port_update_event(port_change_event)
assert len(task.port_dict) == 1
task.port_mapping.logical_port_list = MagicMock()
task.get_host_tx_status = MagicMock(return_value='true')
task.get_port_admin_status = MagicMock(return_value='up')
task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
task.update_port_transceiver_status_table_sw_cmis_state("Ethernet0", CMIS_STATE_DP_DEINIT)
mock_xcvr_api.get_module_state = MagicMock(return_value='ModulePwrDn')
task.need_lp_mode_for_dpdeinit = MagicMock(return_value=True)
task.port_dict['Ethernet0']['host_lanes_mask'] = 0xff
task.port_dict['Ethernet0']['appl'] = 1
task.port_dict['Ethernet0']['cmis_expired'] = datetime.datetime.now() - datetime.timedelta(100)
task.task_worker()
assert get_cmis_state_from_state_db('Ethernet0', task.xcvr_table_helper.get_status_tbl(task.port_mapping.get_asic_id_for_logical_port('Ethernet0'))) == CMIS_STATE_INSERTED

@patch('xcvrd.xcvrd.platform_chassis')
@patch('xcvrd.xcvrd.is_fast_reboot_enabled', MagicMock(return_value=(False)))
@patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock()))
Expand Down

0 comments on commit 778ea5e

Please sign in to comment.