Coverage for src / c41811 / config / validators.py: 100%

282 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-06 06:04 +0000

1# cython: language_level = 3 # noqa: ERA001 

2 

3 

4"""配置验证器""" 

5 

6import dataclasses 

7import re 

8import types 

9import warnings 

10from collections import OrderedDict 

11from collections.abc import Callable 

12from collections.abc import Iterable 

13from collections.abc import Mapping 

14from contextlib import suppress 

15from copy import deepcopy 

16from dataclasses import dataclass 

17from enum import Enum 

18from typing import Any 

19from typing import NamedTuple 

20from typing import Never 

21from typing import TypeAliasType 

22from typing import cast 

23from typing import overload 

24from typing import override 

25 

26from pydantic import BaseModel 

27from pydantic import ConfigDict 

28from pydantic import ValidationError 

29from pydantic import create_model 

30 

31# noinspection PyProtectedMember 

32from pydantic.fields import FieldInfo 

33from pydantic_core import core_schema 

34 

35from .abc import ABCIndexedConfigData 

36from .abc import ABCPath 

37from .basic.component import ComponentConfigData 

38from .basic.mapping import MappingConfigData 

39from .basic.object import NoneConfigData 

40from .errors import ComponentMemberMismatchError 

41from .errors import ConfigDataTypeError 

42from .errors import ConfigOperate 

43from .errors import KeyInfo 

44from .errors import RequiredPathNotFoundError 

45from .errors import UnknownErrorDuringValidateError 

46from .path import AttrKey 

47from .path import IndexKey 

48from .path import Path 

49from .utils import Ref 

50from .utils import Unset 

51from .utils import UnsetType 

52from .utils import singleton 

53 

54 

55class ValidatorTypes(Enum): 

56 """验证器类型""" 

57 

58 DEFAULT = None 

59 CUSTOM = "custom" 

60 """ 

61 .. versionchanged:: 0.2.0 

62 重命名 ``IGNORE`` 为 ``NO_VALIDATION`` 

63 

64 .. versionchanged:: 0.3.0 

65 重命名 ``NO_VALIDATION`` 为 ``CUSTOM`` 

66 """ 

67 PYDANTIC = "pydantic" 

68 COMPONENT = "component" 

69 """ 

70 .. versionadded:: 0.2.0 

71 """ 

72 

73 

74@dataclass(kw_only=True) 

75class ValidatorOptions: 

76 # noinspection GrazieInspection 

77 """ 

78 验证器选项 

79 

80 .. versionchanged:: 0.3.0 

81 重命名 ``ValidatorFactoryConfig`` 为 ``ValidatorOptions`` 

82 """ 

83 

84 allow_modify: bool = True 

85 """ 

86 是否允许在填充默认值时同步填充源数据 

87 

88 .. versionchanged:: 0.1.2 

89 重命名 ``allow_create`` 为 ``allow_modify`` 

90 

91 .. versionchanged:: 0.2.0 

92 现在默认为 :py:const:`True` 

93 """ 

94 skip_missing: bool = False 

95 """ 

96 是否忽略不存在的路径 

97 

98 .. versionchanged:: 0.2.0 

99 重命名 ``ignore_missing`` 为 ``skip_missing`` 

100 """ 

101 

102 extra: dict[str, Any] = dataclasses.field(default_factory=dict) 

103 

104 

105type MCD = MappingConfigData[Any] 

106type ICD = ABCIndexedConfigData[Any] 

107 

108 

109# noinspection PyNewStyleGenericSyntax 

110def _remove_skip_missing[D: dict[str, Any] | list[Any]](data: D) -> D: 

111 """ 

112 递归删除值为 :py:const:`SkipMissing` 的项 

113 

114 :param data: 配置数据 

115 :type data: dict | list 

116 

117 .. versionadded:: 0.3.0 

118 """ 

119 if isinstance(data, dict): 

120 return type(data)((k, _remove_skip_missing(v)) for k, v in data.items() if v is not SkipMissing) 

121 if isinstance(data, list): 

122 return type(data)(_remove_skip_missing(item) for item in data if item is not SkipMissing) 

123 return data 

124 

125 

126def _process_pydantic_exceptions(err: ValidationError) -> Exception: 

127 """ 

128 转换包装 pydantic 的异常 

129 

130 :param err: pydantic 的异常 

131 :type err: ValidationError 

132 

133 :return: 转换后的异常 

134 :rtype: Exception 

135 """ 

136 e = err.errors()[0] 

137 

138 locate = list(e["loc"]) 

139 locate_keys: list[AttrKey | IndexKey] = [] 

140 for key in locate: 

141 if isinstance(key, str): 

142 locate_keys.append(AttrKey(key)) 

143 elif isinstance(key, int): 

144 locate_keys.append(IndexKey(key)) 

145 else: # pragma: no cover 

146 msg = "Cannot convert pydantic index to string" 

147 raise UnknownErrorDuringValidateError(msg) from err 

148 

149 kwargs: dict[str, Any] = { 

150 "key_info": KeyInfo(path=Path(locate_keys), current_key=locate_keys[-1], index=len(locate_keys) - 1) 

151 } 

152 

153 class ErrInfo(NamedTuple): 

154 err_type: type[Exception] | Callable[..., Exception] 

155 kwargs: dict[str, Any] 

156 

157 err_input = e["input"] 

158 err_msg = e["msg"] 

159 

160 types_kwarg: dict[str, Callable[[], ErrInfo]] = { 

161 "missing": lambda: ErrInfo(RequiredPathNotFoundError, {"operate": ConfigOperate.Read}), 

162 "model_type": lambda: ErrInfo( 

163 ConfigDataTypeError, 

164 { 

165 "required_type": ( 

166 Never if (match := re.match(r"Input should be (.*)", err_msg)) is None else match.group(1) 

167 ), 

168 "current_type": type(err_input), 

169 }, 

170 ), 

171 "int_type": lambda: ErrInfo(ConfigDataTypeError, {"required_type": int, "current_type": type(err_input)}), 

172 "int_parsing": lambda: ErrInfo(ConfigDataTypeError, {"required_type": int, "current_type": type(err_input)}), 

173 "string_type": lambda: ErrInfo(ConfigDataTypeError, {"required_type": str, "current_type": type(err_input)}), 

174 "dict_type": lambda: ErrInfo(ConfigDataTypeError, {"required_type": dict, "current_type": type(err_input)}), 

175 "literal_error": lambda: ErrInfo(RequiredPathNotFoundError, {"operate": ConfigOperate.Write}), 

176 } 

177 

178 err_type_processor = types_kwarg.get(e["type"]) 

179 if err_type_processor is None: # pragma: no cover 

180 raise UnknownErrorDuringValidateError(**kwargs, error=e) from err 

181 err_info = err_type_processor() 

182 return err_info.err_type(**(kwargs | err_info.kwargs)) 

183 

184 

185@singleton 

186class SkipMissingType: 

187 """ 

188 用于表明值可以缺失特殊值 

189 

190 .. versionchanged:: 0.2.0 

191 重命名 ``IgnoreMissingType`` 为 ``SkipMissingType`` 

192 """ 

193 

194 @override 

195 def __str__(self) -> str: 

196 return "<SkipMissing>" 

197 

198 @staticmethod 

199 def __get_pydantic_core_schema__(*_: Any) -> core_schema.ChainSchema: 

200 # 构造一个永远无法匹配的schema使 `SkipMissing | int` 可以正常工作 即被pydantic视为 `int` 

201 return core_schema.chain_schema([core_schema.none_schema(), core_schema.is_subclass_schema(type)]) 

202 

203 

204SkipMissing = SkipMissingType() 

205type AnyTypeHint = type | types.UnionType | types.EllipsisType | types.GenericAlias | TypeAliasType 

206 

207 

208@dataclass(init=False) 

209class FieldDefinition[T: AnyTypeHint]: 

210 """ 

211 字段定义,包含类型注解和默认值 

212 

213 .. versionchanged:: 0.1.4 

214 新增 ``allow_recursive`` 字段 

215 

216 .. versionchanged:: 0.3.0 

217 新增对 :py:class:`TypeAliasType` 支持 

218 """ # noqa: RUF002 

219 

220 @overload 

221 def __init__(self, annotation: T, default: Any, *, allow_recursive: bool = True): ... 

222 

223 @overload 

224 def __init__(self, annotation: T, *, default_factory: Callable[[], Any], allow_recursive: bool = True): ... 

225 

226 def __init__( 

227 self, 

228 annotation: T, 

229 default: Any = Unset, 

230 *, 

231 default_factory: Callable[[], Any] | UnsetType = Unset, 

232 allow_recursive: bool = True, 

233 ): 

234 # noinspection GrazieInspection 

235 """ 

236 :param annotation: 用于类型检查的类型 

237 :type annotation: T 

238 :param default: 字段默认值 

239 :type default: Any 

240 :param default_factory: 字段默认值工厂 

241 :type default_factory: Callable[[], Any] | UnsetType 

242 :param allow_recursive: 是否允许递归处理字段值 

243 :type allow_recursive: bool 

244 

245 .. versionchanged:: 0.2.0 

246 重命名参数 ``type_`` 为 ``value`` 

247 

248 重命名参数 ``annotation`` 为 ``default`` 

249 

250 添加参数 ``default_factory`` 

251 """ # noqa: D205 

252 kwargs: dict[str, Any] = {} 

253 if default is not Unset: 

254 kwargs["default"] = default 

255 if default_factory is not Unset: 

256 kwargs["default_factory"] = default_factory 

257 

258 if len(kwargs) != 1: 

259 msg = "take one of arguments 'default' or 'default_factory'" 

260 raise ValueError(msg) 

261 

262 value = default 

263 if not isinstance(default, FieldInfo): 

264 value = FieldInfo(**kwargs) 

265 

266 self.annotation = annotation 

267 self.value = value 

268 self.allow_recursive = allow_recursive 

269 

270 annotation: T 

271 """ 

272 用于类型检查的类型 

273 """ 

274 value: FieldInfo 

275 """ 

276 字段值 

277 """ 

278 allow_recursive: bool 

279 """ 

280 是否允许递归处理字段值 

281 

282 .. versionadded:: 0.1.4 

283 """ 

284 

285 

286class MappingType(BaseModel): 

287 value: type[Mapping] # type: ignore[type-arg] 

288 

289 

290class NestedMapping(BaseModel): 

291 value: Mapping[str, Any] 

292 

293 

294def _is_mapping(typ: Any) -> bool: 

295 """ 

296 判断是否为 :py:class`~collections.abc.Mapping` 类型 

297 

298 :param typ: 待检测类型 

299 :type typ: Any 

300 

301 :return: 是否为 :py:class`~collections.abc.Mapping` 类型 

302 :rtype: bool 

303 """ 

304 if typ is Any: 

305 return True 

306 try: 

307 MappingType(value=typ) 

308 except (ValidationError, TypeError): 

309 return False 

310 return True 

311 

312 

313def _allow_recursive(typ: Any) -> bool: 

314 """ 

315 判断是否允许递归处理字段值(键全为字符串则视为允许) 

316 

317 :param typ: 待检测值 

318 :type typ: Any 

319 

320 :return: 是否允许递归处理字段值 

321 :rtype: bool 

322 """ # noqa: RUF002 

323 try: 

324 NestedMapping(value=typ) 

325 except (ValidationError, TypeError): 

326 return False 

327 return True 

328 

329 

330def _check_overwriting_exists_path( 

331 key: str, value: Any, fmt_data: MappingConfigData[Any], typehint_types: tuple[type, ...] 

332) -> bool: 

333 """ 

334 检查是否覆盖了验证器已存在的路径 

335 

336 :param key: 路径 

337 :type key: str 

338 :param value: 新值 

339 :type value: Any 

340 :param fmt_data: 验证器 

341 :type fmt_data: MappingConfigData[Any] 

342 :param typehint_types: 类型提示类型 

343 :type typehint_types: tuple[type, ...] 

344 

345 .. versionadded:: 0.3.0 

346 """ 

347 # 如果传入了任意路径的父路径 

348 if key not in fmt_data: 

349 return False 

350 

351 # 那就检查新值和旧值是否都为Mapping子类或Any 

352 target_value = fmt_data.retrieve(key) 

353 if not issubclass(type(target_value), typehint_types): 

354 target_value = type(target_value) 

355 

356 # 如果是那就把父路径直接加入parent_set不进行后续操作 

357 if _is_mapping(value) and _is_mapping(target_value): 

358 return True 

359 

360 # 否则发出警告提示意外地复写验证器路径 

361 warnings.warn( 

362 f"Overwriting exists validator path with unexpected type '{value}'(new) and '{target_value}'(exists)", 

363 stacklevel=2, 

364 ) 

365 return False 

366 

367 

368@overload 

369def _convert2definition[D: FieldDefinition[Any]](value: D, typehint_types: tuple[type, ...]) -> D: ... 

370 

371 

372@overload 

373def _convert2definition(value: FieldInfo, typehint_types: tuple[type, ...]) -> FieldDefinition[Any]: ... 

374 

375 

376@overload 

377def _convert2definition[T: AnyTypeHint](value: T, typehint_types: tuple[type, ...]) -> FieldDefinition[T]: ... 

378 

379 

380def _convert2definition(value: Any, typehint_types: tuple[type, ...]) -> FieldDefinition[Any]: 

381 """ 

382 将键值换为字段定义 

383 

384 :param value: 键 

385 :type value: Any 

386 :param typehint_types: 类型提示类型 

387 :type typehint_types: tuple[type, ...] 

388 

389 .. versionadded:: 0.3.0 

390 """ 

391 # foo = FieldInfo() # noqa: ERA001 

392 if isinstance(value, FieldInfo): 

393 # foo: FieldInfo().annotation = FieldInfo() # noqa: ERA001 

394 return FieldDefinition(value.annotation, value) 

395 # foo: int # noqa: ERA001 

396 # 如果是仅类型就填上空值 

397 if issubclass(type(value), typehint_types): 

398 # foo: int = FieldInfo() # noqa: ERA001 

399 return FieldDefinition(value, FieldInfo()) 

400 # foo = FieldDefinition(int, FieldInfo()) # noqa: ERA001 

401 # 已经是处理好的字段定义不需要特殊处理 

402 if isinstance(value, FieldDefinition): 

403 return value 

404 # foo = 1 # noqa: ERA001 

405 # 如果是仅默认值就补上类型 

406 # foo: int = 1 # noqa: ERA001 

407 return FieldDefinition(type(value), FieldInfo(default=value)) 

408 

409 

410class DefaultValidatorFactory[D: MCD]: 

411 """默认的验证器工厂""" 

412 

413 def __init__(self, validator: Iterable[str] | Mapping[str, Any], validator_options: ValidatorOptions): 

414 # noinspection GrazieInspection 

415 """ 

416 :param validator: 用于生成验证器的数据 

417 :type validator: Iterable[str] | Mapping[str, Any] 

418 :param validator_options: 验证器选项 

419 :type validator_options: ValidatorOptions 

420 

421 额外验证器选项 

422 ----------------------- 

423 .. list-table:: 

424 :widths: auto 

425 

426 * - 键名 

427 - 描述 

428 - 默认值 

429 - 类型 

430 * - model_config_key 

431 - 内部编译 :py:mod:`pydantic` 的 :py:class:`~pydantic.main.BaseModel` 

432 时,模型配置是以嵌套字典的形式存储的,因此请确保此参数不与任何其中子模型名冲突 

433 - ".__model_config__" 

434 - Any 

435 

436 .. versionchanged:: 0.1.2 

437 支持验证器混搭路径字符串和嵌套字典 

438 

439 .. versionchanged:: 0.1.4 

440 支持验证器非字符串键 (含有非字符串键的子验证器不会被递归处理) 

441 """ # noqa: RUF002, D205 

442 validator = deepcopy(validator) 

443 if isinstance(validator, Mapping): # 先检查Mapping因为Mapping可以是Iterable 

444 ... 

445 elif isinstance(validator, Iterable): 

446 # 预处理为 

447 # k: Any # noqa: ERA001 

448 validator = OrderedDict((k, Any) for k in validator) 

449 else: 

450 msg = f"Invalid validator type '{type(validator).__name__}'" 

451 raise TypeError(msg) 

452 self.validator = validator 

453 self.validator_options = validator_options 

454 

455 self.typehint_types = (type, types.UnionType, types.EllipsisType, types.GenericAlias, TypeAliasType) 

456 self.model_config_key = validator_options.extra.get("model_config_key", ".__model_config__") 

457 self._compile() 

458 self.model: type[BaseModel] 

459 

460 def _fmt_mapping_key(self, validator: Mapping[str, Any]) -> tuple[Mapping[str, Any], set[str | ABCPath[Any]]]: 

461 # noinspection GrazieInspection 

462 """ 

463 格式化验证器键 

464 

465 :param validator: Mapping验证器 

466 :type validator: Mapping[str, Any] 

467 

468 :return: 格式化后的映射键和被覆盖的Mapping父路径 

469 :rtype: tuple[Mapping[str, Any], set[str | ABCPath[Any]]] 

470 

471 .. versionchanged:: 0.3.0 

472 拆分覆盖检查到函数 :py:func:`_check_overwriting_exists_path` 

473 """ 

474 iterator = iter(validator.items()) 

475 key: str = None # type: ignore[assignment] 

476 value: Any = None 

477 

478 def _next() -> bool: 

479 """ 

480 获取下一个键值对 

481 

482 :return: 是否耗尽迭代器 

483 :rtype: bool 

484 """ 

485 nonlocal key, value 

486 with suppress(StopIteration): 

487 key, value = next(iterator) 

488 return False 

489 return True 

490 

491 # 如果为空则提前返回 

492 if _next(): 

493 return {}, set() 

494 

495 fmt_data: MappingConfigData[OrderedDict[str, Any]] = MappingConfigData(OrderedDict()) 

496 parent_set: set[str | ABCPath[Any]] = set() 

497 while True: 

498 # 如果传入了任意路径的父路径那就检查新值和旧值是否都为Mapping子类或Any 

499 # 如果是那就把父路径直接加入parent_set不进行后续操作 

500 if _check_overwriting_exists_path(key, value, fmt_data, self.typehint_types): 

501 parent_set.add(key) 

502 if _next(): # 更新键值对 

503 break 

504 continue 

505 

506 # 如果可以递归处理字段值那就递归处理 

507 if _allow_recursive(value): 

508 value, inner_path_set = self._fmt_mapping_key(value) 

509 parent_set.update(f"{key}\\.{inner_path}" for inner_path in inner_path_set) 

510 

511 # 记录该键值对 

512 try: 

513 fmt_data.modify(key, value) 

514 except ConfigDataTypeError as err: # 如果任意父路径不为Mapping 

515 relative_path = Path(err.key_info.relative_keys) 

516 # 如果旧类型为Mapping子类或Any那么就允许新的键创建 

517 if not _is_mapping(fmt_data.retrieve(relative_path)): 

518 raise err from None 

519 fmt_data.modify(relative_path, OrderedDict()) 

520 parent_set.add(relative_path) 

521 fmt_data.modify(key, value) # 再次记录该键值对 

522 

523 # 获取下一个键值对 

524 if _next(): 

525 break 

526 

527 return fmt_data.data, parent_set 

528 

529 def _mapping2model(self, mapping: Mapping[str, Any], model_config: dict[str, Any]) -> type[BaseModel]: 

530 """ 

531 将Mapping转换为Model 

532 

533 :param mapping: 需要转换的Mapping 

534 :type mapping: Mapping[str, Any] 

535 

536 :return: 转换后的Model 

537 :rtype: type[BaseModel] 

538 

539 .. versionchanged:: 0.3.0 

540 拆分字段定义转换到函数 :py:func:`_convert2definition` 

541 """ 

542 fmt_data: OrderedDict[str, Any] = OrderedDict() 

543 for key, value in mapping.items(): 

544 # 将键值对转换为字段定义 

545 definition = _convert2definition(value, self.typehint_types) 

546 

547 # 递归处理Mapping值 

548 if all( 

549 ( 

550 definition.allow_recursive, 

551 _allow_recursive(definition.value.default), 

552 # foo.bar = {} # noqa: ERA001 

553 # 这种情况下不进行递归解析 即捕获所有键(foo.bar.*)如果进行了解析就会忽略所有内容即返回foo.bar={} 

554 definition.value.default, 

555 ) 

556 ): 

557 model_cls = self._mapping2model( 

558 mapping=definition.value.default, model_config=model_config.get(key, {}) 

559 ) 

560 definition = FieldDefinition(model_cls, FieldInfo(default_factory=model_cls)) 

561 

562 # 如果忽略不存在的键则填充特殊值 

563 if all((self.validator_options.skip_missing, definition.value.is_required())): 

564 definition = FieldDefinition(definition.annotation | SkipMissingType, FieldInfo(default=SkipMissing)) 

565 

566 fmt_data[key] = (definition.annotation, definition.value) 

567 

568 # 创建验证模型 

569 # noinspection PyInvalidCast 

570 return create_model( 

571 f"{type(self).__name__}.RuntimeTemplate", 

572 __config__=cast(ConfigDict, model_config.get(self.model_config_key, {})), 

573 **fmt_data, 

574 ) 

575 

576 def _compile(self) -> None: 

577 """编译模板""" 

578 fmt_validator, parent_set = self._fmt_mapping_key(self.validator) 

579 # 所有重复存在的父路径都将允许其下存在多余的键 

580 model_config: MCD = MappingConfigData() 

581 for path in parent_set: 

582 model_config.modify(path, {self.model_config_key: {"extra": "allow"}}) 

583 

584 self.model = self._mapping2model(fmt_validator, model_config.data) 

585 

586 # noinspection PyTypeHints 

587 def __call__(self, config_ref: Ref[D | NoneConfigData]) -> D: 

588 """ 

589 验证配置数据 

590 

591 :param config_ref: 配置数据引用 

592 :type config_ref: Ref[D | NoneConfigData] 

593 

594 :return: 验证后的配置数据 

595 :rtype: D 

596 """ 

597 if isinstance(config_ref.value, NoneConfigData): 

598 config_ref.value = MappingConfigData() # type: ignore[assignment] 

599 data: D = config_ref.value # type: ignore[assignment] 

600 

601 try: 

602 dict_obj = self.model(**data.data).model_dump() 

603 except ValidationError as err: 

604 raise _process_pydantic_exceptions(err) from err 

605 

606 # 处理 SkipMissing 项 

607 if self.validator_options.skip_missing: 

608 dict_obj = _remove_skip_missing(dict_obj) 

609 

610 # 完全替换原始数据 

611 if self.validator_options.allow_modify: 

612 data._data = dict_obj # noqa: SLF001 

613 return data 

614 return data.from_data(dict_obj) 

615 

616 

617# noinspection PyTypeHints 

618def pydantic_validator[D: MCD]( 

619 validator: type[BaseModel], cfg: ValidatorOptions 

620) -> Callable[[Ref[D | NoneConfigData]], D]: 

621 """ 

622 验证器选项 ``skip_missing`` 无效 

623 

624 :param validator: :py:class:`~pydantic.main.BaseModel` 的子类 

625 :type validator: type[BaseModel] 

626 :param cfg: 验证器选项 

627 :type cfg: ValidatorOptions 

628 

629 :return: 验证器 

630 :rtype: Callable[[Ref[D | NoneConfigData]], D] 

631 """ 

632 if not issubclass(validator, BaseModel): 

633 msg = f"Expected a subclass of BaseModel for parameter 'validator', but got '{validator.__name__}'" 

634 raise TypeError(msg) 

635 if cfg.skip_missing: 

636 warnings.warn("skip_missing is not supported in pydantic validator", stacklevel=2) 

637 

638 # noinspection PyTypeHints 

639 def _builder(config_ref: Ref[D | NoneConfigData]) -> D: 

640 """ 

641 验证配置数据 

642 

643 :param config_ref: 配置数据引用 

644 :type config_ref: Ref[D | NoneConfigData] 

645 

646 :return: 验证后的配置数据 

647 :rtype: D 

648 """ 

649 if isinstance(config_ref.value, NoneConfigData): 

650 config_ref.value = MappingConfigData() # type: ignore[assignment] 

651 data: D = config_ref.value # type: ignore[assignment] 

652 

653 try: 

654 dict_obj = validator(**data).model_dump() 

655 except ValidationError as err: 

656 raise _process_pydantic_exceptions(err) from err 

657 

658 # 完全替换原始数据 

659 if cfg.allow_modify: 

660 data._data = dict_obj # noqa: SLF001 

661 return data 

662 return data.from_data(dict_obj) 

663 

664 return _builder 

665 

666 

667class ComponentValidatorFactory[D: ComponentConfigData[Any, Any]]: 

668 """ 

669 组件验证器工厂 

670 

671 .. versionadded:: 0.2.0 

672 """ 

673 

674 def __init__(self, validator: Mapping[str | None, Callable[[Ref[ICD]], ICD]], validator_options: ValidatorOptions): 

675 """ 

676 :param validator: 组件验证器 

677 :type validator: Mapping[str | None, Callable[[Ref[ICD]], ICD]] 

678 :param validator_options: 验证器选项 

679 :type validator_options: ValidatorOptions 

680 

681 额外验证器选项 

682 ----------------------- 

683 

684 .. list-table:: 

685 :widths: auto 

686 

687 * - 键名 

688 - 描述 

689 - 默认值 

690 - 类型 

691 * - allow_initialize 

692 - 是否允许初始化不存在的组件成员(注意! 现在的实现方式会强制初始化成员为 :py:class:`MappingConfigData`) 

693 - True 

694 - bool 

695 * - meta_validator 

696 - 组件元数据验证器 

697 - 尝试从传入的组件元数据获得,若不存在(值为None)则放弃验证 

698 - Callable[[ComponentMeta, ValidatorOptions], ComponentMeta] 

699 

700 .. versionchanged:: 0.3.0 

701 更改参数 ``validator`` 类型为 ``Mapping[str | None, Callable[[Ref[ICD]], ICD]]`` 

702 并移除因此冗余的移除额外验证器选项 ``validator_factory`` 

703 """ # noqa: RUF002, D205 

704 self.validator_options = validator_options 

705 self.validators = validator 

706 

707 def _validate_member_metadata(self, component_data: D) -> dict[str, ICD]: 

708 """ 

709 验证组件成员元数据 

710 

711 :param component_data: 组件数据 

712 :type component_data: D 

713 

714 :return: 验证后的组件数据 

715 :rtype: dict[str | None, ICD] 

716 """ 

717 validated_members: dict[str, ICD] = {} 

718 for member, validator in self.validators.items(): 

719 if member is None: 

720 continue 

721 

722 member_not_exists = member not in component_data 

723 member_data_ref: Ref[ICD] 

724 if member_not_exists and self.validator_options.extra.get("allow_initialize", True): 

725 member_data_ref = Ref(MappingConfigData()) 

726 if self.validator_options.allow_modify: 

727 component_data[member] = member_data_ref.value 

728 elif member_not_exists: 

729 raise ComponentMemberMismatchError(missing={member}, redundant=set()) 

730 else: 

731 member_data_ref = Ref(component_data[member]) 

732 validated_member = validator(member_data_ref) 

733 validated_members[member] = validated_member 

734 

735 # 完全替换成员数据 

736 if self.validator_options.allow_modify: 

737 component_data[member] = validated_member 

738 return validated_members 

739 

740 def __call__(self, config_ref: Ref[D | NoneConfigData]) -> D: 

741 """ 

742 验证配置数据 

743 

744 :param config_ref: 配置数据引用 

745 :type config_ref: Ref[D | NoneConfigData] 

746 

747 :return: 验证后的配置数据 

748 :rtype: D 

749 

750 .. versionchanged:: 0.3.0 

751 拆分验证成员元数据到方法 :py:meth:`_validate_member_metadata` 

752 """ 

753 if isinstance(config_ref.value, NoneConfigData): 

754 config_ref.value = ComponentConfigData() # type: ignore[assignment] 

755 

756 component_ref: Ref[D] = config_ref # type: ignore[assignment] 

757 component_data = component_ref.value 

758 

759 validated_members: dict[str, ICD] = self._validate_member_metadata(component_data) 

760 

761 meta = deepcopy(component_data.meta) 

762 if None in self.validators: 

763 meta.config = self.validators[None](Ref(meta.config)) 

764 

765 # noinspection PyUnresolvedReferences 

766 meta_validator = None if meta.parser is None else meta.parser.validator 

767 meta_validator = self.validator_options.extra.get("meta_validator", meta_validator) 

768 if meta_validator is not None: 

769 meta = meta_validator(meta, self.validator_options) 

770 

771 # 完全替换元数据 

772 if self.validator_options.allow_modify: 

773 component_data._meta = meta # noqa: SLF001 

774 

775 return component_data.from_data(meta, validated_members) 

776 

777 

778__all__ = ( 

779 "ComponentValidatorFactory", 

780 "DefaultValidatorFactory", 

781 "FieldDefinition", 

782 "ValidatorOptions", 

783 "ValidatorTypes", 

784 "pydantic_validator", 

785)