Coverage for src / c41811 / config / safe_writer.py: 100%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 01:06 +0000

1# cython: language_level = 3 # noqa: ERA001 

2 

3 

4# noinspection SpellCheckingInspection, GrazieInspection 

5""" 

6 从 https://github.com/untitaker/python-atomicwrites 修改而来 

7 

8 .. code-block:: text 

9 :caption: 原始版权声明 

10 

11 Copyright (c) 2015-2016 Markus Unterwaditzer 

12 

13 Permission is hereby granted, free of charge, to any person obtaining a copy of 

14 this software and associated documentation files (the "Software"), to deal in 

15 the Software without restriction, including without limitation the rights to 

16 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

17 of the Software, and to permit persons to whom the Software is furnished to do 

18 so, subject to the following conditions: 

19 

20 The above copyright notice and this permission notice shall be included in all 

21 copies or substantial portions of the Software. 

22 

23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

24 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

25 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

26 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

27 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

28 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

29 SOFTWARE. 

30 

31.. versionadded:: 0.2.0 

32""" 

33 

34import os 

35import shutil 

36import sys 

37import time 

38from abc import ABC 

39from abc import abstractmethod 

40from collections.abc import Generator 

41from contextlib import AbstractContextManager 

42from contextlib import contextmanager 

43from contextlib import suppress 

44from enum import IntEnum 

45from numbers import Real 

46from pathlib import Path 

47from threading import Lock 

48from typing import IO 

49from typing import TYPE_CHECKING 

50from typing import Any 

51from typing import TextIO 

52from typing import cast 

53from typing import overload 

54from typing import override 

55from weakref import WeakValueDictionary 

56 

57import portalocker 

58 

59if TYPE_CHECKING: 

60 from _typeshed import OpenBinaryMode 

61 from _typeshed import OpenTextMode 

62else: 

63 OpenBinaryMode = Any 

64 OpenTextMode = Any 

65 

66try: 

67 import fcntl 

68except ImportError: 

69 # noinspection SpellCheckingInspection 

70 fcntl = None # type: ignore[assignment] 

71 

72try: 

73 from os import fspath 

74except ImportError: 

75 fspath = None # type: ignore[assignment] 

76 

77type PathLike = str | Path 

78type AIO = IO[Any] 

79 

80 

81def _path2str(x: PathLike) -> str: # pragma: no cover 

82 if isinstance(x, Path): 

83 return str(x) 

84 return x 

85 

86 

87_proper_fsync = os.fsync 

88 

89if sys.platform != "win32": # pragma: no cover 

90 # noinspection SpellCheckingInspection 

91 if hasattr(fcntl, "F_FULLFSYNC"): 

92 

93 def _proper_fsync(fd: int) -> None: # type: ignore[misc] 

94 # https://lists.apple.com/archives/darwin-dev/2005/Feb/msg00072.html 

95 # https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man2/fsync.2.html 

96 # https://github.com/untitaker/python-atomicwrites/issues/6 

97 fcntl.fcntl(fd, fcntl.F_FULLFSYNC) # type: ignore[attr-defined] 

98 

99 def _sync_directory(directory: str) -> None: 

100 # Ensure that filenames are written to disk 

101 fd = os.open(directory, 0) 

102 try: 

103 _proper_fsync(fd) 

104 finally: 

105 os.close(fd) 

106 

107 def _replace_atomic(src: PathLike, dst: PathLike) -> None: 

108 os.rename(src, dst) 

109 _sync_directory(os.path.normpath(os.path.dirname(dst))) 

110 

111 def _move_atomic(src: PathLike, dst: PathLike) -> None: 

112 os.link(src, dst) 

113 os.unlink(src) 

114 

115 src_dir = os.path.normpath(os.path.dirname(src)) 

116 dst_dir = os.path.normpath(os.path.dirname(dst)) 

117 _sync_directory(dst_dir) 

118 if src_dir != dst_dir: 

119 _sync_directory(src_dir) 

120else: # pragma: no cover 

121 from ctypes import WinError 

122 from ctypes import windll 

123 

124 _MOVEFILE_REPLACE_EXISTING = 0x1 

125 _MOVEFILE_WRITE_THROUGH = 0x8 

126 _windows_default_flags = _MOVEFILE_WRITE_THROUGH 

127 

128 def _handle_errors(rv: Any) -> None: 

129 if not rv: 

130 raise WinError() 

131 

132 def _replace_atomic(src: PathLike, dst: PathLike) -> None: 

133 _handle_errors( 

134 windll.kernel32.MoveFileExW( 

135 _path2str(src), _path2str(dst), _windows_default_flags | _MOVEFILE_REPLACE_EXISTING 

136 ) 

137 ) 

138 

139 def _move_atomic(src: PathLike, dst: PathLike) -> None: 

140 _handle_errors(windll.kernel32.MoveFileExW(_path2str(src), _path2str(dst), _windows_default_flags)) 

141 

142 

143def replace_atomic(src: PathLike, dst: PathLike) -> None: 

144 """ 

145 移动 ``src`` 到 ``dst`` 

146 

147 如果 ``dst`` 存在,它将被静默覆盖 

148 

149 两个路径必须位于同一个文件系统上,这样操作才能是原子的 

150 

151 :param src: 源 

152 :type src: PathLike 

153 :param dst: 目标 

154 :type dst: PathLike 

155 """ # noqa: RUF002 

156 _replace_atomic(src, dst) 

157 

158 

159def move_atomic(src: PathLike, dst: PathLike) -> None: # pragma: no cover 

160 """ 

161 移动 ``src`` 到 ``dst`` 

162 

163 可能存在两个文件系统条目同时存在的时间窗口 

164 

165 如果 ``dst`` 已经存在,将引发: 

166 py:exc:`FileExistsError` 

167 

168 两个路径必须位于同一个文件系统上,这样操作才能是原子的 

169 

170 :param src: 源 

171 :type src: PathLike 

172 :param dst: 目标 

173 :type dst: PathLike 

174 """ # noqa: RUF002 

175 _move_atomic(src, dst) 

176 

177 

178class ABCTempIOManager[F: AIO](ABC): 

179 """管理临时文件""" 

180 

181 @abstractmethod 

182 def from_file(self, file: F) -> F: 

183 """ 

184 为给定的文件创建一个临时文件 

185 

186 :param file: 文件对象 

187 :type file: F 

188 

189 :return: 临时文件对象 

190 :type file: F 

191 """ 

192 

193 @abstractmethod 

194 def from_path(self, path: Path | str, mode: str) -> F: 

195 """ 

196 为给定的路径创建一个临时文件 

197 

198 :param path: 文件路径 

199 :type path: Path | str 

200 :param mode: 打开模式 

201 :type mode: str 

202 

203 :return: 临时文件对象 

204 :rtype: F 

205 """ 

206 

207 @staticmethod 

208 @abstractmethod 

209 def sync(file: F) -> None: 

210 """ 

211 负责在提交之前清除尽可能多的文件缓存 

212 

213 :param file: 文件对象 

214 :type file: F 

215 """ 

216 

217 @staticmethod 

218 @abstractmethod 

219 def rollback(file: F) -> None: 

220 """ 

221 清理所有临时资源 

222 

223 :param file: 文件对象 

224 :type file: F 

225 """ 

226 

227 @staticmethod 

228 @abstractmethod 

229 def commit(temp_file: F, file: F) -> None: 

230 """ 

231 将临时文件移动到目标位置 

232 

233 :param temp_file: 临时文件对象 

234 :type temp_file: F 

235 :param file: 文件对象 

236 :type file: F 

237 """ 

238 

239 @staticmethod 

240 @abstractmethod 

241 def commit_by_path(temp_file: F, path: PathLike, mode: str) -> None: 

242 """ 

243 将临时文件移动到目标位置 

244 

245 :param temp_file: 临时文件对象 

246 :type temp_file: F 

247 :param path: 文件路径 

248 :type path: PathLike 

249 :param mode: 打开模式 

250 :type mode: str 

251 """ 

252 

253 

254class TempTextIOManager[F: TextIO](ABCTempIOManager[F]): 

255 """管理 ``TextIO`` 对象""" 

256 

257 def __init__(self, prefix: str = "", suffix: str = ".tmp", **open_kwargs: Any): 

258 """ 

259 :param prefix: 临时文件前缀 

260 :type prefix: str 

261 :param suffix: 临时文件后缀 

262 :type suffix: str 

263 :param open_kwargs: 传递给 ``open`` 的额外参数 

264 """ # noqa: D205 

265 self._prefix = prefix 

266 self._suffix = suffix 

267 self._open_kwargs = open_kwargs 

268 

269 @override 

270 def from_file(self, file: F) -> F: # pragma: no cover # 用不上 暂不维护 

271 path, name = os.path.split(cast(TextIO, file).name) 

272 f = open( # noqa: SIM115 

273 os.path.join(path, f"{self._prefix}{name}{self._suffix}"), mode=cast(TextIO, file).mode, **self._open_kwargs 

274 ) 

275 shutil.copyfile(cast(TextIO, file).name, f.name) 

276 return cast(F, f) 

277 

278 @override 

279 def from_path(self, path: Path | str, mode: str) -> F: 

280 f_path = f"{path}{self._suffix}" 

281 if "r" in mode or os.path.exists(path): 

282 shutil.copyfile(path, f_path) 

283 return cast(F, open(f_path, mode=mode, **self._open_kwargs)) 

284 

285 @staticmethod 

286 @override 

287 def sync(file: F) -> None: 

288 if not file.writable(): 

289 return 

290 file.flush() 

291 _proper_fsync(file.fileno()) 

292 

293 @staticmethod 

294 @override 

295 def rollback(file: F) -> None: 

296 os.unlink(cast(TextIO, file).name) 

297 

298 @classmethod 

299 @override 

300 def commit(cls, temp_file: F, file: F) -> None: # pragma: no cover # 用不上 暂不维护 

301 if not file.writable(): 

302 return 

303 cls.commit_by_path(temp_file, cast(TextIO, file).name, cast(TextIO, file).mode) 

304 

305 @classmethod 

306 @override 

307 def commit_by_path(cls, temp_file: F, path: PathLike, mode: str) -> None: 

308 writeable = any(x in mode for x in "wax+") 

309 if not writeable: 

310 cls.rollback(temp_file) 

311 return 

312 

313 overwrite = True 

314 if "x" in mode: # pragma: no cover 

315 overwrite = False 

316 if ("r" in mode) and ("+" not in mode): # pragma: no cover 

317 overwrite = False 

318 

319 if overwrite: 

320 replace_atomic(cast(TextIO, temp_file).name, path) 

321 else: # pragma: no cover 

322 move_atomic(cast(TextIO, temp_file).name, path) 

323 

324 

325class LockFlags(IntEnum): 

326 """文件锁标志""" 

327 

328 EXCLUSIVE = portalocker.LOCK_EX | portalocker.LOCK_NB 

329 SHARED = portalocker.LOCK_SH | portalocker.LOCK_NB 

330 

331 

332FileLocks: WeakValueDictionary[str, Lock] = WeakValueDictionary() 

333""" 

334存储文件名对应的锁 

335""" 

336GlobalModifyLock = Lock() 

337""" 

338防止修改 ``FileLocks`` 时发生竞态条件 

339""" 

340 

341 

342class SafeOpen[F: AIO]: 

343 """安全的打开文件""" 

344 

345 def __init__( 

346 self, io_manager: ABCTempIOManager[Any], timeout: float | None = 1, flag: LockFlags = LockFlags.EXCLUSIVE 

347 ) -> None: 

348 """ 

349 :param io_manager: IO管理器 

350 :type io_manager: ABCTempIOManager 

351 :param timeout: 超时时间 

352 :type timeout: float | None 

353 :param flag: 锁标志 

354 :type flag: LockFlags 

355 """ # noqa: D205 

356 self._manager = io_manager 

357 self._timeout = timeout 

358 self._flag = flag 

359 

360 @contextmanager 

361 def open_path(self, path: str | Path, mode: str) -> Generator[F | None, Any, None]: 

362 """ 

363 打开路径 (上下文管理器) 

364 

365 :param path: 文件路径 

366 :type path: str | pathlib.Path 

367 :param mode: 打开模式 

368 :type mode: str 

369 :return: 返回值为IO对象的上下文管理器 

370 

371 :return: 上下文管理器 

372 :rtype: Generator[F | None, Any, None] 

373 """ 

374 with GlobalModifyLock: 

375 lock = FileLocks.setdefault(_path2str(path), Lock()) 

376 

377 if not lock.acquire(timeout=-1 if self._timeout is None else self._timeout): # pragma: no cover 

378 msg = "Timeout waiting for file lock" 

379 raise TimeoutError(msg) 

380 

381 f: F | None = None 

382 try: 

383 f = self._manager.from_path(path, mode) 

384 acquire_lock(cast(AIO, f), self._flag, timeout=cast(Real | None, self._timeout)) 

385 with cast(AIO, f): 

386 yield f 

387 self._manager.sync(cast(AIO, f)) 

388 release_lock(cast(AIO, f)) 

389 self._manager.commit_by_path(cast(AIO, f), path, mode) 

390 except BaseException as err: 

391 if f is not None: 

392 try: 

393 self._manager.rollback(f) 

394 except Exception: # pragma: no cover # noqa: BLE001 

395 raise err from None 

396 raise 

397 finally: 

398 lock.release() 

399 with suppress(Exception): 

400 release_lock(f) # type: ignore[arg-type] 

401 

402 @contextmanager 

403 def open_file(self, file: F) -> Generator[F | None, Any, None]: # pragma: no cover # 用不上 暂不维护 

404 """ 

405 打开文件 (上下文管理器) 

406 

407 :param file: 文件对象 

408 :type file: IO 

409 

410 :return: 返回值为IO对象的上下文管理器 

411 :rtype: Generator[F | None, Any, None] 

412 """ 

413 with GlobalModifyLock: 

414 lock = FileLocks.setdefault(cast(TextIO, file).name, Lock()) 

415 

416 if not lock.acquire(timeout=-1 if self._timeout is None else self._timeout): 

417 msg = "Timeout waiting for file lock" 

418 raise TimeoutError(msg) 

419 

420 acquire_lock(file, self._flag, timeout=cast(Real | None, self._timeout), immediately_release=True) 

421 f: F | None = None 

422 try: 

423 f = self._manager.from_file(file) 

424 acquire_lock(file, self._flag, timeout=cast(Real | None, self._timeout)) 

425 with cast(AIO, f): 

426 yield f 

427 self._manager.sync(cast(AIO, f)) 

428 release_lock(file) 

429 self._manager.commit(cast(AIO, f), file) 

430 except BaseException as err: 

431 if f is not None: 

432 try: 

433 self._manager.rollback(f) 

434 except Exception: # noqa: BLE001 

435 raise err from None 

436 raise 

437 finally: 

438 lock.release() 

439 with suppress(Exception): 

440 release_lock(file) 

441 

442 

443def _timeout_checker( 

444 timeout: Real | None = None, 

445 interval_increase_speed: Real = 0.03, # type: ignore[assignment] 

446 max_interval: Real = 0.5, # type: ignore[assignment] 

447) -> Generator[None, Any, Any]: # pragma: no cover # 除了windows其他平台压根不会触发timeout 

448 """ 

449 返回一个可无限迭代对象,在超时时抛出错误,自动处理重试间隔 

450 

451 :param timeout: 超时时间 

452 :type timeout: Real | None 

453 :param interval_increase_speed: 间隔增加速度 

454 :type interval_increase_speed: Real 

455 :param max_interval: 最大间隔 

456 :type max_interval: Real 

457 

458 :return: 可迭代对象 

459 :rtype: Generator[None, Any, Any] 

460 """ # noqa: RUF002 

461 

462 def _calc_interval(interval: Real) -> Real: 

463 """ 

464 计算重试间隔 

465 

466 :param interval: 当前间隔 

467 :type interval: Real 

468 

469 :return: 新间隔 

470 :rtype: Real 

471 """ 

472 interval = min(cast(Real, interval + interval_increase_speed), max_interval) 

473 time.sleep(float(interval)) 

474 return interval 

475 

476 def _inf_loop() -> Generator[None, Any, Any]: 

477 """ 

478 当超时时间为永久时的无限循环,相对性能消耗更低 

479 

480 :return: 可迭代对象 

481 :rtype: Generator[None, Any, Any] 

482 """ # noqa: RUF002 

483 interval: Real = 0 # type: ignore[assignment] 

484 while True: 

485 yield 

486 interval = _calc_interval(interval) 

487 

488 def _timeout_loop() -> Generator[None, Any, Any]: 

489 """ 

490 拥有超时检测的可迭代对象 

491 

492 :return: 可迭代对象 

493 :rtype: Generator[None, Any, Any] 

494 """ 

495 start = time.time() + float(interval_increase_speed) 

496 interval: Real = 0 # type: ignore[assignment] 

497 while cast(Real, time.time() - start) < timeout: 

498 yield 

499 interval = _calc_interval(interval) 

500 msg = "Timeout waiting for file lock" 

501 raise TimeoutError(msg) 

502 

503 if timeout is None: 

504 return _inf_loop() 

505 return _timeout_loop() 

506 

507 

508def acquire_lock( 

509 file: AIO, 

510 flags: LockFlags, 

511 *, 

512 timeout: Real | None = 1, # type: ignore[assignment] 

513 immediately_release: bool = False, 

514) -> None: 

515 """ 

516 获取文件锁 

517 

518 :param file: 文件对象 

519 :type file: IO 

520 :param flags: 锁类型 

521 :type flags: LockFlags 

522 :param timeout: 超时时间 

523 :type timeout: Real | None 

524 :param immediately_release: 是否立即释放锁 

525 :type immediately_release: bool 

526 """ 

527 for _ in _timeout_checker(timeout): 

528 with suppress(portalocker.AlreadyLocked): 

529 portalocker.lock(file, cast(portalocker.LockFlags, flags)) 

530 break 

531 if immediately_release: 

532 release_lock(file) 

533 

534 

535def release_lock(file: AIO) -> None: 

536 """ 

537 释放文件锁 

538 

539 :param file: 文件对象 

540 :type file: IO 

541 """ 

542 portalocker.unlock(file) 

543 

544 

545@overload 

546def safe_open( 

547 path: str | Path, 

548 mode: OpenBinaryMode, 

549 *, 

550 timeout: float | None = 1, 

551 flag: LockFlags = LockFlags.EXCLUSIVE, 

552 io_manager: ABCTempIOManager[Any] | None = None, 

553 **manager_kwargs: Any, 

554) -> AbstractContextManager[IO[bytes]]: ... 

555 

556 

557@overload 

558def safe_open( 

559 path: str | Path, 

560 mode: OpenTextMode, 

561 *, 

562 timeout: float | None = 1, 

563 flag: LockFlags = LockFlags.EXCLUSIVE, 

564 io_manager: ABCTempIOManager[Any] | None = None, 

565 **manager_kwargs: Any, 

566) -> AbstractContextManager[IO[str]]: ... 

567 

568 

569def safe_open( 

570 path: str | Path, 

571 mode: str, 

572 *, 

573 timeout: float | None = 1, 

574 flag: LockFlags = LockFlags.EXCLUSIVE, 

575 io_manager: ABCTempIOManager[Any] | None = None, 

576 **manager_kwargs: Any, 

577) -> AbstractContextManager[AIO | TextIO]: 

578 """ 

579 安全打开文件 

580 

581 :param path: 文件路径 

582 :type path: str | pathlib.Path 

583 :param mode: 打开模式 

584 :type mode: str 

585 :param timeout: 超时时间 

586 :type timeout: float | None 

587 :param flag: 锁类型 

588 :type flag: LockFlags 

589 :param io_manager: 临时文件管理器 

590 :type io_manager: ABCTempIOManager | None 

591 :param manager_kwargs: 临时文件管理器参数 

592 :type manager_kwargs: dict 

593 

594 :return: 返回IO对象的上下文管理器 

595 :rtype: ContextManager[IO | TextIO] 

596 """ 

597 if io_manager is None: 

598 io_manager = TempTextIOManager(**manager_kwargs) 

599 return cast(AbstractContextManager[AIO | TextIO], SafeOpen(io_manager, timeout, flag).open_path(path, mode)) 

600 

601 

602__all__ = ( 

603 "FileLocks", 

604 "GlobalModifyLock", 

605 "LockFlags", 

606 "SafeOpen", 

607 "acquire_lock", 

608 "release_lock", 

609 "safe_open", 

610)