@@ -32,3 +32,42 @@ def reduce(self, records):
32
32
data = list (data_chunk .data )
33
33
assert len (data ) == 1
34
34
assert int (data [0 ]['sum' ]) == sum (range (0 , 10 ))
35
+
36
+
37
+ def test_simple_reporting_command_with_map ():
38
+ @searchcommands .Configuration ()
39
+ class MapAndReduceReportingCommand (searchcommands .ReportingCommand ):
40
+ def map (self , records ):
41
+ for record in records :
42
+ record ["value" ] = str (int (record ["value" ]) * 2 )
43
+ yield record
44
+
45
+ def reduce (self , records ):
46
+ total = 0
47
+ for record in records :
48
+ total += int (record ["value" ])
49
+ yield {"sum" : total }
50
+
51
+ cmd = MapAndReduceReportingCommand ()
52
+ ifile = io .BytesIO ()
53
+
54
+ input_data = [{"value" : str (i )} for i in range (5 )]
55
+
56
+ mapped_data = list (cmd .map (input_data ))
57
+
58
+ ifile .write (chunky .build_getinfo_chunk ())
59
+ ifile .write (chunky .build_data_chunk (mapped_data ))
60
+ ifile .seek (0 )
61
+
62
+ ofile = io .BytesIO ()
63
+ cmd ._process_protocol_v2 ([], ifile , ofile )
64
+
65
+ ofile .seek (0 )
66
+ chunk_stream = chunky .ChunkedDataStream (ofile )
67
+ chunk_stream .read_chunk ()
68
+ data_chunk = chunk_stream .read_chunk ()
69
+ assert data_chunk .meta ['finished' ] is True
70
+
71
+ result = list (data_chunk .data )
72
+ expected_sum = sum (i * 2 for i in range (5 ))
73
+ assert int (result [0 ]["sum" ]) == expected_sum
0 commit comments