Skip to content

Commit

Permalink
Merge branch 'miyurud:master' into fix/partition_issue
Browse files Browse the repository at this point in the history
  • Loading branch information
thamindumk authored Feb 24, 2025
2 parents 7c9e042 + 2b91cde commit 7f9f119
Show file tree
Hide file tree
Showing 9 changed files with 508 additions and 11 deletions.
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ set(HEADERS globals.h
src/frontend/core/executor/impl/PageRankExecutor.h
src/util/dbinterface/DBInterface.h
src/query/processor/cypher/util/Const.h
src/nativestore/MetaPropertyLink.h
src/nativestore/MetaPropertyEdgeLink.h
)

set(SOURCES src/backend/JasmineGraphBackend.cpp
Expand Down Expand Up @@ -156,6 +158,9 @@ set(SOURCES src/backend/JasmineGraphBackend.cpp
src/frontend/ui/JasmineGraphFrontEndUI.h
src/frontend/core/common/JasmineGraphFrontendCommon.cpp
src/frontend/core/common/JasmineGraphFrontendCommon.h
src/nativestore/MetaPropertyLink.cpp
src/nativestore/MetaPropertyEdgeLink.cpp

)

if (CMAKE_BUILD_TYPE STREQUAL "DEBUG")
Expand Down
13 changes: 10 additions & 3 deletions src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
#include "../../nativestore/RelationBlock.h"
#include "../../util/logger/Logger.h"
#include "../../util/Utils.h"
#include "../../nativestore/MetaPropertyLink.h"

Logger incremental_localstore_logger;

Expand Down Expand Up @@ -67,8 +68,9 @@ void JasmineGraphIncrementalLocalStore::addEdgeFromString(std::string edgeString
if (!newRelation) {
return;
}
char value[PropertyLink::MAX_VALUE_SIZE] = {};

char value[PropertyLink::MAX_VALUE_SIZE] = {};
char meta[MetaPropertyLink::MAX_VALUE_SIZE] = {};
if (edgeJson.contains("properties")) {
auto edgeProperties = edgeJson["properties"];
for (auto it = edgeProperties.begin(); it != edgeProperties.end(); it++) {
Expand All @@ -80,22 +82,27 @@ void JasmineGraphIncrementalLocalStore::addEdgeFromString(std::string edgeString
}
}
}

if (sourceJson.contains("properties")) {
auto sourceProps = sourceJson["properties"];
for (auto it = sourceProps.begin(); it != sourceProps.end(); it++) {
strcpy(value, it.value().get<std::string>().c_str());
newRelation->getSource()->addProperty(std::string(it.key()), &value[0]);
}
}
std::string sourcePid = std::to_string(destinationJson["pid"].get<int>());
strcpy(meta, sourcePid.c_str());
newRelation->getSource()->addMetaProperty(MetaPropertyLink::PARTITION_ID, &meta[0]);

if (destinationJson.contains("properties")) {
auto destProps = destinationJson["properties"];
for (auto it = destProps.begin(); it != destProps.end(); it++) {
strcpy(value, it.value().get<std::string>().c_str());
newRelation->getDestination()->addProperty(std::string(it.key()), &value[0]);
}
}

std::string destPid = std::to_string(destinationJson["pid"].get<int>());
strcpy(meta, destPid.c_str());
newRelation->getDestination()->addMetaProperty(MetaPropertyLink::PARTITION_ID, &meta[0]);
incremental_localstore_logger.debug("Edge (" + sId + ", " + dId + ") Added successfully!");
} catch (const std::exception&) { // TODO tmkasun: Handle multiple types of exceptions
incremental_localstore_logger.log(
Expand Down
176 changes: 176 additions & 0 deletions src/nativestore/MetaPropertyEdgeLink.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/**
Copyright 2025 JasmineGraph Team
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
**/

#include <sstream>
#include <vector>
#include <memory>
#include "MetaPropertyEdgeLink.h"
#include "../util/logger/Logger.h"

Logger property_edge_link_logger;
thread_local unsigned int MetaPropertyEdgeLink::nextPropertyIndex = 1;
thread_local std::fstream* MetaPropertyEdgeLink::metaEdgePropertiesDB = nullptr;
pthread_mutex_t lockMetaPropertyEdgeLink;
pthread_mutex_t lockCreateMetaPropertyEdgeLink;
pthread_mutex_t lockInsertMetaPropertyEdgeLink;
pthread_mutex_t lockGetMetaPropertyEdgeLink;

MetaPropertyEdgeLink::MetaPropertyEdgeLink(unsigned int propertyBlockAddress) : blockAddress(propertyBlockAddress) {
pthread_mutex_lock(&lockMetaPropertyEdgeLink);
if (propertyBlockAddress > 0) {
this->metaEdgePropertiesDB->seekg(propertyBlockAddress);
char rawName[MetaPropertyEdgeLink::MAX_NAME_SIZE] = {0};
if (!this->metaEdgePropertiesDB->read(reinterpret_cast<char*>(&rawName),
MetaPropertyEdgeLink::MAX_NAME_SIZE)) {
property_edge_link_logger.error("Error while reading property name from block " +
std::to_string(blockAddress));
}
property_edge_link_logger.debug(
"Current file descriptor cursor position = " +
std::to_string(this->metaEdgePropertiesDB->tellg()) +
" when reading = " + std::to_string(blockAddress));
if (!this->metaEdgePropertiesDB->read(reinterpret_cast<char*>(&this->value),
MetaPropertyEdgeLink::MAX_VALUE_SIZE)) {
property_edge_link_logger.error("Error while reading property value from block " +
std::to_string(blockAddress));
}

if (!this->metaEdgePropertiesDB->read(reinterpret_cast<char*>(&(this->nextPropAddress)),
sizeof(unsigned int))) {
property_edge_link_logger.error("Error while reading property next address from block " +
std::to_string(blockAddress));
}

this->name = std::string(rawName);
}
pthread_mutex_unlock(&lockMetaPropertyEdgeLink);
};

MetaPropertyEdgeLink::MetaPropertyEdgeLink(unsigned int blockAddress, std::string name,
char* rvalue, unsigned int nextAddress)
: blockAddress(blockAddress), nextPropAddress(nextAddress), name(name) {
memcpy(this->value, rvalue, MetaPropertyEdgeLink::MAX_VALUE_SIZE);
};

unsigned int MetaPropertyEdgeLink::insert(std::string name, char* value) {
char dataName[MetaPropertyEdgeLink::MAX_NAME_SIZE] = {0};
char dataValue[MetaPropertyEdgeLink::MAX_VALUE_SIZE] = {0};
std::strcpy(dataName, name.c_str());
std::memcpy(dataValue, value, MetaPropertyEdgeLink::MAX_VALUE_SIZE);
unsigned int nextAddress = 0;
if (this->name == name) {
pthread_mutex_lock(&lockInsertMetaPropertyEdgeLink);
this->metaEdgePropertiesDB->seekp(this->blockAddress + MetaPropertyEdgeLink::MAX_NAME_SIZE);
this->metaEdgePropertiesDB->write(reinterpret_cast<char*>(dataValue), MetaPropertyEdgeLink::MAX_VALUE_SIZE);
this->metaEdgePropertiesDB->flush();
pthread_mutex_unlock(&lockInsertMetaPropertyEdgeLink);
property_edge_link_logger.debug("Updating already existing property key = " + std::string(name));
return this->blockAddress;
} else if (this->nextPropAddress) { // Traverse to the edge/end of the link list
std::unique_ptr<MetaPropertyEdgeLink> pel(new MetaPropertyEdgeLink(this->nextPropAddress));
return pel->insert(name, value);
} else { // No next link means end of the link, Now add the new link
pthread_mutex_lock(&lockInsertMetaPropertyEdgeLink);
unsigned int newAddress = MetaPropertyEdgeLink::nextPropertyIndex *
MetaPropertyEdgeLink::META_PROPERTY_BLOCK_SIZE;
this->metaEdgePropertiesDB->seekp(newAddress);
this->metaEdgePropertiesDB->write(dataName, MetaPropertyEdgeLink::MAX_NAME_SIZE);
this->metaEdgePropertiesDB->write(reinterpret_cast<char*>(dataValue), MetaPropertyEdgeLink::MAX_VALUE_SIZE);
if (!this->metaEdgePropertiesDB->write(reinterpret_cast<char*>(&nextAddress), sizeof(nextAddress))) {
property_edge_link_logger.error("Error while inserting a property " + name
+ " into block address " + std::to_string(newAddress));
return -1;
}

this->metaEdgePropertiesDB->flush();
this->nextPropAddress = newAddress;
this->metaEdgePropertiesDB->seekp(this->blockAddress + MetaPropertyEdgeLink::MAX_NAME_SIZE +
MetaPropertyEdgeLink::MAX_VALUE_SIZE); // seek to current property next address
if (!this->metaEdgePropertiesDB->write(reinterpret_cast<char*>(&newAddress), sizeof(newAddress))) {
property_edge_link_logger.error("Error while updating property next address for " + name +
" into block address " + std::to_string(this->blockAddress));
return -1;
}

MetaPropertyEdgeLink::nextPropertyIndex++; // Increment the shared property index value
pthread_mutex_unlock(&lockInsertMetaPropertyEdgeLink);
return this->blockAddress;
}
}

MetaPropertyEdgeLink* MetaPropertyEdgeLink::create(std::string name, char value[]) {
pthread_mutex_lock(&lockCreateMetaPropertyEdgeLink);
unsigned int nextAddress = 0;
char dataName[MetaPropertyEdgeLink::MAX_NAME_SIZE] = {0};
strcpy(dataName, name.c_str());
unsigned int newAddress = MetaPropertyEdgeLink::nextPropertyIndex * MetaPropertyEdgeLink::META_PROPERTY_BLOCK_SIZE;
MetaPropertyEdgeLink::metaEdgePropertiesDB->seekp(newAddress);
MetaPropertyEdgeLink::metaEdgePropertiesDB->write(dataName, MetaPropertyEdgeLink::MAX_NAME_SIZE);
MetaPropertyEdgeLink::metaEdgePropertiesDB->write(reinterpret_cast<char*>(value),
MetaPropertyEdgeLink::MAX_VALUE_SIZE);
if (!MetaPropertyEdgeLink::metaEdgePropertiesDB->write(reinterpret_cast<char*>(&nextAddress),
sizeof(nextAddress))) {
property_edge_link_logger.error("Error while inserting the property = " + name +
" into block a new address = " + std::to_string(newAddress));
return nullptr;
}
MetaPropertyEdgeLink::metaEdgePropertiesDB->flush();
MetaPropertyEdgeLink::nextPropertyIndex++; // Increment the shared property index value
pthread_mutex_unlock(&lockCreateMetaPropertyEdgeLink);
return new MetaPropertyEdgeLink(newAddress, name, value, nextAddress);
}

MetaPropertyEdgeLink* MetaPropertyEdgeLink::next() {
if (this->nextPropAddress) {
return new MetaPropertyEdgeLink(this->nextPropAddress);
}
return nullptr;
}

MetaPropertyEdgeLink* MetaPropertyEdgeLink::get(unsigned int propertyBlockAddress) {
MetaPropertyEdgeLink* pl = nullptr;

pthread_mutex_lock(&lockGetMetaPropertyEdgeLink);
if (propertyBlockAddress > 0) {
char propertyName[MetaPropertyEdgeLink::MAX_NAME_SIZE] = {0};
char propertyValue[MetaPropertyEdgeLink::MAX_VALUE_SIZE] = {0};
unsigned int nextAddress;
MetaPropertyEdgeLink::metaEdgePropertiesDB->seekg(propertyBlockAddress);

if (!MetaPropertyEdgeLink::metaEdgePropertiesDB->read(reinterpret_cast<char*>(&propertyName),
MetaPropertyEdgeLink::MAX_NAME_SIZE)) {
property_edge_link_logger.error("Error while reading edge property name from block = " +
std::to_string(propertyBlockAddress));
}
if (!MetaPropertyEdgeLink::metaEdgePropertiesDB->read(reinterpret_cast<char*>(&propertyValue),
MetaPropertyEdgeLink::MAX_VALUE_SIZE)) {
property_edge_link_logger.error("Error while reading edge property value from block = " +
std::to_string(propertyBlockAddress));
}

if (!MetaPropertyEdgeLink::metaEdgePropertiesDB->read(reinterpret_cast<char*>(&(nextAddress)),
sizeof(unsigned int))) {
property_edge_link_logger.error("Error while reading edge property next address from block = " +
std::to_string(propertyBlockAddress));
}
property_edge_link_logger.debug("Property head propertyBlockAddress = " +
std::to_string(propertyBlockAddress));
property_edge_link_logger.debug("Property head property name = " + std::string(propertyName));
property_edge_link_logger.debug("Property head nextAddress = " + std::to_string(nextAddress));

pl = new MetaPropertyEdgeLink(propertyBlockAddress, std::string(propertyName),
propertyValue, nextAddress);
}
pthread_mutex_unlock(&lockGetMetaPropertyEdgeLink);
return pl;
}
42 changes: 42 additions & 0 deletions src/nativestore/MetaPropertyEdgeLink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
Copyright 2025 JasmineGraph Team
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
**/

#ifndef JASMINEGRAPH_METAPROPERTYEDGELINK_H
#define JASMINEGRAPH_METAPROPERTYEDGELINK_H
#include <cstring>
#include <fstream>
#include <set>
#include <string>

class MetaPropertyEdgeLink {
public:
static const unsigned long MAX_NAME_SIZE = 12; // Size of a property name in bytes
static const unsigned long MAX_VALUE_SIZE = 180; // Size of a property value in bytes
static thread_local unsigned int nextPropertyIndex; // Next available property block index
static const unsigned long META_PROPERTY_BLOCK_SIZE = MAX_NAME_SIZE + MAX_VALUE_SIZE + sizeof(unsigned int);
std::string name;
char value[MetaPropertyEdgeLink::MAX_VALUE_SIZE] = {0};
unsigned int blockAddress;
unsigned int nextPropAddress;
static std::string DB_PATH;
static thread_local std::fstream* metaEdgePropertiesDB;
MetaPropertyEdgeLink(unsigned int);
MetaPropertyEdgeLink(unsigned int, std::string, char*, unsigned int);
static MetaPropertyEdgeLink* get(unsigned int);
static MetaPropertyEdgeLink* create(std::string, char[]);
unsigned int insert(std::string, char[]);
MetaPropertyEdgeLink* next();
};


#endif // JASMINEGRAPH_METAPROPERTYEDGELINK_H
Loading

0 comments on commit 7f9f119

Please sign in to comment.