diff --git a/api_app/analyzers_manager/migrations/0194_analyzer_config_rdap.py b/api_app/analyzers_manager/migrations/0194_analyzer_config_rdap.py new file mode 100644 index 0000000000..f43a0bae0f --- /dev/null +++ b/api_app/analyzers_manager/migrations/0194_analyzer_config_rdap.py @@ -0,0 +1,129 @@ +from django.db import migrations +from django.db.models.fields.related_descriptors import ( + ForwardManyToOneDescriptor, + ForwardOneToOneDescriptor, + ManyToManyDescriptor, + ReverseManyToOneDescriptor, + ReverseOneToOneDescriptor, +) + +plugin = { + "python_module": { + "health_check_schedule": None, + "update_schedule": None, + "module": "rdap.Rdap", + "base_path": "api_app.analyzers_manager.observable_analyzers", + }, + "name": "Rdap", + "description": "Query the public [RDAP](https://about.rdap.org/) bootstrap " + "(rdap.org) for registration data (RFC 9082); the free, unauthenticated WHOIS " + "successor. Supports IPs, domains and URLs.", + "disabled": False, + "soft_time_limit": 30, + "routing_key": "default", + "health_check_status": True, + "type": "observable", + "docker_based": False, + "maximum_tlp": "AMBER", + "observable_supported": ["ip", "domain", "url"], + "supported_filetypes": [], + "run_hash": False, + "run_hash_type": "", + "not_supported_filetypes": [], + "mapping_data_model": {}, + "model": "analyzers_manager.AnalyzerConfig", +} + +params = [] + +values = [] + + +def _get_real_obj(Model, field, value): + def _get_obj(Model, other_model, value): + if isinstance(value, dict): + real_vals = {} + for key, real_val in value.items(): + real_vals[key] = _get_real_obj(other_model, key, real_val) + value = other_model.objects.get_or_create(**real_vals)[0] + # it is just the primary key serialized + else: + if isinstance(value, int): + if Model.__name__ == "PluginConfig": + value = other_model.objects.get(name=plugin["name"]) + else: + value = other_model.objects.get(pk=value) + else: + value = other_model.objects.get(name=value) + return value + + if ( + type(getattr(Model, field)) + in [ + ForwardManyToOneDescriptor, + ReverseManyToOneDescriptor, + ReverseOneToOneDescriptor, + ForwardOneToOneDescriptor, + ] + and value + ): + other_model = getattr(Model, field).get_queryset().model + value = _get_obj(Model, other_model, value) + elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: + other_model = getattr(Model, field).rel.model + value = [_get_obj(Model, other_model, val) for val in value] + return value + + +def _create_object(Model, data): + mtm, no_mtm = {}, {} + for field, value in data.items(): + value = _get_real_obj(Model, field, value) + if type(getattr(Model, field)) is ManyToManyDescriptor: + mtm[field] = value + else: + no_mtm[field] = value + try: + o = Model.objects.get(**no_mtm) + except Model.DoesNotExist: + o = Model(**no_mtm) + o.full_clean() + o.save() + for field, value in mtm.items(): + attribute = getattr(o, field) + if value is not None: + attribute.set(value) + return False + return True + + +def migrate(apps, schema_editor): + Parameter = apps.get_model("api_app", "Parameter") + PluginConfig = apps.get_model("api_app", "PluginConfig") + plugin_data = dict(plugin) + python_path = plugin_data.pop("model") + Model = apps.get_model(*python_path.split(".")) + if not Model.objects.filter(name=plugin_data["name"]).exists(): + exists = _create_object(Model, plugin_data) + if not exists: + for param in params: + _create_object(Parameter, param) + for value in values: + _create_object(PluginConfig, value) + + +def reverse_migrate(apps, schema_editor): + plugin_data = dict(plugin) + python_path = plugin_data.pop("model") + Model = apps.get_model(*python_path.split(".")) + Model.objects.get(name=plugin_data["name"]).delete() + + +class Migration(migrations.Migration): + atomic = False + dependencies = [ + ("api_app", "0073_alter_updatecheckstatus_last_checked_at_and_more"), + ("analyzers_manager", "0193_analyzer_config_cve_exploitability"), + ] + + operations = [migrations.RunPython(migrate, reverse_migrate)] diff --git a/api_app/analyzers_manager/observable_analyzers/rdap.py b/api_app/analyzers_manager/observable_analyzers/rdap.py new file mode 100644 index 0000000000..47cfafb833 --- /dev/null +++ b/api_app/analyzers_manager/observable_analyzers/rdap.py @@ -0,0 +1,76 @@ +# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl +# See the file 'LICENSE' for copying permission. + +import ipaddress +from urllib.parse import urlparse + +import requests + +from api_app.analyzers_manager import classes +from api_app.analyzers_manager.exceptions import AnalyzerRunException +from api_app.choices import Classification + + +class Rdap(classes.ObservableAnalyzer): + """Query the public RDAP bootstrap (https://rdap.org) for an observable's + registration data. + + RDAP (Registration Data Access Protocol, RFC 9082/9083) is the IETF-standard, + free and unauthenticated successor to WHOIS. It returns structured JSON + describing the registration of IP addresses, domains, and URLs (resolved to + their host). The rdap.org bootstrap redirects each query to the authoritative + RDAP server for the object. + """ + + url: str = "https://rdap.org" + + def update(self) -> bool: + # No local data to refresh; rdap.org is queried live on every run. + return False + + @staticmethod + def _path_for_host(host: str) -> str: + """Pick the RDAP endpoint for a host: ``ip/`` for an IP literal + (the host of a URL can be one), ``domain/`` otherwise.""" + try: + ipaddress.ip_address(host) + except ValueError: + return f"domain/{host}" + return f"ip/{host}" + + def run(self): + if self.observable_classification == Classification.IP: + path = f"ip/{self.observable_name}" + elif self.observable_classification == Classification.DOMAIN: + path = f"domain/{self.observable_name}" + elif self.observable_classification == Classification.URL: + hostname = urlparse(self.observable_name).hostname + if not hostname: + raise AnalyzerRunException(f"unable to extract a hostname from URL {self.observable_name}") + path = self._path_for_host(hostname) + else: + raise AnalyzerRunException( + f"{self.observable_classification} is not a supported observable type " + "for RDAP (supported: ip, domain, url)" + ) + + try: + response = requests.get( + f"{self.url}/{path}", + headers={"Accept": "application/rdap+json"}, + timeout=10, + ) + # RDAP returns 404 when the registry holds no record for the object; + # treat that as a clean negative result rather than an error. + if response.status_code == 404: + return {"found": False} + response.raise_for_status() + except requests.RequestException as e: + raise AnalyzerRunException(str(e)) from e + + try: + result = response.json() + except ValueError as e: + raise AnalyzerRunException(f"RDAP server returned a non-JSON response: {e}") from e + result["found"] = True + return result diff --git a/api_app/playbooks_manager/migrations/0068_add_rdap_to_free_to_use.py b/api_app/playbooks_manager/migrations/0068_add_rdap_to_free_to_use.py new file mode 100644 index 0000000000..e45de443f8 --- /dev/null +++ b/api_app/playbooks_manager/migrations/0068_add_rdap_to_free_to_use.py @@ -0,0 +1,34 @@ +# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl +# See the file 'LICENSE' for copying permission. + + +from django.db import migrations + + +def migrate(apps, schema_editor): + playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig") + AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig") + pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS") + pc.analyzers.add(AnalyzerConfig.objects.get(name="Rdap").id) + pc.full_clean() + pc.save() + + +def reverse_migrate(apps, schema_editor): + playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig") + AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig") + pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS") + pc.analyzers.remove(AnalyzerConfig.objects.get(name="Rdap").id) + pc.full_clean() + pc.save() + + +class Migration(migrations.Migration): + dependencies = [ + ("playbooks_manager", "0067_add_cve_exploitability_to_free_to_use"), + ("analyzers_manager", "0194_analyzer_config_rdap"), + ] + + operations = [ + migrations.RunPython(migrate, reverse_migrate), + ] diff --git a/tests/api_app/analyzers_manager/observable_analyzers/test_rdap.py b/tests/api_app/analyzers_manager/observable_analyzers/test_rdap.py new file mode 100644 index 0000000000..e24dc33ed6 --- /dev/null +++ b/tests/api_app/analyzers_manager/observable_analyzers/test_rdap.py @@ -0,0 +1,76 @@ +# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl +# See the file 'LICENSE' for copying permission. + +from unittest.mock import MagicMock, patch + +from api_app.analyzers_manager.exceptions import AnalyzerRunException +from api_app.analyzers_manager.observable_analyzers.rdap import Rdap +from api_app.choices import Classification +from tests import CustomTestCase + + +class RdapTestCase(CustomTestCase): + """Unit tests for the RDAP analyzer (mocked HTTP, no live calls).""" + + @staticmethod + def _analyzer(observable_name, classification): + analyzer = Rdap(config={}) + analyzer.observable_name = observable_name + analyzer.observable_classification = classification + return analyzer + + @staticmethod + def _ok(json_data): + response = MagicMock(status_code=200) + response.raise_for_status.return_value = None + response.json.return_value = json_data + return response + + @patch("api_app.analyzers_manager.observable_analyzers.rdap.requests.get") + def test_domain_found(self, mock_get): + mock_get.return_value = self._ok({"objectClassName": "domain", "ldhName": "example.com"}) + result = self._analyzer("example.com", Classification.DOMAIN).run() + self.assertTrue(result["found"]) + self.assertEqual(result["ldhName"], "example.com") + self.assertIn("rdap.org/domain/example.com", mock_get.call_args.args[0]) + + @patch("api_app.analyzers_manager.observable_analyzers.rdap.requests.get") + def test_ip_uses_ip_endpoint(self, mock_get): + mock_get.return_value = self._ok({"objectClassName": "ip network"}) + self._analyzer("1.1.1.1", Classification.IP).run() + self.assertIn("rdap.org/ip/1.1.1.1", mock_get.call_args.args[0]) + + @patch("api_app.analyzers_manager.observable_analyzers.rdap.requests.get") + def test_url_resolves_to_host_domain(self, mock_get): + mock_get.return_value = self._ok({"objectClassName": "domain"}) + self._analyzer("https://sub.example.com/path?q=1", Classification.URL).run() + self.assertIn("rdap.org/domain/sub.example.com", mock_get.call_args.args[0]) + + @patch("api_app.analyzers_manager.observable_analyzers.rdap.requests.get") + def test_url_with_ip_host_uses_ip_endpoint(self, mock_get): + mock_get.return_value = self._ok({"objectClassName": "ip network"}) + self._analyzer("http://1.1.1.1/path", Classification.URL).run() + self.assertIn("rdap.org/ip/1.1.1.1", mock_get.call_args.args[0]) + + @patch("api_app.analyzers_manager.observable_analyzers.rdap.requests.get") + def test_non_json_response_raises(self, mock_get): + response = MagicMock(status_code=200) + response.raise_for_status.return_value = None + response.json.side_effect = ValueError("Expecting value") + mock_get.return_value = response + with self.assertRaises(AnalyzerRunException): + self._analyzer("example.com", Classification.DOMAIN).run() + + @patch("api_app.analyzers_manager.observable_analyzers.rdap.requests.get") + def test_not_found_returns_clean_negative(self, mock_get): + mock_get.return_value = MagicMock(status_code=404) + result = self._analyzer("does-not-exist.invalid", Classification.DOMAIN).run() + self.assertEqual(result, {"found": False}) + + def test_unsupported_classification_raises(self): + with self.assertRaises(AnalyzerRunException): + self._analyzer("deadbeefdeadbeef", Classification.HASH).run() + + def test_url_without_hostname_raises(self): + with self.assertRaises(AnalyzerRunException): + self._analyzer("not-a-url", Classification.URL).run()