diff --git a/CMakeLists.txt b/CMakeLists.txt index f9540aeb7..ae3441094 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,6 +80,8 @@ set(HEADERS globals.h src/frontend/core/executor/impl/PageRankExecutor.h src/util/dbinterface/DBInterface.h src/query/processor/cypher/util/Const.h + src/nativestore/MetaPropertyLink.h + src/nativestore/MetaPropertyEdgeLink.h ) set(SOURCES src/backend/JasmineGraphBackend.cpp @@ -156,6 +158,9 @@ set(SOURCES src/backend/JasmineGraphBackend.cpp src/frontend/ui/JasmineGraphFrontEndUI.h src/frontend/core/common/JasmineGraphFrontendCommon.cpp src/frontend/core/common/JasmineGraphFrontendCommon.h + src/nativestore/MetaPropertyLink.cpp + src/nativestore/MetaPropertyEdgeLink.cpp + ) if (CMAKE_BUILD_TYPE STREQUAL "DEBUG") diff --git a/src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp b/src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp index 938f1ef66..2ec4231e9 100644 --- a/src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp +++ b/src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp @@ -19,6 +19,7 @@ limitations under the License. #include "../../nativestore/RelationBlock.h" #include "../../util/logger/Logger.h" #include "../../util/Utils.h" +#include "../../nativestore/MetaPropertyLink.h" Logger incremental_localstore_logger; @@ -67,8 +68,9 @@ void JasmineGraphIncrementalLocalStore::addEdgeFromString(std::string edgeString if (!newRelation) { return; } - char value[PropertyLink::MAX_VALUE_SIZE] = {}; + char value[PropertyLink::MAX_VALUE_SIZE] = {}; + char meta[MetaPropertyLink::MAX_VALUE_SIZE] = {}; if (edgeJson.contains("properties")) { auto edgeProperties = edgeJson["properties"]; for (auto it = edgeProperties.begin(); it != edgeProperties.end(); it++) { @@ -80,7 +82,6 @@ void JasmineGraphIncrementalLocalStore::addEdgeFromString(std::string edgeString } } } - if (sourceJson.contains("properties")) { auto sourceProps = sourceJson["properties"]; for (auto it = sourceProps.begin(); it != sourceProps.end(); it++) { @@ -88,6 +89,10 @@ void JasmineGraphIncrementalLocalStore::addEdgeFromString(std::string edgeString newRelation->getSource()->addProperty(std::string(it.key()), &value[0]); } } + std::string sourcePid = std::to_string(destinationJson["pid"].get()); + strcpy(meta, sourcePid.c_str()); + newRelation->getSource()->addMetaProperty(MetaPropertyLink::PARTITION_ID, &meta[0]); + if (destinationJson.contains("properties")) { auto destProps = destinationJson["properties"]; for (auto it = destProps.begin(); it != destProps.end(); it++) { @@ -95,7 +100,9 @@ void JasmineGraphIncrementalLocalStore::addEdgeFromString(std::string edgeString newRelation->getDestination()->addProperty(std::string(it.key()), &value[0]); } } - + std::string destPid = std::to_string(destinationJson["pid"].get()); + strcpy(meta, destPid.c_str()); + newRelation->getDestination()->addMetaProperty(MetaPropertyLink::PARTITION_ID, &meta[0]); incremental_localstore_logger.debug("Edge (" + sId + ", " + dId + ") Added successfully!"); } catch (const std::exception&) { // TODO tmkasun: Handle multiple types of exceptions incremental_localstore_logger.log( diff --git a/src/nativestore/MetaPropertyEdgeLink.cpp b/src/nativestore/MetaPropertyEdgeLink.cpp new file mode 100644 index 000000000..d2444688e --- /dev/null +++ b/src/nativestore/MetaPropertyEdgeLink.cpp @@ -0,0 +1,176 @@ +/** +Copyright 2025 JasmineGraph Team +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +**/ + +#include +#include +#include +#include "MetaPropertyEdgeLink.h" +#include "../util/logger/Logger.h" + +Logger property_edge_link_logger; +thread_local unsigned int MetaPropertyEdgeLink::nextPropertyIndex = 1; +thread_local std::fstream* MetaPropertyEdgeLink::metaEdgePropertiesDB = nullptr; +pthread_mutex_t lockMetaPropertyEdgeLink; +pthread_mutex_t lockCreateMetaPropertyEdgeLink; +pthread_mutex_t lockInsertMetaPropertyEdgeLink; +pthread_mutex_t lockGetMetaPropertyEdgeLink; + +MetaPropertyEdgeLink::MetaPropertyEdgeLink(unsigned int propertyBlockAddress) : blockAddress(propertyBlockAddress) { + pthread_mutex_lock(&lockMetaPropertyEdgeLink); + if (propertyBlockAddress > 0) { + this->metaEdgePropertiesDB->seekg(propertyBlockAddress); + char rawName[MetaPropertyEdgeLink::MAX_NAME_SIZE] = {0}; + if (!this->metaEdgePropertiesDB->read(reinterpret_cast(&rawName), + MetaPropertyEdgeLink::MAX_NAME_SIZE)) { + property_edge_link_logger.error("Error while reading property name from block " + + std::to_string(blockAddress)); + } + property_edge_link_logger.debug( + "Current file descriptor cursor position = " + + std::to_string(this->metaEdgePropertiesDB->tellg()) + + " when reading = " + std::to_string(blockAddress)); + if (!this->metaEdgePropertiesDB->read(reinterpret_cast(&this->value), + MetaPropertyEdgeLink::MAX_VALUE_SIZE)) { + property_edge_link_logger.error("Error while reading property value from block " + + std::to_string(blockAddress)); + } + + if (!this->metaEdgePropertiesDB->read(reinterpret_cast(&(this->nextPropAddress)), + sizeof(unsigned int))) { + property_edge_link_logger.error("Error while reading property next address from block " + + std::to_string(blockAddress)); + } + + this->name = std::string(rawName); + } + pthread_mutex_unlock(&lockMetaPropertyEdgeLink); +}; + +MetaPropertyEdgeLink::MetaPropertyEdgeLink(unsigned int blockAddress, std::string name, + char* rvalue, unsigned int nextAddress) + : blockAddress(blockAddress), nextPropAddress(nextAddress), name(name) { + memcpy(this->value, rvalue, MetaPropertyEdgeLink::MAX_VALUE_SIZE); +}; + +unsigned int MetaPropertyEdgeLink::insert(std::string name, char* value) { + char dataName[MetaPropertyEdgeLink::MAX_NAME_SIZE] = {0}; + char dataValue[MetaPropertyEdgeLink::MAX_VALUE_SIZE] = {0}; + std::strcpy(dataName, name.c_str()); + std::memcpy(dataValue, value, MetaPropertyEdgeLink::MAX_VALUE_SIZE); + unsigned int nextAddress = 0; + if (this->name == name) { + pthread_mutex_lock(&lockInsertMetaPropertyEdgeLink); + this->metaEdgePropertiesDB->seekp(this->blockAddress + MetaPropertyEdgeLink::MAX_NAME_SIZE); + this->metaEdgePropertiesDB->write(reinterpret_cast(dataValue), MetaPropertyEdgeLink::MAX_VALUE_SIZE); + this->metaEdgePropertiesDB->flush(); + pthread_mutex_unlock(&lockInsertMetaPropertyEdgeLink); + property_edge_link_logger.debug("Updating already existing property key = " + std::string(name)); + return this->blockAddress; + } else if (this->nextPropAddress) { // Traverse to the edge/end of the link list + std::unique_ptr pel(new MetaPropertyEdgeLink(this->nextPropAddress)); + return pel->insert(name, value); + } else { // No next link means end of the link, Now add the new link + pthread_mutex_lock(&lockInsertMetaPropertyEdgeLink); + unsigned int newAddress = MetaPropertyEdgeLink::nextPropertyIndex * + MetaPropertyEdgeLink::META_PROPERTY_BLOCK_SIZE; + this->metaEdgePropertiesDB->seekp(newAddress); + this->metaEdgePropertiesDB->write(dataName, MetaPropertyEdgeLink::MAX_NAME_SIZE); + this->metaEdgePropertiesDB->write(reinterpret_cast(dataValue), MetaPropertyEdgeLink::MAX_VALUE_SIZE); + if (!this->metaEdgePropertiesDB->write(reinterpret_cast(&nextAddress), sizeof(nextAddress))) { + property_edge_link_logger.error("Error while inserting a property " + name + + " into block address " + std::to_string(newAddress)); + return -1; + } + + this->metaEdgePropertiesDB->flush(); + this->nextPropAddress = newAddress; + this->metaEdgePropertiesDB->seekp(this->blockAddress + MetaPropertyEdgeLink::MAX_NAME_SIZE + + MetaPropertyEdgeLink::MAX_VALUE_SIZE); // seek to current property next address + if (!this->metaEdgePropertiesDB->write(reinterpret_cast(&newAddress), sizeof(newAddress))) { + property_edge_link_logger.error("Error while updating property next address for " + name + + " into block address " + std::to_string(this->blockAddress)); + return -1; + } + + MetaPropertyEdgeLink::nextPropertyIndex++; // Increment the shared property index value + pthread_mutex_unlock(&lockInsertMetaPropertyEdgeLink); + return this->blockAddress; + } +} + +MetaPropertyEdgeLink* MetaPropertyEdgeLink::create(std::string name, char value[]) { + pthread_mutex_lock(&lockCreateMetaPropertyEdgeLink); + unsigned int nextAddress = 0; + char dataName[MetaPropertyEdgeLink::MAX_NAME_SIZE] = {0}; + strcpy(dataName, name.c_str()); + unsigned int newAddress = MetaPropertyEdgeLink::nextPropertyIndex * MetaPropertyEdgeLink::META_PROPERTY_BLOCK_SIZE; + MetaPropertyEdgeLink::metaEdgePropertiesDB->seekp(newAddress); + MetaPropertyEdgeLink::metaEdgePropertiesDB->write(dataName, MetaPropertyEdgeLink::MAX_NAME_SIZE); + MetaPropertyEdgeLink::metaEdgePropertiesDB->write(reinterpret_cast(value), + MetaPropertyEdgeLink::MAX_VALUE_SIZE); + if (!MetaPropertyEdgeLink::metaEdgePropertiesDB->write(reinterpret_cast(&nextAddress), + sizeof(nextAddress))) { + property_edge_link_logger.error("Error while inserting the property = " + name + + " into block a new address = " + std::to_string(newAddress)); + return nullptr; + } + MetaPropertyEdgeLink::metaEdgePropertiesDB->flush(); + MetaPropertyEdgeLink::nextPropertyIndex++; // Increment the shared property index value + pthread_mutex_unlock(&lockCreateMetaPropertyEdgeLink); + return new MetaPropertyEdgeLink(newAddress, name, value, nextAddress); +} + +MetaPropertyEdgeLink* MetaPropertyEdgeLink::next() { + if (this->nextPropAddress) { + return new MetaPropertyEdgeLink(this->nextPropAddress); + } + return nullptr; +} + +MetaPropertyEdgeLink* MetaPropertyEdgeLink::get(unsigned int propertyBlockAddress) { + MetaPropertyEdgeLink* pl = nullptr; + + pthread_mutex_lock(&lockGetMetaPropertyEdgeLink); + if (propertyBlockAddress > 0) { + char propertyName[MetaPropertyEdgeLink::MAX_NAME_SIZE] = {0}; + char propertyValue[MetaPropertyEdgeLink::MAX_VALUE_SIZE] = {0}; + unsigned int nextAddress; + MetaPropertyEdgeLink::metaEdgePropertiesDB->seekg(propertyBlockAddress); + + if (!MetaPropertyEdgeLink::metaEdgePropertiesDB->read(reinterpret_cast(&propertyName), + MetaPropertyEdgeLink::MAX_NAME_SIZE)) { + property_edge_link_logger.error("Error while reading edge property name from block = " + + std::to_string(propertyBlockAddress)); + } + if (!MetaPropertyEdgeLink::metaEdgePropertiesDB->read(reinterpret_cast(&propertyValue), + MetaPropertyEdgeLink::MAX_VALUE_SIZE)) { + property_edge_link_logger.error("Error while reading edge property value from block = " + + std::to_string(propertyBlockAddress)); + } + + if (!MetaPropertyEdgeLink::metaEdgePropertiesDB->read(reinterpret_cast(&(nextAddress)), + sizeof(unsigned int))) { + property_edge_link_logger.error("Error while reading edge property next address from block = " + + std::to_string(propertyBlockAddress)); + } + property_edge_link_logger.debug("Property head propertyBlockAddress = " + + std::to_string(propertyBlockAddress)); + property_edge_link_logger.debug("Property head property name = " + std::string(propertyName)); + property_edge_link_logger.debug("Property head nextAddress = " + std::to_string(nextAddress)); + + pl = new MetaPropertyEdgeLink(propertyBlockAddress, std::string(propertyName), + propertyValue, nextAddress); + } + pthread_mutex_unlock(&lockGetMetaPropertyEdgeLink); + return pl; +} diff --git a/src/nativestore/MetaPropertyEdgeLink.h b/src/nativestore/MetaPropertyEdgeLink.h new file mode 100644 index 000000000..aaf322d26 --- /dev/null +++ b/src/nativestore/MetaPropertyEdgeLink.h @@ -0,0 +1,42 @@ +/** +Copyright 2025 JasmineGraph Team +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +**/ + +#ifndef JASMINEGRAPH_METAPROPERTYEDGELINK_H +#define JASMINEGRAPH_METAPROPERTYEDGELINK_H +#include +#include +#include +#include + +class MetaPropertyEdgeLink { + public: + static const unsigned long MAX_NAME_SIZE = 12; // Size of a property name in bytes + static const unsigned long MAX_VALUE_SIZE = 180; // Size of a property value in bytes + static thread_local unsigned int nextPropertyIndex; // Next available property block index + static const unsigned long META_PROPERTY_BLOCK_SIZE = MAX_NAME_SIZE + MAX_VALUE_SIZE + sizeof(unsigned int); + std::string name; + char value[MetaPropertyEdgeLink::MAX_VALUE_SIZE] = {0}; + unsigned int blockAddress; + unsigned int nextPropAddress; + static std::string DB_PATH; + static thread_local std::fstream* metaEdgePropertiesDB; + MetaPropertyEdgeLink(unsigned int); + MetaPropertyEdgeLink(unsigned int, std::string, char*, unsigned int); + static MetaPropertyEdgeLink* get(unsigned int); + static MetaPropertyEdgeLink* create(std::string, char[]); + unsigned int insert(std::string, char[]); + MetaPropertyEdgeLink* next(); +}; + + +#endif // JASMINEGRAPH_METAPROPERTYEDGELINK_H diff --git a/src/nativestore/MetaPropertyLink.cpp b/src/nativestore/MetaPropertyLink.cpp new file mode 100644 index 000000000..6eab5d995 --- /dev/null +++ b/src/nativestore/MetaPropertyLink.cpp @@ -0,0 +1,167 @@ +/** +Copyright 2025 JasmineGraph Team +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +**/ + +#include "MetaPropertyLink.h" +#include "../util/logger/Logger.h" + +Logger meta_property_link_logger; +thread_local unsigned int MetaPropertyLink::nextPropertyIndex = 1; +thread_local std::fstream* MetaPropertyLink::metaPropertiesDB = NULL; +pthread_mutex_t lockMetaPropertyLink; +pthread_mutex_t lockCreateMetaPropertyLink; +pthread_mutex_t lockInsertMetaPropertyLink; +pthread_mutex_t lockGetMetaPropertyLink; + +MetaPropertyLink::MetaPropertyLink(unsigned int propertyBlockAddress) : blockAddress(propertyBlockAddress) { + pthread_mutex_lock(&lockMetaPropertyLink); + if (propertyBlockAddress > 0) { + MetaPropertyLink::metaPropertiesDB->seekg(propertyBlockAddress); + char rawName[MetaPropertyLink::MAX_NAME_SIZE] = {0}; + + if (!this->metaPropertiesDB->read(reinterpret_cast(&rawName), MetaPropertyLink::MAX_NAME_SIZE)) { + meta_property_link_logger.error("Error while reading node property name from block " + + std::to_string(blockAddress)); + } + meta_property_link_logger.debug( + "Current file descriptor curser position = " + std::to_string(this->metaPropertiesDB->tellg()) + + " when reading = " + std::to_string(blockAddress)); + if (!this->metaPropertiesDB->read(reinterpret_cast(&this->value), MetaPropertyLink::MAX_VALUE_SIZE)) { + meta_property_link_logger.error("Error while reading node property value from block " + + std::to_string(blockAddress)); + } + + if (!this->metaPropertiesDB->read(reinterpret_cast(&(this->nextPropAddress)), sizeof(unsigned int))) { + meta_property_link_logger.error("Error while reading node property next address from block " + + std::to_string(blockAddress)); + } + + this->name = std::string(rawName); + } + pthread_mutex_unlock(&lockMetaPropertyLink); +}; + +MetaPropertyLink::MetaPropertyLink(unsigned int blockAddress, std::string name, + const char* rvalue, unsigned int nextAddress) + : blockAddress(blockAddress), nextPropAddress(nextAddress), name(name) { + memcpy(this->value, rvalue, MetaPropertyLink::MAX_VALUE_SIZE); +}; + +unsigned int MetaPropertyLink::insert(std::string name, const char* value) { + char dataName[MetaPropertyLink::MAX_NAME_SIZE] = {0}; + char dataValue[MetaPropertyLink::MAX_VALUE_SIZE] = {0}; + std::strcpy(dataName, name.c_str()); + std::memcpy(dataValue, value, + MetaPropertyLink::MAX_VALUE_SIZE); + + unsigned int nextAddress = 0; + + if (this->name == name) { + pthread_mutex_lock(&lockInsertMetaPropertyLink); + this->metaPropertiesDB->seekp(this->blockAddress + MetaPropertyLink::MAX_NAME_SIZE); + this->metaPropertiesDB->write(reinterpret_cast(dataValue), MetaPropertyLink::MAX_VALUE_SIZE); + this->metaPropertiesDB->flush(); + pthread_mutex_unlock(&lockInsertMetaPropertyLink); + meta_property_link_logger.debug("Updating already existing property key = " + std::string(name)); + return this->blockAddress; + } else if (this->nextPropAddress) { // Traverse to the edge/end of the link list + return this->next()->insert(name, value); + } else { + pthread_mutex_lock(&lockInsertMetaPropertyLink); + unsigned int newAddress = MetaPropertyLink::nextPropertyIndex * MetaPropertyLink::META_PROPERTY_BLOCK_SIZE; + this->metaPropertiesDB->seekp(newAddress); + this->metaPropertiesDB->write(dataName, MetaPropertyLink::MAX_NAME_SIZE); + this->metaPropertiesDB->write(reinterpret_cast(dataValue), MetaPropertyLink::MAX_VALUE_SIZE); + if (!this->metaPropertiesDB->write(reinterpret_cast(&nextAddress), sizeof(nextAddress))) { + meta_property_link_logger.error("Error while inserting a property " + name + " into block address " + + std::to_string(newAddress)); + return -1; + } + + this->metaPropertiesDB->flush(); + + this->nextPropAddress = newAddress; + this->metaPropertiesDB->seekp(this->blockAddress + MetaPropertyLink::MAX_NAME_SIZE + + MetaPropertyLink::MAX_VALUE_SIZE); // seek to current property next address + if (!this->metaPropertiesDB->write(reinterpret_cast(&newAddress), sizeof(newAddress))) { + meta_property_link_logger.error("Error while updating property next address for " + name + + " into block address " + std::to_string(this->blockAddress)); + return -1; + } + + MetaPropertyLink::nextPropertyIndex++; // Increment the shared property index value + pthread_mutex_unlock(&lockInsertMetaPropertyLink); + return this->blockAddress; + } +} + +MetaPropertyLink* MetaPropertyLink::create(std::string name, const char* value) { + pthread_mutex_lock(&lockCreateMetaPropertyLink); + unsigned int nextAddress = 0; + char dataName[MetaPropertyLink::MAX_NAME_SIZE] = {0}; + strcpy(dataName, name.c_str()); + unsigned int newAddress = MetaPropertyLink::nextPropertyIndex * MetaPropertyLink::META_PROPERTY_BLOCK_SIZE; + MetaPropertyLink::metaPropertiesDB->seekp(newAddress); + MetaPropertyLink::metaPropertiesDB->write(dataName, MetaPropertyLink::MAX_NAME_SIZE); + MetaPropertyLink::metaPropertiesDB->write(reinterpret_cast(value), MetaPropertyLink::MAX_VALUE_SIZE); + if (!MetaPropertyLink::metaPropertiesDB->write(reinterpret_cast(&nextAddress), sizeof(nextAddress))) { + meta_property_link_logger.error("Error while inserting the property = " + name + + " into block a new address = " + std::to_string(newAddress)); + return NULL; + } + MetaPropertyLink::metaPropertiesDB->flush(); + MetaPropertyLink::nextPropertyIndex++; // Increment the shared property index value + pthread_mutex_unlock(&lockCreateMetaPropertyLink); + return new MetaPropertyLink(newAddress, name, value, nextAddress); +} + +MetaPropertyLink* MetaPropertyLink::next() { + if (this->nextPropAddress) { + return new MetaPropertyLink(this->nextPropAddress); + } + return nullptr; +} + +MetaPropertyLink* MetaPropertyLink::get(unsigned int propertyBlockAddress) { + MetaPropertyLink* pl = nullptr; + + pthread_mutex_lock(&lockGetMetaPropertyLink); + if (propertyBlockAddress > 0) { + char propertyName[MetaPropertyLink::MAX_NAME_SIZE] = {0}; + char propertyValue[MetaPropertyLink::MAX_VALUE_SIZE] = {0}; + unsigned int nextAddress; + MetaPropertyLink::metaPropertiesDB->seekg(propertyBlockAddress); + + + if (!MetaPropertyLink::metaPropertiesDB->read(reinterpret_cast(&propertyName), + MetaPropertyLink::MAX_NAME_SIZE)) { + meta_property_link_logger.error("Error while reading node property name from block = " + + std::to_string(propertyBlockAddress)); + } + if (!MetaPropertyLink::metaPropertiesDB->read(reinterpret_cast(&propertyValue), + MetaPropertyLink::MAX_VALUE_SIZE)) { + meta_property_link_logger.error("Error while reading node property value from block = " + + std::to_string(propertyBlockAddress)); + } + + if (!MetaPropertyLink::metaPropertiesDB->read(reinterpret_cast(&(nextAddress)), + sizeof(unsigned int))) { + meta_property_link_logger.error("Error while reading node property next address from block = " + + std::to_string(propertyBlockAddress)); + } + + pl = new MetaPropertyLink(propertyBlockAddress, std::string(propertyName), + propertyValue, nextAddress); + } + pthread_mutex_unlock(&lockGetMetaPropertyLink); + return pl; +} diff --git a/src/nativestore/MetaPropertyLink.h b/src/nativestore/MetaPropertyLink.h new file mode 100644 index 000000000..4217d4f1d --- /dev/null +++ b/src/nativestore/MetaPropertyLink.h @@ -0,0 +1,46 @@ +/** +Copyright 2025 JasmineGraph Team +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +**/ + +#ifndef JASMINEGRAPH_METAPROPERTYLINK_H +#define JASMINEGRAPH_METAPROPERTYLINK_H +#include +#include +#include +#include + +class MetaPropertyLink { + public: + static const unsigned long MAX_NAME_SIZE = 12; // Size of a property name in bytes + static const unsigned long MAX_VALUE_SIZE = 180; // Size of a property value in bytes + static inline const std::string PARTITION_ID = "pid"; + static thread_local unsigned int nextPropertyIndex; // Next available property block index + static const unsigned long META_PROPERTY_BLOCK_SIZE = MAX_NAME_SIZE + MAX_VALUE_SIZE + sizeof(unsigned int); + + std::string name; + char value[MetaPropertyLink::MAX_VALUE_SIZE] = {0}; + unsigned int blockAddress; + unsigned int nextPropAddress; + + static thread_local std::string DB_PATH; + static thread_local std::fstream* metaPropertiesDB; + + MetaPropertyLink(unsigned int); + MetaPropertyLink(unsigned int, std::string, const char*, unsigned int); + static MetaPropertyLink* get(unsigned int); + static MetaPropertyLink* create(std::string, const char*); + + unsigned int insert(std::string, const char*); + MetaPropertyLink* next(); +}; + +#endif // JASMINEGRAPH_METAPROPERTYLINK_H diff --git a/src/nativestore/NodeBlock.cpp b/src/nativestore/NodeBlock.cpp index 5ef7578ff..802cd26e1 100644 --- a/src/nativestore/NodeBlock.cpp +++ b/src/nativestore/NodeBlock.cpp @@ -18,18 +18,20 @@ limitations under the License. #include "../util/logger/Logger.h" #include "RelationBlock.h" +#include "MetaPropertyLink.h" Logger node_block_logger; pthread_mutex_t lockSaveNode; pthread_mutex_t lockAddNodeProperty; NodeBlock::NodeBlock(std::string id, unsigned int nodeId, unsigned int address, unsigned int propRef, - unsigned int edgeRef, unsigned int centralEdgeRef, unsigned char edgeRefPID, const char* _label, - bool usage) + unsigned int metaPropRef, unsigned int edgeRef, unsigned int centralEdgeRef, + unsigned char edgeRefPID, const char* _label, bool usage) : id(id), nodeId(nodeId), addr(address), propRef(propRef), + metaPropRef(metaPropRef), edgeRef(edgeRef), centralEdgeRef(centralEdgeRef), edgeRefPID(edgeRefPID), @@ -61,6 +63,7 @@ void NodeBlock::save() { NodeBlock::nodesDB->write(reinterpret_cast(&(this->centralEdgeRef)), sizeof(this->centralEdgeRef)); // 4 NodeBlock::nodesDB->put(this->edgeRefPID); // 1 NodeBlock::nodesDB->write(reinterpret_cast(&(this->propRef)), sizeof(this->propRef)); // 4 + NodeBlock::nodesDB->write(reinterpret_cast(&(this->metaPropRef)), sizeof(this->metaPropRef)); // 4 NodeBlock::nodesDB->write(this->label, sizeof(this->label)); // 6 NodeBlock::nodesDB->flush(); // Sync the file with in-memory stream // pthread_mutex_unlock(&lockSaveNode); @@ -92,6 +95,26 @@ void NodeBlock::addProperty(std::string name, const char* value) { } } +void NodeBlock::addMetaProperty(std::string name, const char* value) { + if (this->metaPropRef == 0) { + MetaPropertyLink* newLink = MetaPropertyLink::create(name, value); + + if (newLink) { + this->metaPropRef = newLink->blockAddress; + + NodeBlock::nodesDB->seekp(this->addr +sizeof(this->usage) +sizeof(this->nodeId) + sizeof(this->edgeRef) + + sizeof(this->centralEdgeRef) + sizeof(this->edgeRefPID)+ sizeof(this->propRef)); + NodeBlock::nodesDB->write(reinterpret_cast(&(this->metaPropRef)), sizeof(this->metaPropRef)); + NodeBlock::nodesDB->flush(); + } else { + node_block_logger.error("Error occurred while adding a new property link to " + + std::to_string(this->addr) + " node block"); + } + } else { + this->metaPropRef = this->getMetaPropertyHead()->insert(name, value); + } +} + bool NodeBlock::updateLocalRelation(RelationBlock* newRelation, bool relocateHead) { unsigned int edgeReferenceAddress = newRelation->addr; unsigned int thisAddress = this->addr; @@ -385,6 +408,7 @@ NodeBlock* NodeBlock::get(unsigned int blockAddress) { unsigned int centralEdgeRef; unsigned char edgeRefPID; unsigned int propRef; + unsigned int metaPropRef; char usageBlock; char label[NodeBlock::LABEL_SIZE]; std::string id; @@ -412,6 +436,11 @@ NodeBlock* NodeBlock::get(unsigned int blockAddress) { node_block_logger.error("Error while reading prop reference data from block " + std::to_string(blockAddress)); } + if (!NodeBlock::nodesDB->read(reinterpret_cast(&metaPropRef), sizeof(unsigned int))) { + node_block_logger.error("Error while reading prop reference data from block " + + std::to_string(blockAddress)); + } + if (!NodeBlock::nodesDB->read(&label[0], NodeBlock::LABEL_SIZE)) { node_block_logger.error("Error while reading label data from block " + std::to_string(blockAddress)); } @@ -424,7 +453,8 @@ NodeBlock* NodeBlock::get(unsigned int blockAddress) { id = std::string(label); } nodeBlockPointer = - new NodeBlock(id, nodeId, blockAddress, propRef, edgeRef, centralEdgeRef, edgeRefPID, label, usage); + new NodeBlock(id, nodeId, blockAddress, propRef, metaPropRef, edgeRef, + centralEdgeRef, edgeRefPID, label, usage); if (nodeBlockPointer->id.length() == 0) { // if label not found in node block look in the properties std::map props = nodeBlockPointer->getAllProperties(); if (props["label"]) { @@ -442,4 +472,6 @@ NodeBlock* NodeBlock::get(unsigned int blockAddress) { } PropertyLink* NodeBlock::getPropertyHead() { return PropertyLink::get(this->propRef); } +MetaPropertyLink* NodeBlock::getMetaPropertyHead() { return MetaPropertyLink::get(this->metaPropRef); } + thread_local std::fstream* NodeBlock::nodesDB = NULL; diff --git a/src/nativestore/NodeBlock.h b/src/nativestore/NodeBlock.h index 104566851..0c6daa4d5 100644 --- a/src/nativestore/NodeBlock.h +++ b/src/nativestore/NodeBlock.h @@ -18,6 +18,7 @@ limitations under the License. #include #include "PropertyLink.h" +#include "MetaPropertyLink.h" class RelationBlock; // Forward declaration @@ -29,7 +30,7 @@ class NodeBlock { bool isDirected = false; public: - static const unsigned long BLOCK_SIZE = 24; // Size of a node block in bytes + static const unsigned long BLOCK_SIZE = 28; // Size of a node block in bytes static const unsigned int LABEL_SIZE = 6; // Size of a node label in bytes unsigned int addr = 0; std::string id = ""; // Node ID for this block ie: citation paper ID, Facebook accout ID, Twitter account ID etc @@ -40,6 +41,8 @@ class NodeBlock { unsigned int centralEdgeRef = 0; // edges cut database block address for edge cut relations unsigned char edgeRefPID = 0; // Partition ID of the edge reference unsigned int propRef = 0; // Properties DB block address for node properties + unsigned int metaPropRef = 0; // Meta properties DB block address for node meta properties + char label[LABEL_SIZE] = { 0}; // Initialize with null chars label === ID if length(id) < 6 else ID will be stored as a Node's property @@ -57,8 +60,9 @@ class NodeBlock { this->usage = true; }; - NodeBlock(std::string id, unsigned int nodeId, unsigned int address, unsigned int propRef, unsigned int edgeRef, - unsigned int centralEdgeRef, unsigned char edgeRefPID, const char *_label, bool usage); + NodeBlock(std::string id, unsigned int nodeId, unsigned int address, unsigned int propRef, unsigned int metaPropRef, + unsigned int edgeRef, unsigned int centralEdgeRef, unsigned char edgeRefPID, const char *_label, + bool usage); void save(); std::string getLabel(); @@ -68,8 +72,10 @@ class NodeBlock { static NodeBlock *get(unsigned int); void addProperty(std::string, const char *); + void addMetaProperty(std::string, const char *); std::map getProperty(std::string); PropertyLink *getPropertyHead(); + MetaPropertyLink *getMetaPropertyHead(); std::map getAllProperties(); bool updateLocalRelation(RelationBlock *, bool relocateHead = true); diff --git a/src/nativestore/NodeManager.cpp b/src/nativestore/NodeManager.cpp index ad404e525..b957a57a9 100644 --- a/src/nativestore/NodeManager.cpp +++ b/src/nativestore/NodeManager.cpp @@ -23,6 +23,7 @@ limitations under the License. #include "NodeBlock.h" // To setup node DB #include "PropertyEdgeLink.h" #include "PropertyLink.h" +#include "MetaPropertyLink.h" #include "RelationBlock.h" #include "iostream" #include @@ -42,6 +43,7 @@ NodeManager::NodeManager(GraphConfig gConfig) { std::string nodesDBPath = dbPrefix + "_nodes.db"; indexDBPath = dbPrefix + "_nodes.index.db"; std::string propertiesDBPath = dbPrefix + "_properties.db"; + std::string metaPropertiesDBPath = dbPrefix + "_meta_properties.db"; std::string edgePropertiesDBPath = dbPrefix + "_edge_properties.db"; std::string relationsDBPath = dbPrefix + "_relations.db"; std::string centralRelationsDBPath = dbPrefix + "_central_relations.db"; @@ -74,6 +76,7 @@ NodeManager::NodeManager(GraphConfig gConfig) { NodeBlock::nodesDB = Utils::openFile(nodesDBPath, openMode); PropertyLink::propertiesDB = Utils::openFile(propertiesDBPath, openMode); + MetaPropertyLink::metaPropertiesDB = Utils::openFile(metaPropertiesDBPath, openMode); PropertyEdgeLink::edgePropertiesDB = Utils::openFile(edgePropertiesDBPath, openMode); RelationBlock::relationsDB = utils.openFile(relationsDBPath, openMode); RelationBlock::centralRelationsDB = Utils::openFile(centralRelationsDBPath, openMode); @@ -126,6 +129,14 @@ NodeManager::NodeManager(GraphConfig gConfig) { node_manager_logger.error("Error getting file size for: " + propertiesDBPath); } + if (stat(metaPropertiesDBPath.c_str(), &stat_buf) == 0) { + MetaPropertyLink::nextPropertyIndex = (stat_buf.st_size / MetaPropertyLink::META_PROPERTY_BLOCK_SIZE) == 0 ? 1 : + (stat_buf.st_size / MetaPropertyLink::META_PROPERTY_BLOCK_SIZE); + + } else { + node_manager_logger.error("Error getting file size for: " + metaPropertiesDBPath); + } + if (stat(edgePropertiesDBPath.c_str(), &stat_buf) == 0) { PropertyEdgeLink::nextPropertyIndex = (stat_buf.st_size / PropertyEdgeLink::PROPERTY_BLOCK_SIZE) == 0 ? 1 : (stat_buf.st_size / PropertyEdgeLink::PROPERTY_BLOCK_SIZE); @@ -347,6 +358,7 @@ NodeBlock *NodeManager::get(std::string nodeId) { unsigned int centralEdgeRef; unsigned char edgeRefPID; unsigned int propRef; + unsigned int metaPropRef; char usageBlock; char label[NodeBlock::LABEL_SIZE]; @@ -373,6 +385,10 @@ NodeBlock *NodeManager::get(std::string nodeId) { node_manager_logger.error("Error while reading prop reference data from block " + std::to_string(blockAddress)); } + if (!NodeBlock::nodesDB->read(reinterpret_cast(&metaPropRef), sizeof(unsigned int))) { + node_manager_logger.error("Error while reading prop reference data from block " + std::to_string(blockAddress)); + } + if (!NodeBlock::nodesDB->read(&label[0], NodeBlock::LABEL_SIZE)) { node_manager_logger.error("Error while reading label data from block " + std::to_string(blockAddress)); } @@ -381,8 +397,8 @@ NodeBlock *NodeManager::get(std::string nodeId) { node_manager_logger.debug("Length of label = " + std::to_string(strlen(label))); node_manager_logger.debug("DEBUG: raw edgeRef from DB (disk) " + std::to_string(edgeRef)); - nodeBlockPointer = - new NodeBlock(nodeId, vertexId, blockAddress, propRef, edgeRef, centralEdgeRef, edgeRefPID, label, usage); + nodeBlockPointer = new NodeBlock(nodeId, vertexId, blockAddress, propRef, metaPropRef, edgeRef, + centralEdgeRef, edgeRefPID, label, usage); node_manager_logger.debug("DEBUG: nodeBlockPointer after creating the object edgeRef " + std::to_string(nodeBlockPointer->edgeRef));