|
7 | 7 | Huffman-coded content where we already know the Huffman table.
|
8 | 8 | """
|
9 | 9 | from .compat import to_byte, decode_hex
|
10 |
| -from .exceptions import HPACKDecodingError |
11 |
| - |
12 |
| - |
13 |
| -def _pad_binary(bin_str, req_len=8): |
14 |
| - """ |
15 |
| - Given a binary string (returned by bin()), pad it to a full byte length. |
16 |
| - """ |
17 |
| - bin_str = bin_str[2:] # Strip the 0b prefix |
18 |
| - return max(0, req_len - len(bin_str)) * '0' + bin_str |
19 |
| - |
20 |
| - |
21 |
| -def _hex_to_bin_str(hex_string): |
22 |
| - """ |
23 |
| - Given a Python bytestring, returns a string representing those bytes in |
24 |
| - unicode form. |
25 |
| - """ |
26 |
| - unpadded_bin_string_list = (bin(to_byte(c)) for c in hex_string) |
27 |
| - padded_bin_string_list = map(_pad_binary, unpadded_bin_string_list) |
28 |
| - bitwise_message = "".join(padded_bin_string_list) |
29 |
| - return bitwise_message |
30 |
| - |
31 |
| - |
32 |
| -class HuffmanDecoder(object): |
33 |
| - """ |
34 |
| - Decodes a Huffman-coded bytestream according to the Huffman table laid out |
35 |
| - in the HPACK specification. |
36 |
| - """ |
37 |
| - class _Node(object): |
38 |
| - def __init__(self, data): |
39 |
| - self.data = data |
40 |
| - self.mapping = {} |
41 |
| - |
42 |
| - def __init__(self, huffman_code_list, huffman_code_list_lengths): |
43 |
| - self.root = self._Node(None) |
44 |
| - for index, (huffman_code, code_length) in enumerate(zip(huffman_code_list, huffman_code_list_lengths)): |
45 |
| - self._insert(huffman_code, code_length, index) |
46 |
| - |
47 |
| - def _insert(self, hex_number, hex_length, letter): |
48 |
| - """ |
49 |
| - Inserts a Huffman code point into the tree. |
50 |
| - """ |
51 |
| - hex_number = _pad_binary(bin(hex_number), hex_length) |
52 |
| - cur_node = self.root |
53 |
| - for digit in hex_number: |
54 |
| - if digit not in cur_node.mapping: |
55 |
| - cur_node.mapping[digit] = self._Node(None) |
56 |
| - cur_node = cur_node.mapping[digit] |
57 |
| - cur_node.data = letter |
58 |
| - |
59 |
| - def decode(self, encoded_string): |
60 |
| - """ |
61 |
| - Decode the given Huffman coded string. |
62 |
| - """ |
63 |
| - number = _hex_to_bin_str(encoded_string) |
64 |
| - cur_node = self.root |
65 |
| - decoded_message = bytearray() |
66 |
| - |
67 |
| - try: |
68 |
| - for digit in number: |
69 |
| - cur_node = cur_node.mapping[digit] |
70 |
| - if cur_node.data is not None: |
71 |
| - # If we get EOS, everything else is padding. |
72 |
| - if cur_node.data == 256: |
73 |
| - break |
74 |
| - |
75 |
| - decoded_message.append(cur_node.data) |
76 |
| - cur_node = self.root |
77 |
| - except KeyError: |
78 |
| - # We have a Huffman-coded string that doesn't match our trie. This |
79 |
| - # is pretty bad: raise a useful exception. |
80 |
| - raise HPACKDecodingError("Invalid Huffman-coded string received.") |
81 |
| - return bytes(decoded_message) |
82 | 10 |
|
83 | 11 |
|
84 | 12 | class HuffmanEncoder(object):
|
|
0 commit comments