Coverage for src / c41811 / config / safe_writer.py: 100%
130 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 01:06 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 01:06 +0000
1# cython: language_level = 3 # noqa: ERA001
4# noinspection SpellCheckingInspection, GrazieInspection
5"""
6 从 https://github.com/untitaker/python-atomicwrites 修改而来
8 .. code-block:: text
9 :caption: 原始版权声明
11 Copyright (c) 2015-2016 Markus Unterwaditzer
13 Permission is hereby granted, free of charge, to any person obtaining a copy of
14 this software and associated documentation files (the "Software"), to deal in
15 the Software without restriction, including without limitation the rights to
16 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
17 of the Software, and to permit persons to whom the Software is furnished to do
18 so, subject to the following conditions:
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 SOFTWARE.
31.. versionadded:: 0.2.0
32"""
34import os
35import shutil
36import sys
37import time
38from abc import ABC
39from abc import abstractmethod
40from collections.abc import Generator
41from contextlib import AbstractContextManager
42from contextlib import contextmanager
43from contextlib import suppress
44from enum import IntEnum
45from numbers import Real
46from pathlib import Path
47from threading import Lock
48from typing import IO
49from typing import TYPE_CHECKING
50from typing import Any
51from typing import TextIO
52from typing import cast
53from typing import overload
54from typing import override
55from weakref import WeakValueDictionary
57import portalocker
59if TYPE_CHECKING:
60 from _typeshed import OpenBinaryMode
61 from _typeshed import OpenTextMode
62else:
63 OpenBinaryMode = Any
64 OpenTextMode = Any
66try:
67 import fcntl
68except ImportError:
69 # noinspection SpellCheckingInspection
70 fcntl = None # type: ignore[assignment]
72try:
73 from os import fspath
74except ImportError:
75 fspath = None # type: ignore[assignment]
77type PathLike = str | Path
78type AIO = IO[Any]
81def _path2str(x: PathLike) -> str: # pragma: no cover
82 if isinstance(x, Path):
83 return str(x)
84 return x
87_proper_fsync = os.fsync
89if sys.platform != "win32": # pragma: no cover
90 # noinspection SpellCheckingInspection
91 if hasattr(fcntl, "F_FULLFSYNC"):
93 def _proper_fsync(fd: int) -> None: # type: ignore[misc]
94 # https://lists.apple.com/archives/darwin-dev/2005/Feb/msg00072.html
95 # https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man2/fsync.2.html
96 # https://github.com/untitaker/python-atomicwrites/issues/6
97 fcntl.fcntl(fd, fcntl.F_FULLFSYNC) # type: ignore[attr-defined]
99 def _sync_directory(directory: str) -> None:
100 # Ensure that filenames are written to disk
101 fd = os.open(directory, 0)
102 try:
103 _proper_fsync(fd)
104 finally:
105 os.close(fd)
107 def _replace_atomic(src: PathLike, dst: PathLike) -> None:
108 os.rename(src, dst)
109 _sync_directory(os.path.normpath(os.path.dirname(dst)))
111 def _move_atomic(src: PathLike, dst: PathLike) -> None:
112 os.link(src, dst)
113 os.unlink(src)
115 src_dir = os.path.normpath(os.path.dirname(src))
116 dst_dir = os.path.normpath(os.path.dirname(dst))
117 _sync_directory(dst_dir)
118 if src_dir != dst_dir:
119 _sync_directory(src_dir)
120else: # pragma: no cover
121 from ctypes import WinError
122 from ctypes import windll
124 _MOVEFILE_REPLACE_EXISTING = 0x1
125 _MOVEFILE_WRITE_THROUGH = 0x8
126 _windows_default_flags = _MOVEFILE_WRITE_THROUGH
128 def _handle_errors(rv: Any) -> None:
129 if not rv:
130 raise WinError()
132 def _replace_atomic(src: PathLike, dst: PathLike) -> None:
133 _handle_errors(
134 windll.kernel32.MoveFileExW(
135 _path2str(src), _path2str(dst), _windows_default_flags | _MOVEFILE_REPLACE_EXISTING
136 )
137 )
139 def _move_atomic(src: PathLike, dst: PathLike) -> None:
140 _handle_errors(windll.kernel32.MoveFileExW(_path2str(src), _path2str(dst), _windows_default_flags))
143def replace_atomic(src: PathLike, dst: PathLike) -> None:
144 """
145 移动 ``src`` 到 ``dst``
147 如果 ``dst`` 存在,它将被静默覆盖
149 两个路径必须位于同一个文件系统上,这样操作才能是原子的
151 :param src: 源
152 :type src: PathLike
153 :param dst: 目标
154 :type dst: PathLike
155 """ # noqa: RUF002
156 _replace_atomic(src, dst)
159def move_atomic(src: PathLike, dst: PathLike) -> None: # pragma: no cover
160 """
161 移动 ``src`` 到 ``dst``
163 可能存在两个文件系统条目同时存在的时间窗口
165 如果 ``dst`` 已经存在,将引发:
166 py:exc:`FileExistsError`
168 两个路径必须位于同一个文件系统上,这样操作才能是原子的
170 :param src: 源
171 :type src: PathLike
172 :param dst: 目标
173 :type dst: PathLike
174 """ # noqa: RUF002
175 _move_atomic(src, dst)
178class ABCTempIOManager[F: AIO](ABC):
179 """管理临时文件"""
181 @abstractmethod
182 def from_file(self, file: F) -> F:
183 """
184 为给定的文件创建一个临时文件
186 :param file: 文件对象
187 :type file: F
189 :return: 临时文件对象
190 :type file: F
191 """
193 @abstractmethod
194 def from_path(self, path: Path | str, mode: str) -> F:
195 """
196 为给定的路径创建一个临时文件
198 :param path: 文件路径
199 :type path: Path | str
200 :param mode: 打开模式
201 :type mode: str
203 :return: 临时文件对象
204 :rtype: F
205 """
207 @staticmethod
208 @abstractmethod
209 def sync(file: F) -> None:
210 """
211 负责在提交之前清除尽可能多的文件缓存
213 :param file: 文件对象
214 :type file: F
215 """
217 @staticmethod
218 @abstractmethod
219 def rollback(file: F) -> None:
220 """
221 清理所有临时资源
223 :param file: 文件对象
224 :type file: F
225 """
227 @staticmethod
228 @abstractmethod
229 def commit(temp_file: F, file: F) -> None:
230 """
231 将临时文件移动到目标位置
233 :param temp_file: 临时文件对象
234 :type temp_file: F
235 :param file: 文件对象
236 :type file: F
237 """
239 @staticmethod
240 @abstractmethod
241 def commit_by_path(temp_file: F, path: PathLike, mode: str) -> None:
242 """
243 将临时文件移动到目标位置
245 :param temp_file: 临时文件对象
246 :type temp_file: F
247 :param path: 文件路径
248 :type path: PathLike
249 :param mode: 打开模式
250 :type mode: str
251 """
254class TempTextIOManager[F: TextIO](ABCTempIOManager[F]):
255 """管理 ``TextIO`` 对象"""
257 def __init__(self, prefix: str = "", suffix: str = ".tmp", **open_kwargs: Any):
258 """
259 :param prefix: 临时文件前缀
260 :type prefix: str
261 :param suffix: 临时文件后缀
262 :type suffix: str
263 :param open_kwargs: 传递给 ``open`` 的额外参数
264 """ # noqa: D205
265 self._prefix = prefix
266 self._suffix = suffix
267 self._open_kwargs = open_kwargs
269 @override
270 def from_file(self, file: F) -> F: # pragma: no cover # 用不上 暂不维护
271 path, name = os.path.split(cast(TextIO, file).name)
272 f = open( # noqa: SIM115
273 os.path.join(path, f"{self._prefix}{name}{self._suffix}"), mode=cast(TextIO, file).mode, **self._open_kwargs
274 )
275 shutil.copyfile(cast(TextIO, file).name, f.name)
276 return cast(F, f)
278 @override
279 def from_path(self, path: Path | str, mode: str) -> F:
280 f_path = f"{path}{self._suffix}"
281 if "r" in mode or os.path.exists(path):
282 shutil.copyfile(path, f_path)
283 return cast(F, open(f_path, mode=mode, **self._open_kwargs))
285 @staticmethod
286 @override
287 def sync(file: F) -> None:
288 if not file.writable():
289 return
290 file.flush()
291 _proper_fsync(file.fileno())
293 @staticmethod
294 @override
295 def rollback(file: F) -> None:
296 os.unlink(cast(TextIO, file).name)
298 @classmethod
299 @override
300 def commit(cls, temp_file: F, file: F) -> None: # pragma: no cover # 用不上 暂不维护
301 if not file.writable():
302 return
303 cls.commit_by_path(temp_file, cast(TextIO, file).name, cast(TextIO, file).mode)
305 @classmethod
306 @override
307 def commit_by_path(cls, temp_file: F, path: PathLike, mode: str) -> None:
308 writeable = any(x in mode for x in "wax+")
309 if not writeable:
310 cls.rollback(temp_file)
311 return
313 overwrite = True
314 if "x" in mode: # pragma: no cover
315 overwrite = False
316 if ("r" in mode) and ("+" not in mode): # pragma: no cover
317 overwrite = False
319 if overwrite:
320 replace_atomic(cast(TextIO, temp_file).name, path)
321 else: # pragma: no cover
322 move_atomic(cast(TextIO, temp_file).name, path)
325class LockFlags(IntEnum):
326 """文件锁标志"""
328 EXCLUSIVE = portalocker.LOCK_EX | portalocker.LOCK_NB
329 SHARED = portalocker.LOCK_SH | portalocker.LOCK_NB
332FileLocks: WeakValueDictionary[str, Lock] = WeakValueDictionary()
333"""
334存储文件名对应的锁
335"""
336GlobalModifyLock = Lock()
337"""
338防止修改 ``FileLocks`` 时发生竞态条件
339"""
342class SafeOpen[F: AIO]:
343 """安全的打开文件"""
345 def __init__(
346 self, io_manager: ABCTempIOManager[Any], timeout: float | None = 1, flag: LockFlags = LockFlags.EXCLUSIVE
347 ) -> None:
348 """
349 :param io_manager: IO管理器
350 :type io_manager: ABCTempIOManager
351 :param timeout: 超时时间
352 :type timeout: float | None
353 :param flag: 锁标志
354 :type flag: LockFlags
355 """ # noqa: D205
356 self._manager = io_manager
357 self._timeout = timeout
358 self._flag = flag
360 @contextmanager
361 def open_path(self, path: str | Path, mode: str) -> Generator[F | None, Any, None]:
362 """
363 打开路径 (上下文管理器)
365 :param path: 文件路径
366 :type path: str | pathlib.Path
367 :param mode: 打开模式
368 :type mode: str
369 :return: 返回值为IO对象的上下文管理器
371 :return: 上下文管理器
372 :rtype: Generator[F | None, Any, None]
373 """
374 with GlobalModifyLock:
375 lock = FileLocks.setdefault(_path2str(path), Lock())
377 if not lock.acquire(timeout=-1 if self._timeout is None else self._timeout): # pragma: no cover
378 msg = "Timeout waiting for file lock"
379 raise TimeoutError(msg)
381 f: F | None = None
382 try:
383 f = self._manager.from_path(path, mode)
384 acquire_lock(cast(AIO, f), self._flag, timeout=cast(Real | None, self._timeout))
385 with cast(AIO, f):
386 yield f
387 self._manager.sync(cast(AIO, f))
388 release_lock(cast(AIO, f))
389 self._manager.commit_by_path(cast(AIO, f), path, mode)
390 except BaseException as err:
391 if f is not None:
392 try:
393 self._manager.rollback(f)
394 except Exception: # pragma: no cover # noqa: BLE001
395 raise err from None
396 raise
397 finally:
398 lock.release()
399 with suppress(Exception):
400 release_lock(f) # type: ignore[arg-type]
402 @contextmanager
403 def open_file(self, file: F) -> Generator[F | None, Any, None]: # pragma: no cover # 用不上 暂不维护
404 """
405 打开文件 (上下文管理器)
407 :param file: 文件对象
408 :type file: IO
410 :return: 返回值为IO对象的上下文管理器
411 :rtype: Generator[F | None, Any, None]
412 """
413 with GlobalModifyLock:
414 lock = FileLocks.setdefault(cast(TextIO, file).name, Lock())
416 if not lock.acquire(timeout=-1 if self._timeout is None else self._timeout):
417 msg = "Timeout waiting for file lock"
418 raise TimeoutError(msg)
420 acquire_lock(file, self._flag, timeout=cast(Real | None, self._timeout), immediately_release=True)
421 f: F | None = None
422 try:
423 f = self._manager.from_file(file)
424 acquire_lock(file, self._flag, timeout=cast(Real | None, self._timeout))
425 with cast(AIO, f):
426 yield f
427 self._manager.sync(cast(AIO, f))
428 release_lock(file)
429 self._manager.commit(cast(AIO, f), file)
430 except BaseException as err:
431 if f is not None:
432 try:
433 self._manager.rollback(f)
434 except Exception: # noqa: BLE001
435 raise err from None
436 raise
437 finally:
438 lock.release()
439 with suppress(Exception):
440 release_lock(file)
443def _timeout_checker(
444 timeout: Real | None = None,
445 interval_increase_speed: Real = 0.03, # type: ignore[assignment]
446 max_interval: Real = 0.5, # type: ignore[assignment]
447) -> Generator[None, Any, Any]: # pragma: no cover # 除了windows其他平台压根不会触发timeout
448 """
449 返回一个可无限迭代对象,在超时时抛出错误,自动处理重试间隔
451 :param timeout: 超时时间
452 :type timeout: Real | None
453 :param interval_increase_speed: 间隔增加速度
454 :type interval_increase_speed: Real
455 :param max_interval: 最大间隔
456 :type max_interval: Real
458 :return: 可迭代对象
459 :rtype: Generator[None, Any, Any]
460 """ # noqa: RUF002
462 def _calc_interval(interval: Real) -> Real:
463 """
464 计算重试间隔
466 :param interval: 当前间隔
467 :type interval: Real
469 :return: 新间隔
470 :rtype: Real
471 """
472 interval = min(cast(Real, interval + interval_increase_speed), max_interval)
473 time.sleep(float(interval))
474 return interval
476 def _inf_loop() -> Generator[None, Any, Any]:
477 """
478 当超时时间为永久时的无限循环,相对性能消耗更低
480 :return: 可迭代对象
481 :rtype: Generator[None, Any, Any]
482 """ # noqa: RUF002
483 interval: Real = 0 # type: ignore[assignment]
484 while True:
485 yield
486 interval = _calc_interval(interval)
488 def _timeout_loop() -> Generator[None, Any, Any]:
489 """
490 拥有超时检测的可迭代对象
492 :return: 可迭代对象
493 :rtype: Generator[None, Any, Any]
494 """
495 start = time.time() + float(interval_increase_speed)
496 interval: Real = 0 # type: ignore[assignment]
497 while cast(Real, time.time() - start) < timeout:
498 yield
499 interval = _calc_interval(interval)
500 msg = "Timeout waiting for file lock"
501 raise TimeoutError(msg)
503 if timeout is None:
504 return _inf_loop()
505 return _timeout_loop()
508def acquire_lock(
509 file: AIO,
510 flags: LockFlags,
511 *,
512 timeout: Real | None = 1, # type: ignore[assignment]
513 immediately_release: bool = False,
514) -> None:
515 """
516 获取文件锁
518 :param file: 文件对象
519 :type file: IO
520 :param flags: 锁类型
521 :type flags: LockFlags
522 :param timeout: 超时时间
523 :type timeout: Real | None
524 :param immediately_release: 是否立即释放锁
525 :type immediately_release: bool
526 """
527 for _ in _timeout_checker(timeout):
528 with suppress(portalocker.AlreadyLocked):
529 portalocker.lock(file, cast(portalocker.LockFlags, flags))
530 break
531 if immediately_release:
532 release_lock(file)
535def release_lock(file: AIO) -> None:
536 """
537 释放文件锁
539 :param file: 文件对象
540 :type file: IO
541 """
542 portalocker.unlock(file)
545@overload
546def safe_open(
547 path: str | Path,
548 mode: OpenBinaryMode,
549 *,
550 timeout: float | None = 1,
551 flag: LockFlags = LockFlags.EXCLUSIVE,
552 io_manager: ABCTempIOManager[Any] | None = None,
553 **manager_kwargs: Any,
554) -> AbstractContextManager[IO[bytes]]: ...
557@overload
558def safe_open(
559 path: str | Path,
560 mode: OpenTextMode,
561 *,
562 timeout: float | None = 1,
563 flag: LockFlags = LockFlags.EXCLUSIVE,
564 io_manager: ABCTempIOManager[Any] | None = None,
565 **manager_kwargs: Any,
566) -> AbstractContextManager[IO[str]]: ...
569def safe_open(
570 path: str | Path,
571 mode: str,
572 *,
573 timeout: float | None = 1,
574 flag: LockFlags = LockFlags.EXCLUSIVE,
575 io_manager: ABCTempIOManager[Any] | None = None,
576 **manager_kwargs: Any,
577) -> AbstractContextManager[AIO | TextIO]:
578 """
579 安全打开文件
581 :param path: 文件路径
582 :type path: str | pathlib.Path
583 :param mode: 打开模式
584 :type mode: str
585 :param timeout: 超时时间
586 :type timeout: float | None
587 :param flag: 锁类型
588 :type flag: LockFlags
589 :param io_manager: 临时文件管理器
590 :type io_manager: ABCTempIOManager | None
591 :param manager_kwargs: 临时文件管理器参数
592 :type manager_kwargs: dict
594 :return: 返回IO对象的上下文管理器
595 :rtype: ContextManager[IO | TextIO]
596 """
597 if io_manager is None:
598 io_manager = TempTextIOManager(**manager_kwargs)
599 return cast(AbstractContextManager[AIO | TextIO], SafeOpen(io_manager, timeout, flag).open_path(path, mode))
602__all__ = (
603 "FileLocks",
604 "GlobalModifyLock",
605 "LockFlags",
606 "SafeOpen",
607 "acquire_lock",
608 "release_lock",
609 "safe_open",
610)