From 6f2f0befaa94a59596c92b469ebf3a3e93785397 Mon Sep 17 00:00:00 2001 From: lenkan Date: Wed, 11 Dec 2024 14:21:56 +0100 Subject: [PATCH 1/7] feat: handle sigterm --- src/keria/app/cli/commands/start.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/keria/app/cli/commands/start.py b/src/keria/app/cli/commands/start.py index c7f2166..21543d1 100644 --- a/src/keria/app/cli/commands/start.py +++ b/src/keria/app/cli/commands/start.py @@ -8,10 +8,11 @@ import argparse import logging import os +import signal +from hio.base import doing from keri import __version__ from keri import help -from keri.app import directing from keria.app import agenting @@ -89,8 +90,8 @@ def launch(args): logger = help.ogler.getLogger() - logger.info("******* Starting Agent for %s listening: admin/%s, http/%s " - ".******", args.name, args.admin, args.http) + logger.info("Starting Agent for %s listening: admin/%s, http/%s, boot/%s", args.name, args.admin, args.http, args.boot) + logger.info("PID: %s", os.getpid()) agency = agenting.setup(name=args.name or "ahab", base=args.base or "", @@ -111,8 +112,15 @@ def launch(args): bootPassword=args.bootPassword, bootUsername=args.bootUsername) - directing.runController(doers=agency, expire=0.0) + tock = 0.03125 + doist = doing.Doist(limit=0.0, tock=tock, real=True) + def handleSignal(sig, frame): + logger.info("Received signal %s", signal.strsignal(sig)) + doist.exit() - logger.info("******* Ended Agent for %s listening: admin/%s, http/%s" - ".******", args.name, args.admin, args.http) + signal.signal(signal.SIGTERM, handleSignal) + + doist.do(doers=agency) + + logger.info("Agent %s gracefully stopped", args.name) From 4d683933c7d559bd5fbf977b3c32d514db1fc743 Mon Sep 17 00:00:00 2001 From: lenkan Date: Wed, 11 Dec 2024 14:34:17 +0100 Subject: [PATCH 2/7] remove redundant runController call --- src/keria/app/cli/keria.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/keria/app/cli/keria.py b/src/keria/app/cli/keria.py index a48e7e4..fd136e8 100644 --- a/src/keria/app/cli/keria.py +++ b/src/keria/app/cli/keria.py @@ -5,14 +5,9 @@ """ import multicommand -from keri import help -from keri.app import directing from keria.app.cli import commands -logger = help.ogler.getLogger() - - def main(): parser = multicommand.create_parser(commands) args = parser.parse_args() @@ -22,8 +17,7 @@ def main(): return try: - doers = args.handler(args) - directing.runController(doers=doers, expire=0.0) + args.handler(args) except Exception as ex: # print(f"ERR: {ex}") From 0383dfb5d674c0e6eb099bf12026c42ab81e0517 Mon Sep 17 00:00:00 2001 From: Kent Bull Date: Sun, 22 Dec 2024 13:22:23 -0700 Subject: [PATCH 3/7] feat: shut down agents gracefully --- scripts/keri/cf/demo-witness-oobis.json | 5 ++++- src/keria/app/agenting.py | 25 ++++++++++++++----------- src/keria/app/cli/commands/sig-fix.py | 3 +-- src/keria/app/cli/commands/start.py | 24 +++++++++++++++++------- 4 files changed, 36 insertions(+), 21 deletions(-) diff --git a/scripts/keri/cf/demo-witness-oobis.json b/scripts/keri/cf/demo-witness-oobis.json index 10d91fd..c003eab 100755 --- a/scripts/keri/cf/demo-witness-oobis.json +++ b/scripts/keri/cf/demo-witness-oobis.json @@ -7,6 +7,9 @@ "iurls": [ "http://127.0.0.1:5642/oobi/BBilc4-L3tFUnfM_wJr4S4OJanAv_VmF_dJNN6vkf2Ha/controller?name=Wan&tag=witness", "http://127.0.0.1:5643/oobi/BLskRTInXnMxWaGqcpSyMgo0nYbalW99cGZESrz3zapM/controller?name=Wes&tag=witness", - "http://127.0.0.1:5644/oobi/BIKKuvBwpmDVA4Ds-EpL5bt9OqPzWPja2LigFYZN2YfX/controller?name=Wil&tag=witness" + "http://127.0.0.1:5644/oobi/BIKKuvBwpmDVA4Ds-EpL5bt9OqPzWPja2LigFYZN2YfX/controller?name=Wil&tag=witness", + "http://127.0.0.1:5645/oobi/BM35JN8XeJSEfpxopjn5jr7tAHCE5749f0OobhMLCorE/controller?name=Wit&tag=witness", + "http://127.0.0.1:5646/oobi/BIj15u5V11bkbtAxMA7gcNJZcax-7TgaBMLsQnMHpYHP/controller?name=Wub&tag=witness", + "http://127.0.0.1:5647/oobi/BF2rZTW79z4IXocYRQnjjsOuvFUQv-ptCf8Yltd7PfsM/controller?name=Wyz&tag=witness" ] } \ No newline at end of file diff --git a/src/keria/app/agenting.py b/src/keria/app/agenting.py index 1fc8a21..0158790 100644 --- a/src/keria/app/agenting.py +++ b/src/keria/app/agenting.py @@ -6,13 +6,13 @@ """ from base64 import b64decode import json -import os import datetime from dataclasses import asdict from urllib.parse import urlparse, urljoin from types import MappingProxyType import falcon +import lmdb from falcon import media from hio.base import doing from hio.core import http, tcp @@ -249,19 +249,22 @@ def delete(self, agent): del self.agents[agent.caid] def shut(self, agent): - logger.info(f"closing idle agent {agent.caid}") + logger.info(f"Shutting down agent {agent.caid}") agent.remove(agent.doers) self.remove([agent]) del self.agents[agent.caid] - agent.hby.ks.close(clear=False) - agent.seeker.close(clear=False) - agent.exnseeker.close(clear=False) - agent.monitor.opr.close(clear=False) - agent.notifier.noter.close(clear=False) - agent.rep.mbx.close(clear=False) - agent.registrar.rgy.close() - agent.mgr.rb.close(clear=False) - agent.hby.close(clear=False) + try: + agent.hby.ks.close(clear=False) + agent.seeker.close(clear=False) + agent.exnseeker.close(clear=False) + agent.monitor.opr.close(clear=False) + agent.notifier.noter.close(clear=False) + agent.rep.mbx.close(clear=False) + agent.registrar.rgy.close() + agent.mgr.rb.close(clear=False) + agent.hby.close(clear=False) + except lmdb.Error as ex: # Sometimes LMDB will throw an error if the DB is already closed + logger.error(f"Error closing databases for agent {agent.caid}: {ex}") def get(self, caid): if caid in self.agents: diff --git a/src/keria/app/cli/commands/sig-fix.py b/src/keria/app/cli/commands/sig-fix.py index a208211..8406996 100644 --- a/src/keria/app/cli/commands/sig-fix.py +++ b/src/keria/app/cli/commands/sig-fix.py @@ -6,9 +6,8 @@ """ import argparse -from hio import help from hio.base import doing -from keri import kering +from keri import help, kering from keri.app import habbing from keri.app.cli.common import existing from keri.core import serdering, coring diff --git a/src/keria/app/cli/commands/start.py b/src/keria/app/cli/commands/start.py index 21543d1..d4fd231 100644 --- a/src/keria/app/cli/commands/start.py +++ b/src/keria/app/cli/commands/start.py @@ -78,22 +78,23 @@ dest="bootUsername", default=os.getenv("KERIA_EXPERIMENTAL_BOOT_USERNAME")) +logger = help.ogler.getLogger() + def getListVariable(name): value = os.getenv(name) return value.split(";") if value else None def launch(args): help.ogler.level = logging.getLevelName(args.loglevel) + logger.setLevel(help.ogler.level) if(args.logfile != None): help.ogler.headDirPath = args.logfile help.ogler.reopen(name=args.name, temp=False, clear=True) - logger = help.ogler.getLogger() - logger.info("Starting Agent for %s listening: admin/%s, http/%s, boot/%s", args.name, args.admin, args.http, args.boot) logger.info("PID: %s", os.getpid()) - agency = agenting.setup(name=args.name or "ahab", + doers = agenting.setup(name=args.name or "ahab", base=args.base or "", bran=args.bran, adminPort=args.admin, @@ -111,16 +112,25 @@ def launch(args): durls=getListVariable("KERIA_DURLS"), bootPassword=args.bootPassword, bootUsername=args.bootUsername) + agency = None + for doer in doers: + if isinstance(doer, agenting.Agency): + agency = doer + break tock = 0.03125 doist = doing.Doist(limit=0.0, tock=tock, real=True) - def handleSignal(sig, frame): - logger.info("Received signal %s", signal.strsignal(sig)) + def handle_sigterm(sig, frame): + agents = list(agency.agents.keys()) + logger.info("Shutting down due to %s | stopping %s agents", signal.strsignal(sig), len(agents)) + for caid in agents: + agency.shut(agency.agents[caid]) + logger.info("Shutting down main Doist loop") doist.exit() - signal.signal(signal.SIGTERM, handleSignal) + signal.signal(signal.SIGTERM, handle_sigterm) - doist.do(doers=agency) + doist.do(doers=doers) logger.info("Agent %s gracefully stopped", args.name) From dc25299fb23baedf8807e088506e6a01a624db17 Mon Sep 17 00:00:00 2001 From: Kent Bull Date: Sat, 11 Jan 2025 08:45:38 -0700 Subject: [PATCH 4/7] feat: add graceful shutdown handler and clean up agent start code --- setup.py | 1 + src/keria/app/agenting.py | 167 +++++++++++++++++++++++----- src/keria/app/cli/commands/start.py | 86 +++++--------- src/keria/app/serving.py | 39 +++++++ tests/app/test_agenting.py | 71 ++++++++++-- 5 files changed, 271 insertions(+), 93 deletions(-) create mode 100644 src/keria/app/serving.py diff --git a/setup.py b/setup.py index 3ffc026..c116f55 100644 --- a/setup.py +++ b/setup.py @@ -92,6 +92,7 @@ tests_require=[ 'coverage>=5.5', 'pytest>=6.2.4', + 'requests==2.32.3' ], setup_requires=[ ], diff --git a/src/keria/app/agenting.py b/src/keria/app/agenting.py index 0158790..127798d 100644 --- a/src/keria/app/agenting.py +++ b/src/keria/app/agenting.py @@ -4,22 +4,24 @@ keria.app.agenting module """ +import logging +import os from base64 import b64decode import json import datetime -from dataclasses import asdict +from dataclasses import asdict, dataclass, field +from typing import List from urllib.parse import urlparse, urljoin from types import MappingProxyType import falcon import lmdb from falcon import media -from hio.base import doing +from hio.base import doing, Doer from hio.core import http, tcp from hio.help import decking -from keri import kering -from keri import core +from keri import core, kering, help from keri.app.notifying import Notifier from keri.app.storing import Mailboxer @@ -43,6 +45,7 @@ from . import aiding, notifying, indirecting, credentialing, ipexing, delegating from . import grouping as keriagrouping +from .serving import GracefulShutdownDoer from ..peer import exchanging as keriaexchanging from .specing import AgentSpecResource from ..core import authing, longrunning, httping @@ -52,23 +55,135 @@ logger = ogler.getLogger() - -def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=None, configDir=None, - keypath=None, certpath=None, cafilepath=None, cors=False, releaseTimeout=None, curls=None, - iurls=None, durls=None, bootUsername=None, bootPassword=None): - """ Set up an ahab in Signify mode """ - - agency = Agency(name=name, base=base, bran=bran, configFile=configFile, configDir=configDir, releaseTimeout=releaseTimeout, curls=curls, iurls=iurls, durls=durls) +@dataclass +class KERIAServerConfig: + """ + Provides a dataclass to define server config so it is easy to test with multiprocess. + Dataclasses are Pickleable and can be passed to a new process. + """ + # HTTP ports to use. + # Admin port number the admin HTTP server listens on. + # Default is 3901. KERIA_ADMIN_PORT also sets this + adminPort: int = 3901 + # Local port number the HTTP server listens on. + # Default is 3902. KERIA_HTTP_PORT also sets this + httpPort: int | None = 3092 + # Boot port number the Boot HTTP server listens on. + # WARNING: This port needs to be secured. + # Default is 3903. KERIA_BOOT_PORT also sets this. + bootPort: int = 3903 + + # Agency master controller information and configuration + # Name of controller. Default is 'keria'. + name: str = "keria" + # additional optional prefix to file location of KERI keystore + base: str = "" + # 21 character encryption passcode for keystore (is not saved) + bran: str = None + # configuration filename + configFile: str = "keria" + # directory override for configuration data + configDir: str = None + + # TLS key material + # TLS server private key file + keyPath: str = None + # TLS server signed certificate (public key) file + certPath: str = None + # TLS server CA certificate chain + caFilePath: str = None + + # Logging configuration + # Set log level to DEBUG | INFO | WARNING | ERROR | CRITICAL. + # Default is CRITICAL + logLevel: str = "CRITICAL" + # path of the log file. If not defined, logs will not be written to the file. + logFile: str = None + + # Agency configuration + # Use CORS headers in the HTTP responses. Default is False + cors: bool = True + # Timeout for releasing agents. Default is 86400 seconds (24 hours) + releaseTimeout: int = 86400 + # Controller Service Endpoint Location OOBI URLs to resolve at startup of each Agent. Makes a 'controller' EndRole and LocScheme in the database for each URL + curls: List[str] = field(default_factory=list) + # General Introduction OOBI URLs to resolve at startup of each Agent. For things like witnesses, watchers, mailboxes, and TEL observers. + iurls: List[str] = field(default_factory=list) + # Data OOBI URLs resolved at startup of each Agent. For things like ACDC schemas, ACDCs (credentials), or other CESR streams. + durls: List[str] = field(default_factory=list) + + # Experimental configuration + # Experimental password for boot endpoint. Enables HTTP Basic Authentication for the boot endpoint. Only meant to be used for testing purposes. + bootPassword: str = None + # Experimental username for boot endpoint. Enables HTTP Basic Authentication for the boot endpoint. Only meant to be used for testing purposes. + bootUsername: str = None + +def runAgency(config: KERIAServerConfig): + """Runs a KERIA Agency with the given Doers by calling Doist.do(). Useful for testing.""" + help.ogler.level = logging.getLevelName(config.logLevel) + logger.setLevel(help.ogler.level) + if config.logFile is not None: + help.ogler.headDirPath = config.logFile + help.ogler.reopen(name=config.name, temp=False, clear=True) + + logger.info("Starting Agent for %s listening: admin/%s, http/%s, boot/%s", + config.name, config.adminPort, config.httpPort, config.bootPort) + logger.info("PID: %s", os.getpid()) + + doist = agencyDoist(setupDoers(config)) + doist.do() + +def agencyDoist(doers: List[Doer]): + """Creates a Doist for the Agency doers and adds a graceful shutdown handler. Useful for testing.""" + tock = 0.03125 + doist = doing.Doist(limit=0.0, tock=tock, real=True) + doers.append(GracefulShutdownDoer(doist=doist, agency=getAgency(doers))) + doist.doers = doers + return doist + +def getAgency(doers): + """Get the agency from a list of Doers. Used to get the Agency for the graceful agent shutdown.""" + for doer in doers: + if isinstance(doer, Agency): + return doer + return None + +def setupDoers(config: KERIAServerConfig): + """ + Sets up the HIO coroutines the KERIA agent server is composed of including three HTTP servers for a KERIA agent server: + 1. Boot server for bootstrapping agents. Signify calls this with a signed inception event. + 2. Admin server for administrative tasks like creating agents. + 3. HTTP server for all other agent operations. + """ + agency = Agency( + name=config.name, + base=config.base, + bran=config.bran, + configFile=config.configFile, + configDir=config.configDir, + releaseTimeout=config.releaseTimeout, + curls=config.curls, + iurls=config.iurls, + durls=config.durls + ) + allowed_cors_headers = [ + 'cesr-attachment', + 'cesr-date', + 'content-type', + 'signature', + 'signature-input', + 'signify-resource', + 'signify-timestamp' + ] bootApp = falcon.App(middleware=falcon.CORSMiddleware( allow_origins='*', allow_credentials='*', - expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input', - 'signify-resource', 'signify-timestamp'])) + expose_headers=allowed_cors_headers)) - bootServer = createHttpServer(bootPort, bootApp, keypath, certpath, cafilepath) + bootServer = createHttpServer(config.bootPort, bootApp, config.keyPath, config.certPath, config.caFilePath) if not bootServer.reopen(): - raise RuntimeError(f"cannot create boot http server on port {bootPort}") + raise RuntimeError(f"Cannot create boot HTTP server on port {config.bootPort}") bootServerDoer = http.ServerDoer(server=bootServer) - bootEnd = BootEnd(agency, username=bootUsername, password=bootPassword) + bootEnd = BootEnd(agency, username=config.bootUsername, password=config.bootPassword) bootApp.add_route("/boot", bootEnd) bootApp.add_route("/health", HealthEnd()) @@ -77,17 +192,16 @@ def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=No app = falcon.App(middleware=falcon.CORSMiddleware( allow_origins='*', allow_credentials='*', - expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input', - 'signify-resource', 'signify-timestamp'])) - if cors: + expose_headers=allowed_cors_headers)) + if config.cors: app.add_middleware(middleware=httping.HandleCORS()) app.add_middleware(authing.SignatureValidationComponent(agency=agency, authn=authn, allowed=["/agent"])) app.req_options.media_handlers.update(media.Handlers()) app.resp_options.media_handlers.update(media.Handlers()) - adminServer = createHttpServer(adminPort, app, keypath, certpath, cafilepath) + adminServer = createHttpServer(config.adminPort, app, config.keyPath, config.certPath, config.caFilePath) if not adminServer.reopen(): - raise RuntimeError(f"cannot create admin http server on port {adminPort}") + raise RuntimeError(f"cannot create admin HTTP server on port {config.adminPort}") adminServerDoer = http.ServerDoer(server=adminServer) doers = [agency, bootServerDoer, adminServerDoer] @@ -100,20 +214,19 @@ def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=No keriaexchanging.loadEnds(app=app) ipexing.loadEnds(app=app) - if httpPort: + if config.httpPort: happ = falcon.App(middleware=falcon.CORSMiddleware( allow_origins='*', allow_credentials='*', - expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input', - 'signify-resource', 'signify-timestamp'])) + expose_headers=allowed_cors_headers)) happ.req_options.media_handlers.update(media.Handlers()) happ.resp_options.media_handlers.update(media.Handlers()) ending.loadEnds(agency=agency, app=happ) indirecting.loadEnds(agency=agency, app=happ) - server = createHttpServer(httpPort, happ, keypath, certpath, cafilepath) + server = createHttpServer(config.httpPort, happ, config.keyPath, config.certPath, config.caFilePath) if not server.reopen(): - raise RuntimeError(f"cannot create local http server on port {httpPort}") + raise RuntimeError(f"cannot create local http server on port {config.httpPort}") httpServerDoer = http.ServerDoer(server=server) doers.append(httpServerDoer) @@ -124,7 +237,7 @@ def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=No specEnd.addRoutes(happ) happ.add_route("/spec.yaml", specEnd) - print("The Agency is loaded and waiting for requests...") + logger.info("The Agency is loaded and waiting for requests...") return doers diff --git a/src/keria/app/cli/commands/start.py b/src/keria/app/cli/commands/start.py index d4fd231..99336d5 100644 --- a/src/keria/app/cli/commands/start.py +++ b/src/keria/app/cli/commands/start.py @@ -1,16 +1,13 @@ # -*- encoding: utf-8 -*- """ KERIA -keria.cli.keria.commands module +keria.cli.keria.commands.start module -Witness command line interface +KERIA Agent server start command line interface (CLI) command """ import argparse -import logging import os -import signal -from hio.base import doing from keri import __version__ from keri import help @@ -78,59 +75,34 @@ dest="bootUsername", default=os.getenv("KERIA_EXPERIMENTAL_BOOT_USERNAME")) -logger = help.ogler.getLogger() -def getListVariable(name): - value = os.getenv(name) - return value.split(";") if value else None +logger = help.ogler.getLogger() def launch(args): - help.ogler.level = logging.getLevelName(args.loglevel) - logger.setLevel(help.ogler.level) - if(args.logfile != None): - help.ogler.headDirPath = args.logfile - help.ogler.reopen(name=args.name, temp=False, clear=True) - - logger.info("Starting Agent for %s listening: admin/%s, http/%s, boot/%s", args.name, args.admin, args.http, args.boot) - logger.info("PID: %s", os.getpid()) - - doers = agenting.setup(name=args.name or "ahab", - base=args.base or "", - bran=args.bran, - adminPort=args.admin, - httpPort=args.http, - bootPort=args.boot, - configFile=args.configFile, - configDir=args.configDir, - keypath=args.keypath, - certpath=args.certpath, - cafilepath=args.cafilepath, - cors=os.getenv("KERI_AGENT_CORS", "false").lower() in ("true", "1"), - releaseTimeout=int(os.getenv("KERIA_RELEASER_TIMEOUT", "86400")), - curls=getListVariable("KERIA_CURLS"), - iurls=getListVariable("KERIA_IURLS"), - durls=getListVariable("KERIA_DURLS"), - bootPassword=args.bootPassword, - bootUsername=args.bootUsername) - agency = None - for doer in doers: - if isinstance(doer, agenting.Agency): - agency = doer - break - - tock = 0.03125 - doist = doing.Doist(limit=0.0, tock=tock, real=True) - - def handle_sigterm(sig, frame): - agents = list(agency.agents.keys()) - logger.info("Shutting down due to %s | stopping %s agents", signal.strsignal(sig), len(agents)) - for caid in agents: - agency.shut(agency.agents[caid]) - logger.info("Shutting down main Doist loop") - doist.exit() - - signal.signal(signal.SIGTERM, handle_sigterm) - - doist.do(doers=doers) - + agenting.runAgency(agenting.KERIAServerConfig( + name=args.name or "ahab", + base=args.base or "", + bran=args.bran, + adminPort=args.admin, + httpPort=args.http, + bootPort=args.boot, + configFile=args.configFile, + configDir=args.configDir, + keyPath=args.keypath, + certPath=args.certpath, + caFilePath=args.cafilepath, + logLevel=args.loglevel, + logFile=args.logfile, + cors=os.getenv("KERI_AGENT_CORS", "false").lower() in ("true", "1"), + releaseTimeout=int(os.getenv("KERIA_RELEASER_TIMEOUT", "86400")), + curls=getListVariable("KERIA_CURLS"), + iurls=getListVariable("KERIA_IURLS"), + durls=getListVariable("KERIA_DURLS"), + bootPassword=args.bootPassword, + bootUsername=args.bootUsername + )) logger.info("Agent %s gracefully stopped", args.name) + +def getListVariable(name): + value = os.getenv(name) + return value.split(";") if value else None \ No newline at end of file diff --git a/src/keria/app/serving.py b/src/keria/app/serving.py new file mode 100644 index 0000000..f2374a9 --- /dev/null +++ b/src/keria/app/serving.py @@ -0,0 +1,39 @@ +import signal + +from hio.base import doing, Doist +from keri import help + +logger = help.ogler.getLogger() + +class GracefulShutdownDoer(doing.DoDoer): + def __init__(self, doist, agency, **kwa): + self.doist: Doist = doist + self.agency = agency + self.shutdown_flag = False + + # Register signal handler + signal.signal(signal.SIGTERM, self.handle_sigterm) + signal.signal(signal.SIGINT, self.handle_sigterm) + logger.info("Registered signal handlers for SIGTERM and SIGINT") + + super().__init__(doers=[self.shutdown], **kwa) + + def handle_sigterm(self, signum, frame): + logger.info(f"Received signal {signum}, initiating graceful shutdown.") + self.shutdown_flag = True + + def shutdown_agents(self, agents): + logger.info("Stopping %s agents", len(agents)) + for caid in agents: + self.agency.shut(self.agency.agents[caid]) + + @doing.doize() + def shutdown(self, tymth, tock=0.0): + self.wind(tymth) + while not self.shutdown_flag: + yield tock + + # Once shutdown_flag is set, exit the Doist loop + self.shutdown_agents(list(self.agency.agents.keys())) + logger.info(f"Shutting down main Doist loop") + self.doist.exit() \ No newline at end of file diff --git a/tests/app/test_agenting.py b/tests/app/test_agenting.py index e0338f3..ba84938 100644 --- a/tests/app/test_agenting.py +++ b/tests/app/test_agenting.py @@ -3,30 +3,32 @@ KERIA keria.app.agenting module -Testing the Mark II Agent +Testing the Mark II Agent (KERIA) """ -from base64 import b64encode import json +import multiprocessing import os import shutil - -import pytest +import signal +import time +from base64 import b64encode import falcon import hio +import pytest +import requests from falcon import testing from hio.base import doing, tyming from hio.core import http, tcp from hio.help import decking +from keri import core from keri import kering from keri.app import habbing, configing, indirecting, oobiing, querying from keri.app.agenting import Receiptor, WitnessReceiptor -from keri import core -from keri.core import coring, serdering +from keri.core import serdering from keri.core.coring import MtrDex from keri.db import basing, dbing from keri.help import nowIso8601 -from keri.vc import proving from keri.vdr import credentialing from keria.app import agenting, aiding @@ -35,14 +37,65 @@ def test_setup_no_http(): - doers = agenting.setup(name="test", bran=None, adminPort=1234, bootPort=5678) + doers = agenting.setupDoers(agenting.KERIAServerConfig( + name="test", + adminPort=1234, + bootPort=5678, + httpPort=None, + )) assert len(doers) == 3 assert isinstance(doers[0], agenting.Agency) is True def test_setup(): - doers = agenting.setup("test", bran=None, adminPort=1234, bootPort=5678, httpPort=9999) + doers = agenting.setupDoers(agenting.KERIAServerConfig( + name="test", + adminPort=1234, + bootPort=5678, + httpPort=9999, + )) assert len(doers) == 4 +def wait_for_server(port, timeout=10): + """Poll server until it responds or until timeout""" + url=f"http://127.0.0.1:{port}/health" + start_time=time.time() + while time.time() - start_time < timeout: + try: + response = requests.get(url) + if response.status_code == 200: + return True # Server is up + except requests.ConnectionError: + pass # Server not ready yet + time.sleep(0.25) # Retry every 1/4 second + return False # Timeout + +def test_shutdown_signals(): + config = agenting.KERIAServerConfig( + adminPort=3333, + bootPort=4444, + httpPort=5555, + ) + + # Test SIGTERM + agency_process = multiprocessing.Process(target=agenting.runAgency, args=[config]) + agency_process.start() + assert wait_for_server(config.bootPort), "Agency did not start as expected." + + os.kill(agency_process.pid, signal.SIGTERM) # Send SigTerm to the process, signal 15 + agency_process.join(timeout=10) + assert not agency_process.is_alive(), "SIGTERM: Agency process did not shut down as expected." + assert agency_process.exitcode == 0, f"SIGTERM: Agency exited with non-zero exit code {agency_process.exitcode}" + + # Test SIGINT + agency_process = multiprocessing.Process(target=agenting.runAgency, args=[config]) + agency_process.start() + assert wait_for_server(config.bootPort), "Agency did not start as expected." + + os.kill(agency_process.pid, signal.SIGINT) # Sends SigInt to the process, signal 2 + agency_process.join(timeout=10) + assert not agency_process.is_alive(), "SIGINT: Agency process did not shut down as expected." + assert agency_process.exitcode == 0, f"SIGINT: Agency exited with non-zero exit code {agency_process.exitcode}" + def test_load_ends(helpers): with helpers.openKeria() as (agency, agent, app, client): From 42ac5c82e6cb75b52b0060a9991ce5ecf6d3fb5f Mon Sep 17 00:00:00 2001 From: Kent Bull Date: Sat, 11 Jan 2025 10:34:33 -0700 Subject: [PATCH 5/7] refactor: DoDoer -> Doer for GracefulShutdownDoer This uses lifecycle methods appropriately to set up the signal handlers, mark the GracefulShutdownDoer as done when the shutdown flag is set, and tear down the Doist in the Doer.exit() method --- src/keria/app/serving.py | 64 ++++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/src/keria/app/serving.py b/src/keria/app/serving.py index f2374a9..6b05d8e 100644 --- a/src/keria/app/serving.py +++ b/src/keria/app/serving.py @@ -5,35 +5,67 @@ logger = help.ogler.getLogger() -class GracefulShutdownDoer(doing.DoDoer): +class GracefulShutdownDoer(doing.Doer): + """ + Shuts all Agency agents down before exiting the Doist loop, performing a graceful shutdown. + Sets up signal handlers in the Doer.enter lifecycle method and exits the Doist scheduler loop in Doer.exit + Checks for the signals in the Doer.recur lifecycle method. + """ def __init__(self, doist, agency, **kwa): + """ + Parameters: + doist (Doist): The Doist running this Doer + agency (Agency): The Agency containing Agent instances to be gracefully shut down + kwa (dict): Additional keyword arguments for Doer initialization + """ self.doist: Doist = doist self.agency = agency - self.shutdown_flag = False + self.shutdown_received = False - # Register signal handler - signal.signal(signal.SIGTERM, self.handle_sigterm) - signal.signal(signal.SIGINT, self.handle_sigterm) - logger.info("Registered signal handlers for SIGTERM and SIGINT") - - super().__init__(doers=[self.shutdown], **kwa) + super().__init__(**kwa) def handle_sigterm(self, signum, frame): - logger.info(f"Received signal {signum}, initiating graceful shutdown.") - self.shutdown_flag = True + """Handler function for SIGTERM""" + logger.info(f"Received SIGTERM, initiating graceful shutdown.") + self.shutdown_received = True + + def handle_sigint(self, signum, frame): + """Handler function for SIGINT""" + logger.info(f"Received SIGINT, initiating graceful shutdown.") + self.shutdown_received = True def shutdown_agents(self, agents): + """Helper function to shut down the agents.""" logger.info("Stopping %s agents", len(agents)) for caid in agents: self.agency.shut(self.agency.agents[caid]) - @doing.doize() - def shutdown(self, tymth, tock=0.0): - self.wind(tymth) - while not self.shutdown_flag: - yield tock + def enter(self): + """ + Sets up signal handlers. + Lifecycle method called once when the Doist running this Doer enters the context for this Doer. + """ + # Register signal handler + signal.signal(signal.SIGTERM, self.handle_sigterm) + signal.signal(signal.SIGINT, self.handle_sigint) + logger.info("Registered signal handlers for SIGTERM and SIGINT") + + def recur(self, tock=0.0): + """Generator coroutine checking once per tock for shutdown flag""" + # Checks once per tock if the shutdown flag has been set and if so initiates the shutdown process + while not self.shutdown_received: + yield tock # will iterate forever in here until shutdown flag set # Once shutdown_flag is set, exit the Doist loop self.shutdown_agents(list(self.agency.agents.keys())) + + return True # Returns a "done" status + # Causes the Doist scheduler to call .exit() lifecycle method below, killing the doist loop + + def exit(self): + """ + Exits the Doist loop. + Lifecycle method called once when the Doist running this Doer exits the context for this Doer. + """ logger.info(f"Shutting down main Doist loop") - self.doist.exit() \ No newline at end of file + self.doist.exit() From b722e095f4982d40e56904f96b6de4083d5917da Mon Sep 17 00:00:00 2001 From: Kent Bull Date: Sun, 12 Jan 2025 21:03:15 -0700 Subject: [PATCH 6/7] refactor: remove redundant SigInt handler Since the Doist loop listens for the KeyboardInterrupt exception then it effectively handles the Ctrl-C SigInt interrupt already which means the shutdown handler only has to handle the SigTerm signal. --- src/keria/app/serving.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/keria/app/serving.py b/src/keria/app/serving.py index 6b05d8e..0b18f82 100644 --- a/src/keria/app/serving.py +++ b/src/keria/app/serving.py @@ -8,8 +8,8 @@ class GracefulShutdownDoer(doing.Doer): """ Shuts all Agency agents down before exiting the Doist loop, performing a graceful shutdown. - Sets up signal handlers in the Doer.enter lifecycle method and exits the Doist scheduler loop in Doer.exit - Checks for the signals in the Doer.recur lifecycle method. + Sets up signal handler in the Doer.enter lifecycle method and exits the Doist scheduler loop in Doer.exit + Checks for the shutdown flag being set in the Doer.recur lifecycle method. """ def __init__(self, doist, agency, **kwa): """ @@ -29,11 +29,6 @@ def handle_sigterm(self, signum, frame): logger.info(f"Received SIGTERM, initiating graceful shutdown.") self.shutdown_received = True - def handle_sigint(self, signum, frame): - """Handler function for SIGINT""" - logger.info(f"Received SIGINT, initiating graceful shutdown.") - self.shutdown_received = True - def shutdown_agents(self, agents): """Helper function to shut down the agents.""" logger.info("Stopping %s agents", len(agents)) @@ -47,8 +42,7 @@ def enter(self): """ # Register signal handler signal.signal(signal.SIGTERM, self.handle_sigterm) - signal.signal(signal.SIGINT, self.handle_sigint) - logger.info("Registered signal handlers for SIGTERM and SIGINT") + logger.info("Registered signal handlers for SIGTERM") def recur(self, tock=0.0): """Generator coroutine checking once per tock for shutdown flag""" From 170398cdd32e95184acb38ae3a5be5983d4b4868 Mon Sep 17 00:00:00 2001 From: Kent Bull Date: Mon, 13 Jan 2025 08:32:45 -0700 Subject: [PATCH 7/7] ci: add requests to tests --- .github/workflows/python-app-ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-app-ci.yml b/.github/workflows/python-app-ci.yml index 34b39b8..a53b535 100644 --- a/.github/workflows/python-app-ci.yml +++ b/.github/workflows/python-app-ci.yml @@ -28,7 +28,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install flake8 pytest hio + pip install flake8 pytest hio requests if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - name: Lint changes run: | @@ -51,7 +51,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install pytest pytest-cov hio + pip install pytest pytest-cov hio requests if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - name: Run core KERIA tests run: |