|
| 1 | +# Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +# or more contributor license agreements. See the NOTICE file |
| 3 | +# distributed with this work for additional information |
| 4 | +# regarding copyright ownership. The ASF licenses this file |
| 5 | +# to you under the Apache License, Version 2.0 (the |
| 6 | +# "License"); you may not use this file except in compliance |
| 7 | +# with the License. You may obtain a copy of the License at |
| 8 | +# |
| 9 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +# |
| 11 | +# Unless required by applicable law or agreed to in writing, |
| 12 | +# software distributed under the License is distributed on an |
| 13 | +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +# KIND, either express or implied. See the License for the |
| 15 | +# specific language governing permissions and limitations |
| 16 | +# under the License. |
| 17 | + |
| 18 | +from email import policy |
| 19 | +import email |
| 20 | +import json |
| 21 | +import pyarrow as pa |
| 22 | +import sys |
| 23 | +import time |
| 24 | +import urllib.request |
| 25 | + |
| 26 | +JSON_FORMAT = "application/json" |
| 27 | +TEXT_FORMAT = "text/plain" |
| 28 | +ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream" |
| 29 | + |
| 30 | +start_time = time.time() |
| 31 | +response_parsing_time = 0 # time to parse the multipart message |
| 32 | +arrow_stream_parsing_time = 0 # time to parse the Arrow stream |
| 33 | + |
| 34 | + |
| 35 | +def parse_multipart_message(response, boundary, buffer_size=8192): |
| 36 | + """ |
| 37 | + Parse a multipart/mixed HTTP response into a list of Message objects. |
| 38 | +
|
| 39 | + Returns |
| 40 | + ------- |
| 41 | + list of email.message.Message containing the parts of the multipart message. |
| 42 | + """ |
| 43 | + global response_parsing_time |
| 44 | + buffer_size = max(buffer_size, 8192) |
| 45 | + buffer = bytearray(buffer_size) |
| 46 | + |
| 47 | + header = f'MIME-Version: 1.0\r\nContent-Type: multipart/mixed; boundary="{boundary}"\r\n\r\n' |
| 48 | + feedparser = email.parser.BytesFeedParser(policy=policy.default) |
| 49 | + feedparser.feed(header.encode("utf-8")) |
| 50 | + while bytes_read := response.readinto(buffer): |
| 51 | + start_time = time.time() |
| 52 | + feedparser.feed(buffer[0:bytes_read]) |
| 53 | + response_parsing_time += time.time() - start_time |
| 54 | + start_time = time.time() |
| 55 | + message = feedparser.close() |
| 56 | + response_parsing_time += time.time() - start_time |
| 57 | + assert message.is_multipart() |
| 58 | + return message.get_payload() |
| 59 | + |
| 60 | + |
| 61 | +def process_json_part(message): |
| 62 | + assert message.get_content_type() == JSON_FORMAT |
| 63 | + payload = part.get_payload() |
| 64 | + print(f"-- {len(payload)} bytes of JSON data:") |
| 65 | + try: |
| 66 | + PREVIW_SIZE = 5 |
| 67 | + data = json.loads(payload) |
| 68 | + print("[") |
| 69 | + for i in range(min(PREVIW_SIZE, len(data))): |
| 70 | + print(f" {data[i]}") |
| 71 | + if len(data) > PREVIW_SIZE: |
| 72 | + print(f" ...+{len(data) - PREVIW_SIZE} entries...") |
| 73 | + print("]") |
| 74 | + except json.JSONDecodeError as e: |
| 75 | + print(f"Error parsing JSON data: {e}\n", file=sys.stderr) |
| 76 | + return data |
| 77 | + |
| 78 | + |
| 79 | +def process_arrow_stream_message(message): |
| 80 | + global arrow_stream_parsing_time |
| 81 | + assert message.get_content_type() == ARROW_STREAM_FORMAT |
| 82 | + payload = part.get_payload(decode=True) |
| 83 | + print(f"-- {len(payload)} bytes of Arrow data:") |
| 84 | + num_batches = 0 |
| 85 | + num_records = 0 |
| 86 | + start_time = time.time() |
| 87 | + with pa.ipc.open_stream(payload) as reader: |
| 88 | + schema = reader.schema |
| 89 | + print(f"Schema: \n{schema}\n") |
| 90 | + try: |
| 91 | + while True: |
| 92 | + batch = reader.read_next_batch() |
| 93 | + num_batches += 1 |
| 94 | + num_records += batch.num_rows |
| 95 | + except StopIteration: |
| 96 | + pass |
| 97 | + arrow_stream_parsing_time = time.time() - start_time |
| 98 | + print(f"Parsed {num_records} records in {num_batches} batch(es)") |
| 99 | + |
| 100 | + |
| 101 | +def process_text_part(message): |
| 102 | + assert message.get_content_type() == TEXT_FORMAT |
| 103 | + payload = part.get_payload() |
| 104 | + print("-- Text Message:") |
| 105 | + print(payload, end="") |
| 106 | + print("-- End of Text Message --") |
| 107 | + |
| 108 | + |
| 109 | +response = urllib.request.urlopen("http://localhost:8008?include_footnotes") |
| 110 | + |
| 111 | +content_type = response.headers.get_content_type() |
| 112 | +if content_type != "multipart/mixed": |
| 113 | + raise ValueError(f"Expected multipart/mixed Content-Type, got {content_type}") |
| 114 | +boundary = response.headers.get_boundary() |
| 115 | +if boundary is None or len(boundary) == 0: |
| 116 | + raise ValueError("No multipart boundary found in Content-Type header") |
| 117 | + |
| 118 | +parts = parse_multipart_message(response, boundary, buffer_size=64 * 1024) |
| 119 | +batches = None |
| 120 | +for part in parts: |
| 121 | + content_type = part.get_content_type() |
| 122 | + if content_type == JSON_FORMAT: |
| 123 | + process_json_part(part) |
| 124 | + elif content_type == ARROW_STREAM_FORMAT: |
| 125 | + batches = process_arrow_stream_message(part) |
| 126 | + elif content_type == TEXT_FORMAT: |
| 127 | + process_text_part(part) |
| 128 | + |
| 129 | +end_time = time.time() |
| 130 | +execution_time = end_time - start_time |
| 131 | + |
| 132 | +rel_response_parsing_time = response_parsing_time / execution_time |
| 133 | +rel_arrow_stream_parsing_time = arrow_stream_parsing_time / execution_time |
| 134 | +print(f"{execution_time:.3f} seconds elapsed") |
| 135 | +print( |
| 136 | + f"""{response_parsing_time:.3f} seconds \ |
| 137 | +({rel_response_parsing_time * 100:.2f}%) \ |
| 138 | +seconds parsing multipart/mixed response""" |
| 139 | +) |
| 140 | +print( |
| 141 | + f"""{arrow_stream_parsing_time:.3f} seconds \ |
| 142 | +({rel_arrow_stream_parsing_time * 100:.2f}%) \ |
| 143 | +seconds parsing Arrow stream""" |
| 144 | +) |
0 commit comments