diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 80dc1ed..03e0b30 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,7 +23,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install pytest requests + pip install pytest requests unittest - name: Test with pytest run: | pytest \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 544e8d1..c658a22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -304,4 +304,11 @@ v2.5.1 (2024-04-02) ### Scheduled removals - **December 2024.**: - - In the `ticloud.DynamicAnalysis.detonate_sample` method the `sample_sha1` parameter will be removed. \ No newline at end of file + - In the `ticloud.DynamicAnalysis.detonate_sample` method the `sample_sha1` parameter will be removed. + + +2.6.1 (2024-07-03) +------------------- + +#### Improvements +- Added more unit tests for all currently available modules. \ No newline at end of file diff --git a/ReversingLabs/SDK/__init__.py b/ReversingLabs/SDK/__init__.py index 3d0295b..dec0aa3 100644 --- a/ReversingLabs/SDK/__init__.py +++ b/ReversingLabs/SDK/__init__.py @@ -5,4 +5,4 @@ A Python SDK for communicating with ReversingLabs services. """ -__version__ = "2.6.0" +__version__ = "2.6.1" diff --git a/ReversingLabs/SDK/ticloud.py b/ReversingLabs/SDK/ticloud.py index 1c36ed0..4278b84 100644 --- a/ReversingLabs/SDK/ticloud.py +++ b/ReversingLabs/SDK/ticloud.py @@ -3255,15 +3255,19 @@ def __detonate(self, platform, sample_hash=None, url_string=None, is_archive=Fal if not isinstance(internet_simulation, bool): raise WrongInputError("internet_simulation parameter must be boolean.") - if internet_simulation: - post_json["rl"]["optional_parameters"] = "internet_simulation=true" - if sample_hash: hash_type = HASH_LENGTH_MAP.get(len(sample_hash)) post_json["rl"][hash_type] = sample_hash + optional_parameters = [] + if sample_name: - post_json["rl"]["sample_name"] = sample_name + optional_parameters.append(f"sample_name={sample_name}") + + if internet_simulation: + optional_parameters.append("internet_simulation=true") + + post_json["rl"]["optional_parameters"] = ", ".join(optional_parameters) elif url_string: post_json["rl"]["url"] = url_string diff --git a/setup.py b/setup.py index 6757f6d..db42e32 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ packages=packages, python_requires=">=3.6", install_requires=requires, - extras_require={"test": ["pytest"]}, + extras_require={"test": ["pytest", "unittest"]}, license="MIT", zip_safe=False, classifiers=[ diff --git a/tests/test_a1000.py b/tests/test_a1000.py index b9c1021..89b1f34 100644 --- a/tests/test_a1000.py +++ b/tests/test_a1000.py @@ -1,9 +1,13 @@ import pytest +from unittest import mock from ReversingLabs.SDK import __version__ from ReversingLabs.SDK.a1000 import CLASSIFICATIONS, AVAILABLE_PLATFORMS, A1000 -from ReversingLabs.SDK.helper import WrongInputError +from ReversingLabs.SDK.helper import WrongInputError, DEFAULT_USER_AGENT +MD5 = "512fca9e83c47fd9c36aa7d50a856396" +SHA1 = "5377d0ed664246a604363f90a2764aa10fa63ad0" +SHA256 = "00f8cd09187d311707b52a1c52018e7cfb5f2f78e47bf9200f16281098741422" EXPECTED_PLATFORMS = ("windows7", "windows10", "macos_11", "windows11", "linux") @@ -44,3 +48,312 @@ def test_a1000_object(): authorization = a1000._headers.get("Authorization") assert authorization == f"Token {token}" + +@pytest.fixture +def requests_mock(): + with mock.patch('ReversingLabs.SDK.a1000.requests', autospec=True) as requests_mock: + yield requests_mock + + +class TestA1000: + host = "https://my.host" + token = "token" + fields = ("id", "sha1", "sha256", "sha512", "md5", "category", "file_type", "file_subtype", + "identification_name", "identification_version", "file_size", "extracted_file_count", + "local_first_seen", "local_last_seen", "classification_origin", "classification_reason", + "classification_source", "classification", "riskscore", "classification_result", "ticore", "tags", + "summary", "ticloud", "aliases", "networkthreatintelligence", "domainthreatintelligence" + ) + + ticore_fields = "sha1, sha256, sha512, md5, imphash, info, application, protection, security, behaviour," \ + " certificate, document, mobile, media, web, email, strings, interesting_strings," \ + " classification, indicators, tags, attack, story" + + @classmethod + def setup_class(cls): + cls.a1000 = A1000(cls.host, token=cls.token) + + def test_sample_from_url(self, requests_mock): + self.a1000.upload_sample_from_url(file_url="https://some.url") + + expected_url = f"{self.host}/api/uploads/" + + requests_mock.post.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None, + json=None, + data={"url": "https://some.url", "analysis": "cloud"}, + files=None + ) + + def test_wrong_id(self, requests_mock): + with pytest.raises(WrongInputError, match=r"task_id parameter must be a string."): + self.a1000.get_submitted_url_report(task_id=123, retry=False) + + assert not requests_mock.get.called + + def test_classification(self, requests_mock): + self.a1000.get_classification_v3(sample_hash=SHA1, local_only=True) + + expected_url = f"{self.host}/api/samples/v3/{SHA1}/classification/?localonly=1&av_scanners=0" + + requests_mock.get.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None + ) + + def test_reanalyze(self, requests_mock): + self.a1000.reanalyze_samples_v2( + hash_input=SHA1, + titanium_cloud=True + ) + + data = { + "hash_value": [SHA1], + "analysis": "cloud", + "rl_cloud_sandbox_platform": None + } + + requests_mock.post.assert_called_with( + url=f"{self.host}/api/samples/v2/analyze_bulk/", + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None, + json=None, + data=data, + files=None + ) + + def test_extracted_files(self, requests_mock): + self.a1000.list_extracted_files_v2(SHA1) + + requests_mock.get.assert_called_with( + url=f"{self.host}/api/samples/v2/{SHA1}/extracted-files/", + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None + ) + + def test_download_extracted(self, requests_mock): + self.a1000.download_extracted_files(SHA1) + + requests_mock.get.assert_called_with( + url=f"{self.host}/api/samples/{SHA1}/unpacked/", + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None + ) + + def test_delete_file(self, requests_mock): + self.a1000.delete_samples([SHA1, SHA1]) + + data = {"hash_values": [SHA1, SHA1]} + + requests_mock.post.assert_called_with( + url=f"{self.host}/api/samples/v2/delete_bulk/", + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None, + json=None, + data=data, + files=None + ) + + def test_pdf_report(self, requests_mock): + self.a1000.create_pdf_report(SHA1) + + requests_mock.get.assert_called_with( + url=f"{self.host}/api/pdf/{SHA1}/create", + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None + ) + + def test_ticore_report(self, requests_mock): + self.a1000.get_titanium_core_report_v2(SHA1) + + expected_url = f"{self.host}/api/v2/samples/{SHA1}/ticore/?fields={self.ticore_fields}" + + requests_mock.get.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None + ) + + def test_dynamic_report(self, requests_mock): + self.a1000.create_dynamic_analysis_report(SHA1, "pdf") + + expected_url = f"{self.host}/api/rl_dynamic_analysis/export/summary/{SHA1}/pdf/create/" + + requests_mock.get.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None + ) + + def test_wrong_dynamic_params(self, requests_mock): + with pytest.raises(WrongInputError, match=r"report_format parameter must be either 'html' or 'pdf'."): + self.a1000.download_dynamic_analysis_report(SHA1, "xml") + + assert not requests_mock.get.called + + def test_set_classification(self, requests_mock): + self.a1000.set_classification(SHA1, classification="malicious", system="local") + + data = { + "classification": "malicious", + "analysis": "cloud" + } + + expected_url = f"{self.host}/api/samples/{SHA1}/setclassification/local/" + + requests_mock.post.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None, + json=None, + data=data, + files=None + ) + + def test_user_tags(self, requests_mock): + self.a1000.post_user_tags(SHA1, ["tag1", "tag2"]) + + post_json = {"tags": ["tag1", "tag2"]} + + expected_url = f"{self.host}/api/tag/{SHA1}/" + + requests_mock.post.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None, + json=post_json, + data=None, + files=None + ) + + def test_yara(self, requests_mock): + self.a1000.get_yara_rulesets_on_the_appliance_v2(source="all") + + expected_url = f"{self.host}/api/yara/v2/rulesets/?source=all" + + requests_mock.get.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None + ) + + def test_enable_yara(self, requests_mock): + self.a1000.enable_or_disable_yara_ruleset( + enabled=True, + name="the_ruleset", + publish=True + ) + + data = { + "name": "the_ruleset", + "publish": True, + "analysis": "cloud" + } + + expected_url = f"{self.host}/api/yara/ruleset/enable/" + + requests_mock.post.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None, + json=None, + data=data, + files=None + ) + + def test_start_yara_retro(self, requests_mock): + self.a1000.start_or_stop_yara_local_retro_scan("START") + + requests_mock.post.assert_called_with( + url=f"{self.host}/api/uploads/local-retro-hunt/", + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None, + json=None, + data={"operation": "START"}, + files=None + ) + + def test_wrong_operation(self, requests_mock): + with pytest.raises(WrongInputError, match=r"operation parameter must be either 'START' or 'STOP'"): + self.a1000.start_or_stop_yara_local_retro_scan("BEGIN") + + assert not requests_mock.post.called + + def test_advanced_search(self, requests_mock): + self.a1000.advanced_search_v3(query_string="av-count:5 available:TRUE", sorting_criteria="sha1", sorting_order="desc", page_number=2, records_per_page=5) + + post_json = {"query": "av-count:5 available:TRUE", "ticloud": False, "page": 2, + "records_per_page": 5, "sort": "sha1 desc"} + + requests_mock.post.assert_called_with( + url=f"{self.host}/api/samples/v3/search/", + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None, + json=post_json, + data=None, + files=None + ) + + def test_list_containers(self, requests_mock): + self.a1000.list_containers_for_hashes([SHA1, SHA1]) + + data = {"hash_values": [SHA1, SHA1]} + + requests_mock.post.assert_called_with( + url=f"{self.host}/api/samples/containers/", + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None, + json=None, + data=data, + files=None + ) + + def test_network_report(self, requests_mock): + domain = "some.test.domain" + + self.a1000.network_domain_report(domain) + + expected_url = f"{self.host}/api/network-threat-intel/domain/{domain}/" + + requests_mock.get.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=None + ) diff --git a/tests/test_ticloud.py b/tests/test_ticloud.py index a5587ae..2114548 100644 --- a/tests/test_ticloud.py +++ b/tests/test_ticloud.py @@ -1,8 +1,21 @@ import pytest import requests -from ReversingLabs.SDK.ticloud import TiCloudAPI, CLASSIFICATIONS, AVAILABLE_PLATFORMS, RHA1_TYPE_MAP, resolve_hash_type -from ReversingLabs.SDK.helper import WrongInputError, BadGatewayError +from unittest import mock +from ReversingLabs.SDK.ticloud import TiCloudAPI, FileReputation, AVScanners, FileAnalysis, FileAnalysisNonMalicious, \ + AdvancedSearch, ExpressionSearch, RHA1FunctionalSimilarity, RHA1Analytics, URIStatistics, URIIndex, FileDownload, \ + URLThreatIntelligence, AnalyzeURL, DomainThreatIntelligence, IPThreatIntelligence, FileUpload, DeleteFile, \ + ReanalyzeFile, DataChangeSubscription, DynamicAnalysis, CertificateIndex, RansomwareIndicators, NewMalwareFilesFeed, \ + NewMalwareURIFeed, ImpHashSimilarity, YARAHunting, YARARetroHunting, TAXIIRansomwareFeed, CustomerUsage, NetworkReputation, \ + CLASSIFICATIONS, AVAILABLE_PLATFORMS, RHA1_TYPE_MAP, \ + resolve_hash_type, calculate_hash, NotFoundError +from ReversingLabs.SDK.helper import WrongInputError, BadGatewayError, DEFAULT_USER_AGENT +MD5 = "512fca9e83c47fd9c36aa7d50a856396" +SHA1 = "5377d0ed664246a604363f90a2764aa10fa63ad0" +SHA256 = "00f8cd09187d311707b52a1c52018e7cfb5f2f78e47bf9200f16281098741422" +HOST = "https://example.com" +USERNAME = "username" +PASSWORD = "password" EXPECTED_PLATFORMS = ("windows7", "windows10", "macos11", "windows11", "linux") EXPECTED_RHA1_TYPES = { @@ -22,6 +35,12 @@ } +def test_calculate_hash(): + test_url = "https://some.url.com/document.xml" + + assert calculate_hash(test_url, "sha1") == "6f87faff4a44a3a97827ce42233c58104e9411e0" + + def test_classifications(): assert "KNOWN" in CLASSIFICATIONS, "Are you sure that KNOWN should be removed from allowed classifications?" assert "GOODWARE" not in CLASSIFICATIONS, "Are you sure that GOODWARE should be in allowed classifications?" @@ -73,10 +92,888 @@ def test_hash_resolving(): various_hash_types = [ "5377d0ed664246a604363f90a2764aa10fa63ad0", "00f8cd09187d311707b52a1c52018e7cfb5f2f78e47bf9200f16281098741422", - "efabc8b39de9d1f136abc48dc6e47f30a2ce9245" + "512fca9e83c47fd9c36aa7d50a856396" ] assert resolve_hash_type(sample_hashes=same_hash_types) == "sha1" with pytest.raises(WrongInputError, match=r"Hash on position 1 is a/an sha256"): resolve_hash_type(sample_hashes=various_hash_types) + + +@pytest.fixture +def requests_mock(): + with mock.patch('ReversingLabs.SDK.ticloud.requests', autospec=True) as requests_mock: + yield requests_mock + + +@pytest.fixture +def file_type_mock(): + with mock.patch("ReversingLabs.SDK.ticloud.get_rha1_type", autospec=True) as file_type_mock: + yield file_type_mock + + +class TestFileReputation: + password = "password" + + @classmethod + def setup_class(cls): + cls.file_reputation = FileReputation(HOST, USERNAME, PASSWORD) + + def test_wrong_input_hash(self, requests_mock): + with pytest.raises(WrongInputError, match=r"Only hash string or list of hash strings are allowed"): + self.file_reputation.get_file_reputation(123) + + with pytest.raises(WrongInputError, match=r"The given hash input string is not a valid hexadecimal value."): + self.file_reputation.get_file_reputation([123, 456]) + + assert not requests_mock.get.called + + def test_request_single_hash(self, requests_mock): + requests_mock.get.return_value.status_code = 200 + self.file_reputation.get_file_reputation(SHA1) + + expected_url = (f"{HOST}/api/databrowser/malware_presence/query/sha1/{SHA1}?extended=true&" + f"show_hashes=true&format=json") + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + def test_request_multiple_hashes(self, requests_mock): + requests_mock.post.return_value.status_code = 200 + hashes = [SHA1] * 3 + + self.file_reputation.get_file_reputation(hashes) + + expected_url = f"{HOST}/api/databrowser/malware_presence/bulk_query/json?extended=true&show_hashes=true" + + expected_payload = {"rl": {"query": {"hash_type": "sha1", "hashes": hashes}}} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + json=expected_payload, + data=None, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + def test_error_status_code(self, requests_mock): + requests_mock.get.return_value.status_code = 404 + + with pytest.raises(NotFoundError): + self.file_reputation.get_file_reputation(SHA1) + + +class TestAVScanners: + @classmethod + def setup_class(cls): + cls.av_scanners = AVScanners(HOST, USERNAME, PASSWORD) + + def test_wrong_input_hash(self, requests_mock): + with pytest.raises(WrongInputError, match=r"Only hash string or list of hash strings are allowed"): + self.av_scanners.get_scan_results(123) + + with pytest.raises(WrongInputError, match=r"The given hash input string is not a valid hexadecimal value."): + self.av_scanners.get_scan_results(f"{SHA1},{SHA1}") + + assert not requests_mock.get.called + + def test_single_hash(self, requests_mock): + requests_mock.get.return_value.status_code = 200 + + self.av_scanners.get_scan_results(SHA1, historical_results=True) + + expected_url = f"{HOST}/api/xref/v2/query/sha1/{SHA1}?format=json&history=true" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestFileAnalysis: + @classmethod + def setup_class(cls): + cls.rldata = FileAnalysis(HOST, USERNAME, PASSWORD) + + def test_wrong_input_hash(self, requests_mock): + with pytest.raises(WrongInputError, match=r"Only hash string or list of hash strings are allowed"): + self.rldata.get_analysis_results(123) + + assert not requests_mock.get.called + + def test_bulk_query(self, requests_mock): + requests_mock.post.return_value.status_code = 200 + + self.rldata.get_analysis_results([SHA256, SHA256]) + + expected_url = f"{HOST}/api/databrowser/rldata/bulk_query/json" + + post_json = {"rl": {"query": {"hash_type": "sha256", "hashes": [SHA256, SHA256]}}} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) + + +class TestFileAnalysisNonMalicious: + @classmethod + def setup_class(cls): + cls.rldata_nonmal = FileAnalysisNonMalicious(HOST, USERNAME, PASSWORD) + + def test_wrong_input_hash(self, requests_mock): + with pytest.raises(WrongInputError, match=r"The given hash input string is not a valid hexadecimal value."): + self.rldata_nonmal.get_analysis_results(123) + + assert not requests_mock.get.called + + def test_single_query(self, requests_mock): + requests_mock.get.return_value.status_code = 200 + + self.rldata_nonmal.get_analysis_results(SHA1) + + expected_url = f"{HOST}/api/databrowser/rldata/goodware/query/sha1/{SHA1}" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params={"format": "json"} + ) + + +class TestRHA1FunctionalSimilarity: + hash = "21841b32c6165b27dddbd4d6eb3a672defe54271" + + @classmethod + def setup_class(cls): + cls.rha1 = RHA1FunctionalSimilarity(HOST, USERNAME, PASSWORD) + + def test_wrong_input_hash(self, requests_mock): + with pytest.raises(WrongInputError, + match=r"Only hash strings of the following types are allowed as input values:"): + self.rha1.get_similar_hashes(SHA256) + + assert not requests_mock.get.called + + def test_single_query(self, requests_mock, file_type_mock): + requests_mock.get.return_value.status_code = 200 + file_type_mock.return_value = "pe01" + + self.rha1.get_similar_hashes(self.hash, extended_results=True) + + expected_url = f"{HOST}/api/group_by_rha1/v1/query/pe01/{self.hash}?format=json&limit=1000&extended=true" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestRHA1Analytics: + hash = "21841b32c6165b27dddbd4d6eb3a672defe54271" + + @classmethod + def setup_class(cls): + cls.rha1 = RHA1Analytics(HOST, USERNAME, PASSWORD) + + def test_wrong_input_hash(self, requests_mock): + with pytest.raises(WrongInputError, + match=r"Only hash strings of the following types are allowed as input values:"): + self.rha1.get_rha1_analytics(SHA256) + + assert not requests_mock.get.called + + def test_single_query(self, requests_mock, file_type_mock): + requests_mock.get.return_value.status_code = 200 + file_type_mock.return_value = "pe01" + + self.rha1.get_rha1_analytics(self.hash, extended_results=True) + + expected_url = f"{HOST}/api/rha1/analytics/v1/query/pe01/{self.hash}?format=json&extended=true" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestURIStatistics: + test_url = "https://www.softpedia.com/get/Office-tools/Text-editors/Sublime-Text.shtml" + + @classmethod + def setup_class(cls): + cls.uristats = URIStatistics(HOST, USERNAME, PASSWORD) + + def test_query(self, requests_mock): + requests_mock.get.return_value.status_code = 200 + + self.uristats.get_uri_statistics(self.test_url) + + expected_url = f"{HOST}/api/uri/statistics/uri_state/sha1/0164af1f2e83a7411a3c8cfd02b1424156a21b6b?format=json" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestURIIndex: + test_url = "https://www.softpedia.com/get/Office-tools/Text-editors/Sublime-Text.shtml" + + @classmethod + def setup_class(cls): + cls.uri_index = URIIndex(HOST, USERNAME, PASSWORD) + + def test_wrong_input(self, requests_mock): + with pytest.raises(WrongInputError, match=r"Only a single email address, URL, DNS name or IPv4 string is " + r"allowed as the uri_input parameter."): + self.uri_index.get_uri_index(123) + + def test_single_query(self, requests_mock): + requests_mock.get.return_value.status_code = 200 + + self.uri_index.get_uri_index(self.test_url, classification="MALICIOUS", + page_sha1="21841b32c6165b27dddbd4d6eb3a672defe54271") + + expected_url = (f"{HOST}/api/uri_index/v1/query/0164af1f2e83a7411a3c8cfd02b1424156a21b6b/21841b32c6165b27dddbd4d6eb3a672defe54271?" + f"format=json&classification=MALICIOUS") + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestAdvancedSearch: + @classmethod + def setup_class(cls): + cls.adv_search = AdvancedSearch(HOST, USERNAME, PASSWORD) + + def test_wrong_input(self, requests_mock): + with pytest.raises(WrongInputError, match=r"records_per_page parameter must be integer with value between 1 and 10000"): + self.adv_search.search("search_query", records_per_page=12000) + + with pytest.raises(WrongInputError, match=r"Sorting criteria must be one of the following options"): + self.adv_search.search("search_query", sorting_criteria="wrong", sorting_order="also_wrong") + + assert not requests_mock.post.called + + def test_single_query(self, requests_mock): + requests_mock.post.return_value.status_code = 200 + + self.adv_search.search(query_string="av-count:5 available:TRUE", sorting_criteria="sha1", sorting_order="desc", page_number=2, records_per_page=5) + + expected_url = f"{HOST}/api/search/v1/query" + + post_json = {"query": "av-count:5 available:TRUE", "page": 2, "records_per_page": 5, "format": "json", "sort": "sha1 desc"} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) + + +class TestExpressionSearch: + @classmethod + def setup_class(cls): + cls.exp_search = ExpressionSearch(HOST, USERNAME, PASSWORD) + + def test_wrong_input(self, requests_mock): + with pytest.raises(WrongInputError, match=r"query parameter must be a list of strings."): + self.exp_search.search(query="av-count:5 available:TRUE") + + with pytest.raises(WrongInputError, match=r"query list must have at least 2 expressions."): + self.exp_search.search(query=["status=MALICIOUS"]) + + with pytest.raises(WrongInputError, match=r"All expressions in the query list must be strings."): + self.exp_search.search(query=["status=MALICIOUS", 123]) + + assert not requests_mock.get.called + + def test_single_query(self, requests_mock): + requests_mock.get.return_value.status_code = 200 + + self.exp_search.search(query=["one=1", "two=2"], date="2024-07-03", page_number=2) + + expected_url = f"{HOST}/api/sample/search/download/v1/query/date/2024-07-03?format=json&page=2&one=1&two=2" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestFileDownload: + @classmethod + def setup_class(cls): + cls.download = FileDownload(HOST, USERNAME, PASSWORD) + + def test_status(self, requests_mock): + requests_mock.post.return_value.status_code = 200 + + self.download.get_download_status(SHA1) + + expected_url = f"{HOST}/api/spex/download/v2/status/bulk_query/json?format=json" + + post_json = {"rl": {"query": {"hash_type": "sha1", "hashes": [SHA1]}}} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) + + def test_download(self, requests_mock): + requests_mock.get.return_value.status_code = 200 + + self.download.download_sample(SHA1) + + expected_url = f"{HOST}/api/spex/download/v2/query/sha1/{SHA1}" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestURLThreatIntelligence: + test_url = "https://www.softpedia.com/get/Office-tools/Text-editors/Sublime-Text.shtml" + + @classmethod + def setup_class(cls): + cls.url_ti = URLThreatIntelligence(HOST, USERNAME, PASSWORD) + + def test_query(self, requests_mock): + self.url_ti.get_url_report(self.test_url) + + expected_url = f"{HOST}/api/networking/url/v1/report/query/json" + + post_json = {"rl": {"query": {"url": "https://www.softpedia.com/get/Office-tools/Text-editors/Sublime-Text.shtml", "response_format": "json"}}} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) + + +class TestAnalyzeURL: + test_url = "https://www.softpedia.com/get/Office-tools/Text-editors/Sublime-Text.shtml" + + @classmethod + def setup_class(cls): + cls.analyze_url = AnalyzeURL(HOST, USERNAME, PASSWORD) + + def test_query(self, requests_mock): + self.analyze_url.submit_url(url_input=self.test_url) + + expected_url = f"{HOST}/api/networking/url/v1/analyze/query/json" + + post_json = {"rl": {"query": {"url": "https://www.softpedia.com/get/Office-tools/Text-editors/Sublime-Text.shtml", "response_format": "json"}}} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) + + +class TestDomainThreatIntelligence: + domain = "some.test.domain" + + @classmethod + def setup_class(cls): + cls.domain_ti = DomainThreatIntelligence(HOST, USERNAME, PASSWORD) + + def test_query(self, requests_mock): + self.domain_ti.get_domain_report(self.domain) + + expected_url = f"{HOST}/api/networking/domain/report/v1/query/json" + + post_json = {"rl": {"query": {"domain": "some.test.domain", "response_format": "json"}}} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) + + +class TestIPThreatIntelligence: + ip = "1.1.1.1" + + @classmethod + def setup_class(cls): + cls.ip_ti = IPThreatIntelligence(HOST, USERNAME, PASSWORD) + + def test_wrong_input(self, requests_mock): + with pytest.raises(WrongInputError, match=r"p_address parameter must be string."): + self.ip_ti.get_ip_report(ip_address=1.1) + + assert not requests_mock.post.called + + def test_query(self, requests_mock): + self.ip_ti.get_ip_report(ip_address=self.ip) + + expected_url = f"{HOST}/api/networking/ip/report/v1/query/json" + + post_json = {"rl": {"query": {"ip": "1.1.1.1", "response_format": "json"}}} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) + + +class TestFileUpload: + @classmethod + def setup_class(cls): + cls.upload = FileUpload(HOST, USERNAME, PASSWORD) + + def test_upload_meta(self, requests_mock): + self.upload._FileUpload__upload_meta( + url="https://mock.url", + sample_name="test_name", + sample_domain="test_domain", + subscribe="data_change", + archive_type=None, + archive_password=None + ) + + expected_url = "https://mock.url/meta" + params = {"subscribe": "data_change"} + meta_xml = ("file_nametest_name" + "test_domain") + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=params, + json=None, + data=meta_xml + ) + + +class TestDeleteFile: + @classmethod + def setup_class(cls): + cls.delete_file = DeleteFile(HOST, USERNAME, PASSWORD) + + def test_query(self, requests_mock): + self.delete_file.delete_samples(SHA1, delete_on=1234567) + + expected_url = f"{HOST}/api/delete/sample/v1/query/sha1/{SHA1}?delete_on=1234567" + + requests_mock.delete.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + json=None + ) + + +class TestReanalyzeFile: + @classmethod + def setup_class(cls): + cls.reanalyze = ReanalyzeFile(HOST, USERNAME, PASSWORD) + + def test_query(self, requests_mock): + self.reanalyze.reanalyze_samples(SHA1) + + expected_url = f"{HOST}/api/rescan/v1/query/sha1/{SHA1}" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestDataChangeSubscription: + @classmethod + def setup_class(cls): + cls.data_change = DataChangeSubscription(HOST, USERNAME, PASSWORD) + + def test_query(self, requests_mock): + self.data_change.subscribe([SHA1, SHA1]) + + expected_url = f"{HOST}/api/subscription/data_change/v1/bulk_query/subscribe/json" + + post_json = {"rl": {"query": {"hash_type": "sha1", "hashes": [SHA1, SHA1]}}} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) + + +class TestDynamicAnalysis: + @classmethod + def setup_class(cls): + cls.da = DynamicAnalysis(HOST, USERNAME, PASSWORD) + + def test_detonate_file(self, requests_mock): + self.da.detonate_sample( + sample_hash=SHA1, + platform="windows10", + internet_simulation=True, + sample_name="sample_name" + ) + + expected_url = f"{HOST}/api/dynamic/analysis/analyze/v1/query/json" + + post_json = {"rl": {"platform": "windows10", "response_format": "json", "sha1": SHA1, "optional_parameters": "sample_name=sample_name, internet_simulation=true"}} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) + + +class TestCertificateIndex: + @classmethod + def setup_class(cls): + cls.ci = CertificateIndex(HOST, USERNAME, PASSWORD) + + def test_query(self, requests_mock): + self.ci.get_certificate_information(SHA1, classification="MALICIOUS", next_page_hash=SHA1) + + expected_url = (f"{HOST}/api/certificate/index/v1/query/thumbprint/{SHA1}/page/{SHA1}" + f"?format=json&extended=true&limit=100&classification=MALICIOUS") + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestRansomwareIndicators: + @classmethod + def setup_class(cls): + cls.rf = RansomwareIndicators(HOST, USERNAME, PASSWORD) + + def test_query(self, requests_mock): + self.rf.get_indicators( + hours_back=3, + indicator_types=['ipv4', 'hash', 'domain', 'uri'] + ) + + expected_url = f"{HOST}/api/public/v1/ransomware/indicators?withHealth=0&tagFormat=dict&" \ + "hours=3&indicatorTypes=ipv4,hash,domain,uri&onlyFreemium=0" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestNewMalwareFilesFeed: + @classmethod + def setup_class(cls): + cls.feed = NewMalwareFilesFeed(HOST, USERNAME, PASSWORD) + + def test_pull(self, requests_mock): + self.feed.pull_with_timestamp( + time_format="timestamp", + time_value="1234567" + ) + + expected_url = f"{HOST}/api/feed/malware/detection/v1/query/timestamp/1234567?format=json&sample_available=false&limit=1000" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestNewMalwareURIFeed: + @classmethod + def setup_class(cls): + cls.feed = NewMalwareURIFeed(HOST, USERNAME, PASSWORD) + + def test_pull(self, requests_mock): + self.feed.pull_latest() + + expected_url = f"{HOST}/api/feed/malware_uri/v1/query/latest?format=json" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestImpHashSimilarity: + @classmethod + def setup_class(cls): + cls.imphash = ImpHashSimilarity(HOST, USERNAME, PASSWORD) + + def test_imphash(self, requests_mock): + imphash = "abcdefg" + + self.imphash.get_imphash_index(imphash, next_page_sha1=SHA1) + + expected_url = f"{HOST}/api/imphash_index/v1/query/{imphash}/start_sha1/{SHA1}?format=json" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None + ) + + +class TestYARAHunting: + @classmethod + def setup_class(cls): + cls.yara = YARAHunting(HOST, USERNAME, PASSWORD) + + def test_yara(self, requests_mock): + self.yara.create_ruleset( + ruleset_name="name", + ruleset_text="ruleset_text" + ) + + post_json = { + "ruleset_name": "name", + "text": "ruleset_text" + } + + expected_url = f"{HOST}/api/yara/admin/v1/ruleset" + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) + + def test_wrong_ruleset_text(self, requests_mock): + with pytest.raises(WrongInputError, match=r"ruleset_text parameter must be unicode string."): + self.yara.create_ruleset(ruleset_name="name", ruleset_text=123) + + assert not requests_mock.post.called + + +class TestYARARetroHunting: + @classmethod + def setup_class(cls): + cls.yara = YARARetroHunting(HOST, USERNAME, PASSWORD) + + def test_enable_retro(self, requests_mock): + ruleset_name = "name" + + self.yara.enable_retro_hunt(ruleset_name=ruleset_name) + + expected_url = f"{HOST}/api/yara/admin/v1/ruleset/enable-retro-hunt" + + post_json = {"ruleset_name": ruleset_name} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) + + +class TestTAXIIRansomwareFeed: + @classmethod + def setup_class(cls): + cls.taxii = TAXIIRansomwareFeed(HOST, USERNAME, PASSWORD) + + def test_get_objects(self, requests_mock): + self.taxii.get_objects( + api_root="lite-root", + collection_id="123456" + ) + + query_params = { + "limit": 500, + "added_after": None, + "match[id]": None, + "next": None + } + + expected_url = f"{HOST}/api/taxii/lite-root/collections/123456/objects/" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, 'Accept': 'application/taxii+json;version=2.1'}, + params=query_params + ) + + +class TestCustomerUsage: + @classmethod + def setup_class(cls): + cls.usage = CustomerUsage(HOST, USERNAME, PASSWORD) + + def test_usage(self, requests_mock): + self.usage.daily_usage(single_date="2024-07-03") + + expected_url = f"{HOST}/api/customer_usage/v1/usage/daily" + + requests_mock.get.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params={"date": "2024-07-03", "format": "json", "from": None, "to": None} + ) + + +class TestNetworkReputation: + @classmethod + def setup_class(cls): + cls.net_rep = NetworkReputation(HOST, USERNAME, PASSWORD) + + def test_query(self, requests_mock): + locations = ["some.domain", "another.domain"] + + self.net_rep.get_network_reputation( + network_locations=locations + ) + + expected_url = f"{HOST}/api/networking/reputation/v1/query/json" + + post_json = {"rl": {"query": {"network_locations": [{"network_location": "some.domain"}, {"network_location": "another.domain"}], "response_format": "json"}}} + + requests_mock.post.assert_called_with( + url=expected_url, + auth=(USERNAME, PASSWORD), + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT}, + params=None, + json=post_json, + data=None + ) diff --git a/tests/test_tiscale.py b/tests/test_tiscale.py index 4e69944..7b0be80 100644 --- a/tests/test_tiscale.py +++ b/tests/test_tiscale.py @@ -1,7 +1,8 @@ import pytest +from unittest import mock from ReversingLabs.SDK import __version__ from ReversingLabs.SDK.tiscale import TitaniumScale -from ReversingLabs.SDK.helper import WrongInputError +from ReversingLabs.SDK.helper import WrongInputError, DEFAULT_USER_AGENT def test_tiscale_object(): @@ -22,3 +23,103 @@ def test_tiscale_object(): user_agent = tiscale._headers.get("User-Agent") assert __version__ in user_agent + + +@pytest.fixture +def requests_mock(): + with mock.patch('ReversingLabs.SDK.tiscale.requests', autospec=True) as requests_mock: + yield requests_mock + + +class TestTitaniumScale: + host = "https://my.host" + token = "token" + + @classmethod + def setup_class(cls): + cls.tiscale = TitaniumScale(cls.host, token=cls.token) + + def test_list_tasks(self, requests_mock): + self.tiscale.list_processing_tasks(age=10, custom_token="custom") + + query_params = { + "age": 10, + "token": "Token custom" + } + + expected_url = f"{self.host}/api/tiscale/v1/task" + + requests_mock.get.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=query_params + ) + + def test_task_info(self, requests_mock): + self.tiscale.get_processing_task_info( + task_id=1, + full=True + ) + + query_params = { + "full": "true", + "v13": "false" + } + + expected_url = f"{self.host}/api/tiscale/v1/task/1" + + requests_mock.get.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=query_params + ) + + def test_delete_task(self, requests_mock): + self.tiscale.delete_processing_task( + task_id=1 + ) + + expected_url = f"{self.host}/api/tiscale/v1/task/1" + + requests_mock.delete.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"} + ) + + def test_wrong_task_id(self, requests_mock): + with pytest.raises(WrongInputError, match=r"task_id parameter must be integer."): + self.tiscale.delete_processing_task(task_id="123") + + assert not requests_mock.delete.called + + def test_delete_multiple(self, requests_mock): + self.tiscale.delete_multiple_tasks(age=10) + + query_params = {"age": 10} + + expected_url = f"{self.host}/api/tiscale/v1/task" + + requests_mock.delete.assert_called_with( + url=expected_url, + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"}, + params=query_params + ) + + def test_yara_id(self, requests_mock): + self.tiscale.get_yara_id() + + requests_mock.get.assert_called_with( + url=f"{self.host}/api/tiscale/v1/yara", + verify=True, + proxies=None, + headers={"User-Agent": DEFAULT_USER_AGENT, "Authorization": f"Token {self.token}"} + ) +