From 9c3d43f7739c5f8d02b8517d19f2b2f6462415e7 Mon Sep 17 00:00:00 2001 From: Christian von Elm Date: Mon, 10 Feb 2025 14:52:09 +0100 Subject: [PATCH] fofoo --- CMakeLists.txt | 3 +- include/lo2s/cupti/events.hpp | 48 --- include/lo2s/dwarf_resolve.hpp | 8 +- include/lo2s/execution_scope.hpp | 2 +- include/lo2s/function_resolver.hpp | 65 ++- include/lo2s/instruction_resolver.hpp | 8 +- include/lo2s/measurement_scope.hpp | 41 +- include/lo2s/monitor/main_monitor.hpp | 2 + include/lo2s/monitor/ringbuf_monitor.hpp | 13 +- include/lo2s/monitor/socket_monitor.hpp | 3 +- include/lo2s/perf/calling_context_manager.hpp | 13 +- include/lo2s/perf/event_attr.hpp | 6 + include/lo2s/perf/event_reader.hpp | 4 - include/lo2s/perf/sample/writer.hpp | 2 +- include/lo2s/perf/types.hpp | 38 +- include/lo2s/process_info.hpp | 56 ++- include/lo2s/ringbuf.hpp | 386 ++++++++++++++---- include/lo2s/trace/reg_keys.hpp | 2 +- include/lo2s/trace/trace.hpp | 26 +- include/lo2s/util.hpp | 12 +- lib/otf2xx | 2 +- src/cupti/lib.cpp | 43 +- src/cupti/test.cpp | 120 ++---- src/dwarf_resolve.cpp | 15 +- src/monitor/cpu_set_monitor.cpp | 6 +- src/monitor/main_monitor.cpp | 43 +- src/monitor/process_monitor.cpp | 13 +- src/monitor/ringbuf_monitor.cpp | 61 ++- src/monitor/socket_monitor.cpp | 7 +- src/monitor/system_process_monitor.cpp | 2 +- src/perf/sample/writer.cpp | 12 +- src/process_info.cpp | 269 +++++++----- src/trace/trace.cpp | 170 ++++---- 33 files changed, 915 insertions(+), 586 deletions(-) delete mode 100644 include/lo2s/cupti/events.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 3437ec3a..18f67fb1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -98,6 +98,7 @@ add_subdirectory(lib/otf2xx) mark_as_advanced(OTF2XX_WITH_MPI OTF2_CONFIG OTF2_PRINT) # configure fmtlib submodule +set(BUILD_SHARED_LIBS ON) add_subdirectory(lib/fmt) # configure Nitro submodule @@ -349,7 +350,7 @@ if (USE_LIBAUDIT) endif() endif() -add_executable(rb_test src/cupti/test.cpp) +add_executable(rb_test src/cupti/test.cpp src/types.cpp) target_include_directories(rb_test PRIVATE include ${CMAKE_CURRENT_BINARY_DIR}/include) target_link_libraries(rb_test PRIVATE fmt::fmt Nitro::log diff --git a/include/lo2s/cupti/events.hpp b/include/lo2s/cupti/events.hpp deleted file mode 100644 index 060e73f3..00000000 --- a/include/lo2s/cupti/events.hpp +++ /dev/null @@ -1,48 +0,0 @@ -/* - * This file is part of the lo2s software. - * Linux OTF2 sampling - * - * Copyright (c) 2024, - * Technische Universitaet Dresden, Germany - * - * lo2s is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * lo2s is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with lo2s. If not, see . - */ - -#pragma once - -#include -#include - -namespace lo2s -{ -enum class EventType : uint64_t -{ - CUPTI_KERNEL = 1, -}; - -struct event_header -{ - EventType type; - uint64_t size; -}; - -struct event_kernel -{ - struct event_header header; - uint64_t start; - uint64_t end; - char name[1]; -}; - -} // namespace lo2s diff --git a/include/lo2s/dwarf_resolve.hpp b/include/lo2s/dwarf_resolve.hpp index cc56bdba..43f9ec69 100644 --- a/include/lo2s/dwarf_resolve.hpp +++ b/include/lo2s/dwarf_resolve.hpp @@ -20,11 +20,15 @@ class DwarfFunctionResolver : public FunctionResolver public: DwarfFunctionResolver(std::string name); - static FunctionResolver& cache(std::string name) + static std::shared_ptr cache(std::string name) { - return StringCache::instance()[name]; + return BinaryCache::instance()[name]; } + DwarfFunctionResolver(DwarfFunctionResolver&) = delete; + DwarfFunctionResolver& operator=(DwarfFunctionResolver&) = delete; + DwarfFunctionResolver(DwarfFunctionResolver&&) = delete; + DwarfFunctionResolver& operator=(DwarfFunctionResolver&&) = delete; ~DwarfFunctionResolver(); virtual LineInfo lookup_line_info(Address addr) override; diff --git a/include/lo2s/execution_scope.hpp b/include/lo2s/execution_scope.hpp index b68925c8..aa1f1403 100644 --- a/include/lo2s/execution_scope.hpp +++ b/include/lo2s/execution_scope.hpp @@ -72,7 +72,7 @@ class ExecutionScope case ExecutionScopeType::THREAD: return fmt::format("thread {}", id); case ExecutionScopeType::PROCESS: - return fmt::format("process {}"); + return fmt::format("process {}", id); case ExecutionScopeType::CPU: return fmt::format("cpu {}", id); default: diff --git a/include/lo2s/function_resolver.hpp b/include/lo2s/function_resolver.hpp index a5ee6e51..b4c0391a 100644 --- a/include/lo2s/function_resolver.hpp +++ b/include/lo2s/function_resolver.hpp @@ -36,9 +36,9 @@ class FunctionResolver { } - static FunctionResolver& cache(const std::string& name) + static std::shared_ptr cache(const std::string& name) { - return StringCache::instance()[name]; + return BinaryCache::instance()[name]; } virtual LineInfo lookup_line_info(Address) @@ -51,18 +51,71 @@ class FunctionResolver return name_; } + virtual void insert(std::map& functions [[maybe_unused]]) + { + throw std::runtime_error("Class does not support inserting elements"); + } + + virtual ~FunctionResolver() + { + } + protected: std::string name_; }; +class ManualFunctionResolver : public FunctionResolver +{ +public: + ManualFunctionResolver(const std::string& name) : FunctionResolver(name) + { + } + + static std::shared_ptr cache(const std::string& name) + { + return BinaryCache::instance()[name]; + } + + virtual LineInfo lookup_line_info(Address address) override + { + if (functions_.count(address)) + { + return LineInfo::for_function("", functions_.at(address).c_str(), 1, ""); + } + return LineInfo::for_unknown_function(); + } + + void insert(std::map& functions) override + { + for (auto& function : functions) + { + functions_.insert(function); + } + } + + std::string name() + { + return name_; + } + +protected: + std::map functions_; + std::string name_; +}; class Kallsyms : public FunctionResolver { public: Kallsyms() : FunctionResolver("[kernel]") { std::map entries; - std::ifstream ksyms_file("/proc/kallsyms"); + std::ifstream ksyms_file; + ksyms_file.exceptions(std::ifstream::badbit); + ksyms_file.open("/proc/kallsyms"); + if (!ksyms_file.good()) + { + return; + } std::regex ksym_regex("([0-9a-f]+) (?:t|T) ([^[:space:]]+)"); std::smatch ksym_match; @@ -107,9 +160,9 @@ class Kallsyms : public FunctionResolver } } - static Kallsyms& cache() + static std::shared_ptr cache() { - static Kallsyms k; + static std::shared_ptr k = std::make_shared(); return k; } @@ -125,6 +178,6 @@ class Kallsyms : public FunctionResolver private: std::map kallsyms_; - uint64_t start_; + uint64_t start_ = UINT64_MAX; }; } // namespace lo2s diff --git a/include/lo2s/instruction_resolver.hpp b/include/lo2s/instruction_resolver.hpp index 6633c6b4..bb3b80bd 100644 --- a/include/lo2s/instruction_resolver.hpp +++ b/include/lo2s/instruction_resolver.hpp @@ -48,6 +48,10 @@ class InstructionResolver { return ""; } + + virtual ~InstructionResolver() + { + } }; #ifdef HAVE_RADARE class RadareInstructionResolver : public InstructionResolver @@ -57,9 +61,9 @@ class RadareInstructionResolver : public InstructionResolver { } - static RadareInstructionResolver& cache(const std::string& name) + static std::shared_ptr cache(const std::string& name) { - return StringCache::instance()[name]; + return BinaryCache::instance()[name]; } virtual std::string lookup_instruction(Address ip) diff --git a/include/lo2s/measurement_scope.hpp b/include/lo2s/measurement_scope.hpp index 8e9378fa..5221a762 100644 --- a/include/lo2s/measurement_scope.hpp +++ b/include/lo2s/measurement_scope.hpp @@ -33,7 +33,7 @@ enum class MeasurementScopeType NEC_METRIC, BIO, SYSCALL, - CUDA, + RB, TRACEPOINT, UNKNOWN }; @@ -42,12 +42,14 @@ struct MeasurementScope { MeasurementScopeType type; ExecutionScope scope; + std::string rb_name = ""; MeasurementScope() : type(MeasurementScopeType::UNKNOWN), scope() { } - MeasurementScope(MeasurementScopeType type, ExecutionScope s) : type(type), scope(s) + MeasurementScope(MeasurementScopeType type, ExecutionScope s, std::string name = "") + : type(type), scope(s), rb_name(name) { } @@ -81,9 +83,9 @@ struct MeasurementScope return { MeasurementScopeType::SYSCALL, s }; } - static MeasurementScope cuda(ExecutionScope s) + static MeasurementScope rb(ExecutionScope s, std::string name) { - return { MeasurementScopeType::CUDA, s }; + return { MeasurementScopeType::RB, s, name }; } static MeasurementScope tracepoint(ExecutionScope s) @@ -93,19 +95,40 @@ struct MeasurementScope friend bool operator==(const MeasurementScope& lhs, const MeasurementScope& rhs) { + if (lhs.type == MeasurementScopeType::RB && rhs.type == MeasurementScopeType::RB) + { + return lhs.scope == rhs.scope && lhs.rb_name == rhs.rb_name; + } + return (lhs.scope == rhs.scope) && lhs.type == rhs.type; } + MeasurementScope from_ex_scope(ExecutionScope new_scope) + { + return MeasurementScope(type, new_scope, rb_name); + } + friend bool operator<(const MeasurementScope& lhs, const MeasurementScope& rhs) { - if (lhs.type != rhs.type) + if (lhs.type == MeasurementScopeType::RB && rhs.type == MeasurementScopeType::RB) { - return lhs.type < rhs.type; + if (lhs.scope == rhs.scope) + { + return lhs.rb_name < rhs.rb_name; + } + else + { + return lhs.scope < rhs.scope; + } } - else + if (lhs.type == rhs.type) { return lhs.scope < rhs.scope; } + else + { + return lhs.type < rhs.type; + } } std::string name() const @@ -123,8 +146,8 @@ struct MeasurementScope return fmt::format("block layer I/O events for {}", scope.name()); case MeasurementScopeType::SYSCALL: return fmt::format("syscall events for {}", scope.name()); - case lo2s::MeasurementScopeType::CUDA: - return fmt::format("cuda kernel events for {}", scope.name()); + case lo2s::MeasurementScopeType::RB: + return fmt::format("{} events for {}", rb_name, scope.name()); case MeasurementScopeType::TRACEPOINT: return fmt::format("tracepoint events for {}", scope.name()); default: diff --git a/include/lo2s/monitor/main_monitor.hpp b/include/lo2s/monitor/main_monitor.hpp index 32b81f8b..12999bac 100644 --- a/include/lo2s/monitor/main_monitor.hpp +++ b/include/lo2s/monitor/main_monitor.hpp @@ -64,6 +64,8 @@ class MainMonitor void insert_cached_events(const RawMemoryMapCache& cached_events, const RawCommCache& cached_comms); + void + insert_cached_cctx(std::map>& cached_cctx); ProcessMap& get_process_infos() { return process_infos_; diff --git a/include/lo2s/monitor/ringbuf_monitor.hpp b/include/lo2s/monitor/ringbuf_monitor.hpp index bf9d05f0..ceb6b255 100644 --- a/include/lo2s/monitor/ringbuf_monitor.hpp +++ b/include/lo2s/monitor/ringbuf_monitor.hpp @@ -33,7 +33,7 @@ namespace monitor class RingbufMonitor : public PollMonitor { public: - RingbufMonitor(trace::Trace& trace, int fd); + RingbufMonitor(trace::Trace& trace, MainMonitor& main_monitor, int fd); void initialize_thread() override; void finalize_thread() override; @@ -45,9 +45,16 @@ class RingbufMonitor : public PollMonitor } private: - trace::Trace& trace_; perf::time::Converter& time_converter_; - RingBufReader ringbuf_reader_; + RingbufReader ringbuf_reader_; + MeasurementScope scope_; + otf2::writer::local& rb_writer_; + std::optional last_cctx_ref_; + MainMonitor& main_monitor_ [[maybe_unused]]; + LocalCctxMap& local_cctx_map_; + std::map> functions_; + otf2::chrono::time_point last_; + bool entered_ = false; }; } // namespace monitor } // namespace lo2s diff --git a/include/lo2s/monitor/socket_monitor.hpp b/include/lo2s/monitor/socket_monitor.hpp index f5dacd9f..bf631b75 100644 --- a/include/lo2s/monitor/socket_monitor.hpp +++ b/include/lo2s/monitor/socket_monitor.hpp @@ -40,7 +40,7 @@ namespace monitor class SocketMonitor : public PollMonitor { public: - SocketMonitor(trace::Trace& trace); + SocketMonitor(trace::Trace& trace, MainMonitor& main_monitor); void initialize_thread() override; void finalize_thread() override; @@ -53,6 +53,7 @@ class SocketMonitor : public PollMonitor private: trace::Trace& trace_; + MainMonitor& main_monitor_; std::map monitors_; int socket = -1; }; diff --git a/include/lo2s/perf/calling_context_manager.hpp b/include/lo2s/perf/calling_context_manager.hpp index 268a0469..9ee7db8a 100644 --- a/include/lo2s/perf/calling_context_manager.hpp +++ b/include/lo2s/perf/calling_context_manager.hpp @@ -22,7 +22,7 @@ #include #include - +#include #include extern "C" @@ -46,7 +46,7 @@ struct LocalCctx class LocalCctxMap { public: - LocalCctxMap() + LocalCctxMap(MeasurementScope scope) : scope_(scope) { } @@ -79,7 +79,7 @@ class LocalCctxMap // information. // // Having these things in mind, look at this line and tell me, why it is still wrong: - auto children = &map[p]; + auto children = &map[scope_.from_ex_scope(p.as_scope())]; uint64_t ref = -1; for (uint64_t i = num_ips - 1;; i--) { @@ -112,7 +112,7 @@ class LocalCctxMap otf2::definition::calling_context::reference_type sample_ref(Process p, uint64_t ip) { - auto it = find_ip_child(ip, map[p]); + auto it = find_ip_child(ip, map[scope_.from_ex_scope(p.as_scope())]); return it->second.ref; } @@ -140,7 +140,7 @@ class LocalCctxMap return thread_cctxs_; } - const std::map>& get_functions() const + const std::map>& get_functions() const { return map; } @@ -170,8 +170,9 @@ class LocalCctxMap } private: - std::map> map; + std::map> map; std::map> thread_cctxs_; + MeasurementScope scope_; /* * Stores calling context information for each sample writer / monitoring thread. diff --git a/include/lo2s/perf/event_attr.hpp b/include/lo2s/perf/event_attr.hpp index b6bf2cca..82171c28 100644 --- a/include/lo2s/perf/event_attr.hpp +++ b/include/lo2s/perf/event_attr.hpp @@ -66,6 +66,7 @@ enum class EventFlag COMM, CONTEXT_SWITCH, MMAP, + MMAP2, EXCLUDE_KERNEL, TASK, ENABLE_ON_EXEC, @@ -193,6 +194,9 @@ class EventAttr case EventFlag::DISABLED: attr_.disabled = 1; break; + case EventFlag::MMAP2: + attr_.mmap2 = 1; + break; } } } @@ -217,6 +221,8 @@ class EventAttr return attr_.enable_on_exec == 1; case EventFlag::DISABLED: return attr_.disabled == 1; + case EventFlag::MMAP2: + return attr_.mmap2 == 1; default: return false; } diff --git a/include/lo2s/perf/event_reader.hpp b/include/lo2s/perf/event_reader.hpp index 587e8726..8d561e91 100644 --- a/include/lo2s/perf/event_reader.hpp +++ b/include/lo2s/perf/event_reader.hpp @@ -66,7 +66,6 @@ class EventReader using RecordUnknownType = perf_event_header; using RecordMmapType = lo2s::RecordMmapType; - using RecordMmap2Type = lo2s::RecordMmap2Type; using RecordCommType = lo2s::RecordCommType; struct RecordLostType @@ -176,9 +175,6 @@ class EventReader case PERF_RECORD_MMAP: stop = crtp_this->handle((const RecordMmapType*)event_header_p); break; - case PERF_RECORD_MMAP2: - stop = crtp_this->handle((const RecordMmap2Type*)event_header_p); - break; case PERF_RECORD_SWITCH: stop = crtp_this->handle((const RecordSwitchType*)event_header_p); break; diff --git a/include/lo2s/perf/sample/writer.hpp b/include/lo2s/perf/sample/writer.hpp index b589dbac..59759d08 100644 --- a/include/lo2s/perf/sample/writer.hpp +++ b/include/lo2s/perf/sample/writer.hpp @@ -63,7 +63,7 @@ class Writer : public Reader public: using Reader::handle; bool handle(const Reader::RecordSampleType* sample); - bool handle(const Reader::RecordMmap2Type* mmap_event); + bool handle(const Reader::RecordMmapType* mmap_event); bool handle(const Reader::RecordCommType* comm); bool handle(const Reader::RecordSwitchCpuWideType* context_switch); bool handle(const Reader::RecordSwitchType* context_switch); diff --git a/include/lo2s/perf/types.hpp b/include/lo2s/perf/types.hpp index 64468c99..471df6cf 100644 --- a/include/lo2s/perf/types.hpp +++ b/include/lo2s/perf/types.hpp @@ -43,6 +43,16 @@ class PerfEventCache PerfEventCache(const PerfEventCache&) = delete; PerfEventCache& operator=(const PerfEventCache&) = delete; + PerfEventCache(PerfEventCache&& other) + { + std::swap(data_, other.data_); + } + + PerfEventCache& operator=(PerfEventCache&& other) + { + std::swap(data_, other.data_); + return *this; + } PerfEventCache(const T* event) : data_(std::make_unique(event->header.size)) { memcpy(data_.get(), event, event->header.size); @@ -101,33 +111,7 @@ struct RecordCommType // struct sample_id id; }; -struct RecordMmap2Type -{ - // BAD things happen if you try this - RecordMmap2Type() = delete; - RecordMmap2Type(const RecordMmap2Type&) = delete; - RecordMmap2Type& operator=(const RecordMmap2Type&) = delete; - RecordMmap2Type(RecordMmap2Type&&) = delete; - RecordMmap2Type& operator=(RecordMmap2Type&&) = delete; - - struct perf_event_header header; - uint32_t pid; - uint32_t tid; - uint64_t addr; - uint64_t len; - uint64_t pgoff; - uint32_t maj; - uint32_t min; - uint64_t ino; - uint64_t ino_generation; - uint32_t prot; - uint32_t flags; - // Note ISO C++ forbids zero-size array, but this struct is exclusively used as pointer - char filename[1]; - // struct sample_id sample_id; -}; - -using RawMemoryMapCache = std::deque>; +using RawMemoryMapCache = std::deque>; using RawCommCache = std::deque>; } // namespace lo2s diff --git a/include/lo2s/process_info.hpp b/include/lo2s/process_info.hpp index cccae59a..903351a6 100644 --- a/include/lo2s/process_info.hpp +++ b/include/lo2s/process_info.hpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -48,34 +49,61 @@ class ProcessInfo return process_; } - void mmap(const RecordMmap2Type& entry) + void mmap(const RecordMmapType& entry) { mmap(entry.addr, entry.addr + entry.len, entry.pgoff, entry.filename); } void mmap(Address addr, Address end, Address pgoff, std::string filename); - LineInfo lookup_line_info(Address ip); - std::string lookup_instruction(Address ip) const; + void insert_functions(MeasurementScope scope, std::map functions); + + LineInfo lookup_line_info(MeasurementScope scope, Address ip); + std::string lookup_instruction(MeasurementScope scope, Address ip) const; private: + void emplace_fr(Address addr, Address end, Address pgoff, std::string filename); + void emplace_ir(Address addr, Address end, Address pgoff, std::string filename); struct Mapping { - Mapping(Address s, Address e, Address o, FunctionResolver& f, InstructionResolver& i) - : start(s), end(e), pgoff(o), fr(f), ir(i) + Mapping(Address s, Address e, Address o) : range(s, e), pgoff(o) + { + } + + Mapping(Address s) : range(s), pgoff(0) { } - Address start; - Address end; + Range range; Address pgoff; - FunctionResolver& fr; - InstructionResolver& ir; + + bool operator==(const Address& other) const + { + return other >= range.start && other < range.end; + } + + bool operator<(const Mapping& other) const + { + return range < other.range; + } + + bool operator<(const Address& other) const + { + return range < other; + } + + static Mapping max() + { + return Mapping(0, (uint64_t)-1, 0); + } }; const Process process_; mutable std::shared_mutex mutex_; - std::map map_; + std::map>> + function_resolvers_; + std::map>> + instruction_resolvers_; }; class ProcessMap @@ -85,13 +113,13 @@ class ProcessMap { } - bool has(Process p, uint64_t timepoint); + bool has(Process p); - ProcessInfo& get(Process p, uint64_t timepoint); + ProcessInfo& get(Process p); - ProcessInfo& insert(Process p, uint64_t timepoint, bool read_initial); + ProcessInfo& insert(Process p, bool read_initial); private: - std::map> infos_; + std::map infos_; }; } // namespace lo2s diff --git a/include/lo2s/ringbuf.hpp b/include/lo2s/ringbuf.hpp index 8fd859af..021d66ae 100644 --- a/include/lo2s/ringbuf.hpp +++ b/include/lo2s/ringbuf.hpp @@ -22,12 +22,14 @@ #pragma once #include +#include #include #include #include #include #include +#include #include #include @@ -35,7 +37,9 @@ extern "C" { #include #include +#include #include +#include #include } @@ -43,37 +47,87 @@ namespace lo2s { // To resolve possible ringbuf format incompatibilities -#define RINGBUF_VERSION 1 +// Increase everytime you: +// - change the ringbuf_header +// - add, delete or change events +#define RINGBUF_VERSION 2 +enum class LOCATION_TYPE : uint64_t +{ + CPU = 0, + PROCESS = 1 +}; struct ringbuf_header { uint64_t version; uint64_t size; std::atomic_uint64_t head; std::atomic_uint64_t tail; + // set by the writer side (CUPTI, etc.) + LOCATION_TYPE loc_type; + + union + { + pid_t pid; + int cpuid; + }; + + char measurement_name[255]; + // set by the reader size (lo2s) + uint64_t lo2s_ready; + clockid_t clockid; +}; + +enum class EventType : uint64_t +{ + CCTX_ENTER = 1, + CCTX_LEAVE = 2, + CCTX_SAMPLE = 3, + CCTX_DEF = 4 +}; + +struct event_header +{ + EventType type; + uint64_t size; +}; + +struct cctx_def +{ + struct event_header header; + uint64_t addr; + char function[1]; +}; + +struct cctx_enter +{ + struct event_header header; + uint64_t tp; + uint64_t addr; +}; + +struct cctx_leave +{ + struct event_header header; + uint64_t tp; + uint64_t addr; }; class ShmRingbuf { public: - ShmRingbuf(int fd, bool create, size_t pages) : fd_(fd) + ShmRingbuf(int fd) : fd_(fd) { size_t pagesize = sysconf(_SC_PAGESIZE); - size_t size; + auto header_map = SharedMemory(fd_, sizeof(struct ringbuf_header), 0); + size_t size = header_map.as()->size; - if (create) + uint64_t version = header_map.as()->version; + if (version != RINGBUF_VERSION) { - size = pagesize * pages; - if (ftruncate(fd_, size + sysconf(_SC_PAGESIZE)) == -1) - { - close(fd_); - throw std::system_error(errno, std::system_category()); - } - } - else - { - auto header_map = SharedMemory(fd_, sizeof(struct ringbuf_header), 0); - size = header_map.as()->size; + throw new std::runtime_error("Incompatible Ringbuffer Version" + + std::to_string(RINGBUF_VERSION) + + " (us) != " + std::to_string(version) + " (other side)!"); } // To handle events that wrap around the ringbuffer, map it twice into virtual memory @@ -96,48 +150,63 @@ class ShmRingbuf header_ = first_mapping_.as(); start_ = first_mapping_.as() + pagesize; + } - if (create) + std::byte* head(size_t ev_size) + { + if (header_->head >= header_->tail && + ev_size >= header_->tail - header_->head + header_->size) { - header_->version = RINGBUF_VERSION; - header_->size = size; - header_->tail.store(0); - header_->head.store(0); + return nullptr; } - else + if (header_->head < header_->tail && ev_size >= header_->tail - header_->head) { - if (header_->version != RINGBUF_VERSION) - { - throw new std::runtime_error("Incompatible RingBuffer Version " + - std::to_string(header_->version) + - " detected on other side!"); - } + return nullptr; } + + return start_ + header_->head.load(); } - uint64_t head() + std::byte* tail(uint64_t ev_size) { - return header_->head.load(); + if (!can_be_loaded(ev_size)) + { + return nullptr; + } + return start_ + header_->tail.load(); } - uint64_t tail() + void advance_head(size_t ev_size) { - return header_->tail.load(); + header_->head.store((header_->head.load() + ev_size) % header_->size); } - void head(uint64_t new_head) + void advance_tail(size_t ev_size) { - return header_->head.store(new_head); + // Calling pop() without trying to get() data from the ringbuffer first is an error + assert(can_be_loaded(ev_size)); + + header_->tail.store(header_->tail.load() + ev_size % header_->size); + } + + bool can_be_loaded(size_t ev_size) + { + if (header_->tail.load() <= header_->head.load()) + { + return header_->tail.load() + ev_size <= header_->head.load(); + } + + return header_->tail.load() + ev_size <= header_->head.load() + header_->size; } - void tail(uint64_t new_tail) + int fd() { - return header_->tail.store(new_tail); + return fd_; } - uint64_t size() + struct ringbuf_header* header() { - return header_->size; + return header_; } protected: @@ -149,11 +218,56 @@ class ShmRingbuf SharedMemory first_mapping_, second_mapping_; }; -class RingBufWriter : public ShmRingbuf +class RingbufWriter { public: - RingBufWriter(int fd, bool create, size_t pages = 0) : ShmRingbuf(fd, create, pages) + RingbufWriter(size_t pages, ExecutionScope scope, std::string measurement_name) { + int fd = memfd_create("foo", 0); + + if (fd == -1) + { + throw ::std::system_error(errno, std::system_category()); + } + + size_t pagesize = sysconf(_SC_PAGESIZE); + size_t size; + + size = pagesize * pages; + + if (ftruncate(fd, size + sysconf(_SC_PAGESIZE)) == -1) + { + close(fd); + throw std::system_error(errno, std::system_category()); + } + + auto header_map = SharedMemory(fd, sizeof(struct ringbuf_header), 0); + + struct ringbuf_header* first_header = header_map.as(); + memset((void*)first_header, 0, sizeof(struct ringbuf_header)); + first_header->version = RINGBUF_VERSION; + first_header->size = size; + first_header->tail.store(0); + first_header->head.store(0); + memcpy(first_header->measurement_name, measurement_name.c_str(), measurement_name.size()); + + if (scope.is_process()) + { + first_header->loc_type = LOCATION_TYPE::PROCESS; + first_header->pid = scope.as_process().as_pid_t(); + } + else if (scope.is_cpu()) + { + first_header->loc_type = LOCATION_TYPE::CPU; + first_header->cpuid = scope.as_cpu().as_int(); + } + else + { + throw std::runtime_error("Unallowed ExecutionScope type: " + scope.name()); + } + + rb_ = std::make_unique(fd); + write_fd(); } std::byte* reserve(size_t ev_size) @@ -166,50 +280,184 @@ class RingBufWriter : public ShmRingbuf // No other reservation can be active! assert(reserved_size_ == 0); - if (head() >= tail() && ev_size >= tail() - head() + size()) + reserved_size_ = ev_size; + return rb_->head(ev_size); + } + + const struct ringbuf_header* header() + { + return rb_->header(); + } + + bool ready() + { + return rb_->header()->lo2s_ready; + } + + void commit() + { + assert(reserved_size_ != 0); + rb_->advance_head(reserved_size_); + reserved_size_ = 0; + } + + bool cctx_def(uint64_t addr, std::string func) + { + struct cctx_def* ev = + reinterpret_cast(reserve(sizeof(struct cctx_def) + func.size())); + + memset(ev, 0, sizeof(*ev)); + if (ev == nullptr) { - return nullptr; + return false; } - if (head() < tail() && ev_size >= tail() - head()) + memcpy(ev->function, func.c_str(), func.size()); + ev->header.type = EventType::CCTX_DEF; + ev->header.size = sizeof(struct cctx_enter) + func.size(); + ev->addr = addr; + commit(); + return true; + } + + bool cctx_enter(uint64_t tp, uint64_t addr) + { + struct cctx_enter* ev = + reinterpret_cast(reserve(sizeof(struct cctx_enter))); + + if (ev == nullptr) { - return nullptr; + return false; } - reserved_size_ = ev_size; - return start_ + head(); + ev->header.type = EventType::CCTX_ENTER; + ev->header.size = sizeof(struct cctx_enter); + ev->tp = tp; + ev->addr = addr; + commit(); + return true; } - void commit() + bool cctx_leave(uint64_t tp, uint64_t addr) { - assert(reserved_size_ != 0); + struct cctx_leave* ev = + reinterpret_cast(reserve(sizeof(struct cctx_leave))); - head((head() + reserved_size_) % size()); - reserved_size_ = 0; + if (ev == nullptr) + { + return false; + } + + ev->header.type = EventType::CCTX_LEAVE; + ev->header.size = sizeof(struct cctx_leave); + ev->tp = tp; + ev->addr = addr; + commit(); + return true; + } + + bool cctx_sample(uint64_t tp, uint64_t addr) + { + struct cctx_enter* ev = + reinterpret_cast(reserve(sizeof(struct cctx_enter))); + + if (ev == nullptr) + { + return false; + } + + ev->header.type = EventType::CCTX_SAMPLE; + ev->addr = addr; + ev->tp = tp; + + commit(); + return true; } private: + void write_fd() + { + char* socket_path = getenv("LO2S_SOCKET"); + if (socket_path == nullptr) + { + throw std::runtime_error("LO2S_SOCKET is not set!"); + } + + int data_socket = socket(AF_UNIX, SOCK_SEQPACKET, 0); + if (data_socket == -1) + { + throw std::system_error(errno, std::system_category()); + } + + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1); + + int ret = connect(data_socket, (const struct sockaddr*)&addr, sizeof(addr)); + if (ret < 0) + { + throw std::system_error(errno, std::system_category()); + } + + struct msghdr msg; + struct iovec iov[1]; + + union + { + struct cmsghdr cm; + char control[CMSG_SPACE(sizeof(int))]; + } control_un; + + struct cmsghdr* cmptr; + msg.msg_control = control_un.control; + msg.msg_controllen = sizeof(control_un.control); + + cmptr = CMSG_FIRSTHDR(&msg); + cmptr->cmsg_len = CMSG_LEN(sizeof(int)); + cmptr->cmsg_level = SOL_SOCKET; + cmptr->cmsg_type = SCM_RIGHTS; + *((int*)CMSG_DATA(cmptr)) = rb_->fd(); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + + // We need to include atleast one byte of sentinel data to be able + // to discern between fd messages and EOF + char foo = 42; + iov[0].iov_base = &foo; + iov[0].iov_len = 1; + msg.msg_iov = iov; + msg.msg_iovlen = 1; + sendmsg(data_socket, &msg, 0); + close(data_socket); + } size_t reserved_size_ = 0; + std::unique_ptr rb_; }; -class RingBufReader : public ShmRingbuf +class RingbufReader { public: - RingBufReader(int fd, bool create, size_t pages = 0) : ShmRingbuf(fd, create, pages) + RingbufReader(int fd, clockid_t clockid) : rb_(std::make_unique(fd)) { + rb_->header()->clockid = clockid; + rb_->header()->lo2s_ready = 1; } - std::byte* get(size_t size) + const struct ringbuf_header* header() { - if (size == 0) - { - return nullptr; - } + return rb_->header(); + } - if (!can_be_loaded(size)) + std::byte* get(size_t ev_size) + { + if (ev_size == 0) { return nullptr; } - return start_ + tail(); + + return rb_->tail(ev_size); } void pop(size_t ev_size) @@ -218,22 +466,22 @@ class RingBufReader : public ShmRingbuf { return; } - - // Calling pop() without trying to get() data from the ringbuffer first is an error - assert(can_be_loaded(ev_size)); - - tail((tail() + ev_size) % size()); + rb_->advance_tail(ev_size); } -private: - bool can_be_loaded(size_t ev_size) + ExecutionScope get_location() { - if (tail() <= head()) + switch (rb_->header()->loc_type) { - return tail() + ev_size <= head(); + case LOCATION_TYPE::CPU: + return ExecutionScope(Cpu(rb_->header()->cpuid)); + case LOCATION_TYPE::PROCESS: + return ExecutionScope(Process(rb_->header()->pid)); } - - return tail() + ev_size <= head() + size(); + throw ::std::runtime_error("Unknown location type!"); } + +private: + std::unique_ptr rb_; }; } // namespace lo2s diff --git a/include/lo2s/trace/reg_keys.hpp b/include/lo2s/trace/reg_keys.hpp index c4484c89..31b55c6a 100644 --- a/include/lo2s/trace/reg_keys.hpp +++ b/include/lo2s/trace/reg_keys.hpp @@ -235,7 +235,7 @@ template <> struct Holder { using type = otf2::lookup_definition_holder; + ByProcess, ByLineInfo, BySyscall>; }; template <> diff --git a/include/lo2s/trace/trace.hpp b/include/lo2s/trace/trace.hpp index 0aee5727..8d5d2d81 100644 --- a/include/lo2s/trace/trace.hpp +++ b/include/lo2s/trace/trace.hpp @@ -70,17 +70,14 @@ class Trace otf2::chrono::time_point record_from() const; otf2::chrono::time_point record_to() const; - void add_process(Process parent, Process process, const std::string& name = ""); + void emplace_process(Process parent, Process process, const std::string& name = ""); - void add_thread(Thread t, const std::string& name); - void add_threads(const std::unordered_map& thread_map); + void emplace_thread(Thread t, const std::string& name = ""); + void emplace_threads(const std::unordered_map& thread_map); void add_monitoring_thread(Thread t, const std::string& name, const std::string& group); - void update_process_name(Process p, const std::string& name); - void update_thread_name(Thread t, const std::string& name); - - LocalCctxMap& create_local_cctx_map(); + LocalCctxMap& create_local_cctx_map(MeasurementScope scope); otf2::definition::mapping_table merge_calling_contexts(const LocalCctxMap& local_cctxs, ProcessMap& infos); void merge_calling_contexts(ProcessMap& process_infos); @@ -88,7 +85,7 @@ class Trace otf2::definition::mapping_table merge_syscall_contexts(const std::set& used_syscalls); otf2::writer::local& sample_writer(const ExecutionScope& scope); - otf2::writer::local& cuda_writer(const Thread& thread); + otf2::writer::local& rb_writer(const MeasurementScope& scope); otf2::writer::local& metric_writer(const MeasurementScope& scope); otf2::writer::local& syscall_writer(const Cpu& cpu); otf2::writer::local& bio_writer(BlockDevice dev); @@ -296,12 +293,13 @@ class Trace * This is a helper needed to avoid constant re-locking when adding * multiple threads via #add_threads. **/ - void add_thread_exclusive(Thread thread, const std::string& name, - const std::lock_guard&); + void emplace_thread_exclusive(Thread thread, const std::string& name, + const std::lock_guard&); void merge_ips(const std::map& new_children, std::map& children, std::vector& mapping_table, - otf2::definition::calling_context& parent, ProcessMap& infos, Process p); + otf2::definition::calling_context& parent, ProcessInfo& info, + MeasurementScope scope); const otf2::definition::system_tree_node bio_parent_node(BlockDevice& device) { @@ -317,14 +315,14 @@ class Trace return bio_system_tree_node_; } + void update_thread(Thread t, const std::string& name); + void update_process(Process parent, Process process, const std::string& name); const otf2::definition::string& intern_syscall_str(int64_t syscall_nr); const otf2::definition::source_code_location& intern_scl(const LineInfo&); const otf2::definition::region& intern_region(const LineInfo&); - const otf2::definition::system_tree_node& intern_process_node(Process process); - const otf2::definition::string& intern(const std::string&); void add_lo2s_property(const std::string& name, const std::string& value); @@ -351,7 +349,7 @@ class Trace std::map thread_names_; std::map global_thread_cctxs_; - std::map> calling_context_tree_; + std::map> calling_context_tree_; otf2::definition::comm_locations_group& comm_locations_group_; otf2::definition::comm_locations_group& hardware_comm_locations_group_; diff --git a/include/lo2s/util.hpp b/include/lo2s/util.hpp index efd3fca3..3f1255b6 100644 --- a/include/lo2s/util.hpp +++ b/include/lo2s/util.hpp @@ -61,23 +61,23 @@ struct MallocDelete } // namespace memory template -class StringCache +class BinaryCache { public: - static StringCache& instance() + static BinaryCache& instance() { - static StringCache l; + static BinaryCache l; return l; } - T& operator[](const std::string& name) + std::shared_ptr operator[](const std::string& name) { std::lock_guard guard(mutex_); - return elements_.try_emplace(name, name).first->second; + return elements_.try_emplace(name, std::make_shared(name)).first->second; } private: - std::unordered_map elements_; + std::unordered_map> elements_; std::mutex mutex_; }; diff --git a/lib/otf2xx b/lib/otf2xx index 746e082c..7a7f3247 160000 --- a/lib/otf2xx +++ b/lib/otf2xx @@ -1 +1 @@ -Subproject commit 746e082cb8a6a640c4b63c7140cd9416c83979c0 +Subproject commit 7a7f3247643794c5f47283627e80da2a06565e7b diff --git a/src/cupti/lib.cpp b/src/cupti/lib.cpp index e7635af6..f7f16a1a 100644 --- a/src/cupti/lib.cpp +++ b/src/cupti/lib.cpp @@ -19,7 +19,6 @@ * along with lo2s. If not, see . */ -#include #include #include @@ -40,9 +39,11 @@ extern "C" // Allocate 8 MiB every time CUPTI asks for more event memory constexpr size_t CUPTI_BUFFER_SIZE = 8 * 1024 * 1024; -std::unique_ptr rb_writer = nullptr; +std::unique_ptr rb_writer = nullptr; CUpti_SubscriberHandle subscriber = nullptr; +std::map cctxs; +uint64_t last_cctx = 0; clockid_t clockid = CLOCK_MONOTONIC_RAW; static void atExitHandler(void) @@ -87,23 +88,15 @@ static void CUPTIAPI bufferCompleted(CUcontext ctx, uint32_t streamId, uint8_t* uint64_t name_len = strlen(kernel->name); - struct lo2s::cupti::event_kernel* ev = - reinterpret_cast( - rb_writer->reserve(sizeof(struct lo2s::cupti::event_kernel) + name_len)); - - if (ev == nullptr) + if (cctxs.count(kernel->name) == 0) { - ringbuf_full_dropped++; - continue; + auto foo = cctxs.emplace(kernel->name, last_cctx); + rb_writer->cctx_def(last_cctx, kernel->name); + last_cctx++; } - ev->header.type = lo2s::cupti::EventType::CUPTI_KERNEL; - ev->header.size = sizeof(struct lo2s::cupti::event_kernel) + name_len; - ev->start = kernel->start; - ev->end = kernel->end; - memcpy(ev->name, kernel->name, name_len + 1); - - rb_writer->commit(); + rb_writer->cctx_enter(kernel->start, cctxs.at(kernel->name)); + rb_writer->cctx_leave(kernel->end, cctxs.at(kernel->name)); break; } default: @@ -186,20 +179,14 @@ uint64_t timestampfunc() extern "C" int InitializeInjection(void) { - std::string rb_size_str; - int fd = memfd_create("foo", 0); - if (fd == -1) - { - return -1; - } - rb_writer = std::make_unique(fd, true, 16); - char* clockid_str = getenv("LO2S_CLOCKID"); + pid_t pid = getpid(); + rb_writer = std::make_unique(16, lo2s::ExecutionScope(lo2s::Process(pid)), + "cuda events"); - if (clockid_str != nullptr) + while (!rb_writer->ready()) { - clockid = std::stoi(clockid_str); - } - + }; + clockid = rb_writer->header()->clockid; // Register an atexit() handler for clean-up atexit(&atExitHandler); diff --git a/src/cupti/test.cpp b/src/cupti/test.cpp index f5a76123..7bf918fe 100644 --- a/src/cupti/test.cpp +++ b/src/cupti/test.cpp @@ -19,7 +19,6 @@ * along with lo2s. If not, see . */ -#include #include #include @@ -40,110 +39,41 @@ extern "C" const char* message = ""; -ssize_t write_fd(int conn_fd, int fd) -{ - struct msghdr msg; - struct iovec iov[1]; - - union - { - struct cmsghdr cm; - char control[CMSG_SPACE(sizeof(int))]; - } control_un; - - struct cmsghdr* cmptr; - msg.msg_control = control_un.control; - msg.msg_controllen = sizeof(control_un.control); - - cmptr = CMSG_FIRSTHDR(&msg); - cmptr->cmsg_len = CMSG_LEN(sizeof(int)); - cmptr->cmsg_level = SOL_SOCKET; - cmptr->cmsg_type = SCM_RIGHTS; - *((int*)CMSG_DATA(cmptr)) = fd; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - - char foo = 42; - iov[0].iov_base = &foo; - iov[0].iov_len = 1; - msg.msg_iov = iov; - msg.msg_iovlen = 1; - return sendmsg(conn_fd, &msg, 0); -} - -std::unique_ptr rb_writer = nullptr; +std::unique_ptr rb_writer = nullptr; +std::unique_ptr rb_writer2 = nullptr; int main(void) { - char* socket_path = getenv("LO2S_SOCKET"); - std::cerr << socket_path << std::endl; - std::string rb_size_str; - int fd = memfd_create("foo", 0); - if (fd == -1) - { - return -1; - } - rb_writer = std::make_unique(fd, true, 16); - int data_socket = socket(AF_UNIX, SOCK_SEQPACKET, 0); - if (data_socket == -1) - { - perror("socket"); - exit(EXIT_FAILURE); - } - - /* - * For portability clear the whole structure, since some - * implementations have additional (nonstandard) fields in - * the structure. - */ - - struct sockaddr_un addr; - memset(&addr, 0, sizeof(addr)); - - /* Connect socket to socket address. */ - - addr.sun_family = AF_UNIX; - strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1); - - int ret = connect(data_socket, (const struct sockaddr*)&addr, sizeof(addr)); - if (ret < 0) - { - std::cout << "Could not connect!" << strerror(errno); - return -1; - } - - std::cout << "Connected!" << std::endl; - /* Close socket. */ - - write_fd(data_socket, fd); - + rb_writer = std::make_unique( + 16, lo2s::ExecutionScope(lo2s::Process(getpid())), "test_events"); + while (!rb_writer->ready()) + ; + rb_writer2 = std::make_unique( + 16, lo2s::ExecutionScope(lo2s::Process(getpid())), "test_events2"); + while (!rb_writer2->ready()) + ; + clockid_t clockid = rb_writer->header()->clockid; + + std::string first = "Hello!"; + std::string last = "World!"; + rb_writer->cctx_def(1, first); + rb_writer2->cctx_def(1, last); while (1) { struct timespec start, end; - clock_gettime(CLOCK_MONOTONIC_RAW, &start); + clock_gettime(clockid, &start); sleep(1); - clock_gettime(CLOCK_MONOTONIC_RAW, &end); - const char* str = "Hello World!"; - uint64_t name_len = strlen(str); - struct lo2s::event_kernel* ev = reinterpret_cast( - rb_writer->reserve(sizeof(struct lo2s::event_kernel) + name_len)); - if (ev == nullptr) - { - std::cerr << "Dropped event!"; - continue; - } - ev->header.type = lo2s::EventType::CUPTI_KERNEL; - ev->header.size = sizeof(struct lo2s::event_kernel) + name_len; + clock_gettime(clockid, &end); + + uint64_t start_nsec = start.tv_sec * 1000000000 + start.tv_nsec; + uint64_t end_nsec = end.tv_sec * 1000000000 + end.tv_nsec; - ev->start = start.tv_nsec; - ev->end = end.tv_nsec; - mempcpy(ev->name, str, name_len + 1); - std::cerr << "Event!" << std::endl; - rb_writer->commit(); + rb_writer->cctx_enter(start_nsec, 1); + rb_writer2->cctx_enter(start_nsec, 1); + rb_writer->cctx_leave(end_nsec, 1); + rb_writer2->cctx_leave(end_nsec, 1); } - close(data_socket); exit(EXIT_SUCCESS); diff --git a/src/dwarf_resolve.cpp b/src/dwarf_resolve.cpp index 5b922afd..3c1bf4c6 100644 --- a/src/dwarf_resolve.cpp +++ b/src/dwarf_resolve.cpp @@ -19,6 +19,10 @@ struct getfuncs_arg static int func_cb(Dwarf_Die* d, void* arg) { + uint64_t lowpc, highpc; + dwarf_lowpc(d, &lowpc); + dwarf_highpc(d, &highpc); + struct getfuncs_arg* args = (struct getfuncs_arg*)arg; if (dwarf_haspc(d, args->addr)) { @@ -52,6 +56,7 @@ DwarfFunctionResolver::DwarfFunctionResolver(std::string name) : FunctionResolve { cb.find_debuginfo = dwfl_dummy_find_debuginfo; } + cb.find_debuginfo = dwfl_build_id_find_debuginfo; cb.debuginfo_path = nullptr; cb.find_elf = dwfl_linux_proc_find_elf; @@ -77,8 +82,8 @@ DwarfFunctionResolver::DwarfFunctionResolver(std::string name) : FunctionResolve mod_ = dwfl_report_offline(dwfl_, name.c_str(), name.c_str(), -1); if (mod_ == nullptr) { - const char* errmsg = dwfl_errmsg(dwfl_errno()); - Log::error() << "Could not add " << name << " " << errmsg; + Log::error() << "Could not emplace binary: " << dwfl_errmsg(dwfl_errno()); + throw std::runtime_error(dwfl_errmsg(dwfl_errno())); } dwfl_report_end(dwfl_, NULL, NULL); } @@ -96,15 +101,18 @@ LineInfo DwarfFunctionResolver::lookup_line_info(Address addr) } const char* module_name = dwfl_module_info(mod_, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr); - if (config().dwarf != DwarfUsage::NONE) { Dwarf_Die* cudie = nullptr; Dwarf_Addr bias; while ((cudie = dwfl_module_nextcu(mod_, cudie, &bias)) != nullptr) { + Dwarf_Addr low, high; + dwarf_lowpc(cudie, &low); + dwarf_highpc(cudie, &high); if (dwarf_haspc(cudie, addr.value())) { + Dwarf_Line* line = dwarf_getsrc_die(cudie, addr.value()); int lineno; dwarf_lineno(line, &lineno); @@ -135,7 +143,6 @@ LineInfo DwarfFunctionResolver::lookup_line_info(Address addr) LineInfo::for_function(module_name, name, 1, module_name)) .first->second; } - return cache_.emplace(addr, LineInfo::for_binary(module_name)).first->second; } } // namespace lo2s diff --git a/src/monitor/cpu_set_monitor.cpp b/src/monitor/cpu_set_monitor.cpp index 306c295c..f9235e95 100644 --- a/src/monitor/cpu_set_monitor.cpp +++ b/src/monitor/cpu_set_monitor.cpp @@ -57,12 +57,12 @@ CpuSetMonitor::CpuSetMonitor() : MainMonitor() { pid = std::stol(pid_match[1]); - process_infos_.insert(Process(pid), 0, true); + process_infos_.insert(Process(pid), true); } } } - trace_.add_threads(get_comms_for_running_threads()); + trace_.emplace_threads(get_comms_for_running_threads()); try { @@ -137,7 +137,7 @@ void CpuSetMonitor::run() } } - trace_.add_threads(get_comms_for_running_threads()); + trace_.emplace_threads(get_comms_for_running_threads()); for (auto& monitor_elem : monitors_) { diff --git a/src/monitor/main_monitor.cpp b/src/monitor/main_monitor.cpp index 55c4604e..b6f59f2d 100644 --- a/src/monitor/main_monitor.cpp +++ b/src/monitor/main_monitor.cpp @@ -128,47 +128,48 @@ MainMonitor::MainMonitor() : trace_(), metrics_(trace_) } #endif - socket_monitor_ = std::make_unique(trace_); + socket_monitor_ = std::make_unique(trace_, *this); socket_monitor_->start(); } -static uint64_t mmap_get_time(const RecordMmap2Type* t) -{ - struct sample_id* id = - (struct sample_id*)((char*)t + t->header.size - sizeof(struct sample_id)); - return id->time; -} - -static uint64_t comm_get_time(const RecordCommType* t) -{ - struct sample_id* id = - (struct sample_id*)((char*)t + t->header.size - sizeof(struct sample_id)); - return id->time; -} - void MainMonitor::insert_cached_events(const RawMemoryMapCache& cached_mmaps, const RawCommCache& cached_execs) { for (auto& event : cached_execs) { - process_infos_.insert(Process(event.get()->pid), comm_get_time(event.get()), false); + Process p = Process(event.get()->pid); + Log::error() << p << ": " << event.get()->comm; + process_infos_.insert(Process(event.get()->pid), false); } for (auto& event : cached_mmaps) { - const uint64_t timestamp = mmap_get_time(event.get()); Process p = Process(event.get()->pid); - if (!process_infos_.has(p, timestamp)) + Log::error() << p << ": " << event.get()->filename; + if (!process_infos_.has(p)) { - process_infos_.insert(p, timestamp, false); + process_infos_.insert(p, false); } - - ProcessInfo& pinfo = process_infos_.get(p, timestamp); + ProcessInfo& pinfo = process_infos_.get(p); pinfo.mmap(*event.get()); } } +void MainMonitor::insert_cached_cctx( + std::map>& cached_cctx) +{ + for (auto& scope_map : cached_cctx) + { + if (!process_infos_.has(scope_map.first.scope.as_process())) + { + process_infos_.insert(scope_map.first.scope.as_process(), false); + } + ProcessInfo& pinfo = process_infos_.get(scope_map.first.scope.as_process()); + pinfo.insert_functions(scope_map.first, scope_map.second); + } +} + MainMonitor::~MainMonitor() { // Note: call stop() in reverse order than start() in constructor diff --git a/src/monitor/process_monitor.cpp b/src/monitor/process_monitor.cpp index 904a473b..435fd354 100644 --- a/src/monitor/process_monitor.cpp +++ b/src/monitor/process_monitor.cpp @@ -36,17 +36,20 @@ ProcessMonitor::ProcessMonitor() : MainMonitor() void ProcessMonitor::insert_process(Process parent, Process process, std::string proc_name, bool spawn) { - trace_.add_process(parent, process, proc_name); + trace_.emplace_process(parent, process, proc_name); insert_thread(process, process.as_thread(), proc_name, spawn); } void ProcessMonitor::insert_thread(Process process, Thread thread, std::string name, bool spawn) { - trace_.add_thread(thread, name); + trace_.emplace_thread(thread, name); if (config().sampling) { - process_infos_.insert(process, 0, !spawn); + if (!process_infos_.has(process)) + { + process_infos_.insert(process, !spawn); + } } ExecutionScope scope = ExecutionScope(thread); @@ -73,12 +76,12 @@ void ProcessMonitor::insert_thread(Process process, Thread thread, std::string n } } - trace_.update_thread_name(thread, name); + trace_.emplace_thread(thread, name); } void ProcessMonitor::update_process_name(Process process, const std::string& name) { - trace_.update_process_name(process, name); + trace_.emplace_process(trace::Trace::NO_PARENT_PROCESS, process, name); } void ProcessMonitor::exit_thread(Thread thread) diff --git a/src/monitor/ringbuf_monitor.cpp b/src/monitor/ringbuf_monitor.cpp index 8ef2984f..85d95c07 100644 --- a/src/monitor/ringbuf_monitor.cpp +++ b/src/monitor/ringbuf_monitor.cpp @@ -20,7 +20,6 @@ */ #include -#include #include #include #include @@ -39,10 +38,15 @@ namespace lo2s { namespace monitor { -RingbufMonitor::RingbufMonitor(trace::Trace& trace, int fd) -: PollMonitor(trace, "RingbufMonitor", std::chrono::nanoseconds(10000)), trace_(trace), - time_converter_(perf::time::Converter::instance()), ringbuf_reader_(fd, false, 0) +RingbufMonitor::RingbufMonitor(trace::Trace& trace, MainMonitor& main_monitor, int fd) +: PollMonitor(trace, "RingbufMonitor", std::chrono::nanoseconds(10000)), + time_converter_(perf::time::Converter::instance()), ringbuf_reader_(fd, config().clockid), + scope_(MeasurementScope::rb(ringbuf_reader_.get_location(), + ringbuf_reader_.header()->measurement_name)), + rb_writer_(trace.rb_writer(scope_)), main_monitor_(main_monitor), + local_cctx_map_(trace.create_local_cctx_map(scope_)) { + functions_.emplace(scope_, std::map()); } void RingbufMonitor::initialize_thread() @@ -51,29 +55,56 @@ void RingbufMonitor::initialize_thread() void RingbufMonitor::finalize_thread() { + rb_writer_.write_calling_context_leave( + last_, + local_cctx_map_.thread(scope_.scope.as_process(), scope_.scope.as_process().as_thread())); + main_monitor_.insert_cached_cctx(functions_); + local_cctx_map_.finalize(&rb_writer_); } void RingbufMonitor::monitor(int fd [[maybe_unused]]) { std::string executable_name_ = ""; - Process process_ = Process(1); struct event_header* header = nullptr; while ((header = reinterpret_cast( ringbuf_reader_.get(sizeof(struct event_header)))) != nullptr) { - if (header->type == EventType::CUPTI_KERNEL) + if (header->type == EventType::CCTX_ENTER) { - struct event_kernel* kernel = - reinterpret_cast(ringbuf_reader_.get(header->size)); + struct cctx_enter* kernel = + reinterpret_cast(ringbuf_reader_.get(header->size)); + auto tp = time_converter_(kernel->tp); + std::string kernel_name = ""; + if (!entered_) + { + rb_writer_.write_calling_context_enter( + tp, + local_cctx_map_.thread(scope_.scope.as_process(), + scope_.scope.as_process().as_thread()), + 2); + entered_ = true; + } - auto& writer = trace_.cuda_writer(Thread(process_.as_thread())); - - std::string kernel_name = kernel->name; - auto& cu_cctx = trace_.cuda_calling_context(executable_name_, kernel_name); - - writer.write_calling_context_enter(time_converter_(kernel->start), cu_cctx.ref(), 2); - writer.write_calling_context_leave(time_converter_(kernel->end), cu_cctx.ref()); + rb_writer_.write_calling_context_enter( + tp, local_cctx_map_.sample_ref(scope_.scope.as_process(), kernel->addr), 2); + last_ = tp; + } + else if (header->type == EventType::CCTX_LEAVE) + { + struct cctx_leave* kernel = + reinterpret_cast(ringbuf_reader_.get(header->size)); + auto tp = time_converter_(kernel->tp); + rb_writer_.write_calling_context_leave( + tp, local_cctx_map_.sample_ref(scope_.scope.as_process(), kernel->addr)); + last_ = tp; + } + else if (header->type == EventType::CCTX_DEF) + { + struct cctx_def* kernel = + reinterpret_cast(ringbuf_reader_.get(header->size)); + functions_.at(scope_).emplace(Address(kernel->addr), std::string(kernel->function)); + Log::error() << kernel->function << kernel->addr; } ringbuf_reader_.pop(header->size); diff --git a/src/monitor/socket_monitor.cpp b/src/monitor/socket_monitor.cpp index 4a8ae5bd..5dcfefcc 100644 --- a/src/monitor/socket_monitor.cpp +++ b/src/monitor/socket_monitor.cpp @@ -39,8 +39,9 @@ namespace lo2s { namespace monitor { -SocketMonitor::SocketMonitor(trace::Trace& trace) -: PollMonitor(trace, "SocketMonitor", std::chrono::nanoseconds(0)), trace_(trace) +SocketMonitor::SocketMonitor(trace::Trace& trace, MainMonitor& main_monitor) +: PollMonitor(trace, "SocketMonitor", std::chrono::nanoseconds(0)), trace_(trace), + main_monitor_(main_monitor) { socket = ::socket(AF_UNIX, SOCK_SEQPACKET, 0); if (socket == -1) @@ -150,7 +151,7 @@ void SocketMonitor::monitor(int fd) } auto res = monitors_.emplace(std::piecewise_construct, std::forward_as_tuple(foo_fd), - std::forward_as_tuple(trace_, foo_fd)); + std::forward_as_tuple(trace_, main_monitor_, foo_fd)); res.first->second.start(); close(data_socket); } diff --git a/src/monitor/system_process_monitor.cpp b/src/monitor/system_process_monitor.cpp index b87001c5..cc0f2462 100644 --- a/src/monitor/system_process_monitor.cpp +++ b/src/monitor/system_process_monitor.cpp @@ -38,7 +38,7 @@ void SystemProcessMonitor::insert_thread([[maybe_unused]] Process process, Threa { // in system monitoring, we only need to track the threads spawned from the process lo2s spawned // itself. Without this, these threads end up as "". Sad times. - trace_.add_thread(thread, name); + trace_.emplace_thread(thread, name); } void SystemProcessMonitor::update_process_name([[maybe_unused]] Process process, diff --git a/src/perf/sample/writer.cpp b/src/perf/sample/writer.cpp index 924384fe..b961269c 100644 --- a/src/perf/sample/writer.cpp +++ b/src/perf/sample/writer.cpp @@ -39,6 +39,7 @@ #include #include +#include extern "C" { @@ -59,7 +60,7 @@ Writer::Writer(ExecutionScope scope, monitor::MainMonitor& Monitor, trace::Trace cpuid_metric_instance_(trace.metric_instance(trace.cpuid_metric_class(), otf2_writer_.location(), otf2_writer_.location())), cpuid_metric_event_(otf2::chrono::genesis(), cpuid_metric_instance_), - local_cctx_map_(trace.create_local_cctx_map()), + local_cctx_map_(trace.create_local_cctx_map(MeasurementScope::sample(scope))), time_converter_(perf::time::Converter::instance()), first_time_point_(lo2s::time::now()), last_time_point_(first_time_point_) { @@ -98,7 +99,7 @@ bool Writer::handle(const Reader::RecordSampleType* sample) return false; } -bool Writer::handle(const Reader::RecordMmap2Type* mmap_event) +bool Writer::handle(const Reader::RecordMmapType* mmap_event) { // Since this is an mmap record (as opposed to mmap2), it will only be generated for executable if (!scope_.is_cpu() && scope_ != ExecutionScope(Thread(mmap_event->tid))) @@ -211,12 +212,13 @@ bool Writer::handle(const Reader::RecordCommType* comm) << " changed name to \"" << new_command << "\""; // update task name - trace_.update_thread_name(Thread(comm->tid), new_command); + trace_.emplace_thread(Thread(comm->tid), new_command); // only update name of process if the main thread changes its name if (comm->pid == comm->tid) { - trace_.update_process_name(Process(comm->pid), new_command); + trace_.emplace_process(trace::Trace::NO_PARENT_PROCESS, Process(comm->pid), + new_command); } } @@ -257,7 +259,7 @@ void Writer::end() trace_.process_comm(scope_.as_thread()), -1); } - trace_.add_threads(comms_); + trace_.emplace_threads(comms_); monitor_.insert_cached_events(cached_mmap_events_, cached_comm_events_); } diff --git a/src/process_info.cpp b/src/process_info.cpp index 400bfb50..b9fe0827 100644 --- a/src/process_info.cpp +++ b/src/process_info.cpp @@ -26,11 +26,24 @@ namespace lo2s ProcessInfo::ProcessInfo(Process p, bool read_initial) : process_(p) { - FunctionResolver& kall = Kallsyms::cache(); - InstructionResolver& kernel_ir = InstructionResolver::cache(); - map_.emplace( - std::piecewise_construct, std::forward_as_tuple(Kallsyms::cache().start(), (uint64_t)-1), - std::forward_as_tuple(Kallsyms::cache().start(), (uint64_t)-1, 0, kall, kernel_ir)); + try + { + + std::shared_ptr kall = Kallsyms::cache(); + + auto& fr_map = function_resolvers_ + .emplace(MeasurementScope::sample(p.as_scope()), + std::map>()) + .first->second; + + fr_map.emplace(std::piecewise_construct, + std::forward_as_tuple(Kallsyms::cache()->start(), (uint64_t)-1, 0), + std::forward_as_tuple(kall)); + } + catch (std::exception& e) + { + Log::debug() << "Could not read kallsyms: " << e.what(); + } if (!read_initial) { return; @@ -65,84 +78,116 @@ ProcessInfo::ProcessInfo(Process p, bool read_initial) : process_(p) } } -void ProcessInfo::mmap(Address addr, Address end, Address pgoff, std::string filename) +void ProcessInfo::emplace_fr(Address addr, Address end, Address pgoff, std::string filename) { - - std::lock_guard lock(mutex_); - - Log::debug() << "mmap: " << addr << "-" << end << " " << pgoff << ": " << filename; - - if (filename.empty() || std::string("//anon") == filename || - std::string("/dev/zero") == filename || std::string("/anon_hugepage") == filename || - nitro::lang::starts_with(filename, "/memfd") || - nitro::lang::starts_with(filename, "/SYSV") || nitro::lang::starts_with(filename, "/dev")) - { - Log::debug() << "mmap: skipping fr: " << filename << " (known non-library)"; - return; - } - - FunctionResolver* fr; - InstructionResolver* ir; - + Log::error() << "Emplacing fr: " << filename; + std::shared_ptr fr; try { if (nitro::lang::starts_with(filename, "[")) { - fr = &FunctionResolver::cache(filename); + fr = FunctionResolver::cache(filename); } else { - fr = &DwarfFunctionResolver::cache(filename); + fr = DwarfFunctionResolver::cache(filename); } } - catch (...) + catch (std::exception& e) { - fr = &FunctionResolver::cache(filename); + Log::error() << "Could not open DWarf for: " << filename; + fr = FunctionResolver::cache(filename); } + if (fr == nullptr) + { + return; + } + auto& fr_map = function_resolvers_ + .emplace(MeasurementScope::sample(process_.as_scope()), + std::map>()) + .first->second; + + auto fr_entry = fr_map.find(Mapping(addr)); + if (fr_entry != fr_map.end()) + { + Log::error() << "Moving Entry! " << fr_entry->second->name(); + // Overlapping with existing entry + auto fr_mapping = fr_entry->first; + auto fr = std::move(fr_entry->second); + fr_map.erase(fr_entry); + if (fr_mapping.range.start != addr) + { + // Truncate entry + assert(fr_mapping.range.start < addr); + fr_mapping.range.end = addr; + Log::debug() << "truncating map entry at " << fr_mapping.range.start << " to " + << fr_mapping.range.end; + [[maybe_unused]] auto r = fr_map.emplace(fr_mapping, std::move(fr)); + assert(r.second); + } + } + + try + { + auto foo = fr_map.emplace(std::piecewise_construct, std::forward_as_tuple(addr, end, pgoff), + std::forward_as_tuple(fr)); + Log::error() << foo.second; + } + catch (Range::Error& e) + { + // Very common, can't warn here + // TODO consider handling this somehow... + Log::debug() << "invalid address range in /proc/.../maps: " << e.what(); + } +} + +void ProcessInfo::emplace_ir(Address addr, Address end, Address pgoff, std::string filename) +{ + std::shared_ptr ir = nullptr; #ifdef HAVE_RADARE try { - ir = &RadareInstructionResolver::cache(filename); + ir = RadareInstructionResolver::cache(filename); } catch (...) { - ir = &InstructionResolver::cache(); } -#else - ir = &InstructionResolver::cache(); #endif - auto ex_it = map_.find(addr); - if (ex_it != map_.end()) + + if (ir == nullptr) + { + return; + } + + auto& ir_map = instruction_resolvers_ + .emplace(MeasurementScope::sample(process_.as_scope()), + std::map>()) + .first->second; + + auto ir_entry = ir_map.find(Mapping(addr)); + if (ir_entry != ir_map.end()) { // Overlapping with existing entry - auto ex_range = ex_it->first; - auto ex_elem = std::move(ex_it->second); - map_.erase(ex_it); - if (ex_range.start != addr) + auto ir_mapping = ir_entry->first; + auto ir = std::move(ir_entry->second); + ir_map.erase(ir_entry); + if (ir_mapping.range.start != addr) { // Truncate entry - assert(ex_range.start < addr); - ex_range.end = addr; - Log::debug() << "truncating map entry at " << ex_range.start << " to " << ex_range.end; - [[maybe_unused]] auto r = map_.emplace(ex_range, std::move(ex_elem)); + assert(ir_mapping.range.start < addr); + ir_mapping.range.end = addr; + Log::debug() << "truncating map entry at " << ir_mapping.range.start << " to " + << ir_mapping.range.end; + [[maybe_unused]] auto r = ir_map.emplace(ir_mapping, std::move(ir)); assert(r.second); } } try { - auto r = map_.emplace(std::piecewise_construct, std::forward_as_tuple(addr, end), - std::forward_as_tuple(addr, end, pgoff, *fr, *ir)); - if (!r.second) - { - // very common, so only debug - // TODO handle better - Log::debug() << "duplicate memory range from mmap event. new: " << addr << "-" << end - << "%" << pgoff << " " << filename << "\n" - << "OLD: " << r.first->second.start << "-" << r.first->second.end << "%" - << r.first->second.pgoff << " " << r.first->second.fr.name(); - } + ir_map.emplace(std::piecewise_construct, std::forward_as_tuple(addr, end, pgoff), + std::forward_as_tuple(ir)); } catch (Range::Error& e) { @@ -152,80 +197,102 @@ void ProcessInfo::mmap(Address addr, Address end, Address pgoff, std::string fil } } -LineInfo ProcessInfo::lookup_line_info(Address ip) +void ProcessInfo::mmap(Address addr, Address end, Address pgoff, std::string filename) { + + std::lock_guard lock(mutex_); + + Log::debug() << "mmap: " << addr << "-" << end << " " << pgoff << ": " << filename; + + if (filename.empty() || std::string("//anon") == filename || + std::string("/dev/zero") == filename || std::string("/anon_hugepage") == filename || + nitro::lang::starts_with(filename, "/memfd") || + nitro::lang::starts_with(filename, "/SYSV") || nitro::lang::starts_with(filename, "/dev")) + { + Log::debug() << "mmap: skipping fr: " << filename << " (known non-library)"; + return; + } + + emplace_fr(addr, end, pgoff, filename); + emplace_ir(addr, end, pgoff, filename); +} + +LineInfo ProcessInfo::lookup_line_info(MeasurementScope scope, Address ip) +{ + Log::error() << "looking up: " << scope.name(); try { std::shared_lock lock(mutex_); - return map_.at(ip).fr.lookup_line_info(ip - map_.at(ip).start + map_.at(ip).pgoff); + + auto& fr_map = function_resolvers_.at(scope); + for (auto& fr : fr_map) + { + if (ip >= fr.first.range.start && ip < fr.first.range.end) + { + return fr.second->lookup_line_info(ip - fr.first.range.start + fr.first.pgoff); + } + } } catch (std::exception& e) { - Log::error() << process_ << ": Could not find mapping for ip: " << ip << e.what(); - return LineInfo::for_unknown_function(); + Log::debug() << process_ << ": Could not find mapping for ip: " << ip << e.what(); } + return LineInfo::for_unknown_function(); } -std::string ProcessInfo::lookup_instruction(Address ip) const +std::string ProcessInfo::lookup_instruction(MeasurementScope scope, Address ip) const { - return map_.at(ip).ir.lookup_instruction(ip - map_.at(ip).start + map_.at(ip).pgoff); -} + auto& ir_map = instruction_resolvers_.at(scope); -bool ProcessMap::has(Process p, uint64_t timepoint) -{ - auto it = infos_.find(p); - if (it == infos_.end()) + auto ir = ir_map.find(Mapping(ip)); + if (ir != ir_map.end()) { - return false; + return ir->second->lookup_instruction(ip - ir->first.range.start + ir->first.pgoff); } - for (auto& elem : it->second) - { - if (timepoint >= elem.first) - { - return true; - } - } - return false; + return ""; } -ProcessInfo& ProcessMap::get(Process p, uint64_t timepoint) +void ProcessInfo::insert_functions(MeasurementScope scope, std::map functions) { - auto& infos = infos_.at(p); + auto fr_map = function_resolvers_.find(scope); + Log::error() << "EMPLACING: " << scope.name(); + if (fr_map == function_resolvers_.end()) + { + fr_map = function_resolvers_ + .emplace(scope, std::map>()) + .first; + } - auto it = std::find_if(infos.begin(), infos.end(), - [&timepoint](auto& arg) { return arg.first > timepoint; }); + auto manual_resolver = fr_map->second.find(Mapping::max()); - if (it == infos.begin()) + if (manual_resolver == fr_map->second.end()) { - if (timepoint >= it->first) - { - return it->second; - } - else - { - throw std::out_of_range("no matching timepoint"); - } - } - else - { - it--; - if (timepoint >= it->first) - { - return it->second; - } - else - { - throw std::out_of_range("no matching timepoint it--"); - } + manual_resolver = + fr_map->second.emplace(Mapping::max(), ManualFunctionResolver::cache(scope.name())) + .first; } - return infos_.at(p).at(timepoint); + manual_resolver->second->insert(functions); } -ProcessInfo& ProcessMap::insert(Process p, uint64_t timepoint, bool read_initial) +bool ProcessMap::has(Process p) { - auto it = infos_[p].emplace(std::piecewise_construct, std::forward_as_tuple(timepoint), - std::forward_as_tuple(p, read_initial)); + return infos_.count(p) != 0; +} + +ProcessInfo& ProcessMap::get(Process p) +{ + return infos_.at(p); +} + +ProcessInfo& ProcessMap::insert(Process p, bool read_initial) +{ + if (infos_.count(p)) + { + infos_.erase(p); + } + auto it = infos_.emplace(std::piecewise_construct, std::forward_as_tuple(p), + std::forward_as_tuple(p, read_initial)); return it.first->second; } } // namespace lo2s diff --git a/src/trace/trace.cpp b/src/trace/trace.cpp index a3384fe9..6fdabd02 100644 --- a/src/trace/trace.cpp +++ b/src/trace/trace.cpp @@ -286,38 +286,30 @@ Trace::~Trace() std::filesystem::create_symlink(trace_name_, symlink_path); } -const otf2::definition::system_tree_node& Trace::intern_process_node(Process process) -{ - if (registry_.has(ByProcess(process))) - { - return registry_.get(ByProcess(process)); - } - else - { - Log::warn() << "Could not find system tree node for " << process; - return system_tree_root_node_; - } -} - -void Trace::add_process(Process parent, Process process, const std::string& name) +void Trace::emplace_process(Process parent, Process process, const std::string& name) { std::lock_guard guard(mutex_); if (registry_.has(ByProcess(process))) { - update_process_name(process, name); + update_process(parent, process, name); return; } else { + auto parent_node = system_tree_root_node_; + + if (parent != NO_PARENT_PROCESS) + { + emplace_process(NO_PARENT_PROCESS, parent, ""); + parent_node = registry_.get(ByProcess(parent)); + } + thread_names_.emplace(std::piecewise_construct, std::forward_as_tuple(process.as_thread()), std::forward_as_tuple(name)); const auto& iname = intern(name); - const auto& parent_node = - (parent == NO_PARENT_PROCESS) ? system_tree_root_node_ : intern_process_node(parent); - const auto& ret = registry_.create( ByProcess(process), iname, intern("process"), parent_node); @@ -334,19 +326,28 @@ void Trace::add_process(Process parent, Process process, const std::string& name } } -void Trace::update_process_name(Process process, const std::string& name) +void Trace::update_process(Process parent, Process process, const std::string& name) { std::lock_guard guard(mutex_); - const auto& iname = intern(name); try { - registry_.get(ByProcess(process)).name(iname); - registry_.get(ByExecutionScope(process.as_scope())) - .name(iname); - registry_.get(ByProcess(process)).name(iname); - registry_.get(ByProcess(process)).name(iname); + auto tree_node = registry_.get(ByProcess(process)); + if (name != "") + { + const auto& iname = intern(name); + tree_node.name(iname); + registry_.get(ByExecutionScope(process.as_scope())) + .name(iname); + registry_.get(ByProcess(process)).name(iname); + registry_.get(ByProcess(process)).name(iname); - update_thread_name(process.as_thread(), name); + update_thread(process.as_thread(), name); + } + if (parent != NO_PARENT_PROCESS && tree_node.parent() == system_tree_root_node_) + { + emplace_process(NO_PARENT_PROCESS, parent); + tree_node.parent(registry_.get(ByProcess(parent))); + } } catch (const std::out_of_range&) { @@ -354,7 +355,7 @@ void Trace::update_process_name(Process process, const std::string& name) } } -void Trace::update_thread_name(Thread thread, const std::string& name) +void Trace::update_thread(Thread thread, const std::string& name) { // TODO we call this function in a hot-loop, locking doesn't sound like a good idea std::lock_guard guard(mutex_); @@ -408,18 +409,13 @@ otf2::writer::local& Trace::sample_writer(const ExecutionScope& writer_scope) return archive_(location(writer_scope)); } -otf2::writer::local& Trace::cuda_writer(const Thread& thread) +otf2::writer::local& Trace::rb_writer(const MeasurementScope& scope) { - MeasurementScope scope = MeasurementScope::cuda(thread.as_scope()); - - const auto& cuda_location_group = registry_.emplace( - ByMeasurementScope(scope), intern(scope.name()), - otf2::common::location_group_type::accelerator, system_tree_root_node_); - + emplace_process(NO_PARENT_PROCESS, scope.scope.as_process(), ""); const auto& intern_location = registry_.emplace( - ByMeasurementScope(scope), intern(scope.name()), cuda_location_group, + ByMeasurementScope(scope), intern(scope.name()), + registry_.get(ByExecutionScope(scope.scope)), otf2::definition::location::location_type::accelerator_stream); - return archive_(intern_location); } @@ -611,7 +607,8 @@ otf2::definition::metric_class& Trace::metric_class() void Trace::merge_ips(const std::map& new_children, std::map& children, std::vector& mapping_table, - otf2::definition::calling_context& parent, ProcessMap& infos, Process process) + otf2::definition::calling_context& parent, ProcessInfo& info, + MeasurementScope scope) { for (const auto& elem : new_children) { @@ -620,15 +617,7 @@ void Trace::merge_ips(const std::map& new_children, auto& local_children = elem.second.children; LineInfo line_info = LineInfo::for_unknown_function(); - if (infos.has(process, UINT64_MAX)) - { - line_info = infos.get(process, UINT64_MAX).lookup_line_info(ip); - } - else - { - infos.insert(process, 0, false); - line_info = infos.get(process, UINT64_MAX).lookup_line_info(ip); - } + line_info = info.lookup_line_info(scope, ip); Log::trace() << "resolved " << ip << ": " << line_info; auto cctx_it = children.find(ip); @@ -639,11 +628,11 @@ void Trace::merge_ips(const std::map& new_children, auto r = children.emplace(ip, new_cctx); cctx_it = r.first; - if (config().disassemble && infos.has(process, UINT64_MAX) == 1) + if (config().disassemble) { try { - auto instruction = infos.get(process, UINT64_MAX).lookup_instruction(ip); + auto instruction = info.lookup_instruction(scope, ip); Log::trace() << "mapped " << ip << " to " << instruction; registry_.create( @@ -659,10 +648,9 @@ void Trace::merge_ips(const std::map& new_children, auto& cctx = cctx_it->second.cctx; mapping_table.at(local_ref) = cctx.ref(); - merge_ips(local_children, cctx_it->second.children, mapping_table, cctx, infos, process); + merge_ips(local_children, cctx_it->second.children, mapping_table, cctx, info, scope); } } - otf2::definition::mapping_table Trace::merge_calling_contexts(const LocalCctxMap& local_cctxs, ProcessMap& infos) { @@ -673,10 +661,10 @@ otf2::definition::mapping_table Trace::merge_calling_contexts(const LocalCctxMap std::vector mappings(local_cctxs.num_cctx()); #endif - // Merge local thread tree into global thread tree for (auto& process_map : local_cctxs.get_threads()) { Process process = process_map.first; + emplace_process(NO_PARENT_PROCESS, process); for (auto& local_thread_cctx : process_map.second) { @@ -689,31 +677,7 @@ otf2::definition::mapping_table Trace::merge_calling_contexts(const LocalCctxMap if (global_thread_cctx == global_thread_cctxs_.end()) { - if (thread != Thread(0)) - { - - if (auto thread_name = thread_names_.find(thread); - thread_name != thread_names_.end()) - { - add_thread(thread, thread_name->second); - } - else - { - if (auto process_name = thread_names_.find(process.as_thread()); - process_name != thread_names_.end()) - { - add_thread(thread, process_name->second); - } - else - { - add_thread(thread, ""); - } - } - } - else - { - add_thread(thread, ""); - } + emplace_thread(thread); } const auto& foo = global_thread_cctxs_.at(thread).cctx.ref(); mappings.at(local_ref.ref) = foo; @@ -722,11 +686,18 @@ otf2::definition::mapping_table Trace::merge_calling_contexts(const LocalCctxMap for (auto& local_process_map : local_cctxs.get_functions()) { - auto& parent = registry_.get( - ByThread(local_process_map.first.as_thread())); + + Process p = local_process_map.first.scope.as_process(); + + auto& parent = registry_.get(ByThread(p.as_thread())); auto ret = calling_context_tree_.emplace(local_process_map.first, std::map()); - merge_ips(local_process_map.second, ret.first->second, mappings, parent, infos, + if (!infos.has(p)) + { + infos.insert(p, false); + } + ProcessInfo& info = infos.get(p); + merge_ips(local_process_map.second, ret.first->second, mappings, parent, info, local_process_map.first); } @@ -771,18 +742,39 @@ Trace::merge_syscall_contexts(const std::set& used_syscalls) mappings); } -void Trace::add_thread_exclusive(Thread thread, const std::string& name, - const std::lock_guard&) +void Trace::emplace_thread_exclusive(Thread thread, const std::string& name, + const std::lock_guard&) { if (registry_.has(ByThread(thread))) { - update_thread_name(thread, name); + update_thread(thread, name); return; } + std::string thread_name = ""; + if (name == "") + { + auto thread_it = thread_names_.find(thread); + if (thread_it != thread_names_.end()) + { + thread_name = thread_it->second; + } + else + { + Process p = groups_.get_parent(thread.as_scope()).as_process(); + auto process_name = thread_names_.find(p.as_thread()); + if (process_name != thread_names_.end()) + { + thread_name = process_name->second; + } + } + } + else + { + thread_name = name; + } thread_names_.emplace(std::piecewise_construct, std::forward_as_tuple(thread), std::forward_as_tuple(name)); - auto& iname = intern(fmt::format("{} ({})", name, thread.as_pid_t())); auto& thread_region = registry_.emplace( @@ -796,11 +788,11 @@ void Trace::add_thread_exclusive(Thread thread, const std::string& name, std::forward_as_tuple(thread_cctx)); } -void Trace::add_thread(Thread thread, const std::string& name) +void Trace::emplace_thread(Thread thread, const std::string& name) { // Lock this to avoid conflict on regions_thread_ with add_monitoring_thread std::lock_guard guard(mutex_); - add_thread_exclusive(thread, name, guard); + emplace_thread_exclusive(thread, name, guard); } void Trace::add_monitoring_thread(Thread thread, const std::string& name, const std::string& group) @@ -828,7 +820,7 @@ void Trace::add_monitoring_thread(Thread thread, const std::string& name, const } } -void Trace::add_threads(const std::unordered_map& thread_map) +void Trace::emplace_threads(const std::unordered_map& thread_map) { Log::debug() << "Adding " << thread_map.size() << " monitored thread(s) to the trace"; @@ -836,7 +828,7 @@ void Trace::add_threads(const std::unordered_map& thread_ma std::lock_guard guard(mutex_); for (const auto& elem : thread_map) { - add_thread_exclusive(elem.first, elem.second, guard); + emplace_thread_exclusive(elem.first, elem.second, guard); } } @@ -879,13 +871,13 @@ const otf2::definition::string& Trace::intern(const std::string& name) return registry_.emplace(ByString(name), name); } -LocalCctxMap& Trace::create_local_cctx_map() +LocalCctxMap& Trace::create_local_cctx_map(MeasurementScope scope) { std::lock_guard guard(local_cctx_maps_mutex_); assert(!local_cctx_maps_finalized_); - return local_cctx_maps_.emplace_back(); + return local_cctx_maps_.emplace_back(scope); } void Trace::merge_calling_contexts(ProcessMap& process_infos)