import warnings as _warnings
import io as _io
import os as _os
-import shutil as _shutil
+try:
+ import shutil as _shutil
+ _rmtree = _shutil.rmtree
+except ImportError:
+ import sys as _sys
+ import stat as _stat
+ # version vulnerable to race conditions
+ def _rmtree_unsafe(path, onerror):
+ try:
+ if _os.path.islink(path):
+ # symlinks to directories are forbidden, see bug #1669
+ raise OSError("Cannot call rmtree on a symbolic link")
+ except OSError:
+ onerror(_os.path.islink, path, _sys.exc_info())
+ # can't continue even if onerror hook returns
+ return
+ names = []
+ try:
+ names = _os.listdir(path)
+ except OSError:
+ onerror(_os.listdir, path, _sys.exc_info())
+ for name in names:
+ fullname = _os.path.join(path, name)
+ try:
+ mode = _os.lstat(fullname).st_mode
+ except OSError:
+ mode = 0
+ if _stat.S_ISDIR(mode):
+ _rmtree_unsafe(fullname, onerror)
+ else:
+ try:
+ _os.unlink(fullname)
+ except OSError:
+ onerror(_os.unlink, fullname, _sys.exc_info())
+ try:
+ _os.rmdir(path)
+ except OSError:
+ onerror(_os.rmdir, path, _sys.exc_info())
+
+ # Version using fd-based APIs to protect against races
+ def _rmtree_safe_fd(topfd, path, onerror):
+ names = []
+ try:
+ names = _os.listdir(topfd)
+ except OSError as err:
+ err.filename = path
+ onerror(_os.listdir, path, _sys.exc_info())
+ for name in names:
+ fullname = _os.path.join(path, name)
+ try:
+ orig_st = _os.stat(name, dir_fd=topfd, follow_symlinks=False)
+ mode = orig_st.st_mode
+ except OSError:
+ mode = 0
+ if _stat.S_ISDIR(mode):
+ try:
+ dirfd = _os.open(name, _os.O_RDONLY, dir_fd=topfd)
+ except OSError:
+ onerror(_os.open, fullname, _sys.exc_info())
+ else:
+ try:
+ if _os.path.samestat(orig_st, _os.fstat(dirfd)):
+ _rmtree_safe_fd(dirfd, fullname, onerror)
+ try:
+ _os.rmdir(name, dir_fd=topfd)
+ except OSError:
+ onerror(_os.rmdir, fullname, _sys.exc_info())
+ else:
+ try:
+ # This can only happen if someone replaces
+ # a directory with a symlink after the call to
+ # stat.S_ISDIR above.
+ raise OSError("Cannot call rmtree on a symbolic "
+ "link")
+ except OSError:
+ onerror(_os.path.islink, fullname, _sys.exc_info())
+ finally:
+ _os.close(dirfd)
+ else:
+ try:
+ _os.unlink(name, dir_fd=topfd)
+ except OSError:
+ onerror(_os.unlink, fullname, _sys.exc_info())
+
+ _use_fd_functions = ({_os.open, _os.stat, _os.unlink, _os.rmdir} <=
+ _os.supports_dir_fd and
+ _os.listdir in _os.supports_fd and
+ _os.stat in _os.supports_follow_symlinks)
+
+ def _rmtree(path, ignore_errors=False, onerror=None):
+ """Recursively delete a directory tree.
+
+ If ignore_errors is set, errors are ignored; otherwise, if onerror
+ is set, it is called to handle the error with arguments (func,
+ path, exc_info) where func is platform and implementation dependent;
+ path is the argument to that function that caused it to fail; and
+ exc_info is a tuple returned by sys.exc_info(). If ignore_errors
+ is false and onerror is None, an exception is raised.
+
+ """
+ if ignore_errors:
+ def onerror(*args):
+ pass
+ elif onerror is None:
+ def onerror(*args):
+ raise
+ if _use_fd_functions:
+ # While the unsafe rmtree works fine on bytes, the fd based does not.
+ if isinstance(path, bytes):
+ path = _os.fsdecode(path)
+ # Note: To guard against symlink races, we use the standard
+ # lstat()/open()/fstat() trick.
+ try:
+ orig_st = _os.lstat(path)
+ except Exception:
+ onerror(_os.lstat, path, _sys.exc_info())
+ return
+ try:
+ fd = _os.open(path, _os.O_RDONLY)
+ except Exception:
+ onerror(_os.lstat, path, _sys.exc_info())
+ return
+ try:
+ if _os.path.samestat(orig_st, _os.fstat(fd)):
+ _rmtree_safe_fd(fd, path, onerror)
+ try:
+ _os.rmdir(path)
+ except OSError:
+ onerror(_os.rmdir, path, _sys.exc_info())
+ else:
+ try:
+ # symlinks to directories are forbidden, see bug #1669
+ raise OSError("Cannot call rmtree on a symbolic link")
+ except OSError:
+ onerror(_os.path.islink, path, _sys.exc_info())
+ finally:
+ _os.close(fd)
+ else:
+ return _rmtree_unsafe(path, onerror)
+
import errno as _errno
from random import Random as _Random
import sys as _sys
else:
raise
- _shutil.rmtree(name, onerror=onerror)
+ _rmtree(name, onerror=onerror)
@classmethod
def _cleanup(cls, name, warn_message):