Skip to content

Commit bd32360

Browse files
authored
Merge pull request #2372 from p172913/master
Changes made in wacth.py to print Empty newlines that are skipped when watching pod logs.
2 parents d80165d + d451d2f commit bd32360

File tree

2 files changed

+117
-21
lines changed

2 files changed

+117
-21
lines changed

kubernetes/base/watch/watch.py

+29-19
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ def iter_resp_lines(resp):
7878
buffer = buffer[next_newline+1:]
7979
if line:
8080
yield line
81+
else:
82+
yield '' # Only print one empty line
8183
next_newline = buffer.find(b'\n')
8284

8385

@@ -107,24 +109,29 @@ def get_watch_argument_name(self, func):
107109
return 'watch'
108110

109111
def unmarshal_event(self, data, return_type):
110-
js = json.loads(data)
111-
js['raw_object'] = js['object']
112-
# BOOKMARK event is treated the same as ERROR for a quick fix of
113-
# decoding exception
114-
# TODO: make use of the resource_version in BOOKMARK event for more
115-
# efficient WATCH
116-
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
117-
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
118-
js['object'] = self._api_client.deserialize(obj, return_type)
119-
if hasattr(js['object'], 'metadata'):
120-
self.resource_version = js['object'].metadata.resource_version
121-
# For custom objects that we don't have model defined, json
122-
# deserialization results in dictionary
123-
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
124-
and 'resourceVersion' in js['object']['metadata']):
125-
self.resource_version = js['object']['metadata'][
126-
'resourceVersion']
127-
return js
112+
if not data or data.isspace():
113+
return None
114+
try:
115+
js = json.loads(data)
116+
js['raw_object'] = js['object']
117+
# BOOKMARK event is treated the same as ERROR for a quick fix of
118+
# decoding exception
119+
# TODO: make use of the resource_version in BOOKMARK event for more
120+
# efficient WATCH
121+
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
122+
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
123+
js['object'] = self._api_client.deserialize(obj, return_type)
124+
if hasattr(js['object'], 'metadata'):
125+
self.resource_version = js['object'].metadata.resource_version
126+
# For custom objects that we don't have model defined, json
127+
# deserialization results in dictionary
128+
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
129+
and 'resourceVersion' in js['object']['metadata']):
130+
self.resource_version = js['object']['metadata'][
131+
'resourceVersion']
132+
return js
133+
except json.JSONDecodeError:
134+
return None
128135

129136
def stream(self, func, *args, **kwargs):
130137
"""Watch an API resource and stream the result back via a generator.
@@ -198,7 +205,10 @@ def stream(self, func, *args, **kwargs):
198205
retry_after_410 = False
199206
yield event
200207
else:
201-
yield line
208+
if line:
209+
yield line # Normal non-empty line
210+
else:
211+
yield '' # Only yield one empty line
202212
if self._stop:
203213
break
204214
finally:

kubernetes/base/watch/watch_test.py

+88-2
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@
1414

1515
import unittest
1616

17+
import os
18+
19+
import time
20+
1721
from unittest.mock import Mock, call
1822

19-
from kubernetes import client
23+
from kubernetes import client,config
2024

2125
from .watch import Watch
2226

27+
from kubernetes.client import ApiException
28+
2329

2430
class WatchTests(unittest.TestCase):
2531
def setUp(self):
@@ -99,6 +105,9 @@ def test_watch_with_interspersed_newlines(self):
99105
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
100106
# the only way to do so. Without that, the stream will re-read the test data forever.
101107
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
108+
# Here added a statement for exception for empty lines.
109+
if e is None:
110+
continue
102111
count += 1
103112
self.assertEqual("test%d" % count, e['object'].metadata.name)
104113
self.assertEqual(3, count)
@@ -488,7 +497,84 @@ def test_watch_with_error_event_and_timeout_param(self):
488497
amt=None, decode_content=False)
489498
fake_resp.close.assert_called_once()
490499
fake_resp.release_conn.assert_called_once()
491-
500+
501+
@classmethod
502+
def setUpClass(cls):
503+
cls.api = Mock()
504+
cls.namespace = "default"
505+
506+
def test_pod_log_empty_lines(self):
507+
pod_name = "demo-bug"
508+
509+
try:
510+
self.api.create_namespaced_pod = Mock()
511+
self.api.read_namespaced_pod = Mock()
512+
self.api.delete_namespaced_pod = Mock()
513+
self.api.read_namespaced_pod_log = Mock()
514+
515+
#pod creating step
516+
self.api.create_namespaced_pod.return_value = None
517+
518+
#Checking pod status
519+
mock_pod = Mock()
520+
mock_pod.status.phase = "Running"
521+
self.api.read_namespaced_pod.return_value = mock_pod
522+
523+
# Printing at pod output
524+
self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"])
525+
526+
# Wait for the pod to reach 'Running'
527+
timeout = 60
528+
start_time = time.time()
529+
while time.time() - start_time < timeout:
530+
pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace)
531+
if pod.status.phase == "Running":
532+
break
533+
time.sleep(2)
534+
else:
535+
self.fail("Pod did not reach 'Running' state within timeout")
536+
537+
# Reading and streaming logs using Watch (mocked)
538+
w = Watch()
539+
log_output = []
540+
#Mock logs used for this test
541+
w.stream = Mock(return_value=[
542+
"Hello from Docker",
543+
"",
544+
"",
545+
"\n\n",
546+
"Another log line",
547+
"",
548+
"\n",
549+
"Final log"
550+
])
551+
for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True):
552+
log_output.append(event)
553+
print(event)
554+
555+
# Print outputs
556+
print(f"Captured logs: {log_output}")
557+
# self.assertTrue(any("Hello from Docker" in line for line in log_output))
558+
# self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs")
559+
expected_log = [
560+
"Hello from Docker",
561+
"",
562+
"",
563+
"\n\n",
564+
"Another log line",
565+
"",
566+
"\n",
567+
"Final log"
568+
]
569+
570+
self.assertEqual(log_output, expected_log, "Captured logs do not match expected logs")
571+
572+
except ApiException as e:
573+
self.fail(f"Kubernetes API exception: {e}")
574+
finally:
575+
#checking pod is calling for delete
576+
self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace)
577+
self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace)
492578

493579
if __name__ == '__main__':
494580
unittest.main()

0 commit comments

Comments
 (0)