Skip to content

Commit

Permalink
Better error message on parallel turtle parsing of scattered base and…
Browse files Browse the repository at this point in the history
… prefix declarations (#1807)

The parallel Turtle parser requires that all `PREFIX` and `BASE` declarations stand in a single block at the beginning of the file. 
Until now, prefix declarations after this initial block led to an exception about the prefix being undeclared, which was highly misleading. With this PR, this case triggers a proper exception about PREFIX declarations being illegal in the middle of a Turtle file with the parallel parser, together with some guidance to mitigate this issue by either refactoring the input, or deactivating the parallel parser.

Fixes #1794 by providing a better message for the described error.
  • Loading branch information
RobinTF authored Feb 18, 2025
1 parent f13fafa commit 8678731
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 55 deletions.
1 change: 1 addition & 0 deletions src/engine/GraphStoreProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "engine/GraphStoreProtocol.h"

#include "parser/Tokenizer.h"
#include "util/http/beast.h"

// ____________________________________________________________________________
Expand Down
2 changes: 2 additions & 0 deletions src/index/IndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "index/IndexFormatVersion.h"
#include "index/VocabularyMerger.h"
#include "parser/ParallelParseBuffer.h"
#include "parser/Tokenizer.h"
#include "parser/TokenizerCtre.h"
#include "util/BatchedPipeline.h"
#include "util/CachingMemoryResource.h"
#include "util/HashMap.h"
Expand Down
3 changes: 0 additions & 3 deletions src/parser/RdfEscaping.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
#ifndef QLEVER_RDFESCAPING_H
#define QLEVER_RDFESCAPING_H

#include <unicode/ustream.h>

#include <sstream>
#include <string>

#include "global/TypedIndex.h"
#include "parser/NormalizedString.h"
#include "util/Exception.h"
#include "util/HashSet.h"
#include "util/StringUtils.h"

namespace RdfEscaping {
Expand Down
62 changes: 44 additions & 18 deletions src/parser/RdfParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
#include "global/Constants.h"
#include "parser/GeoPoint.h"
#include "parser/NormalizedString.h"
#include "parser/RdfEscaping.h"
#include "util/Conversions.h"
#include "parser/Tokenizer.h"
#include "parser/TokenizerCtre.h"
#include "util/DateYearDuration.h"
#include "util/OnDestructionDontThrowDuringStackUnwinding.h"
#include "util/TransparentFunctors.h"

using namespace std::chrono_literals;
// _______________________________________________________________
Expand All @@ -31,7 +32,17 @@ bool TurtleParser<T>::statement() {
// ______________________________________________________________
template <class T>
bool TurtleParser<T>::directive() {
return prefixID() || base() || sparqlPrefix() || sparqlBase();
bool successfulParse = prefixID() || base() || sparqlPrefix() || sparqlBase();
if (successfulParse && prefixAndBaseDisabled_) {
raise(
"@prefix or @base directives need to be at the beginning of the file "
"when using the parallel parser. Use '--parse-parallel false' if you "
"can't guarantee this. If the reason for this error is that the input "
"is a concatenation of Turtle files, each of which has the prefixes at "
"the beginning, you should feed the files to QLever separately instead "
"of concatenated");
}
return successfulParse;
}

// ________________________________________________________________
Expand Down Expand Up @@ -630,7 +641,7 @@ bool TurtleParser<T>::iri() {
// _____________________________________________________________________
template <class T>
bool TurtleParser<T>::prefixedName() {
if constexpr (UseRelaxedParsing) {
if constexpr (T::UseRelaxedParsing) {
if (!(pnameLnRelaxed() || pnameNS())) {
return false;
}
Expand Down Expand Up @@ -745,7 +756,7 @@ bool TurtleParser<T>::iriref() {
// In relaxed mode, that is all we check. Otherwise, we check if the IRI is
// standard-compliant. If not, we output a warning and try to parse it in a
// more relaxed way.
if constexpr (UseRelaxedParsing) {
if constexpr (T::UseRelaxedParsing) {
tok_.remove_prefix(endPos + 1);
lastParseResult_ = TripleComponent::Iri::fromIrirefConsiderBase(
view.substr(0, endPos + 1), baseForRelativeIri(), baseForAbsoluteIri());
Expand Down Expand Up @@ -948,20 +959,20 @@ bool RdfStreamParser<T>::getLineImpl(TurtleTriple* triple) {
// `parallelParser_` have been fully processed. After the last batch we will
// push another call to this lambda to the `parallelParser_` which will then
// finish the `tripleCollector_` as soon as all batches have been computed.
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::finishTripleCollectorIfLastBatch() {
template <typename T>
void RdfParallelParser<T>::finishTripleCollectorIfLastBatch() {
if (batchIdx_.fetch_add(1) == numBatchesTotal_) {
tripleCollector_.finish();
}
}

// __________________________________________________________________________________
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::parseBatch(size_t parsePosition,
auto batch) {
template <typename T>
void RdfParallelParser<T>::parseBatch(size_t parsePosition, auto batch) {
try {
RdfStringParser<Tokenizer_T> parser{defaultGraphIri_};
RdfStringParser<T> parser{defaultGraphIri_};
parser.prefixMap_ = this->prefixMap_;
parser.disablePrefixParsing();
parser.setPositionOffset(parsePosition);
parser.setInputStream(std::move(batch));
// TODO: raise error message if a prefix parsing fails;
Expand All @@ -972,14 +983,15 @@ void RdfParallelParser<Tokenizer_T>::parseBatch(size_t parsePosition,
});
finishTripleCollectorIfLastBatch();
} catch (std::exception& e) {
errorMessages_.wlock()->emplace_back(parsePosition, e.what());
tripleCollector_.pushException(std::current_exception());
parallelParser_.finish();
}
};

// _______________________________________________________________________
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::feedBatchesToParser(
template <typename T>
void RdfParallelParser<T>::feedBatchesToParser(
auto remainingBatchFromInitialization) {
bool first = true;
size_t parsePosition = 0;
Expand Down Expand Up @@ -1019,14 +1031,15 @@ void RdfParallelParser<Tokenizer_T>::feedBatchesToParser(
}
}
} catch (std::exception& e) {
errorMessages_.wlock()->emplace_back(parsePosition, e.what());
tripleCollector_.pushException(std::current_exception());
}
};

// _______________________________________________________________________
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::initialize(
const string& filename, ad_utility::MemorySize bufferSize) {
template <typename T>
void RdfParallelParser<T>::initialize(const string& filename,
ad_utility::MemorySize bufferSize) {
fileBuffer_ = std::make_unique<ParallelBufferWithEndRegex>(
bufferSize.getBytes(), "\\.[\\t ]*([\\r\\n]+)");
ParallelBuffer::BufferType remainingBatchFromInitialization;
Expand All @@ -1035,7 +1048,7 @@ void RdfParallelParser<Tokenizer_T>::initialize(
LOG(WARN) << "Empty input to the TURTLE parser, is this what you intended?"
<< std::endl;
} else {
RdfStringParser<Tokenizer_T> declarationParser{};
RdfStringParser<T> declarationParser{};
declarationParser.setInputStream(std::move(batch.value()));
while (declarationParser.parseDirectiveManually()) {
}
Expand All @@ -1062,7 +1075,20 @@ bool RdfParallelParser<T>::getLineImpl(TurtleTriple* triple) {
// contains no triples. (Theoretically this might happen, and it is safer this
// way)
while (triples_.empty()) {
auto optionalTripleTask = tripleCollector_.pop();
auto optionalTripleTask = [&]() {
try {
return tripleCollector_.pop();
} catch (const std::exception&) {
// In case of multiple errors in parallel batches, we always report the
// first error.
parallelParser_.waitUntilFinished();
auto errors = std::move(*errorMessages_.wlock());
const auto& firstError =
ql::ranges::min_element(errors, {}, ad_utility::first);
AD_CORRECTNESS_CHECK(firstError != errors.end());
throw std::runtime_error{firstError->second};
}
}();
if (!optionalTripleTask) {
// Everything has been parsed
return false;
Expand Down
57 changes: 30 additions & 27 deletions src/parser/RdfParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,29 @@

#pragma once

#include <absl/strings/str_cat.h>
#include <gtest/gtest_prod.h>
#include <sys/mman.h>

#include <codecvt>
#include <exception>
#include <future>
#include <locale>
#include <stdexcept>
#include <string_view>

#include "absl/strings/str_cat.h"
#include "global/Constants.h"
#include "global/SpecialIds.h"
#include "index/ConstantsIndexBuilding.h"
#include "index/InputFileSpecification.h"
#include "parser/ParallelBuffer.h"
#include "parser/Tokenizer.h"
#include "parser/TokenizerCtre.h"
#include "parser/TripleComponent.h"
#include "parser/TurtleTokenId.h"
#include "parser/data/BlankNode.h"
#include "util/Exception.h"
#include "util/File.h"
#include "util/HashMap.h"
#include "util/Log.h"
#include "util/ParseException.h"
#include "util/TaskQueue.h"
#include "util/ThreadSafeQueue.h"

using std::string;

enum class TurtleParserIntegerOverflowBehavior {
Error,
OverflowingToDouble,
Expand Down Expand Up @@ -126,10 +120,6 @@ class TurtleParser : public RdfParserBase {
public:
using ParseException = ::ParseException;

// The CTRE Tokenizer implies relaxed parsing.
static constexpr bool UseRelaxedParsing =
std::is_same_v<Tokenizer_T, TokenizerCtre>;

// Get the result of the single rule that was parsed most recently. Used for
// testing.
const TripleComponent& getLastParseResult() const { return lastParseResult_; }
Expand Down Expand Up @@ -204,10 +194,10 @@ class TurtleParser : public RdfParserBase {

// Getters for the two base prefixes. Without BASE declaration, these will
// both return the empty IRI.
const TripleComponent::Iri& baseForRelativeIri() {
const TripleComponent::Iri& baseForRelativeIri() const {
return prefixMap_.at(baseForRelativeIriKey_);
}
const TripleComponent::Iri& baseForAbsoluteIri() {
const TripleComponent::Iri& baseForAbsoluteIri() const {
return prefixMap_.at(baseForAbsoluteIriKey_);
}

Expand All @@ -226,6 +216,8 @@ class TurtleParser : public RdfParserBase {
static inline std::atomic<size_t> numParsers_ = 0;
size_t blankNodePrefix_ = numParsers_.fetch_add(1);

bool prefixAndBaseDisabled_ = false;

public:
TurtleParser() = default;
explicit TurtleParser(TripleComponent defaultGraphIri)
Expand Down Expand Up @@ -400,7 +392,7 @@ class TurtleParser : public RdfParserBase {
}

// create a new, unused, unique blank node string
string createAnonNode() {
std::string createAnonNode() {
return BlankNode{true,
absl::StrCat(blankNodePrefix_, "_", numBlankNodes_++)}
.toSparql();
Expand Down Expand Up @@ -479,9 +471,7 @@ CPP_template(typename Parser)(
return positionOffset_ + tmpToParse_.size() - this->tok_.data().size();
}

void initialize(const string& filename, ad_utility::MemorySize bufferSize) {
(void)filename;
(void)bufferSize;
void initialize(const std::string&, ad_utility::MemorySize) {
throw std::runtime_error(
"RdfStringParser doesn't support calls to initialize. Only use "
"parseUtf8String() for unit tests\n");
Expand Down Expand Up @@ -534,7 +524,7 @@ CPP_template(typename Parser)(
// testing interface for reusing a parser
// only specifies the tokenizers input stream.
// Does not alter the tokenizers state
void setInputStream(const string& toParse) {
void setInputStream(const std::string& toParse) {
tmpToParse_.clear();
tmpToParse_.reserve(toParse.size());
tmpToParse_.insert(tmpToParse_.end(), toParse.begin(), toParse.end());
Expand All @@ -555,6 +545,9 @@ CPP_template(typename Parser)(
// as expected
size_t getPosition() const { return this->tok_.begin() - tmpToParse_.data(); }

// Disable prefix parsing for turtle parsers during parallel parsing.
void disablePrefixParsing() { this->prefixAndBaseDisabled_ = true; }

FRIEND_TEST(RdfParserTest, prefixedName);
FRIEND_TEST(RdfParserTest, prefixID);
FRIEND_TEST(RdfParserTest, stringParse);
Expand Down Expand Up @@ -590,7 +583,7 @@ class RdfStreamParser : public Parser {
// Default construction needed for tests
RdfStreamParser() = default;
explicit RdfStreamParser(
const string& filename,
const std::string& filename,
ad_utility::MemorySize bufferSize = DEFAULT_PARSER_BUFFER_SIZE,
TripleComponent defaultGraphIri =
qlever::specialIds().at(DEFAULT_GRAPH_IRI))
Expand All @@ -602,7 +595,8 @@ class RdfStreamParser : public Parser {

bool getLineImpl(TurtleTriple* triple) override;

void initialize(const string& filename, ad_utility::MemorySize bufferSize);
void initialize(const std::string& filename,
ad_utility::MemorySize bufferSize);

size_t getParsePosition() const override {
return numBytesBeforeCurrentBatch_ + (tok_.data().data() - byteVec_.data());
Expand Down Expand Up @@ -644,15 +638,15 @@ class RdfStreamParser : public Parser {
template <typename Parser>
class RdfParallelParser : public Parser {
public:
using Triple = std::array<string, 3>;
using Triple = std::array<std::string, 3>;
// Default construction needed for tests
RdfParallelParser() = default;

// If the `sleepTimeForTesting` is set, then after the initialization the
// parser will sleep for the specified time before parsing each batch s.t.
// certain corner cases can be tested.
explicit RdfParallelParser(
const string& filename,
const std::string& filename,
ad_utility::MemorySize bufferSize = DEFAULT_PARSER_BUFFER_SIZE,
std::chrono::milliseconds sleepTimeForTesting =
std::chrono::milliseconds{0})
Expand All @@ -665,7 +659,8 @@ class RdfParallelParser : public Parser {
}

// Construct a parser from a file and a given default graph iri.
RdfParallelParser(const string& filename, ad_utility::MemorySize bufferSize,
RdfParallelParser(const std::string& filename,
ad_utility::MemorySize bufferSize,
const TripleComponent& defaultGraphIri)
: Parser{defaultGraphIri}, defaultGraphIri_{defaultGraphIri} {
initialize(filename, bufferSize);
Expand All @@ -683,7 +678,8 @@ class RdfParallelParser : public Parser {
parallelParser_.resetTimers();
}

void initialize(const string& filename, ad_utility::MemorySize bufferSize);
void initialize(const std::string& filename,
ad_utility::MemorySize bufferSize);

size_t getParsePosition() const override {
// TODO: can we really define this position here?
Expand Down Expand Up @@ -720,6 +716,12 @@ class RdfParallelParser : public Parser {
QUEUE_SIZE_BEFORE_PARALLEL_PARSING, NUM_PARALLEL_PARSER_THREADS,
"parallel parser"};
std::future<void> parseFuture_;

// Collect error messages in case of multiple failures. The `size_t` is the
// start position of the corresponding batch, used to order the errors in case
// the batches are finished out of order.
ad_utility::Synchronized<std::vector<std::pair<size_t, std::string>>>
errorMessages_;
// The parallel parsers need to know when the last batch has been parsed, s.t.
// the parser threads can be destroyed. The following two members are needed
// for keeping track of this condition.
Expand Down Expand Up @@ -779,7 +781,8 @@ class RdfMultifileParser : public RdfParserBase {
// `parsingQueue_` is declared *after* the `finishedBatchQueue_`, s.t. when
// destroying the parser, the threads from the `parsingQueue_` are all joined
// before the `finishedBatchQueue_` (which they are using!) is destroyed.
ad_utility::TaskQueue<false> parsingQueue_{10, NUM_PARALLEL_PARSER_THREADS};
ad_utility::TaskQueue<false> parsingQueue_{QUEUE_SIZE_BEFORE_PARALLEL_PARSING,
NUM_PARALLEL_PARSER_THREADS};

// The number of parsers that have started, but not yet finished. This is
// needed to detect the complete parsing.
Expand Down
7 changes: 3 additions & 4 deletions src/parser/Tokenizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
#include <gtest/gtest_prod.h>
#include <re2/re2.h>

#include <regex>

#include "parser/TurtleTokenId.h"
#include "util/Exception.h"
#include "util/Log.h"

using re2::RE2;
Expand Down Expand Up @@ -240,7 +237,7 @@ struct SkipWhitespaceAndCommentsMixin {
auto v = self().view();
if (v.starts_with('#')) {
auto pos = v.find('\n');
if (pos == string::npos) {
if (pos == std::string::npos) {
// TODO<joka921>: This should rather yield an error.
LOG(INFO) << "Warning, unfinished comment found while parsing"
<< std::endl;
Expand Down Expand Up @@ -273,6 +270,8 @@ class Tokenizer : public SkipWhitespaceAndCommentsMixin<Tokenizer> {
Tokenizer(std::string_view input)
: _tokens(), _data(input.data(), input.size()) {}

static constexpr bool UseRelaxedParsing = false;

// if a prefix of the input stream matches the regex argument,
// return true and that prefix and move the input stream forward
// by the length of the match. If no match is found,
Expand Down
Loading

0 comments on commit 8678731

Please sign in to comment.