diff --git a/include/cgimap/backend/apidb/transaction_manager.hpp b/include/cgimap/backend/apidb/transaction_manager.hpp index c5d4c24b..ebafb3e1 100644 --- a/include/cgimap/backend/apidb/transaction_manager.hpp +++ b/include/cgimap/backend/apidb/transaction_manager.hpp @@ -19,6 +19,57 @@ #include +#if PQXX_VERSION_MAJOR >= 7 + +class Stream_Wrapper +{ +public: + Stream_Wrapper(pqxx::transaction_base &txn, std::string_view table, + std::string_view columns) : + + m_stream(pqxx::stream_to::raw_table(txn, table, columns)), m_table(table), m_start( + std::chrono::steady_clock::now()) + { + } + + ~Stream_Wrapper() + { + if (m_stream) + m_stream.complete(); + } + + template< typename ... Ts > void write_values(Ts const &...fields) + { + m_stream.write_values(std::forward(fields)...); + row_count++; + } + + void complete() + { + m_stream.complete(); + log_stats(); + } + + int row_count = 0; + +private: + + void log_stats() + { + const auto end = std::chrono::steady_clock::now(); + const auto elapsed = std::chrono::duration_cast < std::chrono::milliseconds > (end - m_start); + + logger::message(fmt::format( + "Executed COPY statement for table {} in {:d} ms, inserted {:d} rows", + m_table, elapsed.count(), row_count)); + } + + pqxx::stream_to m_stream; + const std::string_view m_table; + const std::chrono::steady_clock::time_point m_start; + +}; +#endif class Transaction_Owner_Base { @@ -102,6 +153,12 @@ class Transaction_Manager { return res; } +#if PQXX_VERSION_MAJOR >= 7 + Stream_Wrapper to_stream(std::string_view table, std::string_view columns) { + return Stream_Wrapper(m_txn, table, columns); + } +#endif + private: pqxx::transaction_base & m_txn; std::set& m_prep_stmt; diff --git a/src/backend/apidb/changeset_upload/node_updater.cpp b/src/backend/apidb/changeset_upload/node_updater.cpp index a923168e..d994573d 100644 --- a/src/backend/apidb/changeset_upload/node_updater.cpp +++ b/src/backend/apidb/changeset_upload/node_updater.cpp @@ -675,6 +675,8 @@ std::vector ApiDB_Node_Updater::insert_new_current_node_tags( if (nodes.empty()) return {}; +#if PQXX_VERSION_MAJOR < 7 + m.prepare("insert_new_current_node_tags", R"( @@ -712,6 +714,23 @@ std::vector ApiDB_Node_Updater::insert_new_current_node_tags( if (r.affected_rows() != total_tags) throw http::server_error("Could not create new current node tags"); +#else + + std::vector ids; + + auto stream = m.to_stream("current_node_tags", "node_id, k, v"); + + for (const auto &node : nodes) { + for (const auto &tag : node.tags) { + stream.write_values(node.id, tag.first, tag.second); + ids.emplace_back(node.id); + } + } + + stream.complete(); + +#endif + // prepare list of node ids with tags std::sort(ids.begin(), ids.end()); ids.erase(std::unique(ids.begin(), ids.end()), ids.end()); diff --git a/src/backend/apidb/changeset_upload/relation_updater.cpp b/src/backend/apidb/changeset_upload/relation_updater.cpp index 3fcd5885..3bda0293 100644 --- a/src/backend/apidb/changeset_upload/relation_updater.cpp +++ b/src/backend/apidb/changeset_upload/relation_updater.cpp @@ -1296,6 +1296,8 @@ std::vector ApiDB_Relation_Updater::insert_new_current_relation_t if (relations.empty()) return {}; +#if PQXX_VERSION_MAJOR < 7 + m.prepare("insert_new_current_relation_tags", R"( @@ -1333,10 +1335,29 @@ std::vector ApiDB_Relation_Updater::insert_new_current_relation_t if (r.affected_rows() != total_tags) throw http::server_error("Could not create new current relation tags"); + +#else + + std::vector ids; + + auto stream = m.to_stream("current_relation_tags", "relation_id, k, v"); + + for (const auto &relation : relations) { + for (const auto &tag : relation.tags) { + stream.write_values(relation.id, tag.first, tag.second); + ids.emplace_back(relation.id); + } + } + + stream.complete(); + +#endif + // prepare list of relation ids with tags std::sort(ids.begin(), ids.end()); ids.erase(std::unique(ids.begin(), ids.end()), ids.end()); return ids; + } void ApiDB_Relation_Updater::insert_new_current_relation_members( @@ -1345,6 +1366,8 @@ void ApiDB_Relation_Updater::insert_new_current_relation_members( if (relations.empty()) return; +#if PQXX_VERSION_MAJOR < 7 + m.prepare("insert_new_current_relation_members", R"( @@ -1378,6 +1401,19 @@ void ApiDB_Relation_Updater::insert_new_current_relation_members( pqxx::result r = m.exec_prepared("insert_new_current_relation_members", ids, membertypes, memberids, memberroles, sequenceids); +#else + + auto stream = m.to_stream("current_relation_members", "relation_id, member_type, member_id, member_role, sequence_id"); + + for (const auto &relation : relations) { + for (const auto &member : relation.members) { + stream.write_values(relation.id, member.member_type, member.member_id, member.member_role, member.sequence_id); + } + } + + stream.complete(); + +#endif } void ApiDB_Relation_Updater::save_current_relations_to_history( diff --git a/src/backend/apidb/changeset_upload/way_updater.cpp b/src/backend/apidb/changeset_upload/way_updater.cpp index 04122d6f..d4da25fa 100644 --- a/src/backend/apidb/changeset_upload/way_updater.cpp +++ b/src/backend/apidb/changeset_upload/way_updater.cpp @@ -693,8 +693,10 @@ void ApiDB_Way_Updater::update_current_ways(const std::vector &ways, std::vector ApiDB_Way_Updater::insert_new_current_way_tags( const std::vector &ways) { - if (ways.empty()) - return {}; + if (ways.empty()) + return {}; + +#if PQXX_VERSION_MAJOR < 7 m.prepare("insert_new_current_way_tags", @@ -733,6 +735,23 @@ std::vector ApiDB_Way_Updater::insert_new_current_way_tags( if (r.affected_rows() != total_tags) throw http::server_error("Could not create new current way tags"); +#else + + std::vector ids; + + auto stream = m.to_stream("current_way_tags", "way_id, k, v"); + + for (const auto &way : ways) { + for (const auto &tag : way.tags) { + stream.write_values(way.id, tag.first, tag.second); + ids.emplace_back(way.id); + } + } + + stream.complete(); + +#endif + // prepare list of way ids with tags std::sort(ids.begin(), ids.end()); ids.erase(std::unique(ids.begin(), ids.end()), ids.end()); @@ -745,6 +764,8 @@ void ApiDB_Way_Updater::insert_new_current_way_nodes( if (ways.empty()) return; +#if PQXX_VERSION_MAJOR < 7 + m.prepare("insert_new_current_way_nodes", R"( @@ -772,6 +793,19 @@ void ApiDB_Way_Updater::insert_new_current_way_nodes( pqxx::result r = m.exec_prepared("insert_new_current_way_nodes", ids, nodeids, sequenceids); +#else + + auto stream = m.to_stream("current_way_nodes", "way_id, node_id, sequence_id"); + + for (const auto &way : ways) { + for (const auto &wn : way.way_nodes) { + stream.write_values(way.id, wn.node_id, wn.sequence_id); + } + } + + stream.complete(); + +#endif } void ApiDB_Way_Updater::save_current_ways_to_history(