Skip to content

Enable TNU to use Azure auth for DRS operations #399 #401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ __pycache__/
/.idea
/*.iml

# Default virtual environment name used by PyCharm
/.venv

# JS/node/npm/web dev files
node_modules
npm-debug.log
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
azure-identity >= 1.12.0, < 2
google-cloud-storage >= 1.38.0, < 2
gs-chunked-io >= 0.5.1, < 0.6
firecloud
Expand Down
19 changes: 19 additions & 0 deletions terra_notebook_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
from dataclasses import dataclass
from enum import Enum

WORKSPACE_NAME = os.environ.get('WORKSPACE_NAME', None)
WORKSPACE_NAMESPACE = os.environ.get('WORKSPACE_NAMESPACE') # This env var is set in Terra Cloud Environments
Expand Down Expand Up @@ -26,3 +28,20 @@
DRS_RESOLVER_URL = MARTHA_URL
else:
DRS_RESOLVER_URL = DRSHUB_URL


class ExecutionEnvironment(Enum):
TERRA_WORKSPACE = 1, # Executing in a Terra Workspace (on any supported platform)
OTHER = 2 # Executing outside of a Terra Workspace (e.g., local system)


class ExecutionPlatform(Enum):
AZURE = 1, # Executing in an Azure compute environment
GOOGLE = 2, # Executing in a Google compute environment
UNKNOWN = 3 # Execution platform not identified (e.g., local system)


@dataclass
class ExecutionContext:
execution_environment: ExecutionEnvironment
execution_platform: ExecutionPlatform
70 changes: 70 additions & 0 deletions terra_notebook_utils/azure_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Microsoft Azure identity/auth support.

See:
https://azuresdkdocs.blob.core.windows.net/$web/python/azure-identity/1.12.0/index.html
https://learn.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential?view=azure-python
https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/azure/identity/_credentials/default.py
"""
import logging
import os
from typing import Optional

from azure.identity import DefaultAzureCredential
from terra_notebook_utils.logger import logger


# Single instance of DefaultAzureCredential that initialized lazily.
# The instance is treated as threadsafe and reusable.
# The Azure documentation is silent on thread safety.
# Based on scanning the code, it appears to be threadsafe.
# See: https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/
# azure-identity/azure/identity/_credentials/default.py
_AZURE_CREDENTIAL: Optional[DefaultAzureCredential] = None


def _set_azure_identity_logging_level(level) -> None:
""" Set the logging level for modules participating the Azure default credential flow """
import azure.identity
logging.getLogger(azure.identity._credentials.environment.__name__).setLevel(level)
logging.getLogger(azure.identity._credentials.managed_identity.__name__).setLevel(level)
logging.getLogger(azure.identity._credentials.chained.__name__).setLevel(level)
logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(level)


# Suppress extraneous azure-identity INFO level logging
_set_azure_identity_logging_level(logging.WARNING)


def _get_default_credential() -> DefaultAzureCredential:
"""
Instantiate DefaultAzureCredential lazily if/when needed.

Note: It would not need to be instantiated this way, as
# no exception is raised even if Azure credentials are not configured.
:return: Reference to instance of DefaultAzureCredential
"""

# Should a more sophisticated Singleton pattern be used instead?
global _AZURE_CREDENTIAL
if not _AZURE_CREDENTIAL:
_AZURE_CREDENTIAL = DefaultAzureCredential()
return _AZURE_CREDENTIAL


def get_azure_access_token() -> str:
"""
Return an Azure access token.

raises ClientAuthenticationError
"""
if os.environ.get('TERRA_NOTEBOOK_AZURE_ACCESS_TOKEN'):
logger.debug("Using Azure token configured using 'TERRA_NOTEBOOK_AZURE_ACCESS_TOKEN'")
token = os.environ['TERRA_NOTEBOOK_AZURE_ACCESS_TOKEN']
else:
logger.debug("Requesting Azure default credentials token.")
token_scope = "https://management.azure.com/.default"
azure_token = _get_default_credential().get_token(token_scope)
logger.debug("Using Azure default credentials token.")
token = azure_token.token
return token
5 changes: 3 additions & 2 deletions terra_notebook_utils/drs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from terra_notebook_utils.blobstore.progress import Indicator
from terra_notebook_utils.blobstore import Blob, copy_client, BlobNotFoundError
from terra_notebook_utils.logger import logger
from terra_notebook_utils.terra_auth import get_terra_access_token


DRSInfo = namedtuple("DRSInfo", "credentials access_url bucket_name key name size updated checksums")
Expand Down Expand Up @@ -46,7 +47,7 @@ def enable_requester_pays(workspace_name: Optional[str]=WORKSPACE_NAME,
rawls_url = (f"https://rawls.dsde-{TERRA_DEPLOYMENT_ENV}.broadinstitute.org/api/workspaces/"
f"{workspace_namespace}/{encoded_workspace}/enableRequesterPaysForLinkedServiceAccounts")
logger.info("Enabling requester pays for your workspace. This will only take a few seconds...")
access_token = gs.get_access_token()
access_token = get_terra_access_token()

headers = {
'authorization': f"Bearer {access_token}",
Expand All @@ -61,7 +62,7 @@ def enable_requester_pays(workspace_name: Optional[str]=WORKSPACE_NAME,

def get_drs(drs_url: str, fields: List[str]) -> Response:
"""Request DRS information from DRS Resolver."""
access_token = gs.get_access_token()
access_token = get_terra_access_token()

headers = {
'authorization': f"Bearer {access_token}",
Expand Down
4 changes: 2 additions & 2 deletions terra_notebook_utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import json
import logging
from logging import Logger

logger = logging.getLogger(__name__)
logger: Logger = logging.getLogger(__name__)
70 changes: 70 additions & 0 deletions terra_notebook_utils/terra_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Support for auth with Terra backend services.
"""
from azure.core.exceptions import ClientAuthenticationError
from google.auth.exceptions import DefaultCredentialsError

from terra_notebook_utils import azure_auth, gs, ExecutionPlatform
from terra_notebook_utils.logger import logger
from terra_notebook_utils.utils import get_execution_context


class AuthenticationError(Exception):
pass


class TerraAuthTokenProvider:
"""
Provides auth bearer tokens suitable for use with Terra backend services.
"""
def __init__(self):
self.execution_context = get_execution_context()

@staticmethod
def _identify_valid_access_token() -> str:
"""
Try to obtain an auth bearer tokens suitable for use with Terra backend services
from the Terra supported auth providers. First try Google, then try Azure.
Return the first successfully obtained token, otherwise raise AuthenticationError.

:return: auth bearer token suitable for use with Terra backend services
:raises: AuthenticationError
"""
try:
logger.debug("Attempting to obtain a Google access token to use with Terra backend services.")
google_token = gs.get_access_token()
logger.debug("Using Google access token to use with Terra backend services.")
return google_token
except DefaultCredentialsError as ex:
logger.debug("Failed to obtain a Google access token to use with Terra backend services.", exc_info=ex)

try:
logger.debug("Attempting to obtain a Azure access token to use with Terra backend services.")
azure_token = azure_auth.get_azure_access_token()
logger.debug("Using Azure access token to use with Terra backend services.")
return azure_token
except ClientAuthenticationError as ex:
logger.debug("Failed to obtain a Azure access token to use with Terra backend services.", exc_info=ex)

raise AuthenticationError("Failed to obtain a Google or Azure token to auth with Terra backend services.")

def get_terra_access_token(self) -> str:
if self.execution_context.execution_platform == ExecutionPlatform.GOOGLE:
logger.debug("Using Google default credentials to auth with Terra services.")
return gs.get_access_token()
elif self.execution_context.execution_platform == ExecutionPlatform.AZURE:
logger.debug("Using Azure default credentials to auth with Terra services.")
return azure_auth.get_azure_access_token()
else:
return self._identify_valid_access_token()


# Single instance of TerraAuthTokenProvider.
TERRA_AUTH_TOKEN_PROVIDER = TerraAuthTokenProvider()


def get_terra_access_token() -> str:
""" Return an auth bearer token suitable for use with Terra backend services.
:raises: AuthenticationError
"""
return TERRA_AUTH_TOKEN_PROVIDER.get_terra_access_token()
25 changes: 25 additions & 0 deletions terra_notebook_utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import json
import os
import threading
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, Future, as_completed
from typing import Any, Callable, Dict, Optional, Iterable, Set

import jmespath

from terra_notebook_utils import ExecutionEnvironment, ExecutionPlatform, ExecutionContext


class _AsyncContextManager:
"""Context manager for asynchronous execution. Wait on exit for all jobs to complete."""
Expand Down Expand Up @@ -63,3 +66,25 @@ def is_notebook() -> bool:
return "ZMQInteractiveShell" == get_ipython().__class__.__name__ # type: ignore
except NameError:
return False


@lru_cache()
def get_execution_context() -> ExecutionContext:
"""
Identify information about the context in which terra-notebook-utils is executing.
TODO Improve the information available and algorithm to identify these values accurately!
"""
# Workaround current insufficient information by assuming
# the execution environment is Terra, as that is the most
# common and important case.
# execution_environment = ExecutionEnvironment.OTHER
execution_environment = ExecutionEnvironment.TERRA_WORKSPACE
execution_platform = ExecutionPlatform.UNKNOWN
workspace_bucket = os.environ.get('WORKSPACE_BUCKET', None)
if workspace_bucket and workspace_bucket.startswith("gs://"):
execution_platform = ExecutionPlatform.GOOGLE
else:
# Workaround current insufficient information by assuming
# the execution platform is not Google then it is Azure.
execution_platform = ExecutionPlatform.AZURE
return ExecutionContext(execution_environment, execution_platform)