-
Notifications
You must be signed in to change notification settings - Fork 134
/
all.py
133 lines (113 loc) · 4.36 KB
/
all.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from pathlib import Path
from typing import Any, Optional, Sequence, Union
import click
from pygitguardian.iac_models import IaCScanParameters, IaCScanResult
from ggshield.cmd.iac.scan.iac_scan_common_options import (
add_iac_scan_common_options,
update_context,
)
from ggshield.cmd.iac.scan.iac_scan_utils import (
IaCSkipScanResult,
augment_unignored_issues,
create_output_handler,
handle_scan_error,
)
from ggshield.cmd.utils.common_decorators import exception_wrapper
from ggshield.cmd.utils.common_options import directory_argument
from ggshield.cmd.utils.context_obj import ContextObj
from ggshield.cmd.utils.files import check_directory_not_ignored
from ggshield.core.dirs import get_project_root_dir
from ggshield.core.git_hooks.ci.supported_ci import SupportedCI
from ggshield.core.scan import ScanContext, ScanMode
from ggshield.core.text_utils import display_info
from ggshield.verticals.iac.collection.iac_path_scan_collection import (
IaCPathScanCollection,
)
from ggshield.verticals.iac.filter import get_iac_files_from_path
# Changes to arguments must be propagated to default_command
@click.command()
@add_iac_scan_common_options()
@directory_argument
@click.pass_context
@exception_wrapper
def scan_all_cmd(
ctx: click.Context,
exit_zero: bool,
minimum_severity: str,
ignore_policies: Sequence[str],
ignore_paths: Sequence[str],
directory: Optional[Path] = None,
**kwargs: Any,
) -> int:
"""
Scan a directory for all IaC vulnerabilities in the current state.
The scan is successful if no IaC vulnerability (known or new) was found.
"""
if directory is None:
directory = Path().resolve()
update_context(ctx, exit_zero, minimum_severity, ignore_policies, ignore_paths)
result = iac_scan_all(ctx, directory, scan_mode=ScanMode.DIRECTORY_ALL)
ctx_obj = ContextObj.get(ctx)
augment_unignored_issues(ctx_obj.config.user_config, result)
return display_iac_scan_all_result(ctx, directory, result)
def iac_scan_all(
ctx: click.Context,
directory: Path,
scan_mode: ScanMode,
ci_mode: Optional[SupportedCI] = None,
) -> Union[IaCScanResult, IaCSkipScanResult, None]:
ctx_obj = ContextObj.get(ctx)
config = ctx_obj.config
check_directory_not_ignored(directory, ctx_obj.exclusion_regexes)
paths = get_iac_files_from_path(
path=directory,
exclusion_regexes=ctx_obj.exclusion_regexes,
# bypass verbose here: we want to display only IaC files
verbose=False,
# If the repository is a git repository, ignore untracked files
ignore_git=False,
ignore_git_staged=(scan_mode == ScanMode.PRE_PUSH_ALL),
)
if not paths:
return IaCSkipScanResult()
root = get_project_root_dir(directory)
relative_paths = [str(x.resolve().relative_to(root)) for x in paths]
if config.user_config.verbose:
display_info("> Scanned files")
for filepath in relative_paths:
display_info(f"- {click.format_filename(filepath)}")
client = ctx_obj.client
scan_parameters = IaCScanParameters(
list({ignored.policy for ignored in config.user_config.iac.ignored_policies}),
config.user_config.iac.minimum_severity,
)
# If paths are not sorted, the tar bytes order will be different when calling the function twice
# Different bytes order will cause different tarfile hash_key resulting in a GIM cache bypass.
relative_paths.sort()
scan = client.iac_directory_scan(
root,
relative_paths,
scan_parameters,
ScanContext(
command_path=ctx.command_path,
scan_mode=(
scan_mode if ci_mode is None else f"{scan_mode.value}/{ci_mode.value}"
),
extra_headers={"Ci-Mode": str(ci_mode.value)} if ci_mode else None,
target_path=directory,
).get_http_headers(),
)
if not isinstance(scan, IaCScanResult):
handle_scan_error(client, scan)
return None
return scan
def display_iac_scan_all_result(
ctx: click.Context,
directory: Path,
result: Union[IaCScanResult, IaCSkipScanResult, None],
) -> int:
output_handler = create_output_handler(ctx)
if isinstance(result, IaCSkipScanResult):
return output_handler.process_skip_scan()
scan = IaCPathScanCollection(id=str(directory), result=result)
return output_handler.process_scan(scan)