From af225ea58fa3591d2a853ce8619eafd6eb9c219d Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Fri, 31 Jan 2025 20:38:30 -0800 Subject: [PATCH] add server to client remote logging --- src/clientconn.cpp | 21 ++++++++------------- src/pvaproto.h | 27 +++++++++++++++++++++++++++ src/pvxs/source.h | 6 +++--- src/pvxs/srvcommon.h | 10 +++++++++- src/server.cpp | 1 + src/serverconn.cpp | 17 +++++++++++++++++ src/serverconn.h | 23 +++++++++++++++++++++++ src/serverget.cpp | 10 ++++++++++ src/serverintrospect.cpp | 17 +++++++++++++++++ src/servermon.cpp | 10 ++++++++++ 10 files changed, 125 insertions(+), 17 deletions(-) diff --git a/src/clientconn.cpp b/src/clientconn.cpp index b1130b344..114692b81 100644 --- a/src/clientconn.cpp +++ b/src/clientconn.cpp @@ -459,23 +459,18 @@ void Connection::handle_MESSAGE() if(!M.good()) throw std::runtime_error(SB()<second.handle.lock(); - - Level lvl; - switch(mtype) { - case 0: lvl = Level::Info; break; - case 1: lvl = Level::Warn; break; - case 2: lvl = Level::Err; break; - default: lvl = Level::Crit; break; + if(it!=opByIOID.end()) { + if(auto op = it->second.handle.lock()) { + chan = op->chan->name.c_str(); + } } log_printf(remote, lvl, "%s : %s\n", - op && op->chan ? op->chan->name.c_str() : "", msg.c_str()); + chan, msg.c_str()); } void Connection::tickEcho() diff --git a/src/pvaproto.h b/src/pvaproto.h index 81491eacc..d31292e22 100644 --- a/src/pvaproto.h +++ b/src/pvaproto.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "utilpvt.h" namespace pvxs {namespace impl { @@ -697,6 +698,32 @@ void from_wire(Buf& buf, Header& H) } } +// mapping between CMD_MESSAGE mtype and logging Level + +inline +Level mtype2level(uint8_t mtype) +{ + switch(mtype) { + case 0: return Level::Info; + case 1: return Level::Warn; + case 2: return Level::Err; + default: return Level::Crit; + } +} + +inline +uint8_t level2mtype(Level lvl) +{ + switch(lvl) { + case Level::Debug: + case Level::Info: return 0; + case Level::Warn: return 1; + case Level::Err: return 2; + case Level::Crit: return 3; + } + return 3; +} + }} // namespace pvxs::impl #endif // PVAPROTO_H diff --git a/src/pvxs/source.h b/src/pvxs/source.h index 3421f2644..509c49144 100644 --- a/src/pvxs/source.h +++ b/src/pvxs/source.h @@ -20,7 +20,7 @@ struct ServerConn; namespace server { //! Handle when an operation is being setup -struct PVXS_API ConnectOp : public OpBase { +struct PVXS_API ConnectOp : public OpBase, public RemoteLogger { protected: Value _pvRequest; public: @@ -72,7 +72,7 @@ struct MonitorStat { }; //! Handle for active subscription -struct PVXS_API MonitorControlOp : public OpBase { +struct PVXS_API MonitorControlOp : public OpBase, public RemoteLogger { MonitorControlOp(const std::string& name, const std::shared_ptr& cred, op_t op) :OpBase(name, cred, op) @@ -134,7 +134,7 @@ struct PVXS_API MonitorControlOp : public OpBase { }; //! Handle for subscription which is being setup -struct PVXS_API MonitorSetupOp : public OpBase { +struct PVXS_API MonitorSetupOp : public OpBase, public RemoteLogger { protected: Value _pvRequest; public: diff --git a/src/pvxs/srvcommon.h b/src/pvxs/srvcommon.h index 8ab9e4fdb..8e270bc10 100644 --- a/src/pvxs/srvcommon.h +++ b/src/pvxs/srvcommon.h @@ -20,6 +20,7 @@ #include namespace pvxs { +enum struct Level; namespace server { /** Credentials presented by a client. @@ -91,9 +92,16 @@ struct PVXS_API OpBase { OpBase& operator=(const OpBase&) = delete; virtual ~OpBase() =0; }; +struct PVXS_API RemoteLogger { + virtual ~RemoteLogger() =0; + + //! Request log message to peer + //! @since UNRELEASED + virtual void logRemote(Level lvl, const std::string& msg) =0; +}; //! Handle when an operation is being executed -struct PVXS_API ExecOp : public OpBase { +struct PVXS_API ExecOp : public OpBase, public RemoteLogger { //! Issue a reply without data. (eg. to complete a PUT) virtual void reply() =0; //! Issue a reply with data. For a GET or RPC (or PUT/Get) diff --git a/src/server.cpp b/src/server.cpp index 0a45dc92c..fac9e7eb8 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -839,6 +839,7 @@ void Source::show(std::ostream& strm) } OpBase::~OpBase() {} +RemoteLogger::~RemoteLogger() {} ChannelControl::~ChannelControl() {} diff --git a/src/serverconn.cpp b/src/serverconn.cpp index 865be6e7a..2b301617e 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -140,6 +140,23 @@ const std::shared_ptr& ServerConn::lookupSID(uint32_t sid) return it->second; } +void ServerConn::logRemote(uint32_t ioid, Level lvl, const std::string& msg) +{ + // TODO: respect TX throttle + if(!connection()) + return; + { + (void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get())); + + EvOutBuf R(sendBE, txBody.get()); + to_wire(R, ioid); + to_wire(R, level2mtype(lvl)); + to_wire(R, msg); + } + + enqueueTxBody(CMD_MESSAGE); +} + void ServerConn::handle_ECHO() { // Client requests echo as a keep-alive check diff --git a/src/serverconn.h b/src/serverconn.h index 3b79e6a4d..4be471b73 100644 --- a/src/serverconn.h +++ b/src/serverconn.h @@ -57,6 +57,27 @@ struct ServerOp virtual void show(std::ostream& strm) const =0; }; +// helper for implementing OpBase::logRemote +template +void doLogRemote(Control *self, Level lvl, const std::string& msg) +{ + auto serv = self->server.lock(); + if(!serv) + return; + std::string m(msg); // copy for bind + auto wop(self->op); // copy weak for bind + serv->acceptor_loop.dispatch([wop, lvl, m](){ + // op, chan, or conn may be dead or dissociated by this point... + if(auto oper = wop.lock()) { + if(auto chan = oper->chan.lock()) { + if(auto conn = chan->conn.lock()) { + conn->logRemote(oper->ioid, lvl, m); + } + } + } + }); +} + struct ServerChannelControl : public server::ChannelControl { ServerChannelControl(const std::shared_ptr& conn, const std::shared_ptr& chan); @@ -132,6 +153,8 @@ struct ServerConn final : public ConnBase, public std::enable_shared_from_this& lookupSID(uint32_t sid); + void logRemote(uint32_t ioid, Level lvl, const std::string& msg); + private: #define CASE(Op) virtual void handle_##Op() override final; CASE(ECHO); diff --git a/src/serverget.cpp b/src/serverget.cpp index 8c5eba8fb..39d824219 100644 --- a/src/serverget.cpp +++ b/src/serverget.cpp @@ -250,6 +250,11 @@ struct ServerGPRConnect : public server::ConnectOp }); } + virtual void logRemote(Level lvl, const std::string& msg) override final + { + doLogRemote(this, lvl, msg); + } + const std::weak_ptr server; const std::weak_ptr op; @@ -324,6 +329,11 @@ struct ServerGPRExec : public server::ExecOp return Timer::Pvt::buildOneShot(delay, serv->acceptor_loop.internal(), std::move(fn)); } + virtual void logRemote(Level lvl, const std::string& msg) override final + { + doLogRemote(this, lvl, msg); + } + const std::weak_ptr server; const std::weak_ptr op; diff --git a/src/serverintrospect.cpp b/src/serverintrospect.cpp index 8e67151d4..325549d8c 100644 --- a/src/serverintrospect.cpp +++ b/src/serverintrospect.cpp @@ -113,6 +113,23 @@ struct ServerIntrospectControl : public server::ConnectOp virtual void onGet(std::function&& fn)>&& fn) override final {} virtual void onPut(std::function&& fn, Value&&)>&& fn) override final {} + virtual void logRemote(Level lvl, const std::string& msg) override final + { + auto serv = server.lock(); + if(!serv) + return; + std::string m(msg); // copy for bind + serv->acceptor_loop.dispatch([this, lvl, m](){ + if(auto oper = op.lock()) { + if(auto chan = oper->chan.lock()) { + if(auto conn = chan->conn.lock()) { + conn->logRemote(oper->ioid, lvl, m); + } + } + } + }); + } + const std::weak_ptr server; const std::weak_ptr op; diff --git a/src/servermon.cpp b/src/servermon.cpp index ce87609f3..ca63530a3 100644 --- a/src/servermon.cpp +++ b/src/servermon.cpp @@ -355,6 +355,11 @@ struct ServerMonitorControl : public server::MonitorControlOp }); } + virtual void logRemote(Level lvl, const std::string& msg) override final + { + doLogRemote(this, lvl, msg); + } + const std::weak_ptr server; const std::weak_ptr op; @@ -432,6 +437,11 @@ struct ServerMonitorSetup : public server::MonitorSetupOp }); } + virtual void logRemote(Level lvl, const std::string& msg) override final + { + doLogRemote(this, lvl, msg); + } + const std::weak_ptr server; const std::weak_ptr op;