|
| 1 | +#!/usr/bin/env python3 -X utf8 |
| 2 | +# type: ignore |
| 3 | +import argparse |
| 4 | +import sys |
| 5 | +import tempfile |
| 6 | +from logging import _nameToLevel as levelNames |
| 7 | + |
| 8 | +from apifuzzer.fuzz_utils import get_api_definition_from_file, get_api_definition_from_url |
| 9 | +from apifuzzer.fuzzer_target.fuzz_request_sender import FuzzerTarget |
| 10 | +from apifuzzer.openapi_template_generator import OpenAPITemplateGenerator |
| 11 | +from apifuzzer.server_fuzzer import OpenApiServerFuzzer |
| 12 | +from apifuzzer.utils import json_data, set_logger |
| 13 | +from kitty.interfaces.base import EmptyInterface |
| 14 | +from kitty.model import GraphModel |
| 15 | + |
| 16 | + |
| 17 | +class Fuzzer: |
| 18 | + def __init__( |
| 19 | + self, |
| 20 | + api_resources, |
| 21 | + report_dir, |
| 22 | + test_level, |
| 23 | + log_level, |
| 24 | + basic_output, |
| 25 | + alternate_url=None, |
| 26 | + test_result_dst=None, |
| 27 | + auth_headers=None, |
| 28 | + api_definition_url=None, |
| 29 | + junit_report_path=None, |
| 30 | + ): |
| 31 | + self.api_resources = api_resources |
| 32 | + self.base_url = None |
| 33 | + self.alternate_url = alternate_url |
| 34 | + self.templates = None |
| 35 | + self.test_level = test_level |
| 36 | + self.report_dir = report_dir |
| 37 | + self.test_result_dst = test_result_dst |
| 38 | + self.auth_headers = auth_headers if auth_headers else {} |
| 39 | + self.junit_report_path = junit_report_path |
| 40 | + self.logger = set_logger(log_level, basic_output) |
| 41 | + self.logger.info("APIFuzzer initialized") |
| 42 | + self.api_definition_url = api_definition_url |
| 43 | + |
| 44 | + def prepare(self): |
| 45 | + # here we will be able to branch the template generator if we will support other than Swagger / OpenAPI |
| 46 | + template_generator = OpenAPITemplateGenerator( |
| 47 | + self.api_resources, logger=self.logger, api_definition_url=self.api_definition_url |
| 48 | + ) |
| 49 | + template_generator.process_api_resources() |
| 50 | + self.templates = template_generator.templates |
| 51 | + self.base_url = template_generator.compile_base_url(self.alternate_url) |
| 52 | + |
| 53 | + def run(self): |
| 54 | + target = FuzzerTarget( |
| 55 | + name="target", |
| 56 | + base_url=self.base_url, |
| 57 | + report_dir=self.report_dir, |
| 58 | + auth_headers=self.auth_headers, |
| 59 | + junit_report_path=self.junit_report_path, |
| 60 | + ) |
| 61 | + # WAFP: Replaced `WebInterface` with an empty one because it did not work well when tests are executed via |
| 62 | + # pytest-xdist |
| 63 | + interface = EmptyInterface() |
| 64 | + model = GraphModel() |
| 65 | + for template in self.templates: |
| 66 | + model.connect(template.compile_template()) |
| 67 | + fuzzer = OpenApiServerFuzzer() |
| 68 | + fuzzer.set_model(model) |
| 69 | + fuzzer.set_target(target) |
| 70 | + fuzzer.set_interface(interface) |
| 71 | + fuzzer.start() |
| 72 | + try: |
| 73 | + fuzzer.stop() |
| 74 | + except AttributeError: |
| 75 | + pass |
| 76 | + |
| 77 | + |
| 78 | +def str2bool(v): |
| 79 | + if isinstance(v, bool): |
| 80 | + return v |
| 81 | + if v.lower() in ("yes", "true", "t", "y", "1", "True", "T"): |
| 82 | + return True |
| 83 | + if v.lower() in ("no", "false", "f", "n", "0", "False", "F"): |
| 84 | + return False |
| 85 | + raise argparse.ArgumentTypeError("Boolean value expected.") |
| 86 | + |
| 87 | + |
| 88 | +if __name__ == "__main__": |
| 89 | + parser = argparse.ArgumentParser( |
| 90 | + description="API fuzzer configuration", |
| 91 | + formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=20), |
| 92 | + ) |
| 93 | + parser.add_argument( |
| 94 | + "-s", |
| 95 | + "--src_file", |
| 96 | + type=str, |
| 97 | + required=False, |
| 98 | + help="API definition file path. Currently only JSON format is supported", |
| 99 | + dest="src_file", |
| 100 | + ) |
| 101 | + parser.add_argument( |
| 102 | + "--src_url", |
| 103 | + type=str, |
| 104 | + required=False, |
| 105 | + help="API definition url. Currently only JSON format is supported", |
| 106 | + dest="src_url", |
| 107 | + ) |
| 108 | + parser.add_argument( |
| 109 | + "-r", |
| 110 | + "--report_dir", |
| 111 | + type=str, |
| 112 | + required=False, |
| 113 | + help="Directory where error reports will be saved. Default is temporally generated directory", |
| 114 | + dest="report_dir", |
| 115 | + default=tempfile.mkdtemp(), |
| 116 | + ) |
| 117 | + parser.add_argument( |
| 118 | + "--level", |
| 119 | + type=int, |
| 120 | + required=False, |
| 121 | + help="Test deepness: [1,2], higher is the deeper !!!Not implemented!!!", |
| 122 | + dest="level", |
| 123 | + default=1, |
| 124 | + ) |
| 125 | + parser.add_argument( |
| 126 | + "-u", |
| 127 | + "--url", |
| 128 | + type=str, |
| 129 | + required=False, |
| 130 | + help="Use CLI defined url instead compile the url from the API definition. Useful for testing", |
| 131 | + dest="alternate_url", |
| 132 | + default=None, |
| 133 | + ) |
| 134 | + parser.add_argument( |
| 135 | + "-t", |
| 136 | + "--test_report", |
| 137 | + type=str, |
| 138 | + required=False, |
| 139 | + help="JUnit test result xml save path ", |
| 140 | + dest="test_result_dst", |
| 141 | + default=None, |
| 142 | + ) |
| 143 | + parser.add_argument( |
| 144 | + "--log", |
| 145 | + type=str, |
| 146 | + required=False, |
| 147 | + help="Use different log level than the default WARNING", |
| 148 | + dest="log_level", |
| 149 | + default="warning", |
| 150 | + choices=[level.lower() for level in levelNames if isinstance(level, str)], |
| 151 | + ) |
| 152 | + parser.add_argument( |
| 153 | + "--basic_output", |
| 154 | + type=str2bool, |
| 155 | + required=False, |
| 156 | + help="Use basic output for logging (useful if running in jenkins). Example --basic_output=True", |
| 157 | + dest="basic_output", |
| 158 | + default=False, |
| 159 | + ) |
| 160 | + parser.add_argument( |
| 161 | + "--headers", |
| 162 | + type=json_data, |
| 163 | + required=False, |
| 164 | + help='Http request headers added to all request. Example: \'[{"Authorization": "SuperSecret"}, ' |
| 165 | + '{"Auth2": "asd"}]\'', |
| 166 | + dest="headers", |
| 167 | + default=None, |
| 168 | + ) |
| 169 | + args = parser.parse_args() |
| 170 | + api_definition_json = dict() |
| 171 | + if args.src_file: |
| 172 | + api_definition_json = get_api_definition_from_file(args.src_file) |
| 173 | + elif args.src_url: |
| 174 | + api_definition_json = get_api_definition_from_url(args.src_url) |
| 175 | + else: |
| 176 | + argparse.ArgumentTypeError("No API definition source provided -s, --src_file or --src_url should be defined") |
| 177 | + sys.exit(1) |
| 178 | + prog = Fuzzer( |
| 179 | + api_resources=api_definition_json, |
| 180 | + report_dir=args.report_dir, |
| 181 | + test_level=args.level, |
| 182 | + alternate_url=args.alternate_url, |
| 183 | + test_result_dst=args.test_result_dst, |
| 184 | + log_level=args.log_level, |
| 185 | + basic_output=args.basic_output, |
| 186 | + auth_headers=args.headers, |
| 187 | + api_definition_url=args.src_url, |
| 188 | + junit_report_path=args.test_result_dst, |
| 189 | + ) |
| 190 | + prog.prepare() |
| 191 | + prog.run() |
0 commit comments