Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add server to client remote log #99

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions src/clientconn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,23 +459,18 @@ void Connection::handle_MESSAGE()
if(!M.good())
throw std::runtime_error(SB()<<M.file()<<':'<<M.line()<<" Decode error for Message");

auto lvl(mtype2level(mtype));
const char *chan = "<no channel>";

auto it = opByIOID.find(ioid);
if(it==opByIOID.end()) {
log_debug_printf(connsetup, "Server %s Message on non-existent ioid\n", peerName.c_str());
return;
}
auto op = it->second.handle.lock();

Level lvl;
switch(mtype) {
case 0: lvl = Level::Info; break;
case 1: lvl = Level::Warn; break;
case 2: lvl = Level::Err; break;
default: lvl = Level::Crit; break;
if(it!=opByIOID.end()) {
if(auto op = it->second.handle.lock()) {
chan = op->chan->name.c_str();
}
}

log_printf(remote, lvl, "%s : %s\n",
op && op->chan ? op->chan->name.c_str() : "<dead>", msg.c_str());
chan, msg.c_str());
}

void Connection::tickEcho()
Expand Down
27 changes: 27 additions & 0 deletions src/pvaproto.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <event2/buffer.h>
#include <pvxs/version.h>
#include <pvxs/sharedArray.h>
#include <pvxs/log.h>
#include "utilpvt.h"

namespace pvxs {namespace impl {
Expand Down Expand Up @@ -697,6 +698,32 @@ void from_wire(Buf& buf, Header& H)
}
}

// mapping between CMD_MESSAGE mtype and logging Level

inline
Level mtype2level(uint8_t mtype)
{
switch(mtype) {
case 0: return Level::Info;
case 1: return Level::Warn;
case 2: return Level::Err;
default: return Level::Crit;
}
}

inline
uint8_t level2mtype(Level lvl)
{
switch(lvl) {
case Level::Debug:
case Level::Info: return 0;
case Level::Warn: return 1;
case Level::Err: return 2;
case Level::Crit: return 3;
}
return 3;
}

}} // namespace pvxs::impl

#endif // PVAPROTO_H
6 changes: 3 additions & 3 deletions src/pvxs/source.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct ServerConn;
namespace server {

//! Handle when an operation is being setup
struct PVXS_API ConnectOp : public OpBase {
struct PVXS_API ConnectOp : public OpBase, public RemoteLogger {
protected:
Value _pvRequest;
public:
Expand Down Expand Up @@ -72,7 +72,7 @@ struct MonitorStat {
};

//! Handle for active subscription
struct PVXS_API MonitorControlOp : public OpBase {
struct PVXS_API MonitorControlOp : public OpBase, public RemoteLogger {
MonitorControlOp(const std::string& name,
const std::shared_ptr<const ClientCredentials>& cred, op_t op)
:OpBase(name, cred, op)
Expand Down Expand Up @@ -134,7 +134,7 @@ struct PVXS_API MonitorControlOp : public OpBase {
};

//! Handle for subscription which is being setup
struct PVXS_API MonitorSetupOp : public OpBase {
struct PVXS_API MonitorSetupOp : public OpBase, public RemoteLogger {
protected:
Value _pvRequest;
public:
Expand Down
10 changes: 9 additions & 1 deletion src/pvxs/srvcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <pvxs/data.h>

namespace pvxs {
enum struct Level;
namespace server {

/** Credentials presented by a client.
Expand Down Expand Up @@ -91,9 +92,16 @@ struct PVXS_API OpBase {
OpBase& operator=(const OpBase&) = delete;
virtual ~OpBase() =0;
};
struct PVXS_API RemoteLogger {
virtual ~RemoteLogger() =0;

//! Request log message to peer
//! @since UNRELEASED
virtual void logRemote(Level lvl, const std::string& msg) =0;
};

//! Handle when an operation is being executed
struct PVXS_API ExecOp : public OpBase {
struct PVXS_API ExecOp : public OpBase, public RemoteLogger {
//! Issue a reply without data. (eg. to complete a PUT)
virtual void reply() =0;
//! Issue a reply with data. For a GET or RPC (or PUT/Get)
Expand Down
1 change: 1 addition & 0 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ void Source::show(std::ostream& strm)
}

OpBase::~OpBase() {}
RemoteLogger::~RemoteLogger() {}

ChannelControl::~ChannelControl() {}

Expand Down
17 changes: 17 additions & 0 deletions src/serverconn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,23 @@ const std::shared_ptr<ServerChan>& ServerConn::lookupSID(uint32_t sid)
return it->second;
}

void ServerConn::logRemote(uint32_t ioid, Level lvl, const std::string& msg)
{
// TODO: respect TX throttle
if(!connection())
return;
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(sendBE, txBody.get());
to_wire(R, ioid);
to_wire(R, level2mtype(lvl));
to_wire(R, msg);
}

enqueueTxBody(CMD_MESSAGE);
}

void ServerConn::handle_ECHO()
{
// Client requests echo as a keep-alive check
Expand Down
23 changes: 23 additions & 0 deletions src/serverconn.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,27 @@ struct ServerOp
virtual void show(std::ostream& strm) const =0;
};

// helper for implementing OpBase::logRemote
template<typename Control>
void doLogRemote(Control *self, Level lvl, const std::string& msg)
{
auto serv = self->server.lock();
if(!serv)
return;
std::string m(msg); // copy for bind
auto wop(self->op); // copy weak for bind
serv->acceptor_loop.dispatch([wop, lvl, m](){
// op, chan, or conn may be dead or dissociated by this point...
if(auto oper = wop.lock()) {
if(auto chan = oper->chan.lock()) {
if(auto conn = chan->conn.lock()) {
conn->logRemote(oper->ioid, lvl, m);
}
}
}
});
}

struct ServerChannelControl : public server::ChannelControl
{
ServerChannelControl(const std::shared_ptr<ServerConn>& conn, const std::shared_ptr<ServerChan>& chan);
Expand Down Expand Up @@ -132,6 +153,8 @@ struct ServerConn final : public ConnBase, public std::enable_shared_from_this<S

const std::shared_ptr<ServerChan>& lookupSID(uint32_t sid);

void logRemote(uint32_t ioid, Level lvl, const std::string& msg);

private:
#define CASE(Op) virtual void handle_##Op() override final;
CASE(ECHO);
Expand Down
10 changes: 10 additions & 0 deletions src/serverget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ struct ServerGPRConnect : public server::ConnectOp
});
}

virtual void logRemote(Level lvl, const std::string& msg) override final
{
doLogRemote(this, lvl, msg);
}

const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<ServerGPR> op;

Expand Down Expand Up @@ -324,6 +329,11 @@ struct ServerGPRExec : public server::ExecOp
return Timer::Pvt::buildOneShot(delay, serv->acceptor_loop.internal(), std::move(fn));
}

virtual void logRemote(Level lvl, const std::string& msg) override final
{
doLogRemote(this, lvl, msg);
}

const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<ServerGPR> op;

Expand Down
17 changes: 17 additions & 0 deletions src/serverintrospect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,23 @@ struct ServerIntrospectControl : public server::ConnectOp
virtual void onGet(std::function<void(std::unique_ptr<server::ExecOp>&& fn)>&& fn) override final {}
virtual void onPut(std::function<void(std::unique_ptr<server::ExecOp>&& fn, Value&&)>&& fn) override final {}

virtual void logRemote(Level lvl, const std::string& msg) override final
{
auto serv = server.lock();
if(!serv)
return;
std::string m(msg); // copy for bind
serv->acceptor_loop.dispatch([this, lvl, m](){
if(auto oper = op.lock()) {
if(auto chan = oper->chan.lock()) {
if(auto conn = chan->conn.lock()) {
conn->logRemote(oper->ioid, lvl, m);
}
}
}
});
}

const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<ServerIntrospect> op;

Expand Down
10 changes: 10 additions & 0 deletions src/servermon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ struct ServerMonitorControl : public server::MonitorControlOp
});
}

virtual void logRemote(Level lvl, const std::string& msg) override final
{
doLogRemote(this, lvl, msg);
}

const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<MonitorOp> op;

Expand Down Expand Up @@ -432,6 +437,11 @@ struct ServerMonitorSetup : public server::MonitorSetupOp
});
}

virtual void logRemote(Level lvl, const std::string& msg) override final
{
doLogRemote(this, lvl, msg);
}

const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<MonitorOp> op;

Expand Down
Loading