|
| 1 | +from core.models import DataFile, DataSource |
| 2 | +from dcim.choices import DeviceStatusChoices |
| 3 | +from dcim.models import DeviceType, Location, Manufacturer, Platform, Site |
| 4 | +from django.forms import Form |
| 5 | +from django.utils.translation import gettext_lazy as _ |
| 6 | +from extras.models import Tag |
| 7 | +from netbox.forms import NetBoxModelImportForm |
| 8 | +from tenancy.models import Tenant |
| 9 | +from utilities.forms.fields import CSVChoiceField, CSVModelChoiceField, CSVModelMultipleChoiceField, JSONField |
| 10 | + |
| 11 | +from validity import choices, models |
| 12 | +from validity.api.helpers import SubformValidationMixin |
| 13 | +from .mixins import PollerCleanMixin |
| 14 | + |
| 15 | + |
| 16 | +class SubFormMixin(SubformValidationMixin): |
| 17 | + def clean(self): |
| 18 | + validated_data = {k: v for k, v in self.cleaned_data.items() if not k.startswith("_")} |
| 19 | + self.validate(validated_data) |
| 20 | + return self.cleaned_data |
| 21 | + |
| 22 | + |
| 23 | +class DataSourceMixin(Form): |
| 24 | + data_source = CSVModelChoiceField( |
| 25 | + queryset=DataSource.objects.all(), |
| 26 | + required=False, |
| 27 | + to_field_name="name", |
| 28 | + help_text=_("Data Source"), |
| 29 | + ) |
| 30 | + data_file = CSVModelChoiceField( |
| 31 | + queryset=DataFile.objects.all(), |
| 32 | + required=False, |
| 33 | + to_field_name="path", |
| 34 | + help_text=_("File from Data Source"), |
| 35 | + ) |
| 36 | + |
| 37 | + def clean_data_source(self): |
| 38 | + """ |
| 39 | + Filters data file by known data source |
| 40 | + data_source MUST go before data_file in Meta.fields |
| 41 | + """ |
| 42 | + data_source = self.cleaned_data["data_source"] |
| 43 | + if data_source is not None: |
| 44 | + datafile_field = self.fields["data_file"] |
| 45 | + datafile_field.queryset = datafile_field.queryset.filter(source=data_source) |
| 46 | + return data_source |
| 47 | + |
| 48 | + |
| 49 | +class ComplianceTestImportForm(DataSourceMixin, NetBoxModelImportForm): |
| 50 | + severity = CSVChoiceField(choices=choices.SeverityChoices.choices, help_text=_("Test Severity")) |
| 51 | + selectors = CSVModelMultipleChoiceField( |
| 52 | + queryset=models.ComplianceSelector.objects.all(), |
| 53 | + to_field_name="name", |
| 54 | + help_text=_("Compliance Selector names separated by commas, encased with double quotes"), |
| 55 | + ) |
| 56 | + |
| 57 | + class Meta: |
| 58 | + model = models.ComplianceTest |
| 59 | + fields = ("name", "severity", "description", "selectors", "expression", "data_source", "data_file", "tags") |
| 60 | + |
| 61 | + |
| 62 | +class NameSetImportForm(DataSourceMixin, NetBoxModelImportForm): |
| 63 | + tests = CSVModelMultipleChoiceField( |
| 64 | + queryset=models.ComplianceTest.objects.all(), |
| 65 | + to_field_name="name", |
| 66 | + help_text=_("Compliance Test names separated by commas, encased with double quotes"), |
| 67 | + required=False, |
| 68 | + ) |
| 69 | + |
| 70 | + class Meta: |
| 71 | + model = models.NameSet |
| 72 | + fields = ("name", "description", "_global", "tests", "definitions", "data_source", "data_file") |
| 73 | + |
| 74 | + def __init__(self, *args, headers=None, **kwargs): |
| 75 | + base_fields = {"global": self.base_fields["_global"]} | self.base_fields |
| 76 | + base_fields.pop("_global") |
| 77 | + self.base_fields = base_fields |
| 78 | + super().__init__(*args, headers=headers, **kwargs) |
| 79 | + |
| 80 | + def save(self, commit=True) -> choices.Any: |
| 81 | + if (_global := self.cleaned_data.get("global")) is not None: |
| 82 | + self.instance._global = _global |
| 83 | + return super().save(commit) |
| 84 | + |
| 85 | + |
| 86 | +class SerializerImportForm(SubFormMixin, DataSourceMixin, NetBoxModelImportForm): |
| 87 | + extraction_method = CSVChoiceField( |
| 88 | + choices=choices.ExtractionMethodChoices.choices, help_text=_("Extraction Method") |
| 89 | + ) |
| 90 | + parameters = JSONField( |
| 91 | + help_text=_( |
| 92 | + "JSON-encoded Serializer parameters depending on Extraction Method value. " |
| 93 | + "See REST API to check for specific keys/values" |
| 94 | + ) |
| 95 | + ) |
| 96 | + |
| 97 | + class Meta: |
| 98 | + model = models.Serializer |
| 99 | + fields = ("name", "extraction_method", "template", "parameters", "data_source", "data_file") |
| 100 | + |
| 101 | + |
| 102 | +class ComplianceSelectorImportForm(NetBoxModelImportForm): |
| 103 | + filter_operation = CSVChoiceField( |
| 104 | + choices=choices.BoolOperationChoices.choices, help_text=_("Filter Join Operation") |
| 105 | + ) |
| 106 | + tag_filter = CSVModelMultipleChoiceField( |
| 107 | + queryset=Tag.objects.all(), |
| 108 | + to_field_name="slug", |
| 109 | + help_text=_("Tag slugs separated by commas, encased with double quotes"), |
| 110 | + required=False, |
| 111 | + ) |
| 112 | + manufacturer_filter = CSVModelMultipleChoiceField( |
| 113 | + queryset=Manufacturer.objects.all(), |
| 114 | + to_field_name="slug", |
| 115 | + help_text=_("Manufacturer slugs separated by commas, encased with double quotes"), |
| 116 | + required=False, |
| 117 | + ) |
| 118 | + type_filter = CSVModelMultipleChoiceField( |
| 119 | + queryset=DeviceType.objects.all(), |
| 120 | + to_field_name="slug", |
| 121 | + help_text=_("Device Type slugs separated by commas, encased with double quotes"), |
| 122 | + required=False, |
| 123 | + ) |
| 124 | + platform_filter = CSVModelMultipleChoiceField( |
| 125 | + queryset=Platform.objects.all(), |
| 126 | + to_field_name="slug", |
| 127 | + help_text=_("Platform slugs separated by commas, encased with double quotes"), |
| 128 | + required=False, |
| 129 | + ) |
| 130 | + status_filter = CSVChoiceField(choices=DeviceStatusChoices, help_text=_("Device Status Filter"), required=False) |
| 131 | + location_filter = CSVModelMultipleChoiceField( |
| 132 | + queryset=Location.objects.all(), |
| 133 | + to_field_name="slug", |
| 134 | + help_text=_("Location slugs separated by commas, encased with double quotes"), |
| 135 | + required=False, |
| 136 | + ) |
| 137 | + site_filter = CSVModelMultipleChoiceField( |
| 138 | + queryset=Site.objects.all(), |
| 139 | + to_field_name="slug", |
| 140 | + help_text=_("Site slugs separated by commas, encased with double quotes"), |
| 141 | + required=False, |
| 142 | + ) |
| 143 | + tenant_filter = CSVModelMultipleChoiceField( |
| 144 | + queryset=Tenant.objects.all(), |
| 145 | + to_field_name="slug", |
| 146 | + help_text=("Tenant slugs separated by commas, encased with double quotes"), |
| 147 | + required=False, |
| 148 | + ) |
| 149 | + dynamic_pairs = CSVChoiceField( |
| 150 | + choices=choices.DynamicPairsChoices.choices, required=False, help_text=_("Dynamic Pairs") |
| 151 | + ) |
| 152 | + |
| 153 | + class Meta: |
| 154 | + model = models.ComplianceSelector |
| 155 | + fields = ( |
| 156 | + "name", |
| 157 | + "filter_operation", |
| 158 | + "name_filter", |
| 159 | + "tag_filter", |
| 160 | + "manufacturer_filter", |
| 161 | + "type_filter", |
| 162 | + "platform_filter", |
| 163 | + "status_filter", |
| 164 | + "location_filter", |
| 165 | + "site_filter", |
| 166 | + "tenant_filter", |
| 167 | + "dynamic_pairs", |
| 168 | + "dp_tag_prefix", |
| 169 | + ) |
| 170 | + |
| 171 | + |
| 172 | +class CommandImportForm(SubFormMixin, NetBoxModelImportForm): |
| 173 | + serializer = CSVModelChoiceField( |
| 174 | + queryset=models.Serializer.objects.all(), to_field_name="name", help_text=_("Serializer"), required=False |
| 175 | + ) |
| 176 | + type = CSVChoiceField(choices=choices.CommandTypeChoices.choices, help_text=_("Command Type")) |
| 177 | + parameters = JSONField( |
| 178 | + help_text=_( |
| 179 | + "JSON-encoded Command parameters depending on Type value. See REST API to check for specific keys/values" |
| 180 | + ) |
| 181 | + ) |
| 182 | + |
| 183 | + class Meta: |
| 184 | + model = models.Command |
| 185 | + fields = ("name", "label", "retrieves_config", "serializer", "type", "parameters") |
| 186 | + |
| 187 | + |
| 188 | +class PollerImportForm(PollerCleanMixin, NetBoxModelImportForm): |
| 189 | + connection_type = CSVChoiceField(choices=choices.ConnectionTypeChoices.choices, help_text=_("Connection Type")) |
| 190 | + commands = CSVModelMultipleChoiceField( |
| 191 | + queryset=models.Command.objects.all(), |
| 192 | + to_field_name="label", |
| 193 | + help_text=_("Command labels separated by commas, encased with double quotes"), |
| 194 | + ) |
| 195 | + public_credentials = JSONField(help_text=_("Public Credentials"), required=False) |
| 196 | + private_credentials = JSONField( |
| 197 | + help_text=_( |
| 198 | + "Private Credentials. ATTENTION: encryption depends on Django's SECRET_KEY var, " |
| 199 | + "values from another NetBox may not be decrypted properly" |
| 200 | + ), |
| 201 | + required=False, |
| 202 | + ) |
| 203 | + |
| 204 | + def full_clean(self) -> None: |
| 205 | + return super().full_clean() |
| 206 | + |
| 207 | + class Meta: |
| 208 | + model = models.Poller |
| 209 | + fields = ("name", "connection_type", "commands", "public_credentials", "private_credentials") |
0 commit comments