Skip to content

Commit cae3365

Browse files
Update vendored DuckDB sources to a42abde
1 parent a42abde commit cae3365

File tree

11 files changed

+124
-64
lines changed

11 files changed

+124
-64
lines changed

src/duckdb/src/catalog/catalog_entry/table_catalog_entry.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,8 @@ DataTable &TableCatalogEntry::GetStorage() {
226226
}
227227
// LCOV_EXCL_STOP
228228

229-
static void BindExtraColumns(TableCatalogEntry &table, LogicalGet &get, LogicalProjection &proj, LogicalUpdate &update,
230-
physical_index_set_t &bound_columns) {
229+
void LogicalUpdate::BindExtraColumns(TableCatalogEntry &table, LogicalGet &get, LogicalProjection &proj,
230+
LogicalUpdate &update, physical_index_set_t &bound_columns) {
231231
if (bound_columns.size() <= 1) {
232232
return;
233233
}
@@ -276,15 +276,15 @@ void TableCatalogEntry::BindUpdateConstraints(Binder &binder, LogicalGet &get, L
276276
if (constraint->type == ConstraintType::CHECK) {
277277
auto &check = constraint->Cast<BoundCheckConstraint>();
278278
// check constraint! check if we need to add any extra columns to the UPDATE clause
279-
BindExtraColumns(*this, get, proj, update, check.bound_columns);
279+
LogicalUpdate::BindExtraColumns(*this, get, proj, update, check.bound_columns);
280280
}
281281
}
282282
if (update.return_chunk) {
283283
physical_index_set_t all_columns;
284284
for (auto &column : GetColumns().Physical()) {
285285
all_columns.insert(column.Physical());
286286
}
287-
BindExtraColumns(*this, get, proj, update, all_columns);
287+
LogicalUpdate::BindExtraColumns(*this, get, proj, update, all_columns);
288288
}
289289
// for index updates we always turn any update into an insert and a delete
290290
// we thus need all the columns to be available, hence we check if the update touches any index columns
@@ -317,7 +317,7 @@ void TableCatalogEntry::BindUpdateConstraints(Binder &binder, LogicalGet &get, L
317317
for (auto &column : GetColumns().Physical()) {
318318
all_columns.insert(column.Physical());
319319
}
320-
BindExtraColumns(*this, get, proj, update, all_columns);
320+
LogicalUpdate::BindExtraColumns(*this, get, proj, update, all_columns);
321321
}
322322
}
323323

src/duckdb/src/execution/operator/csv_scanner/scanner/column_count_scanner.cpp

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22

33
namespace duckdb {
44

5-
ColumnCountResult::ColumnCountResult(CSVStates &states, CSVStateMachine &state_machine, idx_t result_size)
6-
: ScannerResult(states, state_machine, result_size) {
5+
ColumnCountResult::ColumnCountResult(CSVStates &states, CSVStateMachine &state_machine, idx_t result_size,
6+
CSVErrorHandler &error_handler)
7+
: ScannerResult(states, state_machine, result_size), error_handler(error_handler) {
78
column_counts.resize(result_size);
89
}
910

@@ -42,6 +43,17 @@ bool ColumnCountResult::AddRow(ColumnCountResult &result, idx_t buffer_pos) {
4243
const LinePosition cur_position(result.cur_buffer_idx, buffer_pos + 1, result.current_buffer_size);
4344
if (cur_position - result.last_position > result.state_machine.options.maximum_line_size.GetValue() &&
4445
buffer_pos != NumericLimits<idx_t>::Maximum()) {
46+
LinesPerBoundary lines_per_batch;
47+
FullLinePosition current_line_position;
48+
current_line_position.begin = result.last_position;
49+
current_line_position.end = cur_position;
50+
bool mock = false;
51+
string csv_row = current_line_position.ReconstructCurrentLine(mock, result.buffer_handles, true);
52+
auto error = CSVError::LineSizeError(
53+
result.state_machine.options, lines_per_batch, csv_row,
54+
result.last_position.GetGlobalPosition(result.state_machine.options.buffer_size_option.GetValue(), false),
55+
result.state_machine.options.file_path);
56+
result.error_handler.Error(error);
4557
result.error = true;
4658
}
4759
result.InternalAddRow();
@@ -100,14 +112,15 @@ void ColumnCountResult::QuotedNewLine(ColumnCountResult &result) {
100112

101113
ColumnCountScanner::ColumnCountScanner(shared_ptr<CSVBufferManager> buffer_manager,
102114
const shared_ptr<CSVStateMachine> &state_machine,
103-
shared_ptr<CSVErrorHandler> error_handler, idx_t result_size_p,
115+
shared_ptr<CSVErrorHandler> error_handler_p, idx_t result_size_p,
104116
CSVIterator iterator)
105-
: BaseScanner(std::move(buffer_manager), state_machine, std::move(error_handler), true, nullptr, iterator),
106-
result(states, *state_machine, result_size_p), column_count(1), result_size(result_size_p) {
117+
: BaseScanner(std::move(buffer_manager), state_machine, std::move(error_handler_p), true, nullptr, iterator),
118+
result(states, *state_machine, result_size_p, *error_handler), column_count(1), result_size(result_size_p) {
107119
sniffing = true;
108120
idx_t actual_size = 0;
109121
if (cur_buffer_handle) {
110122
actual_size = cur_buffer_handle->actual_size;
123+
result.buffer_handles[0] = cur_buffer_handle;
111124
}
112125
result.last_position = {iterator.pos.buffer_idx, iterator.pos.buffer_pos, actual_size};
113126
result.current_buffer_size = actual_size;
@@ -176,15 +189,32 @@ void ColumnCountScanner::FinalizeChunkProcess() {
176189
}
177190
return;
178191
} else {
192+
result.buffer_handles[iterator.pos.buffer_idx] = cur_buffer_handle;
179193
result.cur_buffer_idx = iterator.pos.buffer_idx;
180194
result.current_buffer_size = cur_buffer_handle->actual_size;
181195
// Do a quick check that the line is still sane
182196
const LinePosition cur_position(result.cur_buffer_idx, 0, result.current_buffer_size);
197+
LinesPerBoundary lines_per_batch;
183198
if (cur_position - result.last_position > result.state_machine.options.maximum_line_size.GetValue()) {
199+
FullLinePosition current_line_position;
200+
current_line_position.begin = result.last_position;
201+
current_line_position.end = cur_position;
202+
bool mock = false;
203+
string csv_row = current_line_position.ReconstructCurrentLine(mock, result.buffer_handles, true);
204+
auto error =
205+
CSVError::LineSizeError(result.state_machine.options, lines_per_batch, csv_row,
206+
result.last_position.GetGlobalPosition(
207+
result.state_machine.options.buffer_size_option.GetValue(), false),
208+
result.state_machine.options.file_path);
209+
error_handler->Error(error);
184210
result.error = true;
185211
return;
186212
}
187213
}
214+
if (result.buffer_handles.size() > 2) {
215+
// pop lowest value
216+
result.buffer_handles.erase(result.buffer_handles.begin());
217+
}
188218
iterator.pos.buffer_pos = 0;
189219
buffer_handle_ptr = cur_buffer_handle->Ptr();
190220
}

src/duckdb/src/execution/operator/csv_scanner/scanner/string_value_scanner.cpp

Lines changed: 5 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ bool StringValueResult::UnsetComment(StringValueResult &result, idx_t buffer_pos
212212
return done;
213213
}
214214

215-
static void SanitizeError(string &value) {
215+
void FullLinePosition::SanitizeError(string &value) {
216216
std::vector<char> char_array(value.begin(), value.end());
217217
char_array.push_back('\0'); // Null-terminate the character array
218218
Utf8Proc::MakeValid(&char_array[0], char_array.size());
@@ -460,7 +460,7 @@ void StringValueResult::AddValueToVector(const char *value_ptr, const idx_t size
460460
error << "Could not convert string \"" << std::string(value_ptr, size) << "\" to \'"
461461
<< LogicalTypeIdToString(parse_types[chunk_col_id].type_id) << "\'";
462462
auto error_string = error.str();
463-
SanitizeError(error_string);
463+
FullLinePosition::SanitizeError(error_string);
464464

465465
current_errors.ModifyErrorMessageOfLastError(error_string);
466466
}
@@ -547,7 +547,7 @@ void StringValueResult::AddPossiblyEscapedValue(StringValueResult &result, const
547547
error << "Could not convert string \"" << std::string(value_ptr, length) << "\" to \'"
548548
<< LogicalTypeIdToString(result.parse_types[result.chunk_col_id].type_id) << "\'";
549549
auto error_string = error.str();
550-
SanitizeError(error_string);
550+
FullLinePosition::SanitizeError(error_string);
551551
result.current_errors.ModifyErrorMessageOfLastError(error_string);
552552
}
553553
result.cur_col_id++;
@@ -769,46 +769,6 @@ void StringValueResult::NullPaddingQuotedNewlineCheck() const {
769769
}
770770
}
771771

772-
//! Reconstructs the current line to be used in error messages
773-
string FullLinePosition::ReconstructCurrentLine(bool &first_char_nl,
774-
unordered_map<idx_t, shared_ptr<CSVBufferHandle>> &buffer_handles,
775-
bool reconstruct_line) const {
776-
if (!reconstruct_line || begin == end) {
777-
return {};
778-
}
779-
string result;
780-
if (end.buffer_idx == begin.buffer_idx) {
781-
if (buffer_handles.find(end.buffer_idx) == buffer_handles.end()) {
782-
throw InternalException("CSV Buffer is not available to reconstruct CSV Line, please open an issue with "
783-
"your query and dataset.");
784-
}
785-
auto buffer = buffer_handles[begin.buffer_idx]->Ptr();
786-
first_char_nl = buffer[begin.buffer_pos] == '\n' || buffer[begin.buffer_pos] == '\r';
787-
for (idx_t i = begin.buffer_pos + first_char_nl; i < end.buffer_pos; i++) {
788-
result += buffer[i];
789-
}
790-
} else {
791-
if (buffer_handles.find(begin.buffer_idx) == buffer_handles.end() ||
792-
buffer_handles.find(end.buffer_idx) == buffer_handles.end()) {
793-
throw InternalException("CSV Buffer is not available to reconstruct CSV Line, please open an issue with "
794-
"your query and dataset.");
795-
}
796-
auto first_buffer = buffer_handles[begin.buffer_idx]->Ptr();
797-
auto first_buffer_size = buffer_handles[begin.buffer_idx]->actual_size;
798-
auto second_buffer = buffer_handles[end.buffer_idx]->Ptr();
799-
first_char_nl = first_buffer[begin.buffer_pos] == '\n' || first_buffer[begin.buffer_pos] == '\r';
800-
for (idx_t i = begin.buffer_pos + first_char_nl; i < first_buffer_size; i++) {
801-
result += first_buffer[i];
802-
}
803-
for (idx_t i = 0; i < end.buffer_pos; i++) {
804-
result += second_buffer[i];
805-
}
806-
}
807-
// sanitize borked line
808-
SanitizeError(result);
809-
return result;
810-
}
811-
812772
bool StringValueResult::AddRowInternal() {
813773
LinePosition current_line_start = {iterator.pos.buffer_idx, iterator.pos.buffer_pos, buffer_size};
814774
idx_t current_line_size = current_line_start - current_line_position.end;
@@ -1108,7 +1068,7 @@ void StringValueScanner::Flush(DataChunk &insert_chunk) {
11081068
error << "Could not convert string \"" << parse_vector.GetValue(line_error) << "\" to \'"
11091069
<< type.ToString() << "\'";
11101070
string error_msg = error.str();
1111-
SanitizeError(error_msg);
1071+
FullLinePosition::SanitizeError(error_msg);
11121072
idx_t row_byte_pos = 0;
11131073
if (!(result.line_positions_per_row[line_error].begin ==
11141074
result.line_positions_per_row[line_error].end)) {
@@ -1143,7 +1103,7 @@ void StringValueScanner::Flush(DataChunk &insert_chunk) {
11431103
error << "Could not convert string \"" << parse_vector.GetValue(line_error) << "\" to \'"
11441104
<< LogicalTypeIdToString(type.id()) << "\'";
11451105
string error_msg = error.str();
1146-
SanitizeError(error_msg);
1106+
FullLinePosition::SanitizeError(error_msg);
11471107
auto csv_error = CSVError::CastError(
11481108
state_machine->options, names[i], error_msg, i, borked_line, lines_per_batch,
11491109
result.line_positions_per_row[line_error].begin.GetGlobalPosition(result.result_size,

src/duckdb/src/execution/operator/csv_scanner/sniffer/dialect_detection.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,15 @@ void CSVSniffer::AnalyzeDialectCandidate(unique_ptr<ColumnCountScanner> scanner,
238238
idx_t dirty_notes = 0;
239239
idx_t dirty_notes_minus_comments = 0;
240240
if (sniffed_column_counts.error) {
241+
if (!scanner->error_handler->HasError(MAXIMUM_LINE_SIZE)) {
242+
all_fail_max_line_size = false;
243+
} else {
244+
line_error = scanner->error_handler->GetFirstError(MAXIMUM_LINE_SIZE);
245+
}
241246
// This candidate has an error (i.e., over maximum line size or never unquoting quoted values)
242247
return;
243248
}
249+
all_fail_max_line_size = false;
244250
idx_t consistent_rows = 0;
245251
idx_t num_cols = sniffed_column_counts.result_position == 0 ? 1 : sniffed_column_counts[0].number_of_columns;
246252
const bool ignore_errors = options.ignore_errors.GetValue();
@@ -608,7 +614,12 @@ void CSVSniffer::DetectDialect() {
608614

609615
// if no dialect candidate was found, we throw an exception
610616
if (candidates.empty()) {
611-
auto error = CSVError::SniffingError(options, dialect_candidates.Print(), max_columns_found_error, set_columns);
617+
CSVError error;
618+
if (all_fail_max_line_size) {
619+
error = line_error;
620+
} else {
621+
error = CSVError::SniffingError(options, dialect_candidates.Print(), max_columns_found_error, set_columns);
622+
}
612623
error_handler->Error(error, true);
613624
}
614625
}

src/duckdb/src/execution/operator/csv_scanner/util/csv_error.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ bool CSVErrorHandler::HasError(const CSVErrorType error_type) {
132132
return false;
133133
}
134134

135+
CSVError CSVErrorHandler::GetFirstError(CSVErrorType error_type) {
136+
lock_guard<mutex> parallel_lock(main_mutex);
137+
for (const auto &er : errors) {
138+
if (er.type == error_type) {
139+
return er;
140+
}
141+
}
142+
throw InternalException("CSVErrorHandler::GetFirstError was called without having an appropriate error type");
143+
}
144+
135145
idx_t CSVErrorHandler::GetSize() {
136146
lock_guard<mutex> parallel_lock(main_mutex);
137147
return errors.size();
@@ -273,6 +283,10 @@ CSVError::CSVError(string error_message_p, CSVErrorType type_p, idx_t column_idx
273283
if (reader_options.ignore_errors.GetValue()) {
274284
RemoveNewLine(error_message);
275285
}
286+
// Let's cap the csv row to 10k bytes. For performance reasons.
287+
if (csv_row.size() > 10000) {
288+
csv_row.erase(csv_row.begin() + 10000, csv_row.end());
289+
}
276290
error << error_message << '\n';
277291
error << fixes << '\n';
278292
error << reader_options.ToString(current_path);

src/duckdb/src/function/table/version/pragma_version.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#ifndef DUCKDB_PATCH_VERSION
2-
#define DUCKDB_PATCH_VERSION "0-dev2339"
2+
#define DUCKDB_PATCH_VERSION "0-dev2352"
33
#endif
44
#ifndef DUCKDB_MINOR_VERSION
55
#define DUCKDB_MINOR_VERSION 3
@@ -8,10 +8,10 @@
88
#define DUCKDB_MAJOR_VERSION 1
99
#endif
1010
#ifndef DUCKDB_VERSION
11-
#define DUCKDB_VERSION "v1.3.0-dev2339"
11+
#define DUCKDB_VERSION "v1.3.0-dev2352"
1212
#endif
1313
#ifndef DUCKDB_SOURCE_ID
14-
#define DUCKDB_SOURCE_ID "e1dcf13ed5"
14+
#define DUCKDB_SOURCE_ID "a8d5c0dcb4"
1515
#endif
1616
#include "duckdb/function/table/system_functions.hpp"
1717
#include "duckdb/main/database.hpp"

src/duckdb/src/include/duckdb/execution/operator/csv_scanner/column_count_scanner.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ struct ColumnCount {
3131

3232
class ColumnCountResult : public ScannerResult {
3333
public:
34-
ColumnCountResult(CSVStates &states, CSVStateMachine &state_machine, idx_t result_size);
34+
ColumnCountResult(CSVStates &states, CSVStateMachine &state_machine, idx_t result_size,
35+
CSVErrorHandler &error_handler);
3536
inline ColumnCount &operator[](size_t index) {
3637
return column_counts[index];
3738
}
@@ -46,6 +47,8 @@ class ColumnCountResult : public ScannerResult {
4647
idx_t current_buffer_size = 0;
4748
//! How many rows fit a given column count
4849
map<idx_t, idx_t> rows_per_column_count;
50+
CSVErrorHandler &error_handler;
51+
map<idx_t, shared_ptr<CSVBufferHandle>> buffer_handles;
4952
//! Adds a Value to the result
5053
static inline void AddValue(ColumnCountResult &result, idx_t buffer_pos);
5154
//! Adds a Row to the result

src/duckdb/src/include/duckdb/execution/operator/csv_scanner/csv_error.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ class CSVErrorHandler {
134134
//! Returns true if there are any errors
135135
bool AnyErrors();
136136
bool HasError(CSVErrorType error_type);
137+
CSVError GetFirstError(CSVErrorType error_type);
137138
idx_t GetMaxLineLength();
138139

139140
void DontPrintErrorLine();

src/duckdb/src/include/duckdb/execution/operator/csv_scanner/sniffer/csv_sniffer.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ class CSVSniffer {
9797
bool EmptyOrOnlyHeader() const;
9898

9999
private:
100+
//! If all our candidates failed due to lines being bigger than the max line size.
101+
bool all_fail_max_line_size = true;
102+
CSVError line_error;
100103
//! CSV State Machine Cache
101104
CSVStateMachineCache &state_machine_cache;
102105
//! Highest number of columns found

src/duckdb/src/include/duckdb/execution/operator/csv_scanner/string_value_scanner.hpp

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,43 @@ class FullLinePosition {
3333
FullLinePosition() {};
3434
LinePosition begin;
3535
LinePosition end;
36-
36+
static void SanitizeError(string &value);
3737
//! Reconstructs the current line to be used in error messages
38-
string ReconstructCurrentLine(bool &first_char_nl,
39-
unordered_map<idx_t, shared_ptr<CSVBufferHandle>> &buffer_handles,
40-
bool reconstruct_line) const;
38+
template <class T>
39+
string ReconstructCurrentLine(bool &first_char_nl, T &buffer_handles, bool reconstruct_line) const {
40+
if (!reconstruct_line || begin == end) {
41+
return {};
42+
}
43+
string result;
44+
if (end.buffer_idx == begin.buffer_idx) {
45+
if (buffer_handles.find(end.buffer_idx) == buffer_handles.end()) {
46+
return {};
47+
}
48+
auto buffer = buffer_handles[begin.buffer_idx]->Ptr();
49+
first_char_nl = buffer[begin.buffer_pos] == '\n' || buffer[begin.buffer_pos] == '\r';
50+
for (idx_t i = begin.buffer_pos + first_char_nl; i < end.buffer_pos; i++) {
51+
result += buffer[i];
52+
}
53+
} else {
54+
if (buffer_handles.find(begin.buffer_idx) == buffer_handles.end() ||
55+
buffer_handles.find(end.buffer_idx) == buffer_handles.end()) {
56+
return {};
57+
}
58+
auto first_buffer = buffer_handles[begin.buffer_idx]->Ptr();
59+
auto first_buffer_size = buffer_handles[begin.buffer_idx]->actual_size;
60+
auto second_buffer = buffer_handles[end.buffer_idx]->Ptr();
61+
first_char_nl = first_buffer[begin.buffer_pos] == '\n' || first_buffer[begin.buffer_pos] == '\r';
62+
for (idx_t i = begin.buffer_pos + first_char_nl; i < first_buffer_size; i++) {
63+
result += first_buffer[i];
64+
}
65+
for (idx_t i = 0; i < end.buffer_pos; i++) {
66+
result += second_buffer[i];
67+
}
68+
}
69+
// sanitize borked line
70+
SanitizeError(result);
71+
return result;
72+
}
4173
};
4274

4375
class StringValueResult;

0 commit comments

Comments
 (0)