Skip to content

Commit

Permalink
adding some import scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
himynamesdave committed Oct 16, 2024
1 parent 72b7790 commit a6c9c95
Show file tree
Hide file tree
Showing 7 changed files with 525 additions and 4 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ You can access the django admin UI at:

http://127.0.0.1:8005/admin

### Add data

By default, the `ARANGODB_DATABASE``

### Running in production

Note, if you intend on using this in production, you should also modify the variables in the `.env` file for `POSTGRES_DB`, `POSTGRES_USER`, `POSTGRES_PASS`, `DJANGO_SECRET` and `DEBUG` (to `False`)

## Quickstart

To get up and running quickly head to `/utilities/README.md` for some scripts that will automate the backfill of data.

## Support

[Minimal support provided via the DOGESEC community](https://community.dogesec.com/).
Expand Down
84 changes: 84 additions & 0 deletions utilities/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Vulmatch Utilities

Run these to backfill Vulmatch with data.


## Enrichment backfill

### CPE

Generally very old CPEs are no longer observed, but that does not mean that recent CVEs will reference them.

To be safe backfill all CPEs (beware, this is over 1 million records).

If you want, you can also specify an earliest CPE date in the script below. Be aware though, if CVEs reference CPEs you havent imported (because they have a modified time earlier than that specified) you will miss the CVE -> CPE joins.

### ATT&CK Enterprise

Import all available versions (recommended)

```shell
python3 import_attack_enterprise_archive.py
```

Import specific versions

```shell
python3 import_attack_enterprise_archive.py 1.0 14.1 15.0 15.1
```

### ATT&CK ICS

Import all available versions (recommended)

```shell
python3 import_attack_ics_archive.py
```

Import specific versions

```shell
python3 import_attack_ics_archive.py 14.1 15.0 15.1
```

### ATT&CK Mobile

Import all available versions (recommended)

```shell
python3 import_attack_mobile_archive.py
```

Import specific versions

```shell
python3 import_attack_mobile_archive.py 15.1 11.1-beta
```

### CWE

Import all available versions (recommended)

```shell
python3 import_cwe_archive.py
```

Import specific versions

```shell
python3 import_cwe_archive.py 4.14 4.15
```

### CAPEC

Import all available versions (recommended)

```shell
python3 import_capec_archive.py
```

Import specific versions

```shell
python3 import_capec_archive.py 3.8 3.9
```
86 changes: 86 additions & 0 deletions utilities/import_attack_enterprise_archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import argparse
import requests
import time

# Base URLs of the API
BASE_URL = 'http://127.0.0.1:8005/api/v1/attack-enterprise/'
JOB_STATUS_URL = 'http://127.0.0.1:8005/api/v1/jobs/'

# List of all available versions
ALL_VERSIONS = [
"1.0", "2.0", "3.0", "4.0", "5.0", "5.1", "5.2", "6.0", "6.1", "6.2", "6.3",
"7.0", "7.1", "7.2", "8.0", "8.1", "8.2", "9.0", "10.0", "10.1", "11.0",
"11.1", "11.2", "11.3", "12.0", "12.1", "13.0", "13.1", "14.0", "14.1",
"15.0", "15.1"
]

# Function to post version and get job ID
def post_version(version):
url = BASE_URL
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}

# Replace . with _ for version formatting
version_str = str(version).replace('.', '_')
data = {
"version": version_str
}

print(f"Sending POST request for version: {version_str}")
response = requests.post(url, headers=headers, json=data)

# Print full request and response for debugging
print(f"Request Data: {data}")
print(f"Response: {response.status_code}, {response.text}")

# Accept both 200 OK and 201 Created as successful responses
if response.status_code in [200, 201]:
response_data = response.json()
return response_data['id'] # Return job ID
else:
raise Exception(f"Failed to submit version {version}: {response.status_code} - {response.text}")

# Function to check job status
def check_job_status(job_id):
url = f"{JOB_STATUS_URL}{job_id}/"

while True:
print(f"Checking job status for job ID: {job_id}")
response = requests.get(url)

# Print full request and response for debugging
print(f"Job Status Response: {response.status_code}, {response.text}")

if response.status_code == 200:
response_data = response.json()
if response_data['state'] == 'completed':
print(f"Job {job_id} completed.")
return
else:
print(f"Job {job_id} still in progress. Waiting...")
time.sleep(30) # Wait 30 seconds before checking again
else:
raise Exception(f"Failed to check job status: {response.status_code} - {response.text}")

def main():
# Parse CLI arguments
parser = argparse.ArgumentParser(description="Post versions and track job status.")
parser.add_argument('versions', nargs='*', type=float, help="List of versions to post as numbers (e.g., 14.1, 15.0). If not provided, all versions will be imported.")
args = parser.parse_args()

# Use provided versions or default to all if none are provided
versions = sorted(args.versions) if args.versions else sorted(ALL_VERSIONS)

# Post each version and check job status
for version in versions:
try:
job_id = post_version(version)
check_job_status(job_id)
except Exception as e:
print(f"Error occurred: {e}")
break

if __name__ == "__main__":
main()
85 changes: 85 additions & 0 deletions utilities/import_attack_ics_archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import argparse
import requests
import time

# Base URLs of the API
BASE_URL = 'http://127.0.0.1:8005/api/v1/attack-ics/'
JOB_STATUS_URL = 'http://127.0.0.1:8005/api/v1/jobs/'

# List of all available versions
ALL_VERSIONS = [
"8.0", "8.1", "8.2", "9.0", "10.0", "10.1", "11.0",
"11.1", "11.2", "11.3", "12.0", "12.1", "13.0", "13.1", "14.0", "14.1",
"15.0", "15.1"
]

# Function to post version and get job ID
def post_version(version):
url = BASE_URL
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}

# Replace . with _ for version formatting
version_str = str(version).replace('.', '_')
data = {
"version": version_str
}

print(f"Sending POST request for version: {version_str}")
response = requests.post(url, headers=headers, json=data)

# Print full request and response for debugging
print(f"Request Data: {data}")
print(f"Response: {response.status_code}, {response.text}")

# Accept both 200 OK and 201 Created as successful responses
if response.status_code in [200, 201]:
response_data = response.json()
return response_data['id'] # Return job ID
else:
raise Exception(f"Failed to submit version {version}: {response.status_code} - {response.text}")

# Function to check job status
def check_job_status(job_id):
url = f"{JOB_STATUS_URL}{job_id}/"

while True:
print(f"Checking job status for job ID: {job_id}")
response = requests.get(url)

# Print full request and response for debugging
print(f"Job Status Response: {response.status_code}, {response.text}")

if response.status_code == 200:
response_data = response.json()
if response_data['state'] == 'completed':
print(f"Job {job_id} completed.")
return
else:
print(f"Job {job_id} still in progress. Waiting...")
time.sleep(30) # Wait 30 seconds before checking again
else:
raise Exception(f"Failed to check job status: {response.status_code} - {response.text}")

def main():
# Parse CLI arguments
parser = argparse.ArgumentParser(description="Post versions and track job status.")
parser.add_argument('versions', nargs='*', type=float, help="List of versions to post as numbers (e.g., 14.1, 15.0). If not provided, all versions will be imported.")
args = parser.parse_args()

# Use provided versions or default to all if none are provided
versions = sorted(args.versions) if args.versions else sorted(ALL_VERSIONS)

# Post each version and check job status
for version in versions:
try:
job_id = post_version(version)
check_job_status(job_id)
except Exception as e:
print(f"Error occurred: {e}")
break

if __name__ == "__main__":
main()
86 changes: 86 additions & 0 deletions utilities/import_attack_mobile_archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import argparse
import requests
import time

# Base URLs of the API
BASE_URL = 'http://127.0.0.1:8005/api/v1/attack-mobile/'
JOB_STATUS_URL = 'http://127.0.0.1:8005/api/v1/jobs/'

# List of all available versions
ALL_VERSIONS = [
"1.0", "2.0", "3.0", "4.0", "5.0", "5.1", "5.2", "6.0", "6.1", "6.2", "6.3",
"7.0", "7.1", "7.2", "8.0", "8.1", "8.2", "9.0", "10.0", "10.1", "11.0-beta",
"11.1-beta", "11.2-beta", "11.3", "12.0", "12.1", "13.0", "13.1", "14.0", "14.1",
"15.0", "15.1"
]

# Function to post version and get job ID
def post_version(version):
url = BASE_URL
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}

# Replace . with _ for version formatting
version_str = str(version).replace('.', '_')
data = {
"version": version_str
}

print(f"Sending POST request for version: {version_str}")
response = requests.post(url, headers=headers, json=data)

# Print full request and response for debugging
print(f"Request Data: {data}")
print(f"Response: {response.status_code}, {response.text}")

# Accept both 200 OK and 201 Created as successful responses
if response.status_code in [200, 201]:
response_data = response.json()
return response_data['id'] # Return job ID
else:
raise Exception(f"Failed to submit version {version}: {response.status_code} - {response.text}")

# Function to check job status
def check_job_status(job_id):
url = f"{JOB_STATUS_URL}{job_id}/"

while True:
print(f"Checking job status for job ID: {job_id}")
response = requests.get(url)

# Print full request and response for debugging
print(f"Job Status Response: {response.status_code}, {response.text}")

if response.status_code == 200:
response_data = response.json()
if response_data['state'] == 'completed':
print(f"Job {job_id} completed.")
return
else:
print(f"Job {job_id} still in progress. Waiting...")
time.sleep(30) # Wait 30 seconds before checking again
else:
raise Exception(f"Failed to check job status: {response.status_code} - {response.text}")

def main():
# Parse CLI arguments
parser = argparse.ArgumentParser(description="Post versions and track job status.")
parser.add_argument('versions', nargs='*', type=str, help="List of versions to post (e.g., 14.1, 15.0, 11.1-beta). If not provided, all versions will be imported.")
args = parser.parse_args()

# Use provided versions or default to all if none are provided
versions = sorted(args.versions) if args.versions else sorted(ALL_VERSIONS, key=lambda v: [int(x) if x.isdigit() else x for x in v.replace('-', '.').split('.')])

# Post each version and check job status
for version in versions:
try:
job_id = post_version(version)
check_job_status(job_id)
except Exception as e:
print(f"Error occurred: {e}")
break

if __name__ == "__main__":
main()
Loading

0 comments on commit a6c9c95

Please sign in to comment.