Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Apr 9, 2024
1 parent f450417 commit bacd667
Showing 1 changed file with 170 additions and 178 deletions.
348 changes: 170 additions & 178 deletions velox/functions/sparksql/Hash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include "velox/common/base/BitUtil.h"
#include "velox/expression/DecodedArgs.h"
#include "velox/functions/lib/RowsTranslationUtil.h"
#include "velox/vector/FlatVector.h"

namespace facebook::velox::functions::sparksql {
Expand Down Expand Up @@ -107,198 +106,187 @@ template <
TypeKind kind,
typename SeedType = typename HashTraits<HashClass>::SeedType,
typename ReturnType = typename HashTraits<HashClass>::ReturnType>
ReturnType hashPrimitive(
const DecodedVector& decoded,
vector_size_t index,
SeedType seed) {
return hashOne<HashClass>(
decoded.valueAt<typename TypeTraits<kind>::NativeType>(index), seed);
}
class PrimitiveVectorHasher;

template <
typename HashClass,
typename SeedType = typename HashTraits<HashClass>::SeedType,
typename ReturnType = typename HashTraits<HashClass>::ReturnType>
ReturnType hashElements(
std::vector<DecodedVector>& decodedVectors,
folly::Range<const vector_size_t*> range,
SeedType seed) {
std::vector<DecodedVector> nestedDecodedVectors;
for (auto& decoded : decodedVectors) {
const auto type = decoded.base()->type();
if (type->isArray()) {
auto base = decoded.base()->as<ArrayVector>();
SelectivityVector nestedRows(base->elements()->size());
nestedDecodedVectors.resize(1);
nestedDecodedVectors[0].decode(*base->elements(), nestedRows);
} else if (type->isMap()) {
auto base = decoded.base()->as<MapVector>();
SelectivityVector nestedRows(base->mapKeys()->size());
nestedDecodedVectors.resize(2);
nestedDecodedVectors[0].decode(*base->mapKeys(), nestedRows);
nestedDecodedVectors[1].decode(*base->mapValues(), nestedRows);
} else if (type->isRow()) {
auto base = decoded.base()->as<RowVector>();
auto indices = decoded.indices();
SelectivityVector nestedRows(base->size());
nestedDecodedVectors.resize(type->size());
for (size_t j = 0; j < type->size(); ++j) {
nestedDecodedVectors[j].decode(*base->childAt(j), nestedRows);
}
}
}
class ArrayVectorHasher;

ReturnType result = seed;
for (auto i : range) {
for (auto& decoded : decodedVectors) {
if (!decoded.isNullAt(i)) {
const auto type = decoded.base()->type();
if (type->isPrimitiveType()) {
result = VELOX_DYNAMIC_SCALAR_TEMPLATE_TYPE_DISPATCH(
hashPrimitive, HashClass, type->kind(), decoded, i, result);
} else if (type->isArray() || type->isMap()) {
auto base = decoded.base()->as<ArrayVectorBase>();
auto indices = decoded.indices();
auto size = base->sizeAt(indices[i]);
auto offset = base->offsetAt(indices[i]);
std::vector<vector_size_t> nestedRange(size);
std::iota(nestedRange.begin(), nestedRange.end(), offset);
result = hashElements<HashClass>(
nestedDecodedVectors, nestedRange, result);
} else if (type->isRow()) {
auto base = decoded.base()->as<RowVector>();
auto indices = decoded.indices();
std::vector<vector_size_t> nestedRange;
nestedRange.push_back(i);
result = hashElements<HashClass>(
nestedDecodedVectors, nestedRange, result);
} else {
VELOX_UNREACHABLE();
}
}
}
}
return result;
}
template <
typename HashClass,
typename SeedType = typename HashTraits<HashClass>::SeedType,
typename ReturnType = typename HashTraits<HashClass>::ReturnType>
class MapVectorHasher;

template <
typename HashClass,
typename SeedType = typename HashTraits<HashClass>::SeedType,
typename ReturnType = typename HashTraits<HashClass>::ReturnType>
class RowVectorHasher;

template <
typename HashClass,
typename SeedType = typename HashTraits<HashClass>::SeedType,
typename ReturnType = typename HashTraits<HashClass>::ReturnType>
class VectorHasher {
public:
// Compute the hash value of input vector at index.
virtual ReturnType hashAt(vector_size_t index, SeedType seed) = 0;
};

// Derived from InterpretedHashFunction.hash:
// https://github.com/apache/spark/blob/382b66e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L532
template <typename HashClass, TypeKind kind>
void hashPrimitiveVector(
const SelectivityVector& rows,
DecodedVector& decoded,
const BufferPtr& result) {
auto rawResult =
result->asMutable<typename HashTraits<HashClass>::ReturnType>();
rows.applyToSelected([&](int row) {
rawResult[row] = hashOne<HashClass>(
decoded.valueAt<typename TypeTraits<kind>::NativeType>(row),
rawResult[row]);
});
std::shared_ptr<VectorHasher<HashClass>> createPrimitiveVectorHasher(
DecodedVector& decoded) {
return std::make_shared<PrimitiveVectorHasher<HashClass, kind>>(decoded);
}

template <typename HashClass>
void hashArrayVector(
const SelectivityVector& rows,
DecodedVector& decoded,
const BufferPtr& result) {
auto base = decoded.base()->as<ArrayVector>();
auto indices = decoded.indices();

SelectivityVector nestedRows(base->elements()->size());
std::vector<DecodedVector> decodedVectors;
decodedVectors.resize(1);
decodedVectors[0].decode(*base->elements(), nestedRows);

auto rawSizes = base->rawSizes();
auto rawOffsets = base->rawOffsets();
auto rawResult =
result->asMutable<typename HashTraits<HashClass>::ReturnType>();

rows.applyToSelected([&](int row) {
auto size = rawSizes[indices[row]];
auto offset = rawOffsets[indices[row]];
std::vector<vector_size_t> range(size);
std::iota(range.begin(), range.end(), offset);
rawResult[row] =
hashElements<HashClass>(decodedVectors, range, rawResult[row]);
});
std::shared_ptr<VectorHasher<HashClass>> createVectorHasher(
DecodedVector& decoded) {
auto baseType = decoded.base()->type();
if (baseType->isPrimitiveType()) {
return VELOX_DYNAMIC_SCALAR_TEMPLATE_TYPE_DISPATCH(
createPrimitiveVectorHasher, HashClass, baseType->kind(), decoded);
} else if (baseType->isArray()) {
return std::make_shared<ArrayVectorHasher<HashClass>>(decoded);
} else if (baseType->isMap()) {
return std::make_shared<MapVectorHasher<HashClass>>(decoded);
} else if (baseType->isRow()) {
return std::make_shared<RowVectorHasher<HashClass>>(decoded);
}
VELOX_UNREACHABLE();
}

template <typename HashClass>
void hashMapVector(
const SelectivityVector& rows,
DecodedVector& decoded,
const BufferPtr& result) {
auto base = decoded.base()->as<MapVector>();
auto indices = decoded.indices();

SelectivityVector nestedRows(base->mapKeys()->size());
std::vector<DecodedVector> decodedVectors;
decodedVectors.resize(2);
decodedVectors[0].decode(*base->mapKeys(), nestedRows);
decodedVectors[1].decode(*base->mapValues(), nestedRows);

auto rawSizes = base->rawSizes();
auto rawOffsets = base->rawOffsets();
auto rawResult =
result->asMutable<typename HashTraits<HashClass>::ReturnType>();

rows.applyToSelected([&](int row) {
auto size = rawSizes[indices[row]];
auto offset = rawOffsets[indices[row]];
std::vector<vector_size_t> range(size);
std::iota(range.begin(), range.end(), offset);
rawResult[row] =
hashElements<HashClass>(decodedVectors, range, rawResult[row]);
});
}
template <
typename HashClass,
TypeKind kind,
typename SeedType,
typename ReturnType>
class PrimitiveVectorHasher
: public VectorHasher<HashClass, SeedType, ReturnType> {
public:
PrimitiveVectorHasher(DecodedVector& decoded) : decoded_(decoded) {}

template <typename HashClass>
void hashRowVector(
const SelectivityVector& rows,
DecodedVector& decoded,
const BufferPtr& result) {
auto base = decoded.base()->as<RowVector>();
auto indices = decoded.indices();

SelectivityVector nestedRows(base->size());
std::vector<DecodedVector> decodedVectors;
decodedVectors.resize(base->type()->size());
for (size_t i = 0; i < base->type()->size(); ++i) {
decodedVectors[i].decode(*base->childAt(i), nestedRows);
}

auto rawResult =
result->asMutable<typename HashTraits<HashClass>::ReturnType>();

rows.applyToSelected([&](int row) {
std::vector<vector_size_t> range;
range.push_back(row);
rawResult[row] =
hashElements<HashClass>(decodedVectors, range, rawResult[row]);
});
}
ReturnType hashAt(vector_size_t index, SeedType seed) override {
if (decoded_.isNullAt(index)) {
return seed;
}
return hashOne<HashClass>(
decoded_.valueAt<typename TypeTraits<kind>::NativeType>(index), seed);
}

template <typename HashClass>
void hash(
const SelectivityVector& rows,
const TypePtr& type,
DecodedVector& decoded,
const BufferPtr& result) {
if (type->isPrimitiveType()) {
VELOX_DYNAMIC_SCALAR_TEMPLATE_TYPE_DISPATCH(
hashPrimitiveVector, HashClass, type->kind(), rows, decoded, result);
} else if (type->isArray()) {
hashArrayVector<HashClass>(rows, decoded, result);
} else if (type->isMap()) {
hashMapVector<HashClass>(rows, decoded, result);
} else if (type->isRow()) {
hashRowVector<HashClass>(rows, decoded, result);
} else {
VELOX_UNREACHABLE();
private:
const DecodedVector& decoded_;
};

template <typename HashClass, typename SeedType, typename ReturnType>
class ArrayVectorHasher : public VectorHasher<HashClass> {
public:
ArrayVectorHasher(DecodedVector& decoded) {
base_ = decoded.base()->as<ArrayVector>();
indices_ = decoded.indices();

SelectivityVector rows(base_->elements()->size());
decodedElements_.decode(*base_->elements(), rows);
elementHasher_ = createVectorHasher<HashClass>(decodedElements_);
}
}

ReturnType hashAt(vector_size_t index, SeedType seed) override {
if (base_->isNullAt(indices_[index])) {
return seed;
}
auto size = base_->sizeAt(indices_[index]);
auto offset = base_->offsetAt(indices_[index]);

ReturnType result = seed;
for (auto i = 0; i < size; ++i) {
result = elementHasher_->hashAt(i + offset, result);
}
return result;
}

private:
const ArrayVector* base_;
const int32_t* indices_;
DecodedVector decodedElements_;
std::shared_ptr<VectorHasher<HashClass>> elementHasher_;
};

template <typename HashClass, typename SeedType, typename ReturnType>
class MapVectorHasher : public VectorHasher<HashClass, SeedType, ReturnType> {
public:
MapVectorHasher(DecodedVector& decoded) {
base_ = decoded.base()->as<MapVector>();
indices_ = decoded.indices();

SelectivityVector rows(base_->mapKeys()->size());
decodedKeys_.decode(*base_->mapKeys(), rows);
decodedValues_.decode(*base_->mapValues(), rows);
keyHasher_ = createVectorHasher<HashClass>(decodedKeys_);
valueHasher_ = createVectorHasher<HashClass>(decodedValues_);
}

ReturnType hashAt(vector_size_t index, SeedType seed) override {
if (base_->isNullAt(indices_[index])) {
return seed;
}

auto size = base_->sizeAt(indices_[index]);
auto offset = base_->offsetAt(indices_[index]);

ReturnType result = seed;
for (auto i = 0; i < size; ++i) {
result = keyHasher_->hashAt(i + offset, result);
result = valueHasher_->hashAt(i + offset, result);
}
return result;
}

private:
const MapVector* base_;
const int32_t* indices_;
DecodedVector decodedKeys_;
DecodedVector decodedValues_;
std::shared_ptr<VectorHasher<HashClass>> keyHasher_;
std::shared_ptr<VectorHasher<HashClass>> valueHasher_;
};

template <typename HashClass, typename SeedType, typename ReturnType>
class RowVectorHasher : public VectorHasher<HashClass, SeedType, ReturnType> {
public:
RowVectorHasher(DecodedVector& decoded) {
base_ = decoded.base()->as<RowVector>();
indices_ = decoded.indices();

SelectivityVector rows(base_->size());
decodedChildren_.resize(base_->childrenSize());
hashers_.resize(base_->childrenSize());
for (auto i = 0; i < base_->childrenSize(); ++i) {
decodedChildren_[i].decode(*base_->childAt(i), rows);
hashers_[i] = createVectorHasher<HashClass>(decodedChildren_[i]);
}
}

ReturnType hashAt(vector_size_t index, SeedType seed) override {
if (base_->isNullAt(indices_[index])) {
return seed;
}

ReturnType result = seed;
for (auto i = 0; i < base_->childrenSize(); ++i) {
result = hashers_[i]->hashAt(index, result);
}
return result;
}

private:
const RowVector* base_;
const int32_t* indices_;
std::vector<DecodedVector> decodedChildren_;
std::vector<std::shared_ptr<VectorHasher<HashClass>>> hashers_;
};

// ReturnType can be either int32_t or int64_t
// HashClass contains the function like hashInt32
Expand Down Expand Up @@ -330,7 +318,11 @@ void applyWithType(
decoded->nulls(), rows.begin(), rows.end());
selected = selectedMinusNulls.get();
}
hash<HashClass>(*selected, args[i]->type(), *decoded, result.values());

auto hasher = createVectorHasher<HashClass>(*decoded);
selected->applyToSelected([&](int row) {
result.set(row, hasher->hashAt(row, result.valueAt(row)));
});
}
}

Expand Down

0 comments on commit bacd667

Please sign in to comment.