Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Better error message on parallel turtle parsing ... (#1807)" #1827

Merged
merged 1 commit into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/engine/GraphStoreProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include "engine/GraphStoreProtocol.h"

#include "parser/Tokenizer.h"
#include "util/http/beast.h"

// ____________________________________________________________________________
Expand Down
2 changes: 0 additions & 2 deletions src/index/IndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
#include "index/IndexFormatVersion.h"
#include "index/VocabularyMerger.h"
#include "parser/ParallelParseBuffer.h"
#include "parser/Tokenizer.h"
#include "parser/TokenizerCtre.h"
#include "util/BatchedPipeline.h"
#include "util/CachingMemoryResource.h"
#include "util/HashMap.h"
Expand Down
3 changes: 3 additions & 0 deletions src/parser/RdfEscaping.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
#ifndef QLEVER_RDFESCAPING_H
#define QLEVER_RDFESCAPING_H

#include <unicode/ustream.h>

#include <sstream>
#include <string>

#include "global/TypedIndex.h"
#include "parser/NormalizedString.h"
#include "util/Exception.h"
#include "util/HashSet.h"
#include "util/StringUtils.h"

namespace RdfEscaping {
Expand Down
62 changes: 18 additions & 44 deletions src/parser/RdfParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
#include "global/Constants.h"
#include "parser/GeoPoint.h"
#include "parser/NormalizedString.h"
#include "parser/Tokenizer.h"
#include "parser/TokenizerCtre.h"
#include "parser/RdfEscaping.h"
#include "util/Conversions.h"
#include "util/DateYearDuration.h"
#include "util/OnDestructionDontThrowDuringStackUnwinding.h"
#include "util/TransparentFunctors.h"

using namespace std::chrono_literals;
// _______________________________________________________________
Expand All @@ -32,17 +31,7 @@ bool TurtleParser<T>::statement() {
// ______________________________________________________________
template <class T>
bool TurtleParser<T>::directive() {
bool successfulParse = prefixID() || base() || sparqlPrefix() || sparqlBase();
if (successfulParse && prefixAndBaseDisabled_) {
raise(
"@prefix or @base directives need to be at the beginning of the file "
"when using the parallel parser. Use '--parse-parallel false' if you "
"can't guarantee this. If the reason for this error is that the input "
"is a concatenation of Turtle files, each of which has the prefixes at "
"the beginning, you should feed the files to QLever separately instead "
"of concatenated");
}
return successfulParse;
return prefixID() || base() || sparqlPrefix() || sparqlBase();
}

// ________________________________________________________________
Expand Down Expand Up @@ -641,7 +630,7 @@ bool TurtleParser<T>::iri() {
// _____________________________________________________________________
template <class T>
bool TurtleParser<T>::prefixedName() {
if constexpr (T::UseRelaxedParsing) {
if constexpr (UseRelaxedParsing) {
if (!(pnameLnRelaxed() || pnameNS())) {
return false;
}
Expand Down Expand Up @@ -756,7 +745,7 @@ bool TurtleParser<T>::iriref() {
// In relaxed mode, that is all we check. Otherwise, we check if the IRI is
// standard-compliant. If not, we output a warning and try to parse it in a
// more relaxed way.
if constexpr (T::UseRelaxedParsing) {
if constexpr (UseRelaxedParsing) {
tok_.remove_prefix(endPos + 1);
lastParseResult_ = TripleComponent::Iri::fromIrirefConsiderBase(
view.substr(0, endPos + 1), baseForRelativeIri(), baseForAbsoluteIri());
Expand Down Expand Up @@ -959,20 +948,20 @@ bool RdfStreamParser<T>::getLineImpl(TurtleTriple* triple) {
// `parallelParser_` have been fully processed. After the last batch we will
// push another call to this lambda to the `parallelParser_` which will then
// finish the `tripleCollector_` as soon as all batches have been computed.
template <typename T>
void RdfParallelParser<T>::finishTripleCollectorIfLastBatch() {
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::finishTripleCollectorIfLastBatch() {
if (batchIdx_.fetch_add(1) == numBatchesTotal_) {
tripleCollector_.finish();
}
}

// __________________________________________________________________________________
template <typename T>
void RdfParallelParser<T>::parseBatch(size_t parsePosition, auto batch) {
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::parseBatch(size_t parsePosition,
auto batch) {
try {
RdfStringParser<T> parser{defaultGraphIri_};
RdfStringParser<Tokenizer_T> parser{defaultGraphIri_};
parser.prefixMap_ = this->prefixMap_;
parser.disablePrefixParsing();
parser.setPositionOffset(parsePosition);
parser.setInputStream(std::move(batch));
// TODO: raise error message if a prefix parsing fails;
Expand All @@ -983,15 +972,14 @@ void RdfParallelParser<T>::parseBatch(size_t parsePosition, auto batch) {
});
finishTripleCollectorIfLastBatch();
} catch (std::exception& e) {
errorMessages_.wlock()->emplace_back(parsePosition, e.what());
tripleCollector_.pushException(std::current_exception());
parallelParser_.finish();
}
};

// _______________________________________________________________________
template <typename T>
void RdfParallelParser<T>::feedBatchesToParser(
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::feedBatchesToParser(
auto remainingBatchFromInitialization) {
bool first = true;
size_t parsePosition = 0;
Expand Down Expand Up @@ -1031,15 +1019,14 @@ void RdfParallelParser<T>::feedBatchesToParser(
}
}
} catch (std::exception& e) {
errorMessages_.wlock()->emplace_back(parsePosition, e.what());
tripleCollector_.pushException(std::current_exception());
}
};

// _______________________________________________________________________
template <typename T>
void RdfParallelParser<T>::initialize(const string& filename,
ad_utility::MemorySize bufferSize) {
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::initialize(
const string& filename, ad_utility::MemorySize bufferSize) {
fileBuffer_ = std::make_unique<ParallelBufferWithEndRegex>(
bufferSize.getBytes(), "\\.[\\t ]*([\\r\\n]+)");
ParallelBuffer::BufferType remainingBatchFromInitialization;
Expand All @@ -1048,7 +1035,7 @@ void RdfParallelParser<T>::initialize(const string& filename,
LOG(WARN) << "Empty input to the TURTLE parser, is this what you intended?"
<< std::endl;
} else {
RdfStringParser<T> declarationParser{};
RdfStringParser<Tokenizer_T> declarationParser{};
declarationParser.setInputStream(std::move(batch.value()));
while (declarationParser.parseDirectiveManually()) {
}
Expand All @@ -1075,20 +1062,7 @@ bool RdfParallelParser<T>::getLineImpl(TurtleTriple* triple) {
// contains no triples. (Theoretically this might happen, and it is safer this
// way)
while (triples_.empty()) {
auto optionalTripleTask = [&]() {
try {
return tripleCollector_.pop();
} catch (const std::exception&) {
// In case of multiple errors in parallel batches, we always report the
// first error.
parallelParser_.waitUntilFinished();
auto errors = std::move(*errorMessages_.wlock());
const auto& firstError =
ql::ranges::min_element(errors, {}, ad_utility::first);
AD_CORRECTNESS_CHECK(firstError != errors.end());
throw std::runtime_error{firstError->second};
}
}();
auto optionalTripleTask = tripleCollector_.pop();
if (!optionalTripleTask) {
// Everything has been parsed
return false;
Expand Down
57 changes: 27 additions & 30 deletions src/parser/RdfParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,35 @@

#pragma once

#include <absl/strings/str_cat.h>
#include <gtest/gtest_prod.h>
#include <sys/mman.h>

#include <codecvt>
#include <exception>
#include <future>
#include <locale>
#include <stdexcept>
#include <string_view>

#include "absl/strings/str_cat.h"
#include "global/Constants.h"
#include "global/SpecialIds.h"
#include "index/ConstantsIndexBuilding.h"
#include "index/InputFileSpecification.h"
#include "parser/ParallelBuffer.h"
#include "parser/Tokenizer.h"
#include "parser/TokenizerCtre.h"
#include "parser/TripleComponent.h"
#include "parser/TurtleTokenId.h"
#include "parser/data/BlankNode.h"
#include "util/Exception.h"
#include "util/File.h"
#include "util/HashMap.h"
#include "util/Log.h"
#include "util/ParseException.h"
#include "util/TaskQueue.h"
#include "util/ThreadSafeQueue.h"

using std::string;

enum class TurtleParserIntegerOverflowBehavior {
Error,
OverflowingToDouble,
Expand Down Expand Up @@ -120,6 +126,10 @@ class TurtleParser : public RdfParserBase {
public:
using ParseException = ::ParseException;

// The CTRE Tokenizer implies relaxed parsing.
static constexpr bool UseRelaxedParsing =
std::is_same_v<Tokenizer_T, TokenizerCtre>;

// Get the result of the single rule that was parsed most recently. Used for
// testing.
const TripleComponent& getLastParseResult() const { return lastParseResult_; }
Expand Down Expand Up @@ -194,10 +204,10 @@ class TurtleParser : public RdfParserBase {

// Getters for the two base prefixes. Without BASE declaration, these will
// both return the empty IRI.
const TripleComponent::Iri& baseForRelativeIri() const {
const TripleComponent::Iri& baseForRelativeIri() {
return prefixMap_.at(baseForRelativeIriKey_);
}
const TripleComponent::Iri& baseForAbsoluteIri() const {
const TripleComponent::Iri& baseForAbsoluteIri() {
return prefixMap_.at(baseForAbsoluteIriKey_);
}

Expand All @@ -216,8 +226,6 @@ class TurtleParser : public RdfParserBase {
static inline std::atomic<size_t> numParsers_ = 0;
size_t blankNodePrefix_ = numParsers_.fetch_add(1);

bool prefixAndBaseDisabled_ = false;

public:
TurtleParser() = default;
explicit TurtleParser(TripleComponent defaultGraphIri)
Expand Down Expand Up @@ -392,7 +400,7 @@ class TurtleParser : public RdfParserBase {
}

// create a new, unused, unique blank node string
std::string createAnonNode() {
string createAnonNode() {
return BlankNode{true,
absl::StrCat(blankNodePrefix_, "_", numBlankNodes_++)}
.toSparql();
Expand Down Expand Up @@ -471,7 +479,9 @@ CPP_template(typename Parser)(
return positionOffset_ + tmpToParse_.size() - this->tok_.data().size();
}

void initialize(const std::string&, ad_utility::MemorySize) {
void initialize(const string& filename, ad_utility::MemorySize bufferSize) {
(void)filename;
(void)bufferSize;
throw std::runtime_error(
"RdfStringParser doesn't support calls to initialize. Only use "
"parseUtf8String() for unit tests\n");
Expand Down Expand Up @@ -524,7 +534,7 @@ CPP_template(typename Parser)(
// testing interface for reusing a parser
// only specifies the tokenizers input stream.
// Does not alter the tokenizers state
void setInputStream(const std::string& toParse) {
void setInputStream(const string& toParse) {
tmpToParse_.clear();
tmpToParse_.reserve(toParse.size());
tmpToParse_.insert(tmpToParse_.end(), toParse.begin(), toParse.end());
Expand All @@ -545,9 +555,6 @@ CPP_template(typename Parser)(
// as expected
size_t getPosition() const { return this->tok_.begin() - tmpToParse_.data(); }

// Disable prefix parsing for turtle parsers during parallel parsing.
void disablePrefixParsing() { this->prefixAndBaseDisabled_ = true; }

FRIEND_TEST(RdfParserTest, prefixedName);
FRIEND_TEST(RdfParserTest, prefixID);
FRIEND_TEST(RdfParserTest, stringParse);
Expand Down Expand Up @@ -583,7 +590,7 @@ class RdfStreamParser : public Parser {
// Default construction needed for tests
RdfStreamParser() = default;
explicit RdfStreamParser(
const std::string& filename,
const string& filename,
ad_utility::MemorySize bufferSize = DEFAULT_PARSER_BUFFER_SIZE,
TripleComponent defaultGraphIri =
qlever::specialIds().at(DEFAULT_GRAPH_IRI))
Expand All @@ -595,8 +602,7 @@ class RdfStreamParser : public Parser {

bool getLineImpl(TurtleTriple* triple) override;

void initialize(const std::string& filename,
ad_utility::MemorySize bufferSize);
void initialize(const string& filename, ad_utility::MemorySize bufferSize);

size_t getParsePosition() const override {
return numBytesBeforeCurrentBatch_ + (tok_.data().data() - byteVec_.data());
Expand Down Expand Up @@ -638,15 +644,15 @@ class RdfStreamParser : public Parser {
template <typename Parser>
class RdfParallelParser : public Parser {
public:
using Triple = std::array<std::string, 3>;
using Triple = std::array<string, 3>;
// Default construction needed for tests
RdfParallelParser() = default;

// If the `sleepTimeForTesting` is set, then after the initialization the
// parser will sleep for the specified time before parsing each batch s.t.
// certain corner cases can be tested.
explicit RdfParallelParser(
const std::string& filename,
const string& filename,
ad_utility::MemorySize bufferSize = DEFAULT_PARSER_BUFFER_SIZE,
std::chrono::milliseconds sleepTimeForTesting =
std::chrono::milliseconds{0})
Expand All @@ -659,8 +665,7 @@ class RdfParallelParser : public Parser {
}

// Construct a parser from a file and a given default graph iri.
RdfParallelParser(const std::string& filename,
ad_utility::MemorySize bufferSize,
RdfParallelParser(const string& filename, ad_utility::MemorySize bufferSize,
const TripleComponent& defaultGraphIri)
: Parser{defaultGraphIri}, defaultGraphIri_{defaultGraphIri} {
initialize(filename, bufferSize);
Expand All @@ -678,8 +683,7 @@ class RdfParallelParser : public Parser {
parallelParser_.resetTimers();
}

void initialize(const std::string& filename,
ad_utility::MemorySize bufferSize);
void initialize(const string& filename, ad_utility::MemorySize bufferSize);

size_t getParsePosition() const override {
// TODO: can we really define this position here?
Expand Down Expand Up @@ -716,12 +720,6 @@ class RdfParallelParser : public Parser {
QUEUE_SIZE_BEFORE_PARALLEL_PARSING, NUM_PARALLEL_PARSER_THREADS,
"parallel parser"};
std::future<void> parseFuture_;

// Collect error messages in case of multiple failures. The `size_t` is the
// start position of the corresponding batch, used to order the errors in case
// the batches are finished out of order.
ad_utility::Synchronized<std::vector<std::pair<size_t, std::string>>>
errorMessages_;
// The parallel parsers need to know when the last batch has been parsed, s.t.
// the parser threads can be destroyed. The following two members are needed
// for keeping track of this condition.
Expand Down Expand Up @@ -781,8 +779,7 @@ class RdfMultifileParser : public RdfParserBase {
// `parsingQueue_` is declared *after* the `finishedBatchQueue_`, s.t. when
// destroying the parser, the threads from the `parsingQueue_` are all joined
// before the `finishedBatchQueue_` (which they are using!) is destroyed.
ad_utility::TaskQueue<false> parsingQueue_{QUEUE_SIZE_BEFORE_PARALLEL_PARSING,
NUM_PARALLEL_PARSER_THREADS};
ad_utility::TaskQueue<false> parsingQueue_{10, NUM_PARALLEL_PARSER_THREADS};

// The number of parsers that have started, but not yet finished. This is
// needed to detect the complete parsing.
Expand Down
7 changes: 4 additions & 3 deletions src/parser/Tokenizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
#include <gtest/gtest_prod.h>
#include <re2/re2.h>

#include <regex>

#include "parser/TurtleTokenId.h"
#include "util/Exception.h"
#include "util/Log.h"

using re2::RE2;
Expand Down Expand Up @@ -237,7 +240,7 @@ struct SkipWhitespaceAndCommentsMixin {
auto v = self().view();
if (v.starts_with('#')) {
auto pos = v.find('\n');
if (pos == std::string::npos) {
if (pos == string::npos) {
// TODO<joka921>: This should rather yield an error.
LOG(INFO) << "Warning, unfinished comment found while parsing"
<< std::endl;
Expand Down Expand Up @@ -270,8 +273,6 @@ class Tokenizer : public SkipWhitespaceAndCommentsMixin<Tokenizer> {
Tokenizer(std::string_view input)
: _tokens(), _data(input.data(), input.size()) {}

static constexpr bool UseRelaxedParsing = false;

// if a prefix of the input stream matches the regex argument,
// return true and that prefix and move the input stream forward
// by the length of the match. If no match is found,
Expand Down
Loading
Loading