-
-
Notifications
You must be signed in to change notification settings - Fork 737
Expand file tree
/
Copy pathargs.py
More file actions
554 lines (467 loc) · 16.7 KB
/
args.py
File metadata and controls
554 lines (467 loc) · 16.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
import argparse
import json
import os
import sys
import urllib.error
import urllib.parse
from argparse import ArgumentParser, Namespace
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Self
from urllib.request import Request, urlopen
from pydantic.dataclasses import dataclass as p_dataclass
from archinstall.lib.crypt import decrypt
from archinstall.lib.menu.util import get_password
from archinstall.lib.models.application import ApplicationConfiguration, ZramConfiguration
from archinstall.lib.models.authentication import AuthenticationConfiguration
from archinstall.lib.models.bootloader import Bootloader, BootloaderConfiguration
from archinstall.lib.models.device import DiskEncryption, DiskLayoutConfiguration
from archinstall.lib.models.locale import LocaleConfiguration
from archinstall.lib.models.mirrors import MirrorConfiguration
from archinstall.lib.models.network import NetworkConfiguration
from archinstall.lib.models.packages import Repository
from archinstall.lib.models.profile import ProfileConfiguration
from archinstall.lib.models.users import Password, User, UserSerialization
from archinstall.lib.output import debug, error, logger, warn
from archinstall.lib.plugins import load_plugin
from archinstall.lib.translationhandler import Language, tr, translation_handler
from archinstall.lib.version import get_version
from archinstall.tui.ui.components import tui
@p_dataclass
class Arguments:
config: Path | None = None
config_url: str | None = None
creds: Path | None = None
creds_url: str | None = None
creds_decryption_key: str | None = None
silent: bool = False
dry_run: bool = False
script: str | None = None
mountpoint: Path = Path('/mnt')
skip_ntp: bool = False
skip_wkd: bool = False
skip_boot: bool = False
debug: bool = False
offline: bool = False
no_pkg_lookups: bool = False
plugin: str | None = None
skip_version_check: bool = False
skip_wifi_check: bool = False
advanced: bool = False
verbose: bool = False
@dataclass
class ArchConfig:
version: str | None = None
script: str | None = None
locale_config: LocaleConfiguration | None = None
archinstall_language: Language = field(default_factory=lambda: translation_handler.get_language_by_abbr('en'))
disk_config: DiskLayoutConfiguration | None = None
profile_config: ProfileConfiguration | None = None
mirror_config: MirrorConfiguration | None = None
network_config: NetworkConfiguration | None = None
bootloader_config: BootloaderConfiguration | None = None
app_config: ApplicationConfiguration | None = None
auth_config: AuthenticationConfiguration | None = None
swap: ZramConfiguration | None = None
hostname: str = 'archlinux'
kernels: list[str] = field(default_factory=lambda: ['linux'])
ntp: bool = True
packages: list[str] = field(default_factory=list)
parallel_downloads: int = 0
timezone: str = 'UTC'
services: list[str] = field(default_factory=list)
custom_commands: list[str] = field(default_factory=list)
def unsafe_config(self) -> dict[str, Any]:
config: dict[str, list[UserSerialization] | str | None] = {}
if self.auth_config:
if self.auth_config.users:
config['users'] = [user.json() for user in self.auth_config.users]
if self.auth_config.root_enc_password:
config['root_enc_password'] = self.auth_config.root_enc_password.enc_password
if self.disk_config:
disk_encryption = self.disk_config.disk_encryption
if disk_encryption and disk_encryption.encryption_password:
config['encryption_password'] = disk_encryption.encryption_password.plaintext
return config
def safe_config(self) -> dict[str, Any]:
config: Any = {
'version': self.version,
'script': self.script,
'archinstall-language': self.archinstall_language.json(),
'hostname': self.hostname,
'kernels': self.kernels,
'ntp': self.ntp,
'packages': self.packages,
'parallel_downloads': self.parallel_downloads,
'swap': self.swap,
'timezone': self.timezone,
'services': self.services,
'custom_commands': self.custom_commands,
'bootloader_config': self.bootloader_config.json() if self.bootloader_config else None,
'app_config': self.app_config.json() if self.app_config else None,
'auth_config': self.auth_config.json() if self.auth_config else None,
}
if self.locale_config:
config['locale_config'] = self.locale_config.json()
if self.disk_config:
config['disk_config'] = self.disk_config.json()
if self.profile_config:
config['profile_config'] = self.profile_config.json()
if self.mirror_config:
config['mirror_config'] = self.mirror_config.json()
if self.network_config:
config['network_config'] = self.network_config.json()
return config
@classmethod
def from_config(cls, args_config: dict[str, Any], args: Arguments) -> Self:
arch_config = cls()
arch_config.locale_config = LocaleConfiguration.parse_arg(args_config)
if script := args_config.get('script', None):
arch_config.script = script
if archinstall_lang := args_config.get('archinstall-language', None):
arch_config.archinstall_language = translation_handler.get_language_by_name(archinstall_lang)
translation_handler.activate(arch_config.archinstall_language)
if disk_config := args_config.get('disk_config', {}):
enc_password = args_config.get('encryption_password', '')
password = Password(plaintext=enc_password) if enc_password else None
arch_config.disk_config = DiskLayoutConfiguration.parse_arg(disk_config, password)
# DEPRECATED
# backwards compatibility for main level disk_encryption entry
disk_encryption: DiskEncryption | None = None
if args_config.get('disk_encryption', None) is not None and arch_config.disk_config is not None:
disk_encryption = DiskEncryption.parse_arg(
arch_config.disk_config,
args_config['disk_encryption'],
Password(plaintext=args_config.get('encryption_password', '')),
)
if disk_encryption:
arch_config.disk_config.disk_encryption = disk_encryption
if profile_config := args_config.get('profile_config', None):
arch_config.profile_config = ProfileConfiguration.parse_arg(profile_config)
if mirror_config := args_config.get('mirror_config', None):
backwards_compatible_repo = []
if additional_repositories := args_config.get('additional-repositories', []):
backwards_compatible_repo = [Repository(r) for r in additional_repositories]
arch_config.mirror_config = MirrorConfiguration.parse_args(
mirror_config,
backwards_compatible_repo,
)
if net_config := args_config.get('network_config', None):
arch_config.network_config = NetworkConfiguration.parse_arg(net_config)
if bootloader_config_dict := args_config.get('bootloader_config', None):
arch_config.bootloader_config = BootloaderConfiguration.parse_arg(bootloader_config_dict, args.skip_boot)
# DEPRECATED: separate bootloader and uki fields (backward compatibility)
elif bootloader_str := args_config.get('bootloader', None):
bootloader = Bootloader.from_arg(bootloader_str, args.skip_boot)
uki = args_config.get('uki', False)
if uki and not bootloader.has_uki_support():
uki = False
arch_config.bootloader_config = BootloaderConfiguration(bootloader=bootloader, uki=uki, removable=True)
# deprecated: backwards compatibility
audio_config_args = args_config.get('audio_config', None)
app_config_args = args_config.get('app_config', None)
if audio_config_args is not None or app_config_args is not None:
arch_config.app_config = ApplicationConfiguration.parse_arg(app_config_args, audio_config_args)
if auth_config_args := args_config.get('auth_config', None):
arch_config.auth_config = AuthenticationConfiguration.parse_arg(auth_config_args)
if hostname := args_config.get('hostname', ''):
arch_config.hostname = hostname
if kernels := args_config.get('kernels', []):
arch_config.kernels = kernels
arch_config.ntp = args_config.get('ntp', True)
if packages := args_config.get('packages', []):
arch_config.packages = packages
if parallel_downloads := args_config.get('parallel_downloads', 0):
arch_config.parallel_downloads = parallel_downloads
swap_arg = args_config.get('swap')
if swap_arg is not None:
arch_config.swap = ZramConfiguration.parse_arg(swap_arg)
if timezone := args_config.get('timezone', 'UTC'):
arch_config.timezone = timezone
if services := args_config.get('services', []):
arch_config.services = services
# DEPRECATED: backwards compatibility
root_password = None
if root_password := args_config.get('!root-password', None):
root_password = Password(plaintext=root_password)
if enc_password := args_config.get('root_enc_password', None):
root_password = Password(enc_password=enc_password)
if root_password is not None:
if arch_config.auth_config is None:
arch_config.auth_config = AuthenticationConfiguration()
arch_config.auth_config.root_enc_password = root_password
# DEPRECATED: backwards compatibility
users: list[User] = []
if args_users := args_config.get('!users', None):
users = User.parse_arguments(args_users)
if args_users := args_config.get('users', None):
users = User.parse_arguments(args_users)
if users:
if arch_config.auth_config is None:
arch_config.auth_config = AuthenticationConfiguration()
arch_config.auth_config.users = users
if custom_commands := args_config.get('custom_commands', []):
arch_config.custom_commands = custom_commands
return arch_config
class ArchConfigHandler:
def __init__(self) -> None:
self._parser: ArgumentParser = self._define_arguments()
args: Arguments = self._parse_args()
self._args = args
config = self._parse_config()
try:
self._config = ArchConfig.from_config(config, args)
self._config.version = get_version()
except ValueError as err:
warn(str(err))
sys.exit(1)
@property
def config(self) -> ArchConfig:
return self._config
@property
def args(self) -> Arguments:
return self._args
def get_script(self) -> str:
if script := self.args.script:
return script
if script := self.config.script:
return script
return 'guided'
def print_help(self) -> None:
self._parser.print_help()
def _define_arguments(self) -> ArgumentParser:
parser = ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'-v',
'--version',
action='version',
default=False,
version='%(prog)s ' + get_version(),
)
parser.add_argument(
'--config',
type=Path,
nargs='?',
default=None,
help='JSON configuration file',
)
parser.add_argument(
'--config-url',
type=str,
nargs='?',
default=None,
help='Url to a JSON configuration file',
)
parser.add_argument(
'--creds',
type=Path,
nargs='?',
default=None,
help='JSON credentials configuration file',
)
parser.add_argument(
'--creds-url',
type=str,
nargs='?',
default=None,
help='Url to a JSON credentials configuration file',
)
parser.add_argument(
'--creds-decryption-key',
type=str,
nargs='?',
default=None,
help='Decryption key for credentials file',
)
parser.add_argument(
'--silent',
action='store_true',
default=False,
help='WARNING: Disables all prompts for input and confirmation. If no configuration is provided, this is ignored',
)
parser.add_argument(
'--dry-run',
'--dry_run',
action='store_true',
default=False,
help='Generates a configuration file and then exits instead of performing an installation',
)
parser.add_argument(
'--script',
nargs='?',
help='Script to run for installation',
type=str,
)
parser.add_argument(
'--mountpoint',
type=Path,
nargs='?',
default=Path('/mnt'),
help='Define an alternate mount point for installation',
)
parser.add_argument(
'--skip-ntp',
action='store_true',
help='Disables NTP checks during installation',
default=False,
)
parser.add_argument(
'--skip-wkd',
action='store_true',
help='Disables checking if archlinux keyring wkd sync is complete.',
default=False,
)
parser.add_argument(
'--skip-boot',
action='store_true',
help='Disables installation of a boot loader (note: only use this when problems arise with the boot loader step).',
default=False,
)
parser.add_argument(
'--debug',
action='store_true',
default=False,
help='Adds debug info into the log',
)
parser.add_argument(
'--offline',
action='store_true',
default=False,
help='Disabled online upstream services such as package search and key-ring auto update.',
)
parser.add_argument(
'--no-pkg-lookups',
action='store_true',
default=False,
help='Disabled package validation specifically prior to starting installation.',
)
parser.add_argument(
'--plugin',
nargs='?',
type=str,
default=None,
help='File path to a plugin to load',
)
parser.add_argument(
'--skip-version-check',
action='store_true',
default=False,
help='Skip the version check when running archinstall',
)
parser.add_argument(
'--skip-wifi-check',
action='store_true',
default=False,
help='Skip wifi check when running archinstall',
)
parser.add_argument(
'--advanced',
action='store_true',
default=False,
help='Enabled advanced options',
)
parser.add_argument(
'--verbose',
action='store_true',
default=False,
help='Enabled verbose options',
)
return parser
def _parse_args(self) -> Arguments:
argparse_args = vars(self._parser.parse_args())
args: Arguments = Arguments(**argparse_args)
# amend the parameters (check internal consistency)
# Installation can't be silent if config is not passed
if args.config is None and args.config_url is None:
args.silent = False
if args.debug:
warn(f'Warning: --debug mode will write certain credentials to {logger.path}!')
if args.plugin:
plugin_path = Path(args.plugin)
load_plugin(plugin_path)
if args.creds_decryption_key is None:
if os.environ.get('ARCHINSTALL_CREDS_DECRYPTION_KEY'):
args.creds_decryption_key = os.environ.get('ARCHINSTALL_CREDS_DECRYPTION_KEY')
return args
def _parse_config(self) -> dict[str, Any]:
config: dict[str, Any] = {}
config_data: str | None = None
creds_data: str | None = None
if self._args.config is not None:
config_data = self._read_file(self._args.config)
elif self._args.config_url is not None:
config_data = self._fetch_from_url(self._args.config_url)
if config_data is not None:
config.update(json.loads(config_data))
if self._args.creds is not None:
creds_data = self._read_file(self._args.creds)
elif self._args.creds_url is not None:
creds_data = self._fetch_from_url(self._args.creds_url)
if creds_data is not None:
json_data = self._process_creds_data(creds_data)
if json_data is not None:
config.update(json_data)
config = self._cleanup_config(config)
return config
def _process_creds_data(self, creds_data: str) -> dict[str, Any] | None:
if creds_data.startswith('$'): # encrypted data
if self._args.creds_decryption_key is not None:
try:
creds_data = decrypt(creds_data, self._args.creds_decryption_key)
return json.loads(creds_data)
except ValueError as err:
if 'Invalid password' in str(err):
error(tr('Incorrect credentials file decryption password'))
sys.exit(1)
else:
debug(f'Error decrypting credentials file: {err}')
raise err from err
else:
header = tr('Enter credentials file decryption password')
wrong_pwd_text = tr('Incorrect password')
prompt = header
while True:
decryption_pwd: Password | None = tui.run(
lambda p=prompt: get_password( # type: ignore[misc]
header=p,
allow_skip=False,
no_confirmation=True,
)
)
if not decryption_pwd:
return None
try:
creds_data = decrypt(creds_data, decryption_pwd.plaintext)
break
except ValueError as err:
if 'Invalid password' in str(err):
debug('Incorrect credentials file decryption password')
prompt = f'{header}' + f'\n\n{wrong_pwd_text}'
else:
debug(f'Error decrypting credentials file: {err}')
raise err from err
return json.loads(creds_data)
def _fetch_from_url(self, url: str) -> str:
if urllib.parse.urlparse(url).scheme:
try:
req = Request(url, headers={'User-Agent': 'ArchInstall'})
with urlopen(req) as resp:
return resp.read().decode('utf-8')
except urllib.error.HTTPError as err:
error(f'Could not fetch JSON from {url}: {err}')
else:
error('Not a valid url')
sys.exit(1)
def _read_file(self, path: Path) -> str:
if not path.exists():
error(f'Could not find file {path}')
sys.exit(1)
return path.read_text()
def _cleanup_config(self, config: Namespace | dict[str, Any]) -> dict[str, Any]:
clean_args = {}
for key, val in config.items():
if isinstance(val, dict):
val = self._cleanup_config(val)
if val is not None:
clean_args[key] = val
return clean_args