Skip to content

Modules

scanapi package modules

The modules below are documented automatically from their docstrings.

Core

from_()

Convert from other formats into ScanAPI

Source code in scanapi/cli.py
135
136
137
@main.group(name="from")
def from_():
    """Convert from other formats into ScanAPI"""

from_openapi(input_path, base_url, output_path, log_level, config_path)

Convert an OpenAPI document (JSON or YAML) into ScanAPI format.

 Arguments: OPENAPI_PATH: The OpenAPI file path (JSON or YAML).

 Examples: scanapi from openapi api.json scanapi from openapi api.yaml -o scanapi.yaml

Source code in scanapi/cli.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@from_.command(name="openapi", context_settings=CONTEXT_SETTINGS)
@click.argument(
    "input_path",
    metavar="OPENAPI_PATH",
    type=click.Path(exists=True),
    required=True,
)
@click.option(
    "-b",
    "--base-url",
    "base_url",
    type=str,
    help="Base URL for the API.",
)
@click.option(
    "-o",
    "--output-path",
    "output_path",
    type=click.Path(),
    help="Output ScanAPI file path (default: scanapi.yaml).",
)
@config_path_option()
@log_level_option()
def from_openapi(input_path, base_url, output_path, log_level, config_path):
    """
    Convert an OpenAPI document (JSON or YAML) into ScanAPI format.

    \b
    Arguments:
        OPENAPI_PATH: The OpenAPI file path (JSON or YAML).

    \b
    Examples:
      scanapi from openapi api.json
      scanapi from openapi api.yaml -o scanapi.yaml
    """
    configure_logging(log_level)
    save_preferences(
        input_path=input_path,
        base_url=base_url,
        output_path=output_path,
        config_path=config_path,
    )
    openapi_to_scanapi()

main()

Automated Testing and Documentation for your REST API.

Source code in scanapi/cli.py
67
68
69
70
@click.group()
@click.version_option(version=version("scanapi"))
def main():
    """Automated Testing and Documentation for your REST API."""

run(spec_path, output_path, no_report, config_path, template, log_level, open_browser)

Automated Testing and Documentation for your REST API.

Source code in scanapi/cli.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@main.command(context_settings=CONTEXT_SETTINGS)
@click.argument("spec_path", type=click.Path(exists=True), required=False)
@click.option(
    "-o",
    "--output-path",
    "output_path",
    type=click.Path(),
    help="Report output path. Default is scanapi-report.html",
)
@click.option(
    "-nr",
    "--no-report",
    "no_report",
    is_flag=True,
    help="Run ScanAPI without generating report.",
)
@click.option(
    "-b",
    "--browser",
    "open_browser",
    is_flag=True,
    help="Open the results file using a browser",
)
@click.option(
    "-t",
    "--template",
    "template",
    type=click.Path(exists=True),
    help="Custom report template path. The template must be a .jinja file.",
)
@config_path_option()
@log_level_option()
def run(
    spec_path,
    output_path,
    no_report,
    config_path,
    template,
    log_level,
    open_browser,
):
    """
    Automated Testing and Documentation for your REST API.
    """
    configure_logging(log_level)
    save_preferences(
        spec_path=spec_path,
        output_path=output_path,
        no_report=no_report,
        config_path=config_path,
        template=template,
        open_browser=open_browser,
    )
    scan()

Code Reference https://gist.github.com/joshbode/569627ced3076931b02f

Loader

Bases: SafeLoader

YAML/JSON Loader with !include constructor.

Source code in scanapi/config_loader.py
14
15
16
17
18
19
20
21
22
23
24
class Loader(yaml.SafeLoader):
    """YAML/JSON Loader with `!include` constructor."""

    def __init__(self, stream: IO) -> None:
        """Initialise Loader."""
        try:
            self.root = os.path.split(stream.name)[0]
        except AttributeError:
            self.root = os.path.curdir

        super().__init__(stream)

__init__(stream)

Initialise Loader.

Source code in scanapi/config_loader.py
17
18
19
20
21
22
23
24
def __init__(self, stream: IO) -> None:
    """Initialise Loader."""
    try:
        self.root = os.path.split(stream.name)[0]
    except AttributeError:
        self.root = os.path.curdir

    super().__init__(stream)

construct_include(loader, node)

Include file referenced at node.

Source code in scanapi/config_loader.py
27
28
29
30
31
32
33
34
35
36
37
38
def construct_include(loader: Loader, node: yaml.Node) -> Any:
    """Include file referenced at node."""
    if not isinstance(node, yaml.ScalarNode):
        include_node_str = yaml.serialize(node).strip()
        message = f"Include tag value is not a scalar: {include_node_str}"
        raise BadConfigIncludeError(message)
    include_file_path = str(loader.construct_scalar(node))
    relative_path = os.path.join(loader.root, include_file_path)
    full_path = os.path.abspath(relative_path)

    with open(full_path) as f:
        return yaml.load(f, Loader)

load_config_file(file_path)

Loads configuration file. If non-empty file exists reads data and returns it.

Source code in scanapi/config_loader.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def load_config_file(file_path):
    """
    Loads configuration file. If non-empty file exists reads data and
    returns it.
    """
    with open(file_path, "r") as stream:
        logger.info(
            f"Loading file [deep_sky_blue1 underline]{file_path}",
            extra={"highlighter": None},
        )
        data = yaml.load(stream, Loader)

        if not data:
            raise EmptyConfigFileError(file_path)

        return data

write_report_path(uri)

Print path to generated documentation

Source code in scanapi/console.py
29
30
31
32
33
34
def write_report_path(uri: str) -> None:
    """Print path to generated documentation"""
    console.print(
        f"The documentation was generated successfully.\n"
        f"It is available at -> [deep_sky_blue1 underline]{uri}\n"
    )

write_result(result)

Print the test result to the console output

Source code in scanapi/console.py
17
18
19
20
21
22
23
24
25
26
def write_result(result: dict[str, Any]) -> None:
    """Print the test result to the console output"""
    for test in result["tests_results"]:
        if test["status"] is TestStatus.PASSED:
            console.print(f"[bright_green] [PASSED] [white]{test['name']}")
        if test["status"] == TestStatus.FAILED:
            console.print(
                f"[bright_red] [FAILED] [white]{test['name']}\n"
                f"\t  [bright_red]{test['failure']} is false"
            )

write_results(results)

Print the test results to the console output

Source code in scanapi/console.py
11
12
13
14
def write_results(results: Iterable[dict[str, Any]]) -> None:
    """Print the test results to the console output"""
    for r in results:
        write_result(r)

write_summary()

Write tests summary in console

Source code in scanapi/console.py
37
38
39
40
41
42
43
44
45
def write_summary() -> None:
    """Write tests summary in console"""
    elapsed_time = round(session.elapsed_time().total_seconds(), 2)

    if session.failures > 0 or session.errors > 0:
        _print_summary_with_failures_or_errors(elapsed_time)
        return

    _print_successful_summary(elapsed_time)

openapi_to_scanapi()

Caller function that tries to convert the specification file and write the report.

Uses prance for resolving and parsing the OpenAPI schema.

Source code in scanapi/convert.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def openapi_to_scanapi() -> None:
    """Caller function that tries to convert the specification file and write the report.

    Uses [prance](https://github.com/RonnyPfannschmidt/prance) for resolving
    and parsing the OpenAPI schema.
    """
    openapi_path = settings["input_path"]
    base_url = settings["base_url"]
    output_path = settings["output_path"]

    logger.info(
        f"Loading file [deep_sky_blue1 underline]{openapi_path}",
        extra={"highlighter": None},
    )

    try:
        parser = prance.ResolvingParser(openapi_path)
    except scanner.ScannerError as e:
        error_message = f"Couldn't parse received yaml file: {str(e)}"
        logger.error(error_message)
        raise SystemExit(1)
    except AttributeError as e:
        error_message = f"Couldn't parse received json file: {str(e)}"
        logger.error(error_message)
        raise SystemExit(1)
    except prance.ValidationError as e:
        error_message = "Invalid OpenAPI schema.\n"
        error_message += "Conversion requires a valid OpenAPI 3.x document.\n"
        error_message += f"Details: {str(e)}"
        logger.error(error_message)
        raise SystemExit(1)

    parser.parse()
    openapi_spec = cast(dict, parser.specification)
    converter = OpenAPIToScanAPIConverter(openapi_spec)
    scanapi_yaml, created_variables = converter.convert(base_url)
    with open(output_path, "w") as file:
        yaml.dump(
            scanapi_yaml,
            file,
            default_flow_style=False,
            sort_keys=False,
            indent=4,
        )
    console.print(
        f"File successfully converted and exported as [deep_sky_blue1 underline]{output_path}"
    )
    if len(created_variables) > 0:
        console.print(
            "\nThe following variables were created in the generated ScanAPI YAML file:"
        )
        for variable in created_variables:
            console.print("- ${" + variable + "}")
        console.print(
            "\nSee [deep_sky_blue1 underline]https://scanapi.dev/docs_v1/specification/custom_variables[/deep_sky_blue1 underline]"
            "\nand [deep_sky_blue1 underline]https://scanapi.dev/docs_v1/specification/environment_variables[/deep_sky_blue1 underline]"
            "\nfor more information."
        )

BadConfigIncludeError

Bases: Exception

Raised when the value of the !include yaml tag is not a scalar.

Source code in scanapi/errors.py
70
71
class BadConfigIncludeError(Exception):
    """Raised when the value of the !include yaml tag is not a scalar."""

BadConfigurationError

Bases: Exception

Raised when an environment variable was not set or badly configured.

Source code in scanapi/errors.py
52
53
54
55
56
57
58
59
class BadConfigurationError(Exception):
    """Raised when an environment variable was not set or badly configured."""

    def __init__(self, env_var, *args):
        super().__init__(
            f"{env_var} environment variable not set or badly configured",
            *args,
        )

EmptyConfigFileError

Bases: Exception

Raised when the Config File loaded is empty

Source code in scanapi/errors.py
62
63
64
65
66
67
class EmptyConfigFileError(Exception):
    """Raised when the Config File loaded is empty"""

    def __init__(self, file_path, *args):
        message = f"File '{file_path}' is empty."
        super().__init__(message, *args)

HTTPMethodNotAllowedError

Bases: MalformedSpecError

Raised when the HTTP method in the API spec is invalid

Source code in scanapi/errors.py
 8
 9
10
11
12
13
14
15
16
class HTTPMethodNotAllowedError(MalformedSpecError):
    """Raised when the HTTP method in the API spec is invalid"""

    def __init__(self, method, allowed_methods, *args):
        message = (
            f"HTTP method not supported: {method}. "
            f"Supported methods: {allowed_methods}."
        )
        super().__init__(message, *args)

InvalidKeyError

Bases: MalformedSpecError

Raised when an invalid key is specified in the API spec

Source code in scanapi/errors.py
19
20
21
22
23
24
25
26
27
class InvalidKeyError(MalformedSpecError):
    """Raised when an invalid key is specified in the API spec"""

    def __init__(self, key, scope, available_keys, *args):
        message = (
            f"Invalid key '{key}' at '{scope}' scope. "
            f"Available keys are: {available_keys}"
        )
        super().__init__(message, *args)

InvalidPythonCodeError

Bases: MalformedSpecError

Raised when python code defined in the API spec raises an error

Source code in scanapi/errors.py
39
40
41
42
43
44
45
46
47
48
49
class InvalidPythonCodeError(MalformedSpecError):
    """Raised when python code defined in the API spec raises an error"""

    def __init__(self, error_message, code, *args):
        self.expression = code
        error_message = (
            f"Invalid Python code defined in the API spec. "
            f"Exception: {error_message}. "
            f"Code: {code}."
        )
        super().__init__(error_message, *args)

MalformedSpecError

Bases: Exception

Raised when API spec is invalid;

base class for other exceptions

Source code in scanapi/errors.py
1
2
3
4
5
class MalformedSpecError(Exception):
    """Raised when API spec is invalid;

    base class for other exceptions
    """

MissingMandatoryKeyError

Bases: MalformedSpecError

Raised when one or more mandatory keys are missing

Source code in scanapi/errors.py
30
31
32
33
34
35
36
class MissingMandatoryKeyError(MalformedSpecError):
    """Raised when one or more mandatory keys are missing"""

    def __init__(self, missing_keys, scope, *args):
        missing_keys_str = ", ".join(f"'{k}'" for k in sorted(missing_keys))
        message = f"Missing {missing_keys_str} key(s) at '{scope}' scope"
        super().__init__(message, *args)

Code based on solution: https://github.com/pytest-dev/pytest/blob/83891d9022076375cede03bfd8c932d450e6fcf8/src/_pytest/config/init.py#L67

ExitCode

Bases: IntEnum

Encodes the valid exit codes by ScanAPI.

Source code in scanapi/exit_code.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class ExitCode(enum.IntEnum):
    """Encodes the valid exit codes by ScanAPI."""

    #: tests passed
    OK = 0
    #: tests failed
    TESTS_FAILED = 1
    #: tests error
    TESTS_ERROR = 2
    #: Error while trying to make a request
    REQUEST_ERROR = 3
    #: ScanAPI was misused
    USAGE_ERROR = 4
    #: an internal error got in the way
    INTERNAL_ERROR = 5

hide_sensitive_info(response)

Takes response and hides the sensitive data replacing the info with the string SENSITIVE_INFORMATION.

Parameters:

Name Type Description Default
response Response

Response containing sensitive information to be hidden.

required
Source code in scanapi/hide_utils.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def hide_sensitive_info(response):
    """Takes response and hides the sensitive data replacing the info with the
    string `SENSITIVE_INFORMATION`.

    Args:
        response (requests.models.Response): Response containing
            sensitive information to be hidden.
    """
    report_settings = settings.get("report", {})
    request = response.request
    request_settings = report_settings.get("hide_request", {})
    response_settings = report_settings.get("hide_response", {})

    _hide(request, request_settings)
    _hide(response, response_settings)

Reporter

Class that writes the scan report

Attributes:

Name Type Description
output_path str

Report output path

template str

Custom report template path

Source code in scanapi/reporter.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class Reporter:
    """Class that writes the scan report

    Attributes:
        output_path (str, optional): Report output path
        template (str, optional): Custom report template path

    """

    def __init__(
        self,
        output_path: str | None = None,
        template: str | None = None,
    ):
        """Creates a Reporter instance object."""
        self.output_path = pathlib.Path(output_path or "scanapi-report.html")
        self.template = template

    def write(
        self,
        results: Iterable[dict[str, Any]],
        open_in_browser: bool,
    ) -> None:
        """Part of the Reporter instance that is responsible for writing
        scanapi-report.html.

        Args:
            results (generator): generator of dicts resulting of Request run().
        """
        template_path = self.template if self.template else "report.html"
        has_external_template = bool(self.template)
        context = self._build_context(results)

        content = render(template_path, context, has_external_template)

        with open(self.output_path, "w", newline="\n") as doc:
            doc.write(content)

        write_report_path(self.output_path.resolve().as_uri())

        if open_in_browser:
            self._open_in_browser()

    def _open_in_browser(self):
        """Open the results file on a browser"""
        webbrowser.open(self.output_path.resolve().as_uri())

    @staticmethod
    def _build_context(results: Iterable[dict[str, Any]]) -> dict[str, Any]:
        """Build context dict of values required to render template.

        Args:
            results (generator): generator of dicts resulting of Request run().

        Returns:
            dict: values required to render template.

        """
        try:
            scanapi_version = version("scanapi")
        except PackageNotFoundError:
            scanapi_version = "unknown"

        return {
            "now": datetime.datetime.now().replace(microsecond=0),
            "project_name": settings.get("project_name", ""),
            "results": results,
            "session": session,
            "scanapi_version": scanapi_version,
        }

__init__(output_path=None, template=None)

Creates a Reporter instance object.

Source code in scanapi/reporter.py
24
25
26
27
28
29
30
31
def __init__(
    self,
    output_path: str | None = None,
    template: str | None = None,
):
    """Creates a Reporter instance object."""
    self.output_path = pathlib.Path(output_path or "scanapi-report.html")
    self.template = template

write(results, open_in_browser)

Part of the Reporter instance that is responsible for writing scanapi-report.html.

Parameters:

Name Type Description Default
results generator

generator of dicts resulting of Request run().

required
Source code in scanapi/reporter.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def write(
    self,
    results: Iterable[dict[str, Any]],
    open_in_browser: bool,
) -> None:
    """Part of the Reporter instance that is responsible for writing
    scanapi-report.html.

    Args:
        results (generator): generator of dicts resulting of Request run().
    """
    template_path = self.template if self.template else "report.html"
    has_external_template = bool(self.template)
    context = self._build_context(results)

    content = render(template_path, context, has_external_template)

    with open(self.output_path, "w", newline="\n") as doc:
        doc.write(content)

    write_report_path(self.output_path.resolve().as_uri())

    if open_in_browser:
        self._open_in_browser()

scan()

Caller function that tries to scans the file and write the report.

Source code in scanapi/scan.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def scan():
    """Caller function that tries to scans the file and write the report."""
    spec_path = settings["spec_path"]

    try:
        api_spec = load_config_file(spec_path)
    except FileNotFoundError as e:
        error_message = f"Could not find API spec file: {spec_path}. {str(e)}"
        logger.error(error_message)
        raise SystemExit(ExitCode.USAGE_ERROR)
    except EmptyConfigFileError as e:
        error_message = f"API spec file is empty. {str(e)}"
        logger.error(error_message)
        raise SystemExit(ExitCode.USAGE_ERROR)
    except yaml.YAMLError as e:
        error_message = "Error loading specification file."
        error_message = "{}\nPyYAML: {}".format(error_message, str(e))
        logger.error(error_message)
        raise SystemExit(ExitCode.USAGE_ERROR)

    try:
        root_node = EndpointNode(api_spec)
        results = root_node.run()

    except (
        InvalidKeyError,
        KeyError,
        InvalidPythonCodeError,
    ) as e:
        error_message = "Error loading API spec."
        error_message = "{} {}".format(error_message, str(e))
        logger.error(error_message)
        raise SystemExit(ExitCode.USAGE_ERROR)

    _write(results)
    write_summary()
    session.exit()

Session

Class that handles each scanapi session.

Source code in scanapi/session.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Session:
    """Class that handles each scanapi session."""

    def __init__(self):
        """Constructs a Session object."""
        self.successes = 0
        self.failures = 0
        self.errors = 0
        self.exit_code = ExitCode.OK
        self.started_at = datetime.now()

    @property
    def succeed(self):
        """
        Property decorated method that returns if there were no no errors or
        failures.
        """
        return self.errors == 0 and self.failures == 0

    def exit(self):
        """Handles the exiting of the Session."""
        if self.errors:
            sys.exit(ExitCode.TESTS_ERROR)

        if self.failures:
            sys.exit(ExitCode.TESTS_FAILED)

        sys.exit(self.exit_code)

    def increment_successes(self):
        """Increments success count."""
        self.successes += 1

    def increment_failures(self):
        """Increments failure count."""
        self.failures += 1

    def increment_errors(self):
        """Increments error count."""
        self.errors += 1

    def elapsed_time(self):
        """Returns the delta of time since session object started."""
        return datetime.now() - self.started_at

succeed property

Property decorated method that returns if there were no no errors or failures.

__init__()

Constructs a Session object.

Source code in scanapi/session.py
10
11
12
13
14
15
16
def __init__(self):
    """Constructs a Session object."""
    self.successes = 0
    self.failures = 0
    self.errors = 0
    self.exit_code = ExitCode.OK
    self.started_at = datetime.now()

elapsed_time()

Returns the delta of time since session object started.

Source code in scanapi/session.py
48
49
50
def elapsed_time(self):
    """Returns the delta of time since session object started."""
    return datetime.now() - self.started_at

exit()

Handles the exiting of the Session.

Source code in scanapi/session.py
26
27
28
29
30
31
32
33
34
def exit(self):
    """Handles the exiting of the Session."""
    if self.errors:
        sys.exit(ExitCode.TESTS_ERROR)

    if self.failures:
        sys.exit(ExitCode.TESTS_FAILED)

    sys.exit(self.exit_code)

increment_errors()

Increments error count.

Source code in scanapi/session.py
44
45
46
def increment_errors(self):
    """Increments error count."""
    self.errors += 1

increment_failures()

Increments failure count.

Source code in scanapi/session.py
40
41
42
def increment_failures(self):
    """Increments failure count."""
    self.failures += 1

increment_successes()

Increments success count.

Source code in scanapi/session.py
36
37
38
def increment_successes(self):
    """Increments success count."""
    self.successes += 1

Settings

Bases: dict

Class for generating Settings dictionary.

Source code in scanapi/settings.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class Settings(dict):
    """Class for generating Settings dictionary."""

    def __init__(self):
        """
        Constructs a Settings object with default values for all possible preferences.
        """
        super().__init__()
        self.update(DEFAULT_SETTINGS)

    def save_config_file_preferences(self, config_path=None):
        """Saves the Settings object config file preferences."""
        path = None

        if config_path:
            path = config_path
        elif self.has_local_config_file:
            path = LOCAL_CONFIG_PATH
        elif self.has_global_config_file:
            path = GLOBAL_CONFIG_PATH

        if path:
            self["config_path"] = path
            self.update(**load_config_file(path))

    def save_click_preferences(self, **preferences):
        """Saves all preference items to the Settings object."""
        cleaned_preferences = {
            k: v for k, v in preferences.items() if v is not None
        }
        self.update(**cleaned_preferences)

    def save_preferences(self, **click_preferences):
        """Caller function that begins the saving of Setting preferences."""
        config_path = click_preferences.get("config_path")
        self.save_config_file_preferences(config_path)
        self.save_click_preferences(**click_preferences)

    @property
    def has_global_config_file(self):
        """Checks if there is a global config file."""
        return os.path.isfile(GLOBAL_CONFIG_PATH)

    @property
    def has_local_config_file(self):
        """Checks if there is a local config file."""
        return os.path.isfile(LOCAL_CONFIG_PATH)

has_global_config_file property

Checks if there is a global config file.

has_local_config_file property

Checks if there is a local config file.

__init__()

Constructs a Settings object with default values for all possible preferences.

Source code in scanapi/settings.py
27
28
29
30
31
32
def __init__(self):
    """
    Constructs a Settings object with default values for all possible preferences.
    """
    super().__init__()
    self.update(DEFAULT_SETTINGS)

save_click_preferences(**preferences)

Saves all preference items to the Settings object.

Source code in scanapi/settings.py
49
50
51
52
53
54
def save_click_preferences(self, **preferences):
    """Saves all preference items to the Settings object."""
    cleaned_preferences = {
        k: v for k, v in preferences.items() if v is not None
    }
    self.update(**cleaned_preferences)

save_config_file_preferences(config_path=None)

Saves the Settings object config file preferences.

Source code in scanapi/settings.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def save_config_file_preferences(self, config_path=None):
    """Saves the Settings object config file preferences."""
    path = None

    if config_path:
        path = config_path
    elif self.has_local_config_file:
        path = LOCAL_CONFIG_PATH
    elif self.has_global_config_file:
        path = GLOBAL_CONFIG_PATH

    if path:
        self["config_path"] = path
        self.update(**load_config_file(path))

save_preferences(**click_preferences)

Caller function that begins the saving of Setting preferences.

Source code in scanapi/settings.py
56
57
58
59
60
def save_preferences(self, **click_preferences):
    """Caller function that begins the saving of Setting preferences."""
    config_path = click_preferences.get("config_path")
    self.save_config_file_preferences(config_path)
    self.save_click_preferences(**click_preferences)

group_by_top_level_endpoint(results)

Groups results by endpoint name

Parameters:

Name Type Description Default
[iterator]

iterator of request results

required

Returns:

Type Description

[iterator]: an iterator with tuples containing the endpoint name

and an iterator for all request results of that endpoint

Source code in scanapi/template_render.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def group_by_top_level_endpoint(results):
    """
    Groups results by endpoint name

    Args:
        [iterator]: iterator of request results

    Returns:
        [iterator]: an iterator with tuples containing the endpoint name
        and an iterator for all request results of that endpoint
    """

    def by_top_level_endpoint_name(result):
        endpoint_name = result["endpoint_name"]
        root, *generations = endpoint_name.split("::")
        return generations[0] if generations else root or "root"

    return itertools.groupby(results, by_top_level_endpoint_name)

render(template_path, context, is_external=False)

Controller function that handles the Jinja2 rending of the template.

Source code in scanapi/template_render.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def render(template_path, context, is_external=False):
    """Controller function that handles the Jinja2 rending of the template."""
    loader = _loader(is_external)
    env = Environment(
        loader=loader,
        autoescape=True,
        extensions=["jinja2_humanize_extension.HumanizeExtension"],
    )
    env.filters["curlify"] = curlify2.to_curl
    env.filters["render_body"] = render_body
    env.filters["group_by_top_level_endpoint"] = group_by_top_level_endpoint
    env.globals["is_bytes"] = lambda o: isinstance(o, bytes)
    chosen_template = env.get_template(template_path)
    return chosen_template.render(**context)

render_body(request)

Render body according to its request content type.

Source code in scanapi/template_render.py
34
35
36
37
38
39
def render_body(request):
    """Render body according to its request content type."""
    content_type = request.headers.get("Content-Type")
    if content_type in ["application/json", "text/plain"]:
        return request.body.decode()
    return f"Cannot render. Unsupported content type: {content_type}."

TestStatus

Class that holds test statuses - passed, failed or error.

Source code in scanapi/test_status.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class TestStatus:
    """Class that holds test statuses - passed, failed or error."""

    __test__ = False
    """
    Encodes the valid test status.
    """

    #: test passed
    PASSED = "passed"
    #: test failed
    FAILED = "failed"
    #: test error
    ERROR = "error"

__test__ = False class-attribute instance-attribute

Encodes the valid test status.

join_urls(first_url, second_url)

Function that returns one url if two aren't given else joins the two urls and returns them.

Source code in scanapi/utils.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def join_urls(first_url: str | None, second_url: str | None) -> str | None:
    """Function that returns one url if two aren't given else joins the two
    urls and returns them.
    """
    if not first_url:
        return second_url

    if not second_url:
        return first_url

    first_url = first_url.strip("/")
    second_url = second_url.lstrip("/")

    return "/".join([first_url, second_url])

session_with_retry(retry_configuration, verify=True)

Instantiate a requests session.

Parameters:

Name Type Description Default
retry_configuration dict

Retry configuration for a request. Available for version >= 2.2.0.

required
verify bool

SSL certificates used to verify the identity of requested hosts.

True

Returns:

Type Description
Client

httpx.Client: Configured client session.

Source code in scanapi/utils.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def session_with_retry(
    retry_configuration: dict[str, Any] | None,
    verify: bool = True,
) -> Client:
    """Instantiate a requests session.

    Args:
        retry_configuration (dict): Retry configuration for a
            request. Available for version >= 2.2.0.
        verify (bool): SSL certificates used to verify the
            identity of requested hosts.

    Returns:
        httpx.Client: Configured client session.
    """
    retry_configuration = retry_configuration or {}
    retries = retry_configuration.get(MAX_RETRIES_KEY, 0)

    return Client(
        transport=HTTPTransport(retries=retries), timeout=None, verify=verify
    )

validate_keys(keys, available_keys, required_keys, scope)

Caller function that validates keys.

Source code in scanapi/utils.py
24
25
26
27
28
29
30
31
32
def validate_keys(
    keys: Iterable[str],
    available_keys: tuple[str, ...],
    required_keys: tuple[str, ...],
    scope: str,
) -> None:
    """Caller function that validates keys."""
    _validate_allowed_keys(keys, available_keys, scope)
    _validate_required_keys(keys, required_keys, scope)

Converters

OpenAPIToScanAPIConverter

Class responsible for parsing an OpenAPI schema and generating a skeleton ScanAPI yaml file with correct endpoints, request methods and authentication schema.

Attributes:

Name Type Description
specs dict

Dictionary parsed from the OpenAPI schema.

Source code in scanapi/converters/from_openapi.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
class OpenAPIToScanAPIConverter:
    """
    Class responsible for parsing an OpenAPI schema and generating
    a skeleton ScanAPI yaml file with correct endpoints, request methods
    and authentication schema.

    Attributes:
        specs (dict): Dictionary parsed from the OpenAPI schema.
    """

    SECURITY_SCHEME_TYPES = ("http", "oauth2", "bearer")
    VALID_HTTP_METHODS = (
        "GET",
        "POST",
        "PUT",
        "PATCH",
        "DELETE",
        "HEAD",
        "OPTIONS",
    )
    BODY_CONTENT_TYPES = (
        "application/x-www-form-urlencoded",
        "application/json",
        "multipart/form-data",
    )

    def __init__(self, specs: dict):
        self.created_variables: set[str] = set()
        self.specs = specs
        self._validate_openapi_spec_version()

    def _get_spec_version(self) -> str:
        """Reads the version from parsed specification

        Returns:
            str: version string

        Raises:
            ValueError: Couldn't find version key on parsed specification
        """
        spec_version: str | None = None
        if "openapi" in self.specs:
            spec_version = self.specs.get("openapi")
        elif "swagger" in self.specs:
            spec_version = self.specs.get("swagger")

        if spec_version is None:
            raise ValueError(
                "Could not determine OpenAPI/Swagger specification version!"
            )
        return spec_version

    def _validate_openapi_spec_version(self):
        """Checks whether the received OpenAPI specification is supported by the converter.

        Returns:
            None

        Raises:
            ValueError: Unsupported OpenAPI specification version
        """
        specs_version = self._get_spec_version()
        print(f"OpenAPI/Swagger version detected: {specs_version}\n")

        if not specs_version.startswith("3"):
            raise ValueError(
                "OpenAPI/Swagger version 3 is required to use this feature."
            )

    def _get_api_name(self) -> str | None:
        """Reads title from the spec info tag.

        Returns:
            str | None: Title if present; otherwise, None
        """
        info = self.specs.get("info", None)
        if info is None:
            return None
        title = info.get("title", None)
        if title is None:
            return None
        return str(title)

    def _get_security_schemes(self) -> list:
        """Gets existing security schema from the OpenAPI specification,
        described using the `securitySchemes` under the top level `components`.

        Reference: <https://swagger.io/docs/specification/v3_0/authentication/#describing-security>

        Returns:
            list[dict]: name and type of found security schemes.
        """
        security_schemes = []
        if (
            "components" in self.specs
            and "securitySchemes" in self.specs["components"]
        ):
            for name, security_scheme in self.specs["components"][
                "securitySchemes"
            ].items():
                if (
                    security_scheme.get("type")
                    not in self.SECURITY_SCHEME_TYPES
                ):
                    continue
                security_schemes.append(
                    {
                        "name": name,
                        "scheme": security_scheme.get("scheme", "bearer"),
                    }
                )
        return security_schemes

    def _get_required_properties_from_schema(
        self, schema: dict, operation_id: str
    ) -> dict[str, str] | None:
        """
        Formats required properties for a given request body schema as a dictionary.

        Returns:
            dict|None: Keys are property names and values are custom
            variables created using the operation_id and the
            property name.
        """
        required_properties = None
        if "required" in schema:
            required_properties = {}
            for prop_name in schema["required"]:
                property_variable = operation_id + "_" + prop_name
                required_properties[prop_name] = "${" + property_variable + "}"
                self.created_variables.add(property_variable)
        return required_properties

    def _add_variables_to_path(
        self, path: str, params: list, operation_id: str
    ) -> str:
        parsed_path = path
        for param in params:
            if param["in"] == "query":
                continue
            if param["in"] == "path":
                # path param is generally curly braced (/snippets/{id}/) and
                # scanapi expects variable notation (/snippets/${id})
                path_param_name = f"{operation_id}_{param['name']}"
                parsed_path = parsed_path.replace(
                    f"{{{param['name']}}}", "${" + path_param_name + "}"
                )
                self.created_variables.add(path_param_name)
        return parsed_path

    def _get_api_target_body(
        self, operation: dict, operation_id: str
    ) -> dict[str, str] | None:
        """Checks whether the current API target expects a request body. Loops
        over supported body content types and uses the first content type
        with a defined request body.

        Returns:
            dict|None: Keys are property names and values are custom
                variables created using the operation_id and the
                property name.

        """
        api_target_body = None
        content = operation["requestBody"]["content"]
        available_content_types = content.keys()
        # prioritize application/x-www-form-urlencoded over other content types
        for content_type in self.BODY_CONTENT_TYPES:
            if content_type in available_content_types:
                api_target_body = self._get_required_properties_from_schema(
                    content[content_type]["schema"],
                    operation_id,
                )
                return api_target_body
        return api_target_body

    def convert(self, base_url: str) -> tuple:
        """
        Runs the conversion algorithm and returns a YAML convertable dictionary.

        :param specs: dictionary representing the OpenAPI specs
        :param base_url: Base URL for the API

        Returns:
            tuple: converted YAML dictionary and set of created variables
        """
        base_yaml: dict = {
            "endpoints": [{"name": None, "path": base_url, "requests": []}]
        }

        api_name = self._get_api_name()
        base_yaml["endpoints"][0]["name"] = api_name

        security_schemes = self._get_security_schemes()

        paths = self.specs.get("paths", {})
        for path, path_item in paths.items():
            for method, operation in path_item.items():
                if method.upper() not in self.VALID_HTTP_METHODS:
                    continue
                operation_id = get_api_target_name(operation, path, method)
                required_params = get_required_params(operation)
                parsed_path = (
                    path
                    if not required_params
                    else self._add_variables_to_path(
                        path, required_params, operation_id
                    )
                )
                api_target = {
                    "name": operation_id,
                    "path": parsed_path,
                    "method": method,
                }
                api_target["tests"] = get_tests(operation)
                if (
                    "requestBody" in operation
                    and "content" in operation["requestBody"]
                ):
                    api_target_body = self._get_api_target_body(
                        operation, operation_id
                    )
                    if api_target_body is not None:
                        api_target["body"] = api_target_body

                operation_security = operation.get("security", [])
                if len(operation_security) > 0:
                    for security in operation_security:
                        for name, _ in security.items():
                            for security_scheme in security_schemes:
                                if security_scheme["name"] == name:
                                    security_type = security_scheme["scheme"]
                                    if security_type == "bearer":
                                        api_target["headers"] = {
                                            "Authorization": "Bearer ${bearer_token}"
                                        }
                                        self.created_variables.add(
                                            "bearer_token"
                                        )
                                    elif security_type == "basic":
                                        api_target["headers"] = {
                                            "Authorization": "Basic ${basic_auth_token}"
                                        }
                                        self.created_variables.add(
                                            "basic_auth_token"
                                        )
                                    break
                base_yaml["endpoints"][0]["requests"].append(api_target)

        return base_yaml, self.created_variables

convert(base_url)

Runs the conversion algorithm and returns a YAML convertable dictionary.

:param specs: dictionary representing the OpenAPI specs :param base_url: Base URL for the API

Returns:

Name Type Description
tuple tuple

converted YAML dictionary and set of created variables

Source code in scanapi/converters/from_openapi.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def convert(self, base_url: str) -> tuple:
    """
    Runs the conversion algorithm and returns a YAML convertable dictionary.

    :param specs: dictionary representing the OpenAPI specs
    :param base_url: Base URL for the API

    Returns:
        tuple: converted YAML dictionary and set of created variables
    """
    base_yaml: dict = {
        "endpoints": [{"name": None, "path": base_url, "requests": []}]
    }

    api_name = self._get_api_name()
    base_yaml["endpoints"][0]["name"] = api_name

    security_schemes = self._get_security_schemes()

    paths = self.specs.get("paths", {})
    for path, path_item in paths.items():
        for method, operation in path_item.items():
            if method.upper() not in self.VALID_HTTP_METHODS:
                continue
            operation_id = get_api_target_name(operation, path, method)
            required_params = get_required_params(operation)
            parsed_path = (
                path
                if not required_params
                else self._add_variables_to_path(
                    path, required_params, operation_id
                )
            )
            api_target = {
                "name": operation_id,
                "path": parsed_path,
                "method": method,
            }
            api_target["tests"] = get_tests(operation)
            if (
                "requestBody" in operation
                and "content" in operation["requestBody"]
            ):
                api_target_body = self._get_api_target_body(
                    operation, operation_id
                )
                if api_target_body is not None:
                    api_target["body"] = api_target_body

            operation_security = operation.get("security", [])
            if len(operation_security) > 0:
                for security in operation_security:
                    for name, _ in security.items():
                        for security_scheme in security_schemes:
                            if security_scheme["name"] == name:
                                security_type = security_scheme["scheme"]
                                if security_type == "bearer":
                                    api_target["headers"] = {
                                        "Authorization": "Bearer ${bearer_token}"
                                    }
                                    self.created_variables.add(
                                        "bearer_token"
                                    )
                                elif security_type == "basic":
                                    api_target["headers"] = {
                                        "Authorization": "Basic ${basic_auth_token}"
                                    }
                                    self.created_variables.add(
                                        "basic_auth_token"
                                    )
                                break
            base_yaml["endpoints"][0]["requests"].append(api_target)

    return base_yaml, self.created_variables

get_api_target_name(operation, path, method)

Generates a variable friendly name for a request. Prioritizes the summary, then operationId and, if none is defined, fall back to using the method_path format.

Changes all slashes and spaces to underscores.

Returns:

Name Type Description
str str

generated target name

Source code in scanapi/converters/from_openapi.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def get_api_target_name(operation: dict, path: str, method: str) -> str:
    """
    Generates a variable friendly name for a request. Prioritizes the summary,
    then operationId and, if none is defined, fall back to using the method_path format.

    Changes all slashes and spaces to underscores.

    Returns:
        str: generated target name
    """
    return (
        str(
            operation.get(
                "summary", operation.get("operationId", f"{method}_{path}")
            )
        )
        .replace("/", "_")
        .replace(" ", "_")
    )

get_required_params(operation)

Filters the received operation for required parameters. Path parameters are always required (see https://swagger.io/docs/specification/v3_0/describing-parameters/#path-parameters).

Returns:

Type Description
list

list[dict]: Parameter name and location (in)

Source code in scanapi/converters/from_openapi.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def get_required_params(operation: dict) -> list:
    """
    Filters the received operation for required parameters. Path
    parameters are always required (see https://swagger.io/docs/specification/v3_0/describing-parameters/#path-parameters).

    Returns:
        list[dict]: Parameter name and location (in)
    """
    params = []
    if "parameters" in operation:
        for param in operation["parameters"]:
            if not param.get("required", False):
                continue
            name = param.get("name")
            if name is None:
                continue
            _in = param.get("in")
            if _in is None:
                continue
            params.append({"name": name, "in": _in})
    return params

get_tests(operation)

Generates basic HTTP status code validation tests based on operation responses.

Focuses on successful responses for minimal smoke testing.

Returns:

Type Description
list

list[dict]: Test name and assertion

Source code in scanapi/converters/from_openapi.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def get_tests(operation: dict) -> list:
    """
    Generates basic HTTP status code validation tests based on operation responses.

    Focuses on successful responses for minimal smoke testing.

    Returns:
        list[dict]: Test name and assertion
    """
    tests = []
    if "responses" in operation:
        for status_code, _ in operation["responses"].items():
            # Handle string response keys from OpenAPI such as 2XX
            # see https://swagger.io/docs/specification/v3_0/describing-responses/#http-status-codes
            if not status_code.isdigit():
                continue
            if not status_code.startswith("2"):
                continue
            # details is a dict with content (dict) and description (str)
            tests.append(
                {
                    "name": f"status_code_is_{status_code}",
                    "assert": f"${{{{response.status_code == {status_code}}}}}",
                }
            )
    return tests

Evaluators

CodeEvaluator

Source code in scanapi/evaluators/code_evaluator.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class CodeEvaluator:
    # Configuration: modules available in API spec evaluation
    ALLOWED_MODULES = ["datetime", "math", "random", "re", "time", "uuid"]
    python_code_pattern = re.compile(
        r"(?P<something_before>\w*)"
        r"(?P<start>\${{)"
        r"(?P<python_code>.*)"
        r"(?P<end>}})"
        r"(?P<something_after>\w*)"
    )  # ${{<python_code>}}

    @classmethod
    def evaluate(
        cls,
        sequence: Any,
        spec_vars: dict[str, Any],
        is_a_test_case: bool = False,
    ) -> Any:
        """Receive a sequence of characters and evaluate Python code present in
        it.

        Args:
            sequence (str): Sequence of characters to be evaluated.
            spec_vars (dict): Dictionary containing SpecEvaluator variables.
            is_a_test_case (bool): Indicator for checking whether the
                given evaluation is a test case.

        Returns:
            tuple: Tuple containing:
                - bool: True if Python statement is valid.
                - str | None: None if evaluation is valid; otherwise, the
                    tested code.

        Raises:
            InvalidPythonCodeError: Raised when receiving invalid Python
                statements (e.g. 1/0).
        """
        match = cls.python_code_pattern.search(str(sequence))

        if not match:
            return sequence

        code = match.group("python_code")
        response = spec_vars.get("response")

        try:
            if is_a_test_case:
                return cls._assert_code(code, response)

            return cls._evaluate_sequence(sequence, match, code, response)
        except Exception as e:
            raise InvalidPythonCodeError(str(e), code)

    @classmethod
    def _get_allowed_modules(cls) -> dict[str, Any]:
        """Dynamically import allowed modules.

        Returns:
            dict: Dictionary of module names to imported modules
        """
        # nosec B301: imports are constrained by cls.ALLOWED_MODULES.
        return {
            name: __import__(name)
            for name in cls.ALLOWED_MODULES
        }

    @classmethod
    def _get_safe_globals(cls, response: Any = None) -> dict[str, Any]:
        """Create a secure global context for code execution.

        Args:
            response: Optional response object for test assertions

        Returns:
            dict: Safe global context with restricted access
        """
        safe_context: dict[str, Any] = safe_globals.copy()
        safe_context["__builtins__"] = safe_builtins.copy()

        # Add iterator functions for generator expressions and comprehensions
        safe_context["_iter_unpack_sequence_"] = iter
        safe_context["_getiter_"] = iter
        safe_context["_getattr_"] = getattr
        # Enables obj[key] access
        safe_context["_getitem_"] = lambda obj, key: obj[key]
        essential_builtins = {
            "all": all,
            "any": any,
            "len": len,
            "str": str,
        }
        safe_context["__builtins__"].update(essential_builtins)

        # Add allowed modules via dynamic import
        allowed_modules = cls._get_allowed_modules()
        safe_context.update(allowed_modules)

        # Add response object if provided (for test assertions)
        if response is not None:
            safe_context["response"] = response

        return safe_context

    @classmethod
    def _safe_eval(
        cls,
        code: str,
        global_context: dict[str, Any] | None = None,
    ) -> Any:
        """Safely evaluate Python code using RestrictedPython with mode='eval'.

        Args:
            code (string): Python code to evaluate
            global_context (dict): Global context for evaluation

        Returns:
            Result of code evaluation

        Raises:
            InvalidPythonCodeError: If code compilation or execution fails
        """
        if global_context is None:
            global_context = cls._get_safe_globals()

        try:
            # Strip whitespace to avoid IndentationError in RestrictedPython
            clean_code = code.strip()

            # Compile the code with restrictions using mode='eval'
            compiled_code = compile_restricted(
                clean_code, "<string>", mode="eval"
            )
            if compiled_code is None:
                raise InvalidPythonCodeError(
                    "Failed to compile restricted code", code
                )

            # Execute the compiled code securely; RestrictedPython compiles
            # user code with safety restrictions before it reaches eval.
            result = eval(compiled_code, global_context)  # nosec B307
            return result

        except SyntaxError as e:
            logger.error(
                "Syntax error in Python code: '%s' - %s", clean_code, str(e)
            )
            raise InvalidPythonCodeError(f"Syntax error in code: {e}", code)
        except Exception as e:
            logger.error(
                "Runtime error in Python code: '%s' - %s", clean_code, str(e)
            )
            raise InvalidPythonCodeError(str(e), code)

    @classmethod
    def _assert_code(cls, code, response):
        """Assert a Python code statement using RestrictedPython.

        The evaluation's global context is enriched with the response to support
        comprehensions using RestrictedPython for security.

        Args:
            code (string): python code that ScanAPI needs to assert
            response (requests.Response): the response for the current request
            that is being tested

        Returns:
            tuple: a tuple containing:
                -  Boolean: a boolean that indicates if assert
                is True/False
                -  string: None if valid evaluation, code tested otherwise

        Raises:
            AssertionError: If python statement evaluates False

        """
        global_context = cls._get_safe_globals(response)
        ok = cls._safe_eval(code, global_context)
        return ok, None if ok else code.strip()

    @classmethod
    def _evaluate_sequence(cls, sequence, match, code, response):
        # To avoid circular imports
        from scanapi.evaluators.string_evaluator import StringEvaluator

        global_context = cls._get_safe_globals(response)
        result = cls._safe_eval(code, global_context)

        return StringEvaluator.replace_var_with_value(
            sequence, match.group(), str(result)
        )

evaluate(sequence, spec_vars, is_a_test_case=False) classmethod

Receive a sequence of characters and evaluate Python code present in it.

Parameters:

Name Type Description Default
sequence str

Sequence of characters to be evaluated.

required
spec_vars dict

Dictionary containing SpecEvaluator variables.

required
is_a_test_case bool

Indicator for checking whether the given evaluation is a test case.

False

Returns:

Name Type Description
tuple Any

Tuple containing: - bool: True if Python statement is valid. - str | None: None if evaluation is valid; otherwise, the tested code.

Raises:

Type Description
InvalidPythonCodeError

Raised when receiving invalid Python statements (e.g. 1/0).

Source code in scanapi/evaluators/code_evaluator.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@classmethod
def evaluate(
    cls,
    sequence: Any,
    spec_vars: dict[str, Any],
    is_a_test_case: bool = False,
) -> Any:
    """Receive a sequence of characters and evaluate Python code present in
    it.

    Args:
        sequence (str): Sequence of characters to be evaluated.
        spec_vars (dict): Dictionary containing SpecEvaluator variables.
        is_a_test_case (bool): Indicator for checking whether the
            given evaluation is a test case.

    Returns:
        tuple: Tuple containing:
            - bool: True if Python statement is valid.
            - str | None: None if evaluation is valid; otherwise, the
                tested code.

    Raises:
        InvalidPythonCodeError: Raised when receiving invalid Python
            statements (e.g. 1/0).
    """
    match = cls.python_code_pattern.search(str(sequence))

    if not match:
        return sequence

    code = match.group("python_code")
    response = spec_vars.get("response")

    try:
        if is_a_test_case:
            return cls._assert_code(code, response)

        return cls._evaluate_sequence(sequence, match, code, response)
    except Exception as e:
        raise InvalidPythonCodeError(str(e), code)

SpecEvaluator

Evaluate variables and assertions defined in the ScanAPI specification.

This class maintains a registry of spec variables and evaluates expressions defined in the ScanAPI spec. It can also filter response-derived variables and supports evaluation of test case assertions.

Source code in scanapi/evaluators/spec_evaluator.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class SpecEvaluator:
    """Evaluate variables and assertions defined in the ScanAPI specification.

    This class maintains a registry of spec variables and evaluates
    expressions defined in the ScanAPI spec. It can also filter
    response-derived variables and supports evaluation of test case
    assertions.
    """

    def __init__(
        self,
        endpoint: Any,
        spec_vars: dict[str, Any],
        extras: dict[str, Any] | None = None,
        filter_responses: bool = True,
    ) -> None:
        """Initialize a SpecEvaluator.

        Args:
            endpoint (EndpointNode): Endpoint instance for which the
                spec is being evaluated.
            spec_vars (dict): Variables defined in the ScanAPI
                specification.
            extras (dict, optional): Optional extra variables to
                include in the registry.
            filter_responses (bool): Whether to filter out
                response-related variables.
        """
        self.endpoint = endpoint
        self.registry: dict[str, Any] = {}

        self.update(
            spec_vars,
            extras=extras,
            filter_responses=filter_responses,
        )

    def evaluate(self, element: Any) -> Any:
        """Evaluate a spec element.

        Args:
            element (Any): Spec element/expression to evaluate.

        Returns:
            Any: Evaluated value of the element.
        """
        return evaluate(element, self)

    def evaluate_assertion(self, element: Any) -> Any:
        """Evaluate an assertion element.

        Args:
            element (Any): Assertion expression from a test case.

        Returns:
            Any: Result of the evaluated assertion.
        """
        return _evaluate_str(element, self, is_a_test_case=True)

    def update(
        self,
        spec_vars: dict[str, Any],
        extras: dict[str, Any] | None = None,
        filter_responses: bool = False,
    ) -> None:
        """Update the evaluator registry with evaluated spec variables.

        This method evaluates each variable in ``spec_vars``
        (optionally using ``extras`` during evaluation) and updates
        the internal registry.

        Args:
            spec_vars (dict): Mapping of spec variable names to
                expressions/values.
            extras (dict, optional): Optional extra variables to
                include in the registry.
            filter_responses (bool): Whether to filter out
                response-related variables.
        """
        if extras is None:
            extras = {}

        if filter_responses:
            spec_vars = self.filter_response_var(spec_vars)

        values = {
            key: evaluate(value, extras) for key, value in spec_vars.items()
        }

        self.registry.update(extras)
        self.registry.update(values)

    def get(self, key: str, default: Any = None) -> Any:
        """Retrieve a value from the registry.

        Args:
            key (str): Name of the variable to retrieve.
            default (Any): Value to return if the key does not exist.

        Returns:
            Any: Value associated with the given key or
                ``default`` if key is not present.
        """
        try:
            return self[key]
        except KeyError:
            return default

    def __repr__(self) -> str:
        """Return a string representation of the evaluator registry."""
        return self.registry.__repr__()

    def __getitem__(self, key: str) -> Any:
        """Retrieve a variable value from the registry.

        Args:
            key (str): Variable name to retrieve.

        Returns:
            Any: Value for the given key.

        Raises:
            KeyError: If the key is not present in the registry or
                endpoint variables.
        """
        if key in self:
            return self.registry[key]

        all_vars = self.endpoint.get_all_vars()

        if key in all_vars:
            return all_vars[key]

        raise KeyError(key)

    def __delitem__(self, key: str) -> None:
        """Delete a variable from the registry.

        Args:
            key (str): Variable name to delete.

        Raises:
            KeyError: If the key is not present in the registry.
        """
        if key in self:
            del self.registry[key]
        else:
            raise KeyError(key)

    def __contains__(self, key: str) -> bool:
        """Check whether a variable exists in the registry.

        Args:
            key (str): Variable name.

        Returns:
            bool: True if the variable exists in the registry,
                otherwise False.
        """
        return key in self.registry

    def keys(self) -> KeysView[str]:
        """Return a copy of the dictionary keys.

        Returns:
            KeysView[str]: The registry keys.
        """
        return self.registry.keys()

    @classmethod
    def filter_response_var(
        cls,
        spec_vars: dict[str, Any],
    ) -> dict[str, Any]:
        """Return a copy of ``spec_vars`` without response references.

        Any items with a ``response.*`` reference in their value
        are left out.

        Returns:
            dict: Filtered dictionary.
        """
        pattern = re.compile(r"(?:(\s*response\.\w+))")

        return {
            key: value
            for key, value in spec_vars.items()
            if not pattern.search(value)
        }

__contains__(key)

Check whether a variable exists in the registry.

Parameters:

Name Type Description Default
key str

Variable name.

required

Returns:

Name Type Description
bool bool

True if the variable exists in the registry, otherwise False.

Source code in scanapi/evaluators/spec_evaluator.py
158
159
160
161
162
163
164
165
166
167
168
def __contains__(self, key: str) -> bool:
    """Check whether a variable exists in the registry.

    Args:
        key (str): Variable name.

    Returns:
        bool: True if the variable exists in the registry,
            otherwise False.
    """
    return key in self.registry

__delitem__(key)

Delete a variable from the registry.

Parameters:

Name Type Description Default
key str

Variable name to delete.

required

Raises:

Type Description
KeyError

If the key is not present in the registry.

Source code in scanapi/evaluators/spec_evaluator.py
144
145
146
147
148
149
150
151
152
153
154
155
156
def __delitem__(self, key: str) -> None:
    """Delete a variable from the registry.

    Args:
        key (str): Variable name to delete.

    Raises:
        KeyError: If the key is not present in the registry.
    """
    if key in self:
        del self.registry[key]
    else:
        raise KeyError(key)

__getitem__(key)

Retrieve a variable value from the registry.

Parameters:

Name Type Description Default
key str

Variable name to retrieve.

required

Returns:

Name Type Description
Any Any

Value for the given key.

Raises:

Type Description
KeyError

If the key is not present in the registry or endpoint variables.

Source code in scanapi/evaluators/spec_evaluator.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def __getitem__(self, key: str) -> Any:
    """Retrieve a variable value from the registry.

    Args:
        key (str): Variable name to retrieve.

    Returns:
        Any: Value for the given key.

    Raises:
        KeyError: If the key is not present in the registry or
            endpoint variables.
    """
    if key in self:
        return self.registry[key]

    all_vars = self.endpoint.get_all_vars()

    if key in all_vars:
        return all_vars[key]

    raise KeyError(key)

__init__(endpoint, spec_vars, extras=None, filter_responses=True)

Initialize a SpecEvaluator.

Parameters:

Name Type Description Default
endpoint EndpointNode

Endpoint instance for which the spec is being evaluated.

required
spec_vars dict

Variables defined in the ScanAPI specification.

required
extras dict

Optional extra variables to include in the registry.

None
filter_responses bool

Whether to filter out response-related variables.

True
Source code in scanapi/evaluators/spec_evaluator.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(
    self,
    endpoint: Any,
    spec_vars: dict[str, Any],
    extras: dict[str, Any] | None = None,
    filter_responses: bool = True,
) -> None:
    """Initialize a SpecEvaluator.

    Args:
        endpoint (EndpointNode): Endpoint instance for which the
            spec is being evaluated.
        spec_vars (dict): Variables defined in the ScanAPI
            specification.
        extras (dict, optional): Optional extra variables to
            include in the registry.
        filter_responses (bool): Whether to filter out
            response-related variables.
    """
    self.endpoint = endpoint
    self.registry: dict[str, Any] = {}

    self.update(
        spec_vars,
        extras=extras,
        filter_responses=filter_responses,
    )

__repr__()

Return a string representation of the evaluator registry.

Source code in scanapi/evaluators/spec_evaluator.py
117
118
119
def __repr__(self) -> str:
    """Return a string representation of the evaluator registry."""
    return self.registry.__repr__()

evaluate(element)

Evaluate a spec element.

Parameters:

Name Type Description Default
element Any

Spec element/expression to evaluate.

required

Returns:

Name Type Description
Any Any

Evaluated value of the element.

Source code in scanapi/evaluators/spec_evaluator.py
46
47
48
49
50
51
52
53
54
55
def evaluate(self, element: Any) -> Any:
    """Evaluate a spec element.

    Args:
        element (Any): Spec element/expression to evaluate.

    Returns:
        Any: Evaluated value of the element.
    """
    return evaluate(element, self)

evaluate_assertion(element)

Evaluate an assertion element.

Parameters:

Name Type Description Default
element Any

Assertion expression from a test case.

required

Returns:

Name Type Description
Any Any

Result of the evaluated assertion.

Source code in scanapi/evaluators/spec_evaluator.py
57
58
59
60
61
62
63
64
65
66
def evaluate_assertion(self, element: Any) -> Any:
    """Evaluate an assertion element.

    Args:
        element (Any): Assertion expression from a test case.

    Returns:
        Any: Result of the evaluated assertion.
    """
    return _evaluate_str(element, self, is_a_test_case=True)

filter_response_var(spec_vars) classmethod

Return a copy of spec_vars without response references.

Any items with a response.* reference in their value are left out.

Returns:

Name Type Description
dict dict[str, Any]

Filtered dictionary.

Source code in scanapi/evaluators/spec_evaluator.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@classmethod
def filter_response_var(
    cls,
    spec_vars: dict[str, Any],
) -> dict[str, Any]:
    """Return a copy of ``spec_vars`` without response references.

    Any items with a ``response.*`` reference in their value
    are left out.

    Returns:
        dict: Filtered dictionary.
    """
    pattern = re.compile(r"(?:(\s*response\.\w+))")

    return {
        key: value
        for key, value in spec_vars.items()
        if not pattern.search(value)
    }

get(key, default=None)

Retrieve a value from the registry.

Parameters:

Name Type Description Default
key str

Name of the variable to retrieve.

required
default Any

Value to return if the key does not exist.

None

Returns:

Name Type Description
Any Any

Value associated with the given key or default if key is not present.

Source code in scanapi/evaluators/spec_evaluator.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def get(self, key: str, default: Any = None) -> Any:
    """Retrieve a value from the registry.

    Args:
        key (str): Name of the variable to retrieve.
        default (Any): Value to return if the key does not exist.

    Returns:
        Any: Value associated with the given key or
            ``default`` if key is not present.
    """
    try:
        return self[key]
    except KeyError:
        return default

keys()

Return a copy of the dictionary keys.

Returns:

Type Description
KeysView[str]

KeysView[str]: The registry keys.

Source code in scanapi/evaluators/spec_evaluator.py
170
171
172
173
174
175
176
def keys(self) -> KeysView[str]:
    """Return a copy of the dictionary keys.

    Returns:
        KeysView[str]: The registry keys.
    """
    return self.registry.keys()

update(spec_vars, extras=None, filter_responses=False)

Update the evaluator registry with evaluated spec variables.

This method evaluates each variable in spec_vars (optionally using extras during evaluation) and updates the internal registry.

Parameters:

Name Type Description Default
spec_vars dict

Mapping of spec variable names to expressions/values.

required
extras dict

Optional extra variables to include in the registry.

None
filter_responses bool

Whether to filter out response-related variables.

False
Source code in scanapi/evaluators/spec_evaluator.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def update(
    self,
    spec_vars: dict[str, Any],
    extras: dict[str, Any] | None = None,
    filter_responses: bool = False,
) -> None:
    """Update the evaluator registry with evaluated spec variables.

    This method evaluates each variable in ``spec_vars``
    (optionally using ``extras`` during evaluation) and updates
    the internal registry.

    Args:
        spec_vars (dict): Mapping of spec variable names to
            expressions/values.
        extras (dict, optional): Optional extra variables to
            include in the registry.
        filter_responses (bool): Whether to filter out
            response-related variables.
    """
    if extras is None:
        extras = {}

    if filter_responses:
        spec_vars = self.filter_response_var(spec_vars)

    values = {
        key: evaluate(value, extras) for key, value in spec_vars.items()
    }

    self.registry.update(extras)
    self.registry.update(values)

evaluate(expression, _spec_vars)

Evaluate a spec expression based on its type.

This function is implemented using functools.singledispatch to support different types of expressions (e.g. strings, dictionaries, lists).

Parameters:

Name Type Description Default
expression Any

Expression/value to evaluate.

required
_spec_vars Any

SpecEvaluator instance or variable registry used during evaluation.

required

Returns:

Name Type Description
Any Any

Evaluated result or original expression if no evaluation is needed.

Source code in scanapi/evaluators/spec_evaluator.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@singledispatch
def evaluate(
    expression: Any,
    _spec_vars: Any,
) -> Any:
    """Evaluate a spec expression based on its type.

    This function is implemented using
    ``functools.singledispatch`` to support different types of
    expressions (e.g. strings, dictionaries, lists).

    Args:
        expression: Expression/value to evaluate.
        _spec_vars: SpecEvaluator instance or variable registry
            used during evaluation.

    Returns:
        Any: Evaluated result or original expression if no
            evaluation is needed.
    """
    return expression

StringEvaluator

Class that handles environment and custom variables evaluation. It replaces every occurrence with ${customVariable} or ${ENV} pattern.

Source code in scanapi/evaluators/string_evaluator.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class StringEvaluator:
    """
    Class that handles environment and custom variables evaluation.
    It replaces every occurrence with ```${customVariable}```
    or ```${ENV}``` pattern.
    """

    variable_pattern = re.compile(
        r"(?P<something_before>\w*)"
        r"(?P<start>\${)"
        r"(?P<variable>[\w|-]*)"
        r"(?P<end>})"
        r"(?P<something_after>\w*)"
    )  # ${<variable>}

    @classmethod
    def evaluate(
        cls,
        sequence: str,
        spec_vars: dict[str, Any],
        is_a_test_case: bool = False,
    ) -> Any:
        """Receives a sequence of characters and evaluates any custom or
        environment variables present on it

        Args:
            sequence (string): sequence of characters to be evaluated
                spec_vars (dict): dictionary containing the SpecEvaluator
                variables
            is_a_test_case (bool): indicator for checking if the given
                evaluation is a test case.

        Returns:
            tuple: a tuple containing:
                -  Boolean: True if python statement is valid
                -  string: None if valid evaluation, tested code otherwise

        """
        sequence = cls._evaluate_env_var(sequence)
        sequence = cls._evaluate_custom_var(sequence, spec_vars)

        return CodeEvaluator.evaluate(sequence, spec_vars, is_a_test_case)

    @classmethod
    def _evaluate_env_var(cls, sequence: str) -> str:
        """Receives a sequence of characters and evaluates any environment
        variables present on it

        Args:
            sequence (string): sequence of characters to be evaluated

        Returns:
            sequence (string): sequence of characters with all valid
            environment variables replaced
        """
        matches = cls.variable_pattern.finditer(sequence)

        for match in matches:
            variable_name = match.group("variable")

            if any(letter.islower() for letter in variable_name):
                continue

            try:
                variable_value = os.environ[variable_name]
            except KeyError as e:
                raise BadConfigurationError(e)

            sequence = cls.replace_var_with_value(
                sequence, match.group(), variable_value
            )

        return sequence

    @classmethod
    def _evaluate_custom_var(
        cls,
        sequence: str,
        spec_vars: dict[str, Any],
    ) -> str:
        """Receives a sequence of characters and evaluates any custom
        variables present on it

        Args:
            sequence (string): sequence of characters to be evaluated
            spec_vars (dict): dictionary containing the SpecEvaluator variables

        Returns:
            sequence (string): sequence of characters with all valid
            custom variables replaced
        """
        matches = cls.variable_pattern.finditer(sequence)

        for match in matches:
            variable_name = match.group("variable")

            if variable_name.isupper():
                continue

            if not spec_vars.get(variable_name):
                continue

            variable_value = spec_vars.get(variable_name)

            sequence = cls.replace_var_with_value(
                sequence, match.group(), variable_value
            )

        return sequence

    @classmethod
    def replace_var_with_value(
        cls,
        sequence: str,
        variable: str,
        variable_value: Any,
    ) -> Any:
        """Receives a sequence of characters and replaces every occurrence
        of a variable with its value

        Args:
            sequence (string): sequence of characters to be evaluated
            variable (string): variable to be replaced
            variable_value (any): value that will replace the variable

        Returns:
            sequence (string): sequence of characters with all occurrences of
            the current variable replaced
        """
        if variable == sequence:
            return variable_value

        variable = re.escape(variable)
        return re.sub(variable, str(variable_value), sequence)

evaluate(sequence, spec_vars, is_a_test_case=False) classmethod

Receives a sequence of characters and evaluates any custom or environment variables present on it

Parameters:

Name Type Description Default
sequence string

sequence of characters to be evaluated spec_vars (dict): dictionary containing the SpecEvaluator variables

required
is_a_test_case bool

indicator for checking if the given evaluation is a test case.

False

Returns:

Name Type Description
tuple Any

a tuple containing: - Boolean: True if python statement is valid - string: None if valid evaluation, tested code otherwise

Source code in scanapi/evaluators/string_evaluator.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def evaluate(
    cls,
    sequence: str,
    spec_vars: dict[str, Any],
    is_a_test_case: bool = False,
) -> Any:
    """Receives a sequence of characters and evaluates any custom or
    environment variables present on it

    Args:
        sequence (string): sequence of characters to be evaluated
            spec_vars (dict): dictionary containing the SpecEvaluator
            variables
        is_a_test_case (bool): indicator for checking if the given
            evaluation is a test case.

    Returns:
        tuple: a tuple containing:
            -  Boolean: True if python statement is valid
            -  string: None if valid evaluation, tested code otherwise

    """
    sequence = cls._evaluate_env_var(sequence)
    sequence = cls._evaluate_custom_var(sequence, spec_vars)

    return CodeEvaluator.evaluate(sequence, spec_vars, is_a_test_case)

replace_var_with_value(sequence, variable, variable_value) classmethod

Receives a sequence of characters and replaces every occurrence of a variable with its value

Parameters:

Name Type Description Default
sequence string

sequence of characters to be evaluated

required
variable string

variable to be replaced

required
variable_value any

value that will replace the variable

required

Returns:

Name Type Description
sequence string

sequence of characters with all occurrences of

Any

the current variable replaced

Source code in scanapi/evaluators/string_evaluator.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
@classmethod
def replace_var_with_value(
    cls,
    sequence: str,
    variable: str,
    variable_value: Any,
) -> Any:
    """Receives a sequence of characters and replaces every occurrence
    of a variable with its value

    Args:
        sequence (string): sequence of characters to be evaluated
        variable (string): variable to be replaced
        variable_value (any): value that will replace the variable

    Returns:
        sequence (string): sequence of characters with all occurrences of
        the current variable replaced
    """
    if variable == sequence:
        return variable_value

    variable = re.escape(variable)
    return re.sub(variable, str(variable_value), sequence)

Tree

EndpointNode

Class that represents an endpoint. It follows a tree-like structure where each EndpointNode may contain multiple children EndpointNodes.

Attributes:

Name Type Description
spec dict

dictionary containing the endpoint's specifications

parent EndpointNode

the parent node

child_nodes list of EndpointNodes

the children nodes

spec_vars SpecEvaluator

evaluator used to evaluate expressions and store spec variables

Source code in scanapi/tree/endpoint_node.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class EndpointNode:
    """
    Class that represents an endpoint. It follows a tree-like structure
    where each EndpointNode may contain multiple children EndpointNodes.

    Attributes:
        spec (dict): dictionary containing the endpoint's specifications
        parent (EndpointNode, optional): the parent node
        child_nodes (list of EndpointNodes): the children nodes
        spec_vars (SpecEvaluator): evaluator used to evaluate expressions
                                  and store spec variables
    """

    SCOPE = "endpoint"
    ALLOWED_KEYS = (
        ENDPOINTS_KEY,
        HEADERS_KEY,
        NAME_KEY,
        PARAMS_KEY,
        PATH_KEY,
        REQUESTS_KEY,
        DELAY_KEY,
        VARS_KEY,
        OPTIONS_KEY,
    )
    ALLOWED_OPTIONS = (
        "verify",
        "timeout",
    )
    REQUIRED_KEYS = (NAME_KEY,)
    ROOT_REQUIRED_KEYS = ()

    def __init__(self, spec, parent=None):
        self.spec = spec
        self.parent = parent
        self.child_nodes = []
        self.__build()
        self.spec_vars = SpecEvaluator(self, spec.get(VARS_KEY, {}))

    def __build(self):
        """Validate the EndpointNode keys and create children EndpointNodes
        from endpoints in its specifications.
        """
        self._validate()

        self.child_nodes = [
            EndpointNode(spec, parent=self)
            for spec in self.spec.get(ENDPOINTS_KEY, [])
        ]

    def __repr__(self):
        return f"<{self.__class__.__name__} {self.name}>"

    @property
    def name(self) -> str:
        """Get the endpoint's name. The name is prepended by the parent's name,
        if it is not a root node.

        Returns:
            str: The endpoint's name.
        """
        name = self.spec.get(NAME_KEY, "")

        if self.is_root or not self.parent.name:
            return str(name)

        return f"{self.parent.name}::{name}"

    @property
    def path(self) -> str:
        """Get the endpoint's path. The path is prepended by the parent's path,
        if it is not a root node. The returned path already has all variables
        evaluated.

        Returns:
            str: The endpoint's url.
        """
        path = str(self.spec.get(PATH_KEY, "")).strip()
        url = join_urls(self.parent.path, path) if self.parent else path

        return str(self.spec_vars.evaluate(url))

    @property
    def options(self) -> dict[str, Any]:
        """Get the keywords arguments used in the endpoint call.
        The options of the call include the parent's options.

        Returns:
            dict: the keyword used in the endpoint call.
        """
        options = self._get_specs(OPTIONS_KEY)
        for option in options:
            if option not in self.ALLOWED_OPTIONS:
                raise InvalidKeyError(option, OPTIONS_KEY, self.ALLOWED_OPTIONS)

        return options

    @property
    def headers(self) -> dict[str, Any]:
        """Get the headers used in the endpoint call. The headers of the
        call include the parent's headers.

        Returns:
            dict: the headers used in the endpoint call.
        """
        return self._get_specs(HEADERS_KEY)

    @property
    def params(self) -> dict[str, Any]:
        """Get the parameters used in the endpoint call. The parameters of the
        call include the parent's parameters.

        Returns:
            dict: the parameters used in the endpoint call.
        """
        return self._get_specs(PARAMS_KEY)

    @property
    def delay(self) -> int:
        """Get the time in milliseconds to be waited before making the endpoint
        call.

        Returns:
            int: the time to be waited.
        """
        delay = self.spec.get(DELAY_KEY, 0)
        return int(delay or getattr(self.parent, DELAY_KEY, 0))

    @property
    def is_root(self) -> bool:
        """Check if the EndpointNode is a root node.

        Returns:
            bool: true if the node has no parent, false otherwise.
        """
        return not self.parent

    def propagate_spec_vars(
        self,
        spec_vars: Dict[str, Any],
        extras: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Update the endpoint node spec_vars and propagate those changes
        to the parent nodes. It only includes new variables.

        Args:
            spec_vars (dict): the new spec_vars.
            extras (dict): extra variables used to update the spec_vars.
        """
        new_spec_vars = {
            key: value
            for key, value in spec_vars.items()
            if key not in self.spec_vars.registry
        }

        # Evaluate variables using extras but don't store extras in registry
        extras = extras or {}

        evaluated_vars = {
            key: evaluate(value, extras) for key, value in new_spec_vars.items()
        }

        self.spec_vars.registry.update(evaluated_vars)

        if not self.is_root:
            self.parent.propagate_spec_vars(spec_vars, extras)

    def get_all_vars(self) -> Dict[str, Any]:
        """Get all variables in spec_vars from the node and its parents.

        Returns:
            dict: dict from the variable's name to its value.
        """
        variables = copy.deepcopy(self.spec_vars.registry)
        if not self.is_root:
            variables.update(self.parent.get_all_vars())

        return variables

    def run(self) -> Iterator[dict[str, Any]]:
        """Run the requests of the node and all children nodes.

        Returns:
            iterator: Iterator that yields the test result of each request.
        """
        for request in self._get_requests():
            try:
                yield request.run()
            except (NetworkError, TimeoutException) as e:
                # These are hard errors and should exit early with an error
                error_message = (
                    f"\nError while connecting for request"
                    f" {request.full_url_path!r}\n{str(e)}\n"
                )
                logger.error(error_message)
                session.exit_code = ExitCode.REQUEST_ERROR
                session.increment_errors()
                return
            except (CookieConflict, HTTPError, InvalidURL, StreamError) as e:
                error_message = (
                    f"\nError to make request {repr(request.full_url_path)}. "
                    f"\n{str(e)}\n"
                )
                logger.error(error_message)
                session.exit_code = ExitCode.REQUEST_ERROR
                continue

    def _validate(self):
        """Private method that checks if the specification has any invalid key
        or if there is any required key missing.
        """
        required_keys = (
            self.ROOT_REQUIRED_KEYS if self.is_root else self.REQUIRED_KEYS
        )
        scope = ROOT_SCOPE if self.is_root else self.SCOPE

        validate_keys(self.spec.keys(), self.ALLOWED_KEYS, required_keys, scope)

    def _get_specs(self, field_name: str) -> dict[str, Any]:
        """Get a specification of the endpoint.

        Args:
            field_name (str): name of the specification field.

        Returns:
            dict: a dictionary containing the values of the field.
        """
        values: dict[str, Any] = self.spec.get(field_name, {})
        parent_values: dict[str, Any] | None = getattr(
            self.parent, field_name, None
        )

        if parent_values:
            return {**parent_values, **values}

        return values

    def _get_requests(self) -> Iterator[RequestNode]:
        """Get all requests from the node and children nodes as RequestNodes.

        Returns:
            iterator: Iterator that yields a RequestNode for
            each request.
        """
        return chain(
            (
                RequestNode(spec, self)
                for spec in self.spec.get(REQUESTS_KEY, [])
            ),
            *(child._get_requests() for child in self.child_nodes),
        )

delay property

Get the time in milliseconds to be waited before making the endpoint call.

Returns:

Name Type Description
int int

the time to be waited.

headers property

Get the headers used in the endpoint call. The headers of the call include the parent's headers.

Returns:

Name Type Description
dict dict[str, Any]

the headers used in the endpoint call.

is_root property

Check if the EndpointNode is a root node.

Returns:

Name Type Description
bool bool

true if the node has no parent, false otherwise.

name property

Get the endpoint's name. The name is prepended by the parent's name, if it is not a root node.

Returns:

Name Type Description
str str

The endpoint's name.

options property

Get the keywords arguments used in the endpoint call. The options of the call include the parent's options.

Returns:

Name Type Description
dict dict[str, Any]

the keyword used in the endpoint call.

params property

Get the parameters used in the endpoint call. The parameters of the call include the parent's parameters.

Returns:

Name Type Description
dict dict[str, Any]

the parameters used in the endpoint call.

path property

Get the endpoint's path. The path is prepended by the parent's path, if it is not a root node. The returned path already has all variables evaluated.

Returns:

Name Type Description
str str

The endpoint's url.

__build()

Validate the EndpointNode keys and create children EndpointNodes from endpoints in its specifications.

Source code in scanapi/tree/endpoint_node.py
77
78
79
80
81
82
83
84
85
86
def __build(self):
    """Validate the EndpointNode keys and create children EndpointNodes
    from endpoints in its specifications.
    """
    self._validate()

    self.child_nodes = [
        EndpointNode(spec, parent=self)
        for spec in self.spec.get(ENDPOINTS_KEY, [])
    ]

get_all_vars()

Get all variables in spec_vars from the node and its parents.

Returns:

Name Type Description
dict Dict[str, Any]

dict from the variable's name to its value.

Source code in scanapi/tree/endpoint_node.py
205
206
207
208
209
210
211
212
213
214
215
def get_all_vars(self) -> Dict[str, Any]:
    """Get all variables in spec_vars from the node and its parents.

    Returns:
        dict: dict from the variable's name to its value.
    """
    variables = copy.deepcopy(self.spec_vars.registry)
    if not self.is_root:
        variables.update(self.parent.get_all_vars())

    return variables

propagate_spec_vars(spec_vars, extras=None)

Update the endpoint node spec_vars and propagate those changes to the parent nodes. It only includes new variables.

Parameters:

Name Type Description Default
spec_vars dict

the new spec_vars.

required
extras dict

extra variables used to update the spec_vars.

None
Source code in scanapi/tree/endpoint_node.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def propagate_spec_vars(
    self,
    spec_vars: Dict[str, Any],
    extras: Optional[Dict[str, Any]] = None,
) -> None:
    """Update the endpoint node spec_vars and propagate those changes
    to the parent nodes. It only includes new variables.

    Args:
        spec_vars (dict): the new spec_vars.
        extras (dict): extra variables used to update the spec_vars.
    """
    new_spec_vars = {
        key: value
        for key, value in spec_vars.items()
        if key not in self.spec_vars.registry
    }

    # Evaluate variables using extras but don't store extras in registry
    extras = extras or {}

    evaluated_vars = {
        key: evaluate(value, extras) for key, value in new_spec_vars.items()
    }

    self.spec_vars.registry.update(evaluated_vars)

    if not self.is_root:
        self.parent.propagate_spec_vars(spec_vars, extras)

run()

Run the requests of the node and all children nodes.

Returns:

Name Type Description
iterator Iterator[dict[str, Any]]

Iterator that yields the test result of each request.

Source code in scanapi/tree/endpoint_node.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def run(self) -> Iterator[dict[str, Any]]:
    """Run the requests of the node and all children nodes.

    Returns:
        iterator: Iterator that yields the test result of each request.
    """
    for request in self._get_requests():
        try:
            yield request.run()
        except (NetworkError, TimeoutException) as e:
            # These are hard errors and should exit early with an error
            error_message = (
                f"\nError while connecting for request"
                f" {request.full_url_path!r}\n{str(e)}\n"
            )
            logger.error(error_message)
            session.exit_code = ExitCode.REQUEST_ERROR
            session.increment_errors()
            return
        except (CookieConflict, HTTPError, InvalidURL, StreamError) as e:
            error_message = (
                f"\nError to make request {repr(request.full_url_path)}. "
                f"\n{str(e)}\n"
            )
            logger.error(error_message)
            session.exit_code = ExitCode.REQUEST_ERROR
            continue

RequestNode

Class that represents a request. It's used as a child of an EndpointNode where each EndpointNode may contain multiple children RequestNode.

Attributes:

Name Type Description
spec dict

dictionary containing the request's specifications

endpoint EndpointNode

the parent node

Source code in scanapi/tree/request_node.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class RequestNode:
    """
    Class that represents a request. It's used as a child of an EndpointNode
    where each EndpointNode may contain multiple children RequestNode.

    Attributes:
        spec (dict): dictionary containing the request's specifications
        endpoint (EndpointNode): the parent node
    """

    SCOPE = "request"
    ALLOWED_KEYS = (
        BODY_KEY,
        HEADERS_KEY,
        METHOD_KEY,
        NAME_KEY,
        PARAMS_KEY,
        PATH_KEY,
        TESTS_KEY,
        VARS_KEY,
        DELAY_KEY,
        RETRY_KEY,
        OPTIONS_KEY,
    )
    ALLOWED_OPTIONS = ("verify", "timeout")
    ALLOWED_HTTP_METHODS = (
        "GET",
        "POST",
        "PUT",
        "PATCH",
        "DELETE",
        "HEAD",
        "OPTIONS",
    )
    REQUIRED_KEYS = (NAME_KEY,)

    def __init__(self, spec: dict[str, Any], endpoint: Any) -> None:
        self.spec = spec
        self.endpoint = endpoint
        self._validate()

    def __repr__(self):
        return f"<{self.__class__.__name__} {self.full_url_path}>"

    def __getitem__(self, item: str) -> Any:
        return self.spec[item]

    @property
    def http_method(self) -> str:
        method = str(self.spec.get(METHOD_KEY, "get")).upper()
        if method not in self.ALLOWED_HTTP_METHODS:
            raise HTTPMethodNotAllowedError(method, self.ALLOWED_HTTP_METHODS)

        return method

    @property
    def name(self) -> str:
        return str(self[NAME_KEY])

    @property
    def full_url_path(self) -> str:
        base_path = self.endpoint.path
        path = str(self.spec.get(PATH_KEY, ""))
        full_url = join_urls(base_path, path)

        self.endpoint.spec_vars.update(
            self.spec.get(VARS_KEY, {}),
            extras=dict(self.endpoint.spec_vars),
            filter_responses=True,
        )

        return str(self.endpoint.spec_vars.evaluate(full_url))

    @property
    def options(self) -> dict[str, Any]:
        endpoint_options = self.endpoint.options
        options = self.spec.get(OPTIONS_KEY, {})

        for option in options:
            if option not in self.ALLOWED_OPTIONS:
                raise InvalidKeyError(option, OPTIONS_KEY, self.ALLOWED_OPTIONS)

        return cast(
            dict[str, Any],
            self.endpoint.spec_vars.evaluate({**endpoint_options, **options}),
        )

    @property
    def headers(self) -> dict[str, Any]:
        endpoint_headers = self.endpoint.headers
        headers = self.spec.get(HEADERS_KEY, {})

        return cast(
            dict[str, Any],
            self.endpoint.spec_vars.evaluate({**endpoint_headers, **headers}),
        )

    @property
    def params(self) -> dict[str, Any]:
        endpoint_params = self.endpoint.params
        params = self.spec.get(PARAMS_KEY, {})

        return cast(
            dict[str, Any],
            self.endpoint.spec_vars.evaluate({**endpoint_params, **params}),
        )

    @property
    def delay(self) -> int:
        delay = self.spec.get(DELAY_KEY, 0)
        return int(delay or self.endpoint.delay)

    @property
    def body(self) -> Any:
        body = self.spec.get(BODY_KEY)

        return self.endpoint.spec_vars.evaluate(body)

    @property
    def tests(self) -> Iterator[TestingNode]:
        return (
            TestingNode(spec, self) for spec in self.spec.get(TESTS_KEY, [])
        )

    @property
    def retry(self) -> Any:
        return self.spec.get(RETRY_KEY)

    def run(self) -> dict[str, Any]:
        """Make HTTP requests and generating test results for the given URLs.

        Returns:
            dict: HTTP response and test results with request node name,
            to be used by the report template.

        """
        time.sleep(self.delay / 1000)

        method = self.http_method
        url = self.full_url_path
        console.print(f"\n- Making request {method} {url}", highlight=False)

        options = self.options
        verify = options.pop("verify", True)
        kwargs = dict(
            headers=self.headers,
            params=self.params,
            json=self.body,
            follow_redirects=False,
            **options,
        )

        if not self._content_type_is_json(kwargs["headers"]):
            kwargs["data"] = kwargs.pop("json")

        with session_with_retry(self.retry, verify) as session:
            response = session.request(method, url, **kwargs)

        extras = dict(self.endpoint.spec_vars)
        extras["response"] = response

        self.endpoint.propagate_spec_vars(
            self.spec.get(VARS_KEY, {}),
            extras=extras,
        )

        self.endpoint.spec_vars.update({"response": response})
        tests_results = self._run_tests()
        hide_sensitive_info(response)

        del self.endpoint.spec_vars["response"]

        result = {
            "response": response,
            "tests_results": tests_results,
            "no_failure": all(
                test_result["status"] == TestStatus.PASSED
                for test_result in tests_results
            ),
            "request_node_name": self.name,
            "options": self.options,
            "endpoint_name": self.endpoint.name,
        }

        if not settings["no_report"]:
            write_result(result)

        return result

    def _run_tests(self) -> list[dict[str, Any]]:
        """Run all tests cases of request node.

        Returns:
            dict: Return a dict with test result.

        """
        return [test.run() for test in self.tests]

    def _validate(self) -> None:
        """Validate spec keys.

        Returns:
            None

        """
        validate_keys(
            self.spec.keys(), self.ALLOWED_KEYS, self.REQUIRED_KEYS, self.SCOPE
        )

    @staticmethod
    def _content_type_is_json(headers: dict[str, Any]) -> bool:
        """Check headers for any content-type different than application/json

        Args:
            headers (dict[str, str]): request headers

        Returns:
            bool: False if convent-type is different then application/json
        """
        return not any(
            k.lower() == "content-type" and v.lower() != "application/json"
            for k, v in headers.items()
        )

run()

Make HTTP requests and generating test results for the given URLs.

Returns:

Name Type Description
dict dict[str, Any]

HTTP response and test results with request node name,

dict[str, Any]

to be used by the report template.

Source code in scanapi/tree/request_node.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def run(self) -> dict[str, Any]:
    """Make HTTP requests and generating test results for the given URLs.

    Returns:
        dict: HTTP response and test results with request node name,
        to be used by the report template.

    """
    time.sleep(self.delay / 1000)

    method = self.http_method
    url = self.full_url_path
    console.print(f"\n- Making request {method} {url}", highlight=False)

    options = self.options
    verify = options.pop("verify", True)
    kwargs = dict(
        headers=self.headers,
        params=self.params,
        json=self.body,
        follow_redirects=False,
        **options,
    )

    if not self._content_type_is_json(kwargs["headers"]):
        kwargs["data"] = kwargs.pop("json")

    with session_with_retry(self.retry, verify) as session:
        response = session.request(method, url, **kwargs)

    extras = dict(self.endpoint.spec_vars)
    extras["response"] = response

    self.endpoint.propagate_spec_vars(
        self.spec.get(VARS_KEY, {}),
        extras=extras,
    )

    self.endpoint.spec_vars.update({"response": response})
    tests_results = self._run_tests()
    hide_sensitive_info(response)

    del self.endpoint.spec_vars["response"]

    result = {
        "response": response,
        "tests_results": tests_results,
        "no_failure": all(
            test_result["status"] == TestStatus.PASSED
            for test_result in tests_results
        ),
        "request_node_name": self.name,
        "options": self.options,
        "endpoint_name": self.endpoint.name,
    }

    if not settings["no_report"]:
        write_result(result)

    return result

TestingNode

Represents a test node defined in the ScanAPI specification.

A TestingNode validates a test definition, executing its assertion against the evaluated API response, and reporting the result status.

Source code in scanapi/tree/testing_node.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class TestingNode:
    """Represents a test node defined in the ScanAPI specification.

    A TestingNode validates a test definition, executing its assertion
    against the evaluated API response, and reporting the result status.
    """

    __test__ = False
    SCOPE = "test"
    ALLOWED_KEYS = (ASSERT_KEY, NAME_KEY)
    REQUIRED_KEYS = (NAME_KEY, ASSERT_KEY)

    def __init__(self, spec: dict[str, Any], request: Any) -> None:
        self.spec = spec
        self.request = request
        self._validate()

    def __getitem__(self, item: str) -> Any:
        return self.spec[item]

    @property
    def name(self) -> str:
        return str(self[NAME_KEY])

    @property
    def assertion(self) -> Any:
        return self[ASSERT_KEY]

    @property
    def full_name(self) -> str:
        return f"{self.request.endpoint.name}::{self.request.name}::{self.name}"

    def run(self) -> dict[str, Any]:
        """Run the test assertion and return its result.

        This method evaluates the assertion defined in the test,
        updates the global session counters based on the outcome,
        and returns a dictionary describing the test execution.

        Returns:
            dict: A dictionary containing:
                - name (str): Full hierarchical name of the test.
                - status (TestStatus): Result of the test execution.
                - failure (any): Assertion failure details, if available.
                - error (str): Error message if an exception was raised.
        """

        try:
            (
                passed,
                failure,
            ) = self.request.endpoint.spec_vars.evaluate_assertion(
                self.assertion
            )

            status = TestStatus.PASSED if passed else TestStatus.FAILED
            error = None
        except Exception as e:
            status = TestStatus.ERROR
            failure = None
            error = str(e)

        self._process_result(status)

        return {
            "name": self.full_name,
            "status": status,
            "failure": failure,
            "error": error,
        }

    @staticmethod
    def _process_result(status: str) -> None:
        """Increment the number of session errors/failures/successes
        depending on the test status.

        Args:
            status (str): the status of the test: passed, failed or error.
        """
        if status == TestStatus.ERROR:
            session.increment_errors()
            return

        if status == TestStatus.FAILED:
            session.increment_failures()
            return

        if status == TestStatus.PASSED:
            session.increment_successes()

    def _validate(self) -> None:
        validate_keys(
            self.spec.keys(), self.ALLOWED_KEYS, self.REQUIRED_KEYS, self.SCOPE
        )

run()

Run the test assertion and return its result.

This method evaluates the assertion defined in the test, updates the global session counters based on the outcome, and returns a dictionary describing the test execution.

Returns:

Name Type Description
dict dict[str, Any]

A dictionary containing: - name (str): Full hierarchical name of the test. - status (TestStatus): Result of the test execution. - failure (any): Assertion failure details, if available. - error (str): Error message if an exception was raised.

Source code in scanapi/tree/testing_node.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def run(self) -> dict[str, Any]:
    """Run the test assertion and return its result.

    This method evaluates the assertion defined in the test,
    updates the global session counters based on the outcome,
    and returns a dictionary describing the test execution.

    Returns:
        dict: A dictionary containing:
            - name (str): Full hierarchical name of the test.
            - status (TestStatus): Result of the test execution.
            - failure (any): Assertion failure details, if available.
            - error (str): Error message if an exception was raised.
    """

    try:
        (
            passed,
            failure,
        ) = self.request.endpoint.spec_vars.evaluate_assertion(
            self.assertion
        )

        status = TestStatus.PASSED if passed else TestStatus.FAILED
        error = None
    except Exception as e:
        status = TestStatus.ERROR
        failure = None
        error = str(e)

    self._process_result(status)

    return {
        "name": self.full_name,
        "status": status,
        "failure": failure,
        "error": error,
    }