forked from pantsbuild/pants
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathk8s_parser.py
More file actions
225 lines (185 loc) · 7.09 KB
/
k8s_parser.py
File metadata and controls
225 lines (185 loc) · 7.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from __future__ import annotations
import json
import logging
import pkgutil
from collections import defaultdict
from dataclasses import dataclass
from pathlib import PurePath
from typing import Any
from pants.backend.helm.utils.yaml import YamlPath
from pants.backend.python.subsystems.python_tool_base import PythonToolRequirementsBase
from pants.backend.python.target_types import EntryPoint
from pants.backend.python.util_rules import pex
from pants.backend.python.util_rules.pex import (
VenvPex,
VenvPexProcess,
VenvPexRequest,
create_venv_pex,
)
from pants.backend.python.util_rules.pex_environment import PexEnvironment
from pants.engine.engine_aware import EngineAwareParameter, EngineAwareReturnType
from pants.engine.fs import CreateDigest, FileContent, FileEntry
from pants.engine.intrinsics import create_digest, execute_process
from pants.engine.rules import collect_rules, implicitly, rule
from pants.option.option_types import DictOption
from pants.util.logging import LogLevel
from pants.util.strutil import pluralize, softwrap
logger = logging.getLogger(__name__)
_HELM_K8S_PARSER_SOURCE = "k8s_parser_main.py"
_HELM_K8S_PARSER_PACKAGE = "pants.backend.helm.subsystems"
class HelmKubeParserSubsystem(PythonToolRequirementsBase):
options_scope = "helm-k8s-parser"
help_short = "Analyses K8S manifests rendered by Helm."
default_requirements = [
"hikaru>=1.1.0",
"hikaru-model-28",
"hikaru-model-27",
"hikaru-model-26",
"hikaru-model-25",
"hikaru-model-24",
"hikaru-model-23",
]
register_interpreter_constraints = True
crd = DictOption[str](
help=softwrap(
"""
Additional custom resource definitions be made available to all Helm processes
or during value interpolation.
Example:
[helm-k8s-parser.crd]
"filename1"="classname1"
"filename2"="classname2"
"""
),
default={},
)
default_lockfile_resource = (_HELM_K8S_PARSER_PACKAGE, "k8s_parser.lock")
@dataclass(frozen=True)
class _HelmKubeParserTool:
pex: VenvPex
crd: dict[str] = defaultdict
@rule
async def build_k8s_parser_tool(
k8s_parser: HelmKubeParserSubsystem,
pex_environment: PexEnvironment,
) -> _HelmKubeParserTool:
parser_sources = pkgutil.get_data(_HELM_K8S_PARSER_PACKAGE, _HELM_K8S_PARSER_SOURCE)
if not parser_sources:
raise ValueError(
f"Unable to find source to {_HELM_K8S_PARSER_SOURCE!r} in {_HELM_K8S_PARSER_PACKAGE}"
)
parser_file_content = FileContent(
path="__k8s_parser.py", content=parser_sources, is_executable=True
)
digest_sources = [parser_file_content]
modulename_classname = []
if k8s_parser.crd != {}:
for file, classname in k8s_parser.crd.items():
crd_sources = open(file, "rb").read()
if not crd_sources:
raise ValueError(
f"Unable to find source to customer resource definition in {_HELM_K8S_PARSER_PACKAGE}"
)
unique_name = f"_crd_source_{hash(file)}"
parser_file_content_source = FileContent(
path=f"{unique_name}.py", content=crd_sources, is_executable=False
)
digest_sources.append(parser_file_content_source)
modulename_classname.append((unique_name, classname))
parser_digest = await create_digest(CreateDigest(digest_sources))
# We use copies of site packages because hikaru gets confused with symlinked packages
# The core hikaru package tries to load the packages containing the kubernetes-versioned models
# using the __path__ attribute of the core package,
# which doesn't work when the packages are symlinked from inside the namespace-handling dirs in the PEX
use_site_packages_copies = True
parser_pex = await create_venv_pex(
VenvPexRequest(
k8s_parser.to_pex_request(
main=EntryPoint(PurePath(parser_file_content.path).stem),
sources=parser_digest,
),
pex_environment.in_sandbox(working_directory=None),
site_packages_copies=use_site_packages_copies,
),
**implicitly(),
)
return _HelmKubeParserTool(parser_pex, json.dumps(modulename_classname))
@dataclass(frozen=True)
class ParseKubeManifestRequest(EngineAwareParameter):
file: FileEntry
def debug_hint(self) -> str | None:
return self.file.path
def metadata(self) -> dict[str, Any] | None:
return {"file": self.file}
@dataclass(frozen=True)
class ParsedImageRefEntry:
document_index: int
path: YamlPath
unparsed_image_ref: str
@dataclass(frozen=True)
class ParsedKubeManifest(EngineAwareReturnType):
filename: str
found_image_refs: tuple[ParsedImageRefEntry, ...]
def level(self) -> LogLevel | None:
return LogLevel.DEBUG
def message(self) -> str | None:
return f"Found {pluralize(len(self.found_image_refs), 'image reference')} in file {self.filename}"
def metadata(self) -> dict[str, Any] | None:
return {
"filename": self.filename,
"found_image_refs": self.found_image_refs,
}
@rule(desc="Parse Kubernetes resource manifest")
async def parse_kube_manifest(
request: ParseKubeManifestRequest, tool: _HelmKubeParserTool
) -> ParsedKubeManifest:
file_digest = await create_digest(CreateDigest([request.file]))
result = await execute_process(
**implicitly(
VenvPexProcess(
tool.pex,
argv=[request.file.path, tool.crd],
input_digest=file_digest,
description=f"Analyzing Kubernetes manifest {request.file.path}",
level=LogLevel.DEBUG,
)
)
)
if result.exit_code == 0:
output = result.stdout.decode("utf-8").splitlines()
image_refs: list[ParsedImageRefEntry] = []
for line in output:
parts = line.split(",")
if len(parts) != 3:
raise Exception(
softwrap(
f"""Unexpected output from k8s parser when parsing file {request.file.path}:
{line}
"""
)
)
image_refs.append(
ParsedImageRefEntry(
document_index=int(parts[0]),
path=YamlPath.parse(parts[1]),
unparsed_image_ref=parts[2],
)
)
return ParsedKubeManifest(filename=request.file.path, found_image_refs=tuple(image_refs))
else:
parser_error = result.stderr.decode("utf-8")
raise Exception(
softwrap(
f"""
Could not parse Kubernetes manifests in file: {request.file.path}.
{parser_error}
"""
)
)
def rules():
return [
*collect_rules(),
*pex.rules(),
]