diff --git a/falcon_toolkit/common/auth.py b/falcon_toolkit/common/auth.py index 228a2f6..3097b2c 100644 --- a/falcon_toolkit/common/auth.py +++ b/falcon_toolkit/common/auth.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: from caracara import Client + from click import Context class AuthBackend(ABC): @@ -51,7 +52,7 @@ def __init__(self, config: Dict = None): """ @abstractmethod - def authenticate(self) -> Client: + def authenticate(self, ctx: Context) -> Client: """Return a complete OAuth2 object, ready for use with FalconPy.""" @abstractmethod diff --git a/falcon_toolkit/common/auth_backends/public_mssp.py b/falcon_toolkit/common/auth_backends/public_mssp.py index 6b9da1e..42ba425 100644 --- a/falcon_toolkit/common/auth_backends/public_mssp.py +++ b/falcon_toolkit/common/auth_backends/public_mssp.py @@ -4,10 +4,9 @@ user which child CID to authenticate against. """ -import os - from typing import Dict, Optional +import click import keyring from caracara import Client @@ -94,15 +93,17 @@ def dump_config(self) -> Dict[str, object]: keyring. """ config: Dict[str, object] = {} - config["client_id"]: str = self.client_id - config["cloud_name"]: str = self.cloud_name - config["ssl_verify"]: bool = self.ssl_verify - config["proxy"]: Dict[str, str] = self.proxy + config["client_id"] = self.client_id + config["cloud_name"] = self.cloud_name + config["ssl_verify"] = self.ssl_verify + config["proxy"] = self.proxy return config - def authenticate(self) -> Client: + def authenticate(self, ctx: click.Context) -> Client: """Log the Toolkit into Falcon using the settings and keys configured at instance setup.""" + chosen_cid_str = ctx.obj["cid"] + parent_client = Client( client_id=self.client_id, client_secret=self.client_secret, @@ -111,18 +112,39 @@ def authenticate(self) -> Client: proxy=self.proxy, ) child_cids = parent_client.flight_control.get_child_cids() - chosen_cid_str = os.environ.get("FALCON_MSSP_CHILD_CID") - if chosen_cid_str and chosen_cid_str.lower() in child_cids: - chosen_cid = parent_client.flight_control.get_child_cid_data(cids=[chosen_cid_str])[ - chosen_cid_str - ] - else: - child_cids_data = parent_client.flight_control.get_child_cid_data(cids=child_cids) - chosen_cid_str = choose_cid(cids=child_cids_data, prompt_text="MSSP Child CID Search") - chosen_cid = child_cids_data[chosen_cid_str] - - chosen_cid_name = chosen_cid["name"] - print(f"Connecting to {chosen_cid_name}") + + if chosen_cid_str and chosen_cid_str in child_cids: + click.echo( + click.style("Valid member CID ", fg="blue") + + click.style(chosen_cid_str, fg="blue", bold=True) + + click.style(" provided. Skipping CID selection.", fg="blue") + ) + elif chosen_cid_str: + click.echo(click.style("An invalid CID was provided at the command line.", fg="red")) + click.echo("Please search for an alternative CID:") + # Blank out a bad value + chosen_cid_str = None + + if not chosen_cid_str: + if chosen_cid_str and chosen_cid_str.lower() in child_cids: + chosen_cid = parent_client.flight_control.get_child_cid_data( + cids=[chosen_cid_str], + )[chosen_cid_str] + else: + child_cids_data = parent_client.flight_control.get_child_cid_data(cids=child_cids) + if not child_cids_data: + raise RuntimeError( + "No child CIDs accessible. Please check your API credentials." + ) + + chosen_cid_str = choose_cid( + cids=child_cids_data, + prompt_text="MSSP Child CID Search", + ) + chosen_cid = child_cids_data[chosen_cid_str] + + chosen_cid_name = chosen_cid["name"] + print(f"Connecting to {chosen_cid_name}") client = Client( client_id=self.client_id, diff --git a/falcon_toolkit/common/auth_backends/public_single_cid.py b/falcon_toolkit/common/auth_backends/public_single_cid.py index 665507c..41e3602 100644 --- a/falcon_toolkit/common/auth_backends/public_single_cid.py +++ b/falcon_toolkit/common/auth_backends/public_single_cid.py @@ -6,6 +6,7 @@ from typing import Dict, Optional +import click import keyring from caracara import Client @@ -89,7 +90,7 @@ def dump_config(self) -> Dict[str, object]: return config - def authenticate(self) -> Client: + def authenticate(self, ctx: click.Context) -> Client: """Log the Toolkit into Falcon using the settings and keys configured at instance setup.""" client = Client( client_id=self.client_id, diff --git a/falcon_toolkit/containment/cli.py b/falcon_toolkit/containment/cli.py index 03174bf..5465fe7 100644 --- a/falcon_toolkit/containment/cli.py +++ b/falcon_toolkit/containment/cli.py @@ -66,7 +66,7 @@ def cli_containment( ): """Manage the containment status of hosts in Falcon.""" instance = get_instance(ctx) - client: Client = instance.auth_backend.authenticate() + client: Client = instance.auth_backend.authenticate(ctx) ctx.obj["client"] = client device_ids = None diff --git a/falcon_toolkit/falcon.py b/falcon_toolkit/falcon.py index 5dd6420..e1fae28 100755 --- a/falcon_toolkit/falcon.py +++ b/falcon_toolkit/falcon.py @@ -86,11 +86,22 @@ "profile (Falcon Tenant) set up, this parameter is not required." ), ) +@click.option( + "--cid", + envvar="FALCON_TOOLKIT_CID", + type=click.STRING, + default=None, + help=( + "Specify the CID to connect to. Note that this only applies to authentication backends " + "(e.g., MSSP) that support multiple CIDs through the same set of API keys." + ), +) def cli( ctx: click.Context, config_path: str, verbose: bool, profile: str, + cid: str, ): r"""Falcon Toolkit. @@ -207,6 +218,9 @@ def cli( # Pass a profile name down the chain in case one is selected ctx.obj["profile_name"] = profile + # Store the CID in the context for optional use later + ctx.obj["cid"] = cid + @cli.result_callback() def cli_process_result( # pylint: disable=unused-argument diff --git a/falcon_toolkit/hosts/cli.py b/falcon_toolkit/hosts/cli.py index 8b08397..77f9734 100644 --- a/falcon_toolkit/hosts/cli.py +++ b/falcon_toolkit/hosts/cli.py @@ -55,7 +55,7 @@ def cli_host_search( ): """Implement the host_search CLI command.""" instance = get_instance(ctx) - client = instance.auth_backend.authenticate() + client = instance.auth_backend.authenticate(ctx) filters = parse_cli_filters(filter_kv_strings, client) # Handle validation of the CSV export path here, before the search executes in host_search_cmd. diff --git a/falcon_toolkit/maintenance_token/cli.py b/falcon_toolkit/maintenance_token/cli.py index 9db4c60..7c5c088 100644 --- a/falcon_toolkit/maintenance_token/cli.py +++ b/falcon_toolkit/maintenance_token/cli.py @@ -75,7 +75,7 @@ def cli_maintenance_token( ): """Get system maintenance tokens from Falcon.""" instance = get_instance(ctx) - client: Client = instance.auth_backend.authenticate() + client: Client = instance.auth_backend.authenticate(ctx) ctx.obj["client"] = client # Bulk token is a special case we can handle here. diff --git a/falcon_toolkit/policies/cli.py b/falcon_toolkit/policies/cli.py index dcce343..37d33db 100644 --- a/falcon_toolkit/policies/cli.py +++ b/falcon_toolkit/policies/cli.py @@ -57,7 +57,7 @@ def cli_policies( ): """Configure the future profiles commands by getting the context in shape.""" instance = get_instance(ctx) - client: Client = instance.auth_backend.authenticate() + client: Client = instance.auth_backend.authenticate(ctx) ctx.obj["client"] = client if prevention_policies_option: diff --git a/falcon_toolkit/shell/cli.py b/falcon_toolkit/shell/cli.py index 220df33..875a87e 100644 --- a/falcon_toolkit/shell/cli.py +++ b/falcon_toolkit/shell/cli.py @@ -122,7 +122,7 @@ def cli_shell( # pylint: disable=too-many-arguments,too-many-locals start the REPL command loop. This passes control over to the shell, via the Cmd2 library. """ instance = get_instance(ctx) - client = instance.auth_backend.authenticate() + client = instance.auth_backend.authenticate(ctx) # Show online hosts only if queueing is false online_state = None if queueing else OnlineState.ONLINE diff --git a/pyproject.toml b/pyproject.toml index 09ea81c..0c4c8cb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "falcon-toolkit" -version = "3.4.1" +version = "3.4.2" description = "Toolkit to interface with CrowdStrike Falcon via the API" license = "MIT" authors = [