Skip to content

Commit

Permalink
REFACTOR: Rpyc services to remove vulnerabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
SMoraisAnsys committed Jun 19, 2024
1 parent 8d94ca8 commit 318ba5b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 13 deletions.
18 changes: 18 additions & 0 deletions pyaedt/misc/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,21 @@ def current_student_version():
if "SV" in version_key:
return version_key
return ""


def is_safe_path(path, allowed_extensions=None):
"""Validate if a path is safe to use."""
# Ensure path is an existing file or directory
if not os.path.exists(path) or not os.path.isfile(path):
return False

# Restrict to allowed file extensions:
if allowed_extensions:
if not any(path.endswith(extension) for extension in allowed_extensions):
return False

# Ensure path does not contain dangerous characters
if any(char in path for char in (";", "|", "&", "$", "<", ">", "`")):
return False

return True
52 changes: 39 additions & 13 deletions pyaedt/rpc/rpyc_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
from pyaedt.generic.general_methods import is_ironpython
from pyaedt.generic.settings import is_linux
from pyaedt import is_windows
from pyaedt.misc.misc import is_safe_path

Check warning on line 17 in pyaedt/rpc/rpyc_services.py

View check run for this annotation

Codecov / codecov/patch

pyaedt/rpc/rpyc_services.py#L17

Added line #L17 was not covered by tests

if is_linux and is_ironpython:
import subprocessdotnet as subprocess
import subprocessdotnet as subprocess # nosec

Check warning on line 20 in pyaedt/rpc/rpyc_services.py

View check run for this annotation

Codecov / codecov/patch

pyaedt/rpc/rpyc_services.py#L20

Added line #L20 was not covered by tests
else:
import subprocess
import subprocess # nosec

Check warning on line 22 in pyaedt/rpc/rpyc_services.py

View check run for this annotation

Codecov / codecov/patch

pyaedt/rpc/rpyc_services.py#L22

Added line #L22 was not covered by tests

if not is_ironpython:
import rpyc
Expand Down Expand Up @@ -270,24 +271,37 @@ def exposed_run_script(self, script, aedt_version="2021.2", ansysem_path=None, n
elif os.path.exists(script):
script_file = script
else:
return "File wrong or wrong commands."
return "Wrong file or wrong commands."
if not is_safe_path(script_file):
return "Script file {} not safe.".format(script_file)

Check warning on line 276 in pyaedt/rpc/rpyc_services.py

View check run for this annotation

Codecov / codecov/patch

pyaedt/rpc/rpyc_services.py#L274-L276

Added lines #L274 - L276 were not covered by tests
executable = "ansysedt.exe"
if is_linux and not ansysem_path and not env_path(aedt_version):
ansysem_path = os.getenv("PYAEDT_SERVER_AEDT_PATH", "")
if env_path(aedt_version) or ansysem_path:
if not ansysem_path:
ansysem_path = env_path(aedt_version)

ng_feature = " -features=SF159726_SCRIPTOBJECT"
exe_path = os.path.join(ansysem_path, executable)
if not is_safe_path(exe_path):
return "Ansys EM path not safe."
command = [exe_path]
if non_graphical:
command.append("-ng")
ng_feature = "-features=SF159726_SCRIPTOBJECT"

Check warning on line 289 in pyaedt/rpc/rpyc_services.py

View check run for this annotation

Codecov / codecov/patch

pyaedt/rpc/rpyc_services.py#L283-L289

Added lines #L283 - L289 were not covered by tests
if self._beta_options:
for opt in range(self._beta_options.__len__()):
if self._beta_options[opt] not in ng_feature:
ng_feature += "," + self._beta_options[opt]
if non_graphical:
ng_feature += ",SF6694_NON_GRAPHICAL_COMMAND_EXECUTION -ng"
command = os.path.join(ansysem_path, executable) + ng_feature + " -RunScriptAndExit " + script_file
p = subprocess.Popen(command)
p.wait()
ng_feature += ",SF6694_NON_GRAPHICAL_COMMAND_EXECUTION"
command.append(ng_feature)
command = [exe_path, ng_feature, "-RunScriptAndExit", script_file]
try:
p = subprocess.Popen(command) # nosec
p.wait()
except subprocess.CalledProcessError as e:
msg = "Command failed with error: {}".format(e)
logger.error(msg)
return msg

Check warning on line 304 in pyaedt/rpc/rpyc_services.py

View check run for this annotation

Codecov / codecov/patch

pyaedt/rpc/rpyc_services.py#L295-L304

Added lines #L295 - L304 were not covered by tests
return "Script Executed."

else:
Expand Down Expand Up @@ -871,7 +885,13 @@ def aedt_grpc(port=None, beta_options=None, use_aedt_relative_path=False, non_gr
from pyaedt.generic.general_methods import grpc_active_sessions
sessions = grpc_active_sessions()
if not port:
port = check_port(random.randint(18500, 20000))
# TODO: Remove once IronPython is deprecated
if is_ironpython:
port = check_port(random.randint(18500, 20000)) # nosec

Check warning on line 890 in pyaedt/rpc/rpyc_services.py

View check run for this annotation

Codecov / codecov/patch

pyaedt/rpc/rpyc_services.py#L889-L890

Added lines #L889 - L890 were not covered by tests
else:
import secrets
secure_random = secrets.SystemRandom()
port = check_port(secure_random.randint(18500, 20000))

Check warning on line 894 in pyaedt/rpc/rpyc_services.py

View check run for this annotation

Codecov / codecov/patch

pyaedt/rpc/rpyc_services.py#L892-L894

Added lines #L892 - L894 were not covered by tests

if port == 0:
print("Error. No ports are available.")
Expand Down Expand Up @@ -1097,7 +1117,7 @@ def on_disconnect(self, connection):
try:
edb.close_edb()
except Exception:
pass
logger.warning("Error when trying to close EDB.")

Check warning on line 1120 in pyaedt/rpc/rpyc_services.py

View check run for this annotation

Codecov / codecov/patch

pyaedt/rpc/rpyc_services.py#L1120

Added line #L1120 was not covered by tests

def start_service(self, port):
"""Connect to remove service manager and run a new server on specified port.
Expand Down Expand Up @@ -1157,5 +1177,11 @@ def exposed_stop_service(self, port):

@staticmethod
def exposed_check_port():
port_number = random.randint(18500, 20000)
return check_port(port_number)
# TODO: Remove once IronPython is deprecated
if is_ironpython: # nosec
port = check_port(random.randint(18500, 20000)) # nosec

Check warning on line 1182 in pyaedt/rpc/rpyc_services.py

View check run for this annotation

Codecov / codecov/patch

pyaedt/rpc/rpyc_services.py#L1181-L1182

Added lines #L1181 - L1182 were not covered by tests
else:
import secrets
secure_random = secrets.SystemRandom()
port = check_port(secure_random.randint(18500, 20000))
return port

Check warning on line 1187 in pyaedt/rpc/rpyc_services.py

View check run for this annotation

Codecov / codecov/patch

pyaedt/rpc/rpyc_services.py#L1184-L1187

Added lines #L1184 - L1187 were not covered by tests

0 comments on commit 318ba5b

Please sign in to comment.