Skip to content

Commit

Permalink
[BUGFIX] various bugfixes (#371)
Browse files Browse the repository at this point in the history
* provide fixes & reflect changelog

* add integrity tests

* add tests

* add fix file content call

* test file uploader

* remove unused funcs

* add context uploader tests

* fix upload logic

* add exec messages while upload

* fix echo

* finalize fixes

* fix readme

* prep release
  • Loading branch information
renardeinside authored Aug 4, 2022
1 parent a7e702a commit db628f3
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 37 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/onpush.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ jobs:
git config --global init.defaultBranch main # to avoid verbose deprecation messages in CI pipeline
pytest tests/unit --cov dbx -n auto
- name: Run integrity tests
run: |
python -m dbx --help
python -c "import dbx; print(dbx.__version__)"
- name: Publish test coverage
if: startsWith(matrix.os,'ubuntu')
Expand Down
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

[Please read through the Keep a Changelog (~5min)](https://keepachangelog.com/en/1.0.0/).

## [X.Y.Z] - YYYY-MM-DD
## [0.6.10] - 2022-08-04

## Added
- Added support for `python_wheel_task` in `dbx execute`

## Fixed
- Error in case when `.dbx/project.json` is non-existent
- Error in case when `environment` is not provided in the project file
- Path usage when `--upload-via-context` on win platform

----
> Unreleased changes must be tracked above this line.
> When releasing, Copy the changelog to below this line, with proper version and date.
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Limitations
* | :code:`dbx` currently doesn't provide interactive debugging capabilities.
| If you want to use interactive debugging, you can use `Databricks Connect <https://docs.databricks.com/dev-tools/databricks-connect.html>`_ + :code:`dbx` for deployment operations.
* :code:`dbx execute` only supports Python-based projects which use :code:`spark_python_task` (Notebooks or Repos are not supported in :code:`dbx execute`).
* :code:`dbx execute` only supports Python-based projects which use :code:`spark_python_task` or :code:`python_wheel_task`. Notebooks or Repos are not supported in :code:`dbx execute`.

* :code:`dbx execute` can only be used on clusters with Databricks ML Runtime 7.X or higher.

Expand Down
2 changes: 1 addition & 1 deletion dbx/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.6.9"
__version__ = "0.6.10"
25 changes: 17 additions & 8 deletions dbx/api/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ def __init__(self, file_path: Optional[Path] = INFO_FILE_PATH):
@property
def _file_content(self) -> Dict[str, EnvironmentInfo]:
if not self._file.exists():
return {}
else:
_raw: Dict = JsonUtils.read(self._file).get("environments", {})
_typed = {name: EnvironmentInfo(**value) for name, value in _raw.items()}
return _typed
raise FileNotFoundError(f"dbx project file not found at absolute path {self._file.absolute()}")

_raw: Dict = JsonUtils.read(self._file).get("environments", {})
_typed = {name: EnvironmentInfo(**value) for name, value in _raw.items()}
return _typed

@_file_content.setter
def _file_content(self, content: Dict[str, EnvironmentInfo]):
Expand All @@ -73,9 +73,18 @@ def get(self, name: str) -> Optional[EnvironmentInfo]:
return self._file_content.get(name)

def create(self, name: str, environment_info: EnvironmentInfo):
_new = self._file_content.copy()
_new.update({name: environment_info})
self._file_content = _new
if self._file.exists():
_new = self._file_content.copy()
_new.update({name: environment_info})
self._file_content = _new
else:
self._file_content = {name: environment_info}

def create_or_update(self, name: str, environment_info: EnvironmentInfo):
if self._file.exists() and self.get(name):
self.update(name, environment_info)
else:
self.create(name, environment_info)


class ConfigurationManager:
Expand Down
17 changes: 8 additions & 9 deletions dbx/api/context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json
import pathlib
import time
from base64 import b64encode
from pathlib import Path
Expand All @@ -13,7 +12,7 @@


class LocalContextManager:
context_file_path: pathlib.Path = LOCK_FILE_PATH
context_file_path: Path = LOCK_FILE_PATH

@classmethod
def set_context(cls, context_id: str) -> None:
Expand Down Expand Up @@ -119,8 +118,8 @@ class RichExecutionContextClient:
def __init__(self, v2_client: ApiClient, cluster_id: str, language: str = "python"):
self._client = LowLevelExecutionContextClient(v2_client, cluster_id, language)

def install_package(self, package_file: Path):
installation_command = f"%pip install --force-reinstall {package_file.absolute()}"
def install_package(self, package_file: str):
installation_command = f"%pip install --force-reinstall {package_file}"
self._client.execute_command(installation_command, verbose=False)

def setup_arguments(self, arguments: List[Any]):
Expand All @@ -147,12 +146,12 @@ def execute_entry_point(self, package_name: str, entry_point: str):
def client(self):
return self._client

def get_temp_dir(self) -> Path:
def get_temp_dir(self) -> str:
command = """
from tempfile import mkdtemp
print(mkdtemp())
"""
return Path(self._client.execute_command(command, verbose=False))
return self._client.execute_command(command, verbose=False)

def remove_dir(self, _dir: str):
command = f"""
Expand All @@ -161,17 +160,17 @@ def remove_dir(self, _dir: str):
"""
self._client.execute_command(command, verbose=False)

def upload_file(self, file: Path, prefix_dir: Path) -> Path:
def upload_file(self, file: Path, prefix_dir: str) -> str:
_contents = file.read_bytes()
contents = b64encode(_contents)
command = f"""
from pathlib import Path
from base64 import b64decode
DBX_UPLOAD_CONTENTS = b64decode({contents})
file_path = Path("{prefix_dir}") / "{file}"
file_path = Path("{prefix_dir}") / "{file.as_posix()}"
if not file_path.parent.exists():
file_path.parent.mkdir(parents=True)
file_path.write_bytes(DBX_UPLOAD_CONTENTS)
print(file_path)
"""
return Path(self._client.execute_command(command, verbose=False))
return self._client.execute_command(command, verbose=False)
8 changes: 5 additions & 3 deletions dbx/api/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ def install_package(self):
package_file = get_package_file()
if not package_file:
raise FileNotFoundError("Project package was not found. Please check that /dist directory exists.")
dbx_echo("Installing package")
dbx_echo("Uploading package")
driver_package_path = self._file_uploader.upload_and_provide_path(package_file, as_fuse=True)
self._client.install_package(Path(driver_package_path))
dbx_echo("Package installation finished")
dbx_echo("Uploading package - done")
dbx_echo("Installing package")
self._client.install_package(driver_package_path)
dbx_echo("Installing package - done")

def preprocess_task_parameters(self, parameters: List[str]):
dbx_echo(f"Processing task parameters: {parameters}")
Expand Down
2 changes: 2 additions & 0 deletions dbx/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def transfer_profile_name(info: EnvironmentInfo):

def prepare_environment(env_name: str) -> ApiClient:
info = ConfigurationManager().get(env_name)
if not info:
raise Exception(f"Environment {env_name} is not provided in the project file")
transfer_profile_name(info)
MlflowStorageConfigurationManager.prepare(info)
return DatabricksClientProvider.get_v2_client()
Expand Down
15 changes: 4 additions & 11 deletions dbx/utils/file_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import mlflow
from retry import retry

from dbx.api.configure import ConfigurationManager
from dbx.api.context import RichExecutionContextClient
from dbx.utils import dbx_echo

Expand Down Expand Up @@ -33,9 +32,11 @@ def upload_and_provide_path(self, local_file_path: Path, as_fuse: Optional[bool]
if local_file_path in self._uploaded_files:
remote_path = self._uploaded_files[local_file_path]
else:
dbx_echo(f"Uploading local file {local_file_path}")
self._upload_file(local_file_path)
remote_path = "/".join([self._base_uri, str(local_file_path.as_posix())])
self._uploaded_files[local_file_path] = remote_path
dbx_echo(f"Uploading local file {local_file_path} - done")

remote_path = remote_path.replace("dbfs:/", "/dbfs/") if as_fuse else remote_path
return remote_path
Expand All @@ -49,30 +50,22 @@ class MlflowFileUploader(AbstractFileUploader):
@staticmethod
@retry(tries=3, delay=1, backoff=0.3)
def _upload_file(file_path: Path):
dbx_echo(f"Uploading file {file_path}")
posix_path = PurePosixPath(file_path.as_posix())
parent = str(posix_path.parent) if str(posix_path.parent) != "." else None
dbx_echo(f"Uploading file {file_path} - done")
mlflow.log_artifact(str(file_path), parent)


class ContextBasedUploader(AbstractFileUploader):
def __init__(self, client: RichExecutionContextClient):
self._client = client
temp_dir = self._client.get_temp_dir()
super().__init__(base_uri=str(temp_dir))
super().__init__(base_uri=temp_dir)

def _verify_fuse_support(self):
dbx_echo("Skipping the FUSE check since context-based uploader is used")

@staticmethod
def _get_current_experiment_id(env_name: str) -> str:
info = ConfigurationManager().get(env_name)
return mlflow.get_experiment_by_name(info.workspace_dir).experiment_id

def _upload_file(self, local_file_path: Path):
temp_remote_file_path = self._client.upload_file(local_file_path, Path(self._base_uri))
dbx_echo(f"File uploaded to temp location {temp_remote_file_path}")
self._client.upload_file(local_file_path, self._base_uri)

def __del__(self):
try:
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/api/test_configure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import shutil
from pathlib import Path

import pytest

from dbx.api.configure import ConfigurationManager


def test_configure_non_existent_project(temp_project: Path):
shutil.rmtree(temp_project / ".dbx")
with pytest.raises(FileNotFoundError):
ConfigurationManager().get("default")
19 changes: 16 additions & 3 deletions tests/unit/utils/test_file_uploader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from pathlib import PurePosixPath, PureWindowsPath, Path
from unittest.mock import patch
from unittest.mock import patch, MagicMock

import pytest

from dbx.utils.file_uploader import MlflowFileUploader
from dbx.utils.file_uploader import MlflowFileUploader, ContextBasedUploader

TEST_ARTIFACT_PATHS = ["s3://some/prefix", "dbfs:/some/prefix", "adls://some/prefix", "gs://some/prefix"]


@patch("mlflow.log_artifact", return_value=None)
def test_uploader(_):
def test_mlflow_uploader(_):
local_paths = [PurePosixPath("/some/local/file"), PureWindowsPath("C:\\some\\file")]

for artifact_uri in TEST_ARTIFACT_PATHS:
Expand All @@ -20,6 +20,19 @@ def test_uploader(_):
assert expected_path == resulting_path


def test_context_uploader():
local_paths = [PurePosixPath("/some/local/file"), PureWindowsPath("C:\\some\\file")]
client = MagicMock()
base_uri = "/tmp/some/path"
client.get_temp_dir = MagicMock(return_value=base_uri)

for local_path in local_paths:
uploader = ContextBasedUploader(client)
resulting_path = uploader.upload_and_provide_path(local_path)
expected_path = "/".join([base_uri, str(local_path.as_posix())])
assert expected_path == resulting_path


@patch("mlflow.log_artifact", return_value=None)
def test_fuse_support(_):
local_path = Path("/some/local/file")
Expand Down

0 comments on commit db628f3

Please sign in to comment.