-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathhdfs-lzo-text-scanner.h
204 lines (164 loc) · 6.45 KB
/
hdfs-lzo-text-scanner.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
// Copyright (c) 2012 Cloudera, Inc. All rights reserved.
#ifndef IMPALA_LZO_TEXT_SCANNER_H
#define IMPALA_LZO_TEXT_SCANNER_H
#include "lzo-header.h"
#include <boost/thread/locks.hpp>
#include "common/version.h"
#include "exec/hdfs-text-scanner.h"
#include "runtime/string-buffer.h"
// This provides support for reading files compressed with lzop.
// The file consists of a header and compressed blocks preceeded
// by their compressed and uncompressed block sizes.
//
// The following is a pseudo-BNF grammar for LZOFile. Comments are prefixed
// with dashes:
//
// lzofile ::=
// <file-header>
// <compressed-block>+
//
// compressed-block ::=
// <uncompressed-size>
// <compressed-size>
// <uncompressed-checksums>
// <compressed-checksums>
// <compressed-data>
//
// file-header ::= -- most of this information is not used.
// <magic>
// <version>
// <lib-version>
// [<version-needed>] -- present for all modern files.
// <method>
// <flags>
// <mode>
// <mtime>
// <file-name>
// <header-checksum>
// <extra-field> -- presence indicated in flags, not currently used.
//
// <compressed-checksums> ::=
// [alder-checksum | crc-checksum]
//
// <uncompressed-checksums> ::=
// [alder-checksum | crc-checksum]
//
// <file-name> ::=
// <length> -- one byte
// <name>
//
namespace impala {
class ScannerContext;
class HdfsLzoTextScanner;
// HdfsScanner implementation that reads LZOP formatted text files.
// The format of the data, after decompression, is the same as HdfsText files.
// Records can span compresed blocks.
//
// An optional, but highly recommended, index file may exist in the same directory.
// This file is generated by running: com.hadoop.compression.lzo.DistributedLzoIndexer.
// The file contains the offsets to the start of each compressed block.
// This is used to find the beginning of a split and to skip over a bad block and
// find the next block.
// If there is no index file then the file is non-splittble. A single scan range
// will be issued for the whole file and no error recovery is done.
// Used to verify that this library was built against the expected Impala version when the
// library is loaded via dlopen.
extern "C" const char* GetImpalaBuildVersion() { return IMPALA_BUILD_VERSION; }
// The two functions below are wrappers for calling methods of HdfsLzoTextScanner
// when the library is loaded via dlopen.
// This function is a wrapper for the HdfsLzoTextScanner creator. The caller is expected
// to call delete on it.
// scan_node -- scan node that is creating this scanner.
// state -- runtime state for this scanner.
extern "C" HdfsLzoTextScanner* GetLzoTextScanner(
HdfsScanNode* scan_node, RuntimeState* state);
// This function is a wrapper for HdfsLzoTextScanner::IssueInitialRanges.
// scan_node -- scan node for this scan
// files -- files that are to be scanned.
extern "C" Status IssueInitialRanges(
HdfsScanNode* scan_node, const std::vector<HdfsFileDesc*>& files);
class HdfsLzoTextScanner : public HdfsTextScanner {
public:
HdfsLzoTextScanner(HdfsScanNode* scan_node, RuntimeState* state);
virtual ~HdfsLzoTextScanner();
// Implementation of HdfsScanner interface not inherited from HdsfTextScanner.
virtual Status Close();
// This will read the header of the file, locate the index file and
// then fire off the rest of the scan ranges.
virtual Status ProcessSplit();
// Issue the initial scan ranges for all lzo-text files. This reads the
// file headers and then the reset of the file data will be issued from
// ProcessScanRange.
static Status IssueInitialRanges(
HdfsScanNode* scan_node, const std::vector<HdfsFileDesc*>& files);
private:
enum LzoChecksum {
CHECK_NONE,
CHECK_CRC32,
CHECK_ADLER
};
// Suffix for index files.
const static std::string INDEX_SUFFIX;
// Block size in bytes used by LZOP. The compressed blocks will be no bigger than this.
const static int MAX_BLOCK_COMPRESSED_SIZE = (256 * 1024);
// This is the fixed size of the header. It can have up to 255 bytes of
// file name in it as well.
const static int MIN_HEADER_SIZE = 32;
// An over estimate of how big the header could be. There is a path name
// and an option seciton.
const static int HEADER_SIZE = 300;
// Header informatation, shared by all scanners on this file.
struct LzoFileHeader {
LzoChecksum input_checksum_type_;
LzoChecksum output_checksum_type_;
uint32_t header_size_;
// Offsets to compressed blocks.
std::vector<int64_t> offsets;
};
// Pointer to shared header information.
LzoFileHeader* header_;
// Fills the byte buffer by reading and decompressing blocks.
virtual Status FillByteBuffer(bool* eosr, int num_bytes = 0);
// Read header data and validate header.
Status ReadHeader();
// Checksum data.
Status Checksum(LzoChecksum type,
const std::string& source, int expected_checksum, uint8_t* buffer, int length);
// Read the index file and set up the header.offsets.
Status ReadIndexFile();
// Adjust the context_ to the first block at or after the current context offset.
Status FindFirstBlock();
// Issue the full file ranges after reading the headers.
Status IssueFileRanges(const char* filename);
// Read a data block.
// sets: byte_buffer_ptr_, byte_buffer_read_size_ and eos_read_.
// Data will be in a mempool allocated buffer or in the disk I/O context memory
// if the data was not compressed.
Status ReadAndDecompressData();
// Read compress data and recover from errosr.
Status ReadData();
// Pool for allocating the block_buffer_.
boost::scoped_ptr<MemPool> block_buffer_pool_;
// Buffer to hold decompressed data.
uint8_t* block_buffer_;
// Allocated length of the block_buffer_
int32_t block_buffer_len_;
// Next byte to be returned from the buffer holding decompressed data blocks.
uint8_t* block_buffer_ptr_;
// Bytes remaining in the block_buffer.
int bytes_remaining_;
// True if we have read the compressed block past the end of the scan.
// If set then we return eos to the caller even if there are bytes in the buffer.
bool past_eosr_;
// True if the end of scan has been read.
bool eos_read_;
// True if we are parsing the header for this scanner.
bool only_parsing_header_;
// This is set when the scanner object is constructed. Currently always true.
// HDFS checksums the blocks from the disk to the client, so this is redundent.
bool disable_checksum_;
// Time spent decompressing
RuntimeProfile::Counter* decompress_timer_;
};
}
#endif