Skip to content

Commit 56100d9

Browse files
Add the YARAHunting and YARARetroHunting classes
1 parent 9d24f8f commit 56100d9

File tree

1 file changed

+377
-0
lines changed

1 file changed

+377
-0
lines changed

ReversingLabs/SDK/ticloud.py

+377
Original file line numberDiff line numberDiff line change
@@ -2628,6 +2628,383 @@ def pull_latest(self):
26282628
return response
26292629

26302630

2631+
class ImpHashSimilarity(TiCloudAPI):
2632+
"""TCA-0302"""
2633+
2634+
__SINGLE_QUERY_ENDPOINT = "/api/imphash_index/v1/query/{hash_value}"
2635+
2636+
def __init__(self, host, username, password, verify=True, proxies=None, user_agent=DEFAULT_USER_AGENT,
2637+
allow_none_return=False):
2638+
super(ImpHashSimilarity, self).__init__(host, username, password, verify, proxies, user_agent=user_agent,
2639+
allow_none_return=allow_none_return)
2640+
2641+
self._url = "{host}{{endpoint}}".format(host=self._host)
2642+
2643+
def get_imphash_index(self, imphash, next_page_sha1=None):
2644+
"""Accepts an imphash and returns a list of SHA-1 hashes of files sharing that imphash.
2645+
:param imphash: imphash string
2646+
:type imphash: str
2647+
:param next_page_sha1: SHA-1 string on the next page of results
2648+
:type next_page_sha1: str or None
2649+
:return: response
2650+
:rtype: requests.Response
2651+
"""
2652+
if not isinstance(imphash, str):
2653+
raise WrongInputError("imphash parameter must be string.")
2654+
2655+
endpoint = self.__SINGLE_QUERY_ENDPOINT.format(hash_value=imphash)
2656+
2657+
if next_page_sha1:
2658+
validate_hashes(
2659+
hash_input=[next_page_sha1],
2660+
allowed_hash_types=(SHA1,)
2661+
)
2662+
2663+
endpoint = "{base}/start_sha1/{next_page_sha1}".format(
2664+
base=endpoint,
2665+
next_page_sha1=next_page_sha1
2666+
)
2667+
2668+
endpoint = "{path}?format=json".format(path=endpoint)
2669+
2670+
url = self._url.format(endpoint=endpoint)
2671+
2672+
response = self._get_request(url=url)
2673+
2674+
self._raise_on_error(response)
2675+
2676+
return response
2677+
2678+
def get_imphash_index_aggregated(self, imphash, max_results=5000):
2679+
"""Accepts an imphash and returns a list of SHA-1 hashes of files sharing that imphash.
2680+
This method automatically handles paging and returns a list of results instead of a Response object.
2681+
:param imphash: imphash string
2682+
:type imphash: str
2683+
:param max_results: maximum number of results to be returned in the list
2684+
:type max_results: int
2685+
:return: list of results
2686+
:rtype: list
2687+
"""
2688+
if not isinstance(max_results, int):
2689+
raise WrongInputError("max_results parameter must be integer.")
2690+
2691+
results = []
2692+
next_page_sha1 = None
2693+
2694+
while True:
2695+
response = self.get_imphash_index(
2696+
imphash=imphash,
2697+
next_page_sha1=next_page_sha1
2698+
)
2699+
2700+
response_json = response.json()
2701+
2702+
sha1_list = response_json.get("rl").get("imphash_index").get("sha1_list", [])
2703+
results.extend(sha1_list)
2704+
2705+
next_page_sha1 = response_json.get("rl").get("imphash_index").get("next_page_sha1")
2706+
2707+
if len(results) > max_results or not next_page_sha1:
2708+
break
2709+
2710+
return results[:max_results]
2711+
2712+
2713+
class YARAHunting(TiCloudAPI):
2714+
"""TCA-0303"""
2715+
2716+
__RULESET_ENDPOINT = "/api/yara/admin/v1/ruleset"
2717+
__YARA_MATCHES_ENDPOINT = "/api/feed/yara/v1/query/{time_format}/{time_value}"
2718+
2719+
def __init__(self, host, username, password, verify=True, proxies=None, user_agent=DEFAULT_USER_AGENT,
2720+
allow_none_return=False):
2721+
super(YARAHunting, self).__init__(host, username, password, verify, proxies, user_agent=user_agent,
2722+
allow_none_return=allow_none_return)
2723+
2724+
self._url = "{host}{{endpoint}}".format(host=self._host)
2725+
2726+
def create_ruleset(self, ruleset_name, ruleset_text, sample_available=None):
2727+
"""Creates a new YARA ruleset.
2728+
The ruleset_text parameter needs to be a stringified YARA ruleset / a Unicode string.
2729+
The sample_available parameter defines which samples will be returned:
2730+
- True: only samples available for download
2731+
- False: only samples not available for download
2732+
- None: all samples
2733+
:param ruleset_name: name of the ruleset
2734+
:type ruleset_name: str
2735+
:param ruleset_text: YARA ruleset text
2736+
:type ruleset_text: str
2737+
:param sample_available: which samples to return
2738+
:type sample_available: bool or None
2739+
:return: response
2740+
:rtype: requests.Response
2741+
"""
2742+
if not isinstance(ruleset_name, str):
2743+
raise WrongInputError("ruleset_name parameter must be string.")
2744+
2745+
if not isinstance(ruleset_text, str):
2746+
raise WrongInputError("ruleset_text parameter must be unicode string.")
2747+
2748+
post_json = {
2749+
"ruleset_name": ruleset_name,
2750+
"text": ruleset_text
2751+
}
2752+
2753+
if sample_available is not None:
2754+
if not isinstance(sample_available, bool):
2755+
raise WrongInputError("sample_available parameter must be be either None or boolean.")
2756+
2757+
post_json["sample_available"] = sample_available
2758+
2759+
url = self._url.format(endpoint=self.__RULESET_ENDPOINT)
2760+
2761+
response = self._post_request(url=url, post_json=post_json)
2762+
2763+
self._raise_on_error(response)
2764+
2765+
return response
2766+
2767+
def delete_ruleset(self, ruleset_name):
2768+
"""Deletes a YARA ruleset.
2769+
:param ruleset_name: name of the ruleset
2770+
:type ruleset_name: str
2771+
:return: response
2772+
:rtype: requests.Response
2773+
"""
2774+
if not isinstance(ruleset_name, str):
2775+
raise WrongInputError("ruleset_name parameter must be string.")
2776+
2777+
endpoint = "{base}/{ruleset_name}".format(
2778+
base=self.__RULESET_ENDPOINT,
2779+
ruleset_name=ruleset_name
2780+
)
2781+
2782+
url = self._url.format(endpoint=endpoint)
2783+
2784+
response = self._delete_request(url=url)
2785+
2786+
self._raise_on_error(response)
2787+
2788+
return response
2789+
2790+
def get_ruleset_info(self, ruleset_name=None):
2791+
"""Get information for a specific YARA ruleset or all YARA rulesets in the collection.
2792+
:param ruleset_name: name of the ruleset; if set to None, all rulesets are returned
2793+
:type ruleset_name: str or None
2794+
:return: response
2795+
:rtype: requests.Response
2796+
"""
2797+
endpoint = self.__RULESET_ENDPOINT
2798+
2799+
if ruleset_name is not None:
2800+
if not isinstance(ruleset_name, str):
2801+
raise WrongInputError("ruleset_name parameter must be string.")
2802+
2803+
endpoint = "{base}/{ruleset_name}".format(
2804+
base=self.__RULESET_ENDPOINT,
2805+
ruleset_name=ruleset_name
2806+
)
2807+
2808+
url = self._url.format(endpoint=endpoint)
2809+
2810+
response = self._get_request(url=url)
2811+
2812+
self._raise_on_error(response)
2813+
2814+
return response
2815+
2816+
def get_ruleset_text(self, ruleset_name):
2817+
"""Get the text of a YARA ruleset.
2818+
:param ruleset_name: name of the ruleset
2819+
:type ruleset_name: str
2820+
:return: response
2821+
:rtype: requests.Response
2822+
"""
2823+
if not isinstance(ruleset_name, str):
2824+
raise WrongInputError("ruleset_name parameter must be string.")
2825+
2826+
endpoint = "{base}/{ruleset_name}/text".format(
2827+
base=self.__RULESET_ENDPOINT,
2828+
ruleset_name=ruleset_name
2829+
)
2830+
2831+
url = self._url.format(endpoint=endpoint)
2832+
2833+
response = self._get_request(url=url)
2834+
2835+
self._raise_on_error(response)
2836+
2837+
return response
2838+
2839+
def yara_matches_feed(self, time_format, time_value):
2840+
"""Returns a recordset of YARA ruleset matches in the specified time range.
2841+
:param time_format: possible values: 'utc' or 'timestamp'
2842+
:type time_format: str
2843+
:param time_value: results will be retrieved from the specified time up until the current moment;
2844+
accepted formats are Unix timestamp string and 'YYYY-MM-DDThh:mm:ss'
2845+
:type time_value: str
2846+
:return: response
2847+
:rtype: requests.Response
2848+
"""
2849+
if time_format not in ("utc", "timestamp"):
2850+
raise WrongInputError("time_format parameter must be one of the following values: 'utc', 'timestamp'")
2851+
2852+
if not isinstance(time_value, str):
2853+
raise WrongInputError("time_value parameter must be string.")
2854+
2855+
base = self.__YARA_MATCHES_ENDPOINT.format(
2856+
time_format=time_format,
2857+
time_value=time_value
2858+
)
2859+
2860+
endpoint = "{base}?format=json".format(base=base)
2861+
2862+
url = self._url.format(endpoint=endpoint)
2863+
2864+
response = self._get_request(url=url)
2865+
2866+
self._raise_on_error(response)
2867+
2868+
return response
2869+
2870+
2871+
class YARARetroHunting(TiCloudAPI):
2872+
"""TCA-0319"""
2873+
2874+
__RULESET_ENDPOINT = "/api/yara/admin/v1/ruleset"
2875+
__YARA_RETRO_MATCHES_ENDPOINT = "/api/feed/yara/retro/v1/query/{time_format}/{time_value}"
2876+
2877+
def __init__(self, host, username, password, verify=True, proxies=None, user_agent=DEFAULT_USER_AGENT,
2878+
allow_none_return=False):
2879+
super(YARARetroHunting, self).__init__(host, username, password, verify, proxies, user_agent=user_agent,
2880+
allow_none_return=allow_none_return)
2881+
2882+
self._url = "{host}{{endpoint}}".format(host=self._host)
2883+
2884+
def __retro_hunt_action(self, path_suffix, ruleset_name):
2885+
"""Private method for retro hunt actions."""
2886+
if not isinstance(ruleset_name, str):
2887+
raise WrongInputError("ruleset_name parameter must be string.")
2888+
2889+
if path_suffix in ("enable-retro-hunt", "start-retro-hunt", "cancel-retro-hunt"):
2890+
endpoint = "{base}/{path_suffix}".format(
2891+
base=self.__RULESET_ENDPOINT,
2892+
path_suffix=path_suffix
2893+
)
2894+
2895+
post_json = {"ruleset_name": ruleset_name}
2896+
2897+
url = self._url.format(endpoint=endpoint)
2898+
2899+
response = self._post_request(url=url, post_json=post_json)
2900+
2901+
elif path_suffix == "status-retro-hunt":
2902+
endpoint = "{base}/{ruleset_name}/{path_suffix}".format(
2903+
base=self.__RULESET_ENDPOINT,
2904+
ruleset_name=ruleset_name,
2905+
path_suffix=path_suffix
2906+
)
2907+
2908+
url = self._url.format(endpoint=endpoint)
2909+
2910+
response = self._get_request(url=url)
2911+
2912+
else:
2913+
raise WrongInputError("The supplied path_suffix is not valid.")
2914+
2915+
self._raise_on_error(response)
2916+
2917+
return response
2918+
2919+
def enable_retro_hunt(self, ruleset_name):
2920+
"""Enables the retro hunt for the specified ruleset that has been submitted to TitaniumCloud
2921+
prior to deployment of YARA retro.
2922+
:param ruleset_name: name of the ruleset
2923+
:type ruleset_name: str
2924+
:return: response
2925+
:rtype: requests.Response
2926+
"""
2927+
response = self.__retro_hunt_action(
2928+
path_suffix="enable-retro-hunt",
2929+
ruleset_name=ruleset_name
2930+
)
2931+
2932+
return response
2933+
2934+
def start_retro_hunt(self, ruleset_name):
2935+
"""Enables the retro hunt for the specified ruleset.
2936+
:param ruleset_name: name of the ruleset
2937+
:type ruleset_name: str
2938+
:return: response
2939+
:rtype: requests.Response
2940+
"""
2941+
response = self.__retro_hunt_action(
2942+
path_suffix="start-retro-hunt",
2943+
ruleset_name=ruleset_name
2944+
)
2945+
2946+
return response
2947+
2948+
def check_status(self, ruleset_name):
2949+
"""Checks the retro hunt status for the specified ruleset.
2950+
:param ruleset_name: name of the ruleset
2951+
:type ruleset_name: str
2952+
:return: response
2953+
:rtype: requests.Response
2954+
"""
2955+
response = self.__retro_hunt_action(
2956+
path_suffix="status-retro-hunt",
2957+
ruleset_name=ruleset_name
2958+
)
2959+
2960+
return response
2961+
2962+
def cancel_retro_hunt(self, ruleset_name):
2963+
"""Cancels the retro hunt for the specified ruleset.
2964+
:param ruleset_name: name of the ruleset
2965+
:type ruleset_name: str
2966+
:return: response
2967+
:rtype: requests.Response
2968+
"""
2969+
response = self.__retro_hunt_action(
2970+
path_suffix="cancel-retro-hunt",
2971+
ruleset_name=ruleset_name
2972+
)
2973+
2974+
return response
2975+
2976+
def yara_retro_matches_feed(self, time_format, time_value):
2977+
"""Returns a recordset of YARA ruleset matches in the specified time range.
2978+
:param time_format: possible values: 'utc' or 'timestamp'
2979+
:type time_format: str
2980+
:param time_value: results will be retrieved from the specified time up until the current moment;
2981+
accepted formats are Unix timestamp string and 'YYYY-MM-DDThh:mm:ss'
2982+
:type time_value: str
2983+
:return: response
2984+
:rtype: requests.Response
2985+
"""
2986+
if time_format not in ("utc", "timestamp"):
2987+
raise WrongInputError("time_format parameter must be one of the following values: 'utc', 'timestamp'")
2988+
2989+
if not isinstance(time_value, str):
2990+
raise WrongInputError("time_value parameter must be string.")
2991+
2992+
base = self.__YARA_RETRO_MATCHES_ENDPOINT.format(
2993+
time_format=time_format,
2994+
time_value=time_value
2995+
)
2996+
2997+
endpoint = "{base}?format=json".format(base=base)
2998+
2999+
url = self._url.format(endpoint=endpoint)
3000+
3001+
response = self._get_request(url=url)
3002+
3003+
self._raise_on_error(response)
3004+
3005+
return response
3006+
3007+
26313008
def _update_hash_object(input_source, hash_object):
26323009
"""Accepts a string or an opened file in 'rb' mode and a created hashlib hash object and
26333010
returns an updated hashlib hash object.

0 commit comments

Comments
 (0)