Skip to content

Commit

Permalink
added tests for s3 decorators
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Oct 29, 2024
1 parent 0637d49 commit e523b88
Show file tree
Hide file tree
Showing 10 changed files with 411 additions and 346 deletions.
2 changes: 1 addition & 1 deletion cgatcore/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,13 @@ class method (:func:`cached_method`) calls.
import functools
import gzip
import warnings
import pipes
import optparse
import argparse
import textwrap
import random
import uuid
import yaml
import shlex as pipes # Use shlex as a replacement for pipes
# import convenience functions from logging
import logging
import logging.config
Expand Down
49 changes: 23 additions & 26 deletions cgatcore/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ def loadData(infile, outfile):
'''
# cgatcore/pipeline/__init__.py



# Import existing pipeline functionality
from cgatcore.pipeline.control import *
from cgatcore.pipeline.database import *
Expand All @@ -164,7 +166,6 @@ def loadData(infile, outfile):
from cgatcore.pipeline.utils import *
from cgatcore.pipeline.parameters import *


# Import original Ruffus decorators
from ruffus import (
transform,
Expand All @@ -174,36 +175,32 @@ def loadData(infile, outfile):
follows
)

# Import S3-aware decorators and functions
from cgatcore.remote.file_handler import (
s3_transform,
s3_merge,
s3_split,
s3_originate,
s3_follows,
S3Mapper,
s3_aware
)
# Import S3-related classes and functions
from cgatcore.remote.file_handler import S3Pipeline, S3Mapper, s3_path_to_local, suffix

# Expose the S3Mapper instance if it's needed elsewhere
s3_mapper = S3Mapper()
# Create a global instance of S3Pipeline
s3_pipeline = S3Pipeline()

# Expose S3-aware decorators via the S3Pipeline instance
s3_transform = s3_pipeline.s3_transform
s3_merge = s3_pipeline.s3_merge
s3_split = s3_pipeline.s3_split
s3_originate = s3_pipeline.s3_originate
s3_follows = s3_pipeline.s3_follows

# Add S3-related utility functions
def configure_s3(aws_access_key_id=None, aws_secret_access_key=None, region_name=None):
"""
Configure AWS credentials for S3 access.
If credentials are not provided, it will use the default AWS configuration.
"""
import boto3
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)
s3_mapper.s3.S3 = session.resource('s3')
# Expose S3Mapper instance if needed elsewhere
s3_mapper = s3_pipeline.s3

# Expose S3 configuration function
configure_s3 = s3_pipeline.configure_s3

# Update __all__ to include both standard and S3-aware decorators and functions
__all__ = [
'transform', 'merge', 'split', 'originate', 'follows',
's3_transform', 's3_merge', 's3_split', 's3_originate', 's3_follows',
'S3Pipeline', 'S3Mapper', 's3_path_to_local', 'suffix',
's3_mapper', 'configure_s3'
]
# Add a docstring for the module
__doc__ = """
This module provides pipeline functionality for cgat-core, including support for AWS S3.
Expand Down
54 changes: 29 additions & 25 deletions cgatcore/pipeline/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
HAS_DRMAA = True
except (ImportError, RuntimeError, OSError):
HAS_DRMAA = False
import platform

# global drmaa session
GLOBAL_SESSION = None
Expand Down Expand Up @@ -966,33 +967,36 @@ def run(self, statement_list):

full_statement, job_path = self.build_job_script(statement)

time_command = "gtime" if platform.system() == "Darwin" else "time"

# max_vmem is set to max_rss, not available by /usr/bin/time
full_statement = (
"\\time --output=%s.times "
"-f '"
"exit_status\t%%x\n"
"user_t\t%%U\n"
"sys_t\t%%S\n"
"wall_t\t%%e\n"
"shared_data\t%%D\n"
"io_input\t%%I\n"
"io_output\t%%O\n"
"average_memory_total\t%%K\n"
"percent_cpu\t%%P\n"
"average_rss\t%%t\n"
"max_rss\t%%M\n"
"max_vmem\t%%M\n"
"minor_page_faults\t%%R\n"
"swapped\t%%W\n"
"context_switches_involuntarily\t%%c\n"
"context_switches_voluntarily\t%%w\n"
"average_uss\t%%p\n"
"signal\t%%k\n"
"socket_received\t%%r\tn"
"socket_sent\t%%s\n"
"major_page_fault\t%%F\n"
"unshared_data\t%%D\n' "
"%s") % (job_path, job_path)
f"\\{time_command} --output={job_path}.times "
f"-f '"
f"exit_status\t%x\n"
f"user_t\t%U\n"
f"sys_t\t%S\n"
f"wall_t\t%e\n"
f"shared_data\t%D\n"
f"io_input\t%I\n"
f"io_output\t%O\n"
f"average_memory_total\t%K\n"
f"percent_cpu\t%P\n"
f"average_rss\t%t\n"
f"max_rss\t%M\n"
f"max_vmem\t%M\n"
f"minor_page_faults\t%R\n"
f"swapped\t%W\n"
f"context_switches_involuntarily\t%c\n"
f"context_switches_voluntarily\t%w\n"
f"average_uss\t%p\n"
f"signal\t%k\n"
f"socket_received\t%r\n"
f"socket_sent\t%s\n"
f"major_page_fault\t%F\n"
f"unshared_data\t%D\n' "
f"{job_path}"
)

while 1:
start_time = time.time()
Expand Down
69 changes: 11 additions & 58 deletions cgatcore/remote/__init__.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,18 @@
# cgatcore/remote/__init__.py

import os
import sys
from abc import abstractmethod


class AbstractRemoteObject():
'''This is an abstract class that all RemoteObjects will
inherit from. This is an abstract class to rigidly define
the abstract methods of this RemoteObject class'''

def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs

@abstractmethod
def exists(self):
pass

@abstractmethod
def download(self):
pass

@abstractmethod
def upload(self):
pass

@abstractmethod
def delete_file(self):
pass


# Import S3-specific functionality
try:
from .file_handler import (
s3_transform,
s3_merge,
s3_split,
s3_originate,
s3_follows,
S3Mapper,
s3_aware
)
except ImportError as e:
import warnings

warnings.warn(f"Failed to import S3 functionality from file_handler: {str(e)}. S3 features will be unavailable.")

# If the file_handler module is not available, create dummy functions
def dummy_decorator(*args, **kwargs):
def decorator(func):
return func

return decorator

s3_transform = s3_merge = s3_split = s3_originate = s3_follows = dummy_decorator
s3_aware = lambda func: func

class S3Mapper:
def __init__(self):
pass
from .abstract import AbstractRemoteObject
from .file_handler import S3Pipeline, S3Mapper, s3_path_to_local, suffix

# Create an instance of S3Mapper
s3_mapper = S3Mapper()

# Conditional import for testing
if os.getenv("PYTEST_CURRENT_TEST"):
from tests.mocks import MockS3RemoteObject
from unittest.mock import patch
with patch("cgatcore.remote.aws.S3RemoteObject", new=MockS3RemoteObject):
s3_mapper = S3Mapper() # Use MockS3RemoteObject during tests

__all__ = ['S3Pipeline', 'S3Mapper', 's3_path_to_local', 'suffix']
29 changes: 29 additions & 0 deletions cgatcore/remote/abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# cgatcore/remote/abstract.py

from abc import ABC, abstractmethod


class AbstractRemoteObject(ABC):
'''This is an abstract class that all RemoteObjects will
inherit from. This is an abstract class to rigidly define
the abstract methods of this RemoteObject class'''

def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs

@abstractmethod
def exists(self):
pass

@abstractmethod
def download(self):
pass

@abstractmethod
def upload(self):
pass

@abstractmethod
def delete_file(self):
pass
Loading

0 comments on commit e523b88

Please sign in to comment.