Skip to content

Commit

Permalink
Merge pull request #186 from AikidoSec/AIK-3587
Browse files Browse the repository at this point in the history
AIK-3587 Expand path traversal support for byte-like files and PurePath classes
  • Loading branch information
willem-delbare authored Sep 12, 2024
2 parents 8a6b7f7 + cfa3c95 commit 626939b
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 34 deletions.
5 changes: 5 additions & 0 deletions aikido_zen/helpers/path_to_string.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Helper function file"""

from urllib.parse import urlparse
from pathlib import PurePath


def path_to_string(path):
Expand All @@ -19,4 +20,8 @@ def path_to_string(path):
return path.decode("utf-8")
except UnicodeDecodeError:
return None
if isinstance(path, PurePath):
# Stringify PurePath. This can still allow path traversal but in extremely
# limited cases so it's safe to just stringify for now.
return str(path)
return None
17 changes: 17 additions & 0 deletions aikido_zen/helpers/path_to_string_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pathlib import Path, PurePath
import pytest
from .path_to_string import path_to_string

Expand Down Expand Up @@ -30,3 +31,19 @@ def test_path_to_string_with_other_types():
assert path_to_string(123) is None # Integer input
assert path_to_string([]) is None # List input
assert path_to_string({}) is None # Dictionary input


def test_path_to_string_with_pure_path():
assert path_to_string(PurePath("./", "/folder", "/test.py")) == "/test.py"
assert path_to_string(PurePath("./", "/folder", "test2.py")) == "/folder/test2.py"
assert path_to_string(PurePath(".", ".", ".")) == "."
assert path_to_string(PurePath()) == "."
assert path_to_string(PurePath("test1", "test2", "test3")) == "test1/test2/test3"


def test_path_to_string_with_path():
assert path_to_string(Path("./", "/folder", "/test.py")) == "/test.py"
assert path_to_string(Path("./", "/folder", "test2.py")) == "/folder/test2.py"
assert path_to_string(Path(".", ".", ".")) == "."
assert path_to_string(Path()) == "."
assert path_to_string(Path("test1", "test2", "test3")) == "test1/test2/test3"
3 changes: 2 additions & 1 deletion aikido_zen/sinks/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Sink module for `builtins`, python's built-in function
"""

from pathlib import PurePath
import copy
import aikido_zen.importhook as importhook
import aikido_zen.vulnerabilities as vulns
Expand All @@ -20,7 +21,7 @@ def on_builtins_import(builtins):

def aikido_new_open(*args, **kwargs):
# args[0] is the filename
if len(args) > 0 and isinstance(args[0], str):
if len(args) > 0 and isinstance(args[0], (str, bytes, PurePath)):
vulns.run_vulnerability_scan(
kind="path_traversal", op="builtins.open", args=(args[0],)
)
Expand Down
9 changes: 7 additions & 2 deletions aikido_zen/sinks/os.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import copy
from pathlib import PurePath
import aikido_zen.importhook as importhook
import aikido_zen.vulnerabilities as vulns

Expand Down Expand Up @@ -42,11 +43,15 @@ def generate_aikido_function(op, former_func):
"""

def aikido_new_func(*args, op=op, former_func=former_func, **kwargs):
if len(args) > 0 and isinstance(args[0], str): # args[0] : filename
if len(args) > 0 and isinstance(
args[0], (str, bytes, PurePath)
): # args[0] : filename
vulns.run_vulnerability_scan(
kind="path_traversal", op=f"os.{op}", args=(args[0],)
)
if len(args) > 1 and isinstance(args[1], str): # args[1] : Could be dst folder
if len(args) > 1 and isinstance(
args[1], (str, bytes, PurePath)
): # args[1] : Could be dst folder
vulns.run_vulnerability_scan(
kind="path_traversal", op=f"os.{op}", args=(args[1],)
)
Expand Down
40 changes: 16 additions & 24 deletions aikido_zen/sinks/tests/asyncpg_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from unittest.mock import patch
import aikido_zen.sinks.asyncpg
import aikido_zen.sinks.os
from aikido_zen.background_process.comms import reset_comms
from aikido_zen.vulnerabilities.sql_injection.dialects import Postgres

Expand Down Expand Up @@ -33,12 +34,10 @@ async def test_conn_execute(database_conn):
called_with_kind = mock_run_vulnerability_scan.call_args[1]["kind"]
assert called_with_args[0] == query
assert isinstance(called_with_args[1], Postgres)
mock_run_vulnerability_scan.assert_called_once()
assert called_with_op == "asyncpg.connection.Connection.execute"
assert called_with_kind == "sql_injection"

await conn.close()
mock_run_vulnerability_scan.assert_called_once()


@pytest.mark.asyncio
Expand All @@ -54,10 +53,8 @@ async def test_conn_fetchrow(database_conn):
called_with_args = mock_run_vulnerability_scan.call_args[1]["args"]
assert called_with_args[0] == query
assert isinstance(called_with_args[1], Postgres)
mock_run_vulnerability_scan.assert_called_once()

await conn.close()
mock_run_vulnerability_scan.assert_called_once()


@pytest.mark.asyncio
Expand All @@ -73,10 +70,8 @@ async def test_conn_fetch(database_conn):
called_with_args = mock_run_vulnerability_scan.call_args[1]["args"]
assert called_with_args[0] == query
assert isinstance(called_with_args[1], Postgres)
mock_run_vulnerability_scan.assert_called_once()

await conn.close()
mock_run_vulnerability_scan.assert_called_once()


@pytest.mark.asyncio
Expand All @@ -92,10 +87,8 @@ async def test_conn_fetchval(database_conn):
called_with_args = mock_run_vulnerability_scan.call_args[1]["args"]
assert called_with_args[0] == query
assert isinstance(called_with_args[1], Postgres)
mock_run_vulnerability_scan.assert_called_once()

await conn.close()
mock_run_vulnerability_scan.assert_called_once()


@pytest.mark.asyncio
Expand All @@ -109,13 +102,12 @@ async def test_conn_execute_parameterized(database_conn):
await conn.execute(query, "Doggo", False)
calls = mock_run_vulnerability_scan.call_args_list

first_call_args = calls[0][1]["args"]
assert first_call_args[0] == query
assert isinstance(first_call_args[1], Postgres)

second_call_args = calls[1][1]["args"]
assert second_call_args[0] == query
assert isinstance(second_call_args[1], Postgres)
counter = 0
for call in calls:
args = call[1]["args"]
if args[0] == query:
counter += 1
assert counter == 2

await conn.close()

Expand All @@ -132,14 +124,15 @@ async def test_conn_transaction(database_conn):
await conn.execute(query)

calls = mock_run_vulnerability_scan.call_args_list

first_call_args = calls[0][1]["args"]
assert first_call_args[0] == "BEGIN;"
assert isinstance(first_call_args[1], Postgres)

second_call_args = calls[1][1]["args"]
assert second_call_args[0] == query
assert isinstance(second_call_args[1], Postgres)
begin_in_calls = False
query_in_calls = False
for call in calls:
args = call[1]["args"]
if args[0] == "BEGIN;":
begin_in_calls = True
if args[0] == query:
query_in_calls = True
assert begin_in_calls == query_in_calls == True

await conn.close()

Expand All @@ -160,6 +153,5 @@ async def test_conn_cursor(database_conn):
called_with_args = mock_run_vulnerability_scan.call_args[1]["args"]
assert called_with_args[0] == "BEGIN;"
assert isinstance(called_with_args[1], Postgres)
mock_run_vulnerability_scan.assert_called_once()

await conn.close()
23 changes: 23 additions & 0 deletions aikido_zen/sinks/tests/builtins_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from unittest.mock import patch
import aikido_zen.sinks.builtins
from pathlib import Path, PurePath

kind = "path_traversal"
op = "builtins.open"
Expand All @@ -24,6 +25,23 @@ def test_open():
args = ("ltlwtjl_tlnekt.py",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

with pytest.raises(FileNotFoundError):
open(b"afljqlqfefjq.py")
args = (b"afljqlqfefjq.py",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

path = Path("./test", "/test.py")
with pytest.raises(FileNotFoundError):
open(path)
args = (path,)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

path2 = PurePath("./test", "/test.py")
with pytest.raises(FileNotFoundError):
open(path2)
args = (path2,)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)


def test_open_with_builtins_import():
with patch(
Expand All @@ -45,6 +63,11 @@ def test_open_with_builtins_import():
args = ("ltlwtjl_tlnekt.py",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

with pytest.raises(FileNotFoundError):
builtins.open(b"shleklelkwge.py")
args = (b"shleklelkwge.py",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)


def test_open_invalid_input():
with patch(
Expand Down
40 changes: 38 additions & 2 deletions aikido_zen/sinks/tests/os_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from pathlib import Path, PurePath
from unittest.mock import patch
import aikido_zen.sinks.os

Expand All @@ -22,6 +23,18 @@ def test_ospath_commands():
args = ("aqkqjefbkqlleq_qkvfjksaicuaviel",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

os.path.realpath(b"te2st/test2")
op = "os.path.realpath"
args = (b"te2st/test2",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

path1 = Path("./test", "test2", "test3")
os.path.realpath(path1)

op = "os.path.realpath"
args = (path1,)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)


def test_os_commands():
with patch(
Expand Down Expand Up @@ -69,13 +82,35 @@ def test_os_commands():
args = ("test_path.pathy",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

with pytest.raises(FileNotFoundError):
os.unlink(b"wjlewjrlke")
op = "os.unlink"
args = (b"wjlewjrlke",)
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

os.rename("qlkgkjbnlzheioe_kjbfkjeiLJ", "lkflkenlnlgksnk_aknflkenfk")
op = "os.rename"
args = ("qlkgkjbnlzheioe_kjbfkjeiLJ",)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)
args = ("lkflkenlnlgksnk_aknflkenfk",)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)

with pytest.raises(FileNotFoundError):
os.rename(b"akflaflkqkajlqjoiq", b"kfjlfehfkakj")
op = "os.rename"
args = (b"akflaflkqkajlqjoiq",)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)
args = (b"kfjlfehfkakj",)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)

with pytest.raises(FileNotFoundError):
os.rename("akflaflkqkajlqjoiq", b"aflkkelwwgw")
op = "os.rename"
args = ("akflaflkqkajlqjoiq",)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)
args = (b"aflkkelwwgw",)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)

os.rmdir("lkflkenlnlgksnk_aknflkenfk")
op = "os.rmdir"
args = ("lkflkenlnlgksnk_aknflkenfk",)
Expand All @@ -95,12 +130,13 @@ def test_os_commands():
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)
os.remove("aqwfkjqkjqwfhoie_kjfhejlwqk")

path1 = PurePath("kkjehrqlknl_qk3rqlrjgna")
with pytest.raises(FileNotFoundError):
os.link("qkfhwifqqlejfke_qlfjboqvdbshjw", "kkjehrqlknl_qk3rqlrjgna")
os.link("qkfhwifqqlejfke_qlfjboqvdbshjw", path1)
op = "os.link"
args = ("qkfhwifqqlejfke_qlfjboqvdbshjw",)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)
args = ("kkjehrqlknl_qk3rqlrjgna",)
args = (path1,)
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)

os.walk("qlfjwqqfqelfknef_kwlkgrlkbwkalwd")
Expand Down
10 changes: 5 additions & 5 deletions aikido_zen/sinks/tests/shutil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_shutil_copymode():
op = "shutil.copymode"
args1 = ("Makefile",)
args2 = ("test2",)
assert len(mock_run_vulnerability_scan.call_args_list) == 2
assert len(mock_run_vulnerability_scan.call_args_list) == 3
call_1 = mock_run_vulnerability_scan.call_args_list[0]
call_2 = mock_run_vulnerability_scan.call_args_list[1]

Expand All @@ -76,7 +76,7 @@ def test_shutil_copystat():
op = "shutil.copystat"
args1 = ("Makefile",)
args2 = ("test2",)
assert len(mock_run_vulnerability_scan.call_args_list) == 2
assert len(mock_run_vulnerability_scan.call_args_list) == 3
call_1 = mock_run_vulnerability_scan.call_args_list[0]
call_2 = mock_run_vulnerability_scan.call_args_list[1]

Expand Down Expand Up @@ -115,7 +115,7 @@ def test_shutil_move():
op = "shutil.move"
args1 = ("test2",)
args2 = ("test3",)
assert len(mock_run_vulnerability_scan.call_args_list) == 2
assert len(mock_run_vulnerability_scan.call_args_list) == 4
call_1 = mock_run_vulnerability_scan.call_args_list[0]
call_2 = mock_run_vulnerability_scan.call_args_list[1]

Expand All @@ -134,7 +134,7 @@ def test_shutil_copy():
op = "builtins.open"
args1 = ("Makefile",)
args2 = ("test2",)
assert len(mock_run_vulnerability_scan.call_args_list) == 4
assert len(mock_run_vulnerability_scan.call_args_list) == 5
call_1 = mock_run_vulnerability_scan.call_args_list[0]
call_2 = mock_run_vulnerability_scan.call_args_list[1]

Expand All @@ -154,7 +154,7 @@ def test_shutil_copy2():
op = "builtins.open"
args1 = ("Makefile",)
args2 = ("test2",)
assert len(mock_run_vulnerability_scan.call_args_list) == 4
assert len(mock_run_vulnerability_scan.call_args_list) == 5
call_1 = mock_run_vulnerability_scan.call_args_list[0]
call_2 = mock_run_vulnerability_scan.call_args_list[1]

Expand Down

0 comments on commit 626939b

Please sign in to comment.