Source code for fs

# -*- coding: utf-8 -*-
"""Provide a wrapper around common filesystem operations, including
iterating over the files in a directory structure and copying,
moving and linking them. Wherever possible iterators are used,
making it possible to act on large directory structures without
loading them all into memory.
"""
from __future__ import with_statement
from __future__ import print_function
from __future__ import unicode_literals

import os, sys
import codecs
import collections
import contextlib
import filecmp
import fnmatch
import msvcrt
import operator
import re
import struct
import threading
import time
import zipfile

import ntsecuritycon
import pywintypes
import winerror
import win32api
import win32con
import win32event
import win32file
import win32net
import win32netcon
import winioctlcon


if not hasattr(winerror, 'ERROR_BAD_RECOVERY_POLICY'):
    winerror.ERROR_BAD_RECOVERY_POLICY = 6012

from winsys._compat import *
from winsys import constants, core, exc, handles, security as security_, utils, _kernel32

sep = unicode(os.sep)
seps = "/\\"

[docs]class x_fs(exc.x_winsys): "Base for all fs-related exceptions"
[docs]class x_no_such_file(x_fs): "Raised when a file could not be found"
[docs]class x_too_many_files(x_fs): "Raised when more than one file matches a name"
[docs]class x_invalid_name(x_fs): "Raised when a filename contains invalid characters"
[docs]class x_no_certificate(x_fs): "Raised when encryption is attempted without a certificate"
[docs]class x_not_ready(x_fs): "Raised when a device is not ready"
class x_sharing_violation(x_fs): "Raised when a sharing violation occurs" WINERROR_MAP = { winerror.ERROR_ACCESS_DENIED : exc.x_access_denied, winerror.ERROR_PATH_NOT_FOUND : x_no_such_file, winerror.ERROR_FILE_NOT_FOUND : x_no_such_file, winerror.ERROR_BAD_NETPATH : x_no_such_file, winerror.ERROR_INVALID_NAME : x_invalid_name, winerror.ERROR_BAD_RECOVERY_POLICY : x_no_certificate, winerror.ERROR_NOT_READY : x_not_ready, winerror.ERROR_INVALID_HANDLE : exc.x_invalid_handle, winerror.ERROR_SHARING_VIOLATION : x_sharing_violation, 2310 : x_no_such_file, } wrapped = exc.wrapper(WINERROR_MAP, x_fs) def ignore_access_errors(exc_info): return exc_info[0] is exc.x_access_denied PyHANDLE = pywintypes.HANDLEType FILE_ACCESS = constants.Constants.from_pattern("FILE_*", namespace=ntsecuritycon) FILE_ACCESS.update(constants.STANDARD_ACCESS) FILE_ACCESS.update(constants.GENERIC_ACCESS) FILE_ACCESS.update(constants.ACCESS) FILE_ACCESS.doc("File-specific access rights") FILE_SHARE = constants.Constants.from_pattern("FILE_SHARE_*", namespace=win32file) FILE_SHARE.doc("Ways of sharing a file for reading, writing, &c.") FILE_CREATION = constants.Constants.from_list([ "CREATE_ALWAYS", "CREATE_NEW", "OPEN_ALWAYS", "OPEN_EXISTING", "TRUNCATE_EXISTING" ], namespace=win32con) FILE_CREATION.doc("Options when creating a file") FILE_FLAG = constants.Constants.from_pattern("FILE_FLAG_*", namespace=win32con) FILE_FLAG.doc("File flags") FILE_NOTIFY_CHANGE = constants.Constants.from_pattern("FILE_NOTIFY_CHANGE_*", namespace=win32con) FILE_NOTIFY_CHANGE.doc("Notification types to watch for when a file changes") FILE_ACTION = constants.Constants.from_dict(dict( ADDED = 1, REMOVED = 2, MODIFIED = 3, RENAMED_OLD_NAME = 4, RENAMED_NEW_NAME = 5 )) FILE_ACTION.doc("Results of a file change") FILE_ATTRIBUTE = constants.Constants.from_pattern("FILE_ATTRIBUTE_*", namespace=win32file) FILE_ATTRIBUTE.update(dict( ENCRYPTED=0x00004000, REPARSE_POINT=0x00000400, SPARSE_FILE=0x00000200, VIRTUAL=0x00010000, NOT_CONTENT_INDEXES=0x00002000, )) FILE_ATTRIBUTE.doc("Attributes applying to any file") PROGRESS = constants.Constants.from_pattern("PROGRESS_*", namespace=win32file) PROGRESS.doc("States within a file move/copy progress") MOVEFILE = constants.Constants.from_pattern("MOVEFILE_*", namespace=win32file) MOVEFILE.doc("Options when moving a file") VOLUME_FLAG = constants.Constants.from_dict(dict( FILE_CASE_SENSITIVE_SEARCH = 0x00000001, FILE_CASE_PRESERVED_NAMES = 0x00000002, FILE_UNICODE_ON_DISK = 0x00000004, FILE_PERSISTENT_ACLS = 0x00000008, FILE_FILE_COMPRESSION = 0x00000010, FILE_VOLUME_QUOTAS = 0x00000020, FILE_SUPPORTS_SPARSE_FILES = 0x00000040, FILE_SUPPORTS_REPARSE_POINTS = 0x00000080, FILE_SUPPORTS_REMOTE_STORAGE = 0x00000100, FILE_VOLUME_IS_COMPRESSED = 0x00008000, FILE_SUPPORTS_OBJECT_IDS = 0x00010000, FILE_SUPPORTS_ENCRYPTION = 0x00020000, FILE_NAMED_STREAMS = 0x00040000, FILE_READ_ONLY_VOLUME = 0x00080000, FILE_SEQUENTIAL_WRITE_ONCE = 0x00100000, FILE_SUPPORTS_TRANSACTIONS = 0x00200000 ), pattern="FILE_*") VOLUME_FLAG.doc("Characteristics of a volume") DRIVE_TYPE = constants.Constants.from_pattern("DRIVE_*", namespace=win32file) DRIVE_TYPE.doc("Types of drive") COMPRESSION_FORMAT = constants.Constants.from_dict(dict( NONE = 0x0000, DEFAULT = 0x0001, LZNT1 = 0x0002 )) COMPRESSION_FORMAT.doc("Ways in which a file can be compressed") FSCTL = constants.Constants.from_pattern("FSCTL_*", namespace=winioctlcon) FSCTL.doc("Types of fsctl operation") STYPE = constants.Constants.from_pattern("STYPE_*", namespace=win32netcon) PyHANDLE = pywintypes.HANDLEType LEGAL_FILECHAR = r"[^\?\*\\\:\/]" LEGAL_FILECHARS = LEGAL_FILECHAR + "+" LEGAL_VOLCHAR = r"[^\?\*\\\/]" LEGAL_VOLCHARS = LEGAL_VOLCHAR + "+" UNC = sep * 4 + LEGAL_FILECHARS + sep * 2 + LEGAL_FILECHARS HEXDIGIT = "[0-9a-f]" DRIVE = r"[A-Za-z]:" VOLUME_PREAMBLE = sep * 4 + r"\?" + sep * 2 VOLUME = VOLUME_PREAMBLE + "Volume\{%s{8}-%s{4}-%s{4}-%s{4}-%s{12}\}" % (HEXDIGIT, HEXDIGIT, HEXDIGIT, HEXDIGIT, HEXDIGIT) DRIVE_VOLUME = VOLUME_PREAMBLE + DRIVE PREFIX = r"((?:%s|%s|%s|%s)\\?)" % (UNC, DRIVE, VOLUME, DRIVE_VOLUME) PATHSEG = "(" + LEGAL_FILECHARS + ")" + sep * 2 + "?" PATHSEGS = "(?:%s)*" % PATHSEG FILEPATH = PREFIX + PATHSEGS
[docs]def get_parts(filepath): r"""Helper function to regularise a file path and then to pick out its drive and path components. Attempt to match the first part of the string against known path leaders:: <drive>:\ \\?\<drive>:\ \\?\Volume{xxxx-...}\ \\server\share\ If that fails, assume the path is relative. ============================= ====================================== Path Parts ============================= ====================================== c:/ ["c:\\", ""] c:/t ["c:\\", "t"] c:/t/ ["c:\\", "t"] c:/t/test.txt ["c:\\", "t", "test.txt"] c:/t/s/test.txt ["c:\\", "t", "s", "test.txt"] c:test.txt ["c:", "test.txt"] s/test.txt ["", "s", "test.txt"] \\\\server\\share ["\\\\server\\share\\", ""] \\\\server\\share\\a.txt ["\\\\server\\share\\", "a.txt"] \\\\server\\share\\t\\a.txt ["\\\\server\\share\\", "t", "a.txt"] \\\\?\\c:\test.txt ["\\\\?\\c:\\", "test.txt"] \\\\?\\Volume{xxxx-..}\\t.txt ["\\\\?\Volume{xxxx-..}\\", "t.txt"] ============================= ====================================== The upshot is that the first item in the list returned is always the root, one of: a slash-terminated drive/volume/share for an absolute path; an empty string for a relative path; or a drive-colon for a drive-relative path. All other items before the last one represent the directories along the way. The last item is the filename or directory name. The original filepath can usually be reconstructed as:: from winsys import fs filepath = "c:/temp/abc.txt" parts = fs.get_parts(filepath) assert parts[0] + "\\".join(parts[1:]) == filepath The exception is when a root (UNC or volume) is given without a trailing slash. This is added in. Or if a directory is given with a trailing slash. This is stripped off. """ filepath = filepath.replace("/", sep) prefix_match = re.match(PREFIX, filepath) if prefix_match: prefix = prefix_match.group(1) rest = filepath[len(prefix):].rstrip(sep) # # Special-case the un-rooted drive letter # so that paths of the form <drive>:<path> # are still valid, indicating <path> in the # current directory on <drive> # if prefix.startswith("\\") or not prefix.endswith(":"): prefix = prefix.rstrip(sep) + sep return [prefix] + rest.split(sep) else: return [""] + filepath.split(sep)
[docs]def normalised(filepath): """Convert any path or path-like object into the length-unlimited unicode equivalent. This should avoid issues with maximum path length and the like. """ # # os.path.abspath will return a sep-terminated string # for the root directory and a non-sep-terminated # string for all other paths. # filepath = unicode(filepath) if filepath.startswith(2 * sep): return filepath else: is_dir = filepath[-1] in seps abspath = os.path.abspath(filepath) return ("\\\\?\\" + abspath) + (sep if is_dir and not abspath.endswith(sep) else "")
def read(handle): wrapped(win32file.ReadFile, handle)
[docs]def handle(filepath, write=False, exclusive=False, async=False, attributes=None, sec=None): """Return a file handle either for querying (the default case) or for writing -- including writing directories :param filepath: anything whose unicode representation is a valid file path :param write: is the handle to be used for writing [True] :param attributes: anything accepted by :const:`FILE_ATTRIBUTE` :return: an open file handle for reading or writing, including directories """ attributes = FILE_ATTRIBUTE.constant(attributes) if attributes is None: attributes = FILE_ATTRIBUTE.NORMAL | FILE_FLAG.BACKUP_SEMANTICS if async: attributes |= FILE_FLAG.OVERLAPPED return wrapped( win32file.CreateFile, normalised(filepath), (FILE_ACCESS.READ | FILE_ACCESS.WRITE) if write else FILE_ACCESS.READ, 0 if exclusive else (FILE_SHARE.READ | FILE_SHARE.WRITE) if write else FILE_SHARE.READ, sec, FILE_CREATION.OPEN_ALWAYS if write else FILE_CREATION.OPEN_EXISTING, attributes, None )
@contextlib.contextmanager def Handle(handle_or_filepath, write=False): """Return the handle passed or on newly-created for the filepath, making sure to close it afterwards. :param handle_or_filepath: an existing handle or something accepted by :func:`handle` :param write: passed through to :func:`handle` """ if isinstance(handle_or_filepath, PyHANDLE): handle_supplied = True hFile = handle_or_filepath else: handle_supplied = False hFile = handle(handle_or_filepath, write) yield hFile if not handle_supplied: hFile.close()
[docs]def relative_to(filepath1, filepath2): """Return filepath2 relative to filepath1. Both names are normalised first. ================ ================ ================ filepath1 filepath2 result ================ ================ ================ c:/a/b.txt c:/a b.txt ---------------- ---------------- ---------------- c:/a/b/c.txt c:/a b/c.txt ---------------- ---------------- ---------------- c:/a/b/c.txt c:/a/b c.txt ================ ================ ================ :param filepath1: a file or directory :param filepath2: a directory :returns: filepath2 relative to filepath1 """ # # filepath2 must always be a directory; filepath1 may # be a file or a directory. # return utils.relative_to(filepath1, filepath2.rstrip(seps) + sep)
[docs]class FilePath(unicode): r"""A unicode subtype which knows about path structures on Windows. The path itself need not exist on any filesystem, but it has to match the rules which would make it possible. FilePaths can be absolute or relative. The only difference is that the root attribute is empty for relative paths. They can be added to each other or to other unicode strings which will use os.path.join semantics. A FilePath offers quick access to the different parts of the path: * parts - a list of the components (cf :func:`fs.get_parts`) * root - the drive or UNC server/share ending in a backslash unless a drive-relative path * filename - final component * name - same as filename (convenience) * dirname - all path components before the last * path - combination of root and dirname * parent - combination of root and all path components before second penultimate; raises an exception is FilePath is relative. * base - base part of filename (ie the piece before the dot) * ext - ext part of filename (ie the dot and the piece after) =================== ========== ========= ========= ========= =========== =========== ===== ==== Path root filename name dirname path parent base ext =================== ========== ========= ========= ========= =========== =========== ===== ==== \\\\a\b\c\d.txt \\\\a\b\ d.txt d.txt c \\\\a\b\c \\\\a\b\c d .txt c:\\boot.ini c:\\ boot.ini boot.ini _ c:\\ c:\\ boot .ini boot.ini _ boot.ini boot.ini _ _ x_fs boot .ini c:\\t c:\\ t t _ c:\\ c:\\ t _ c:\\t\ c:\\ t t _ c:\\ c:\\ t _ c:\\t\a.txt c:\\ a.txt a.txt t c:\\t c:\\t a .txt c:a.txt c: a.txt a.txt _ c: x_fs a .txt a.txt _ a.txt a.txt _ _ x_fs a .txt =================== ========== ========= ========= ========= =========== =========== ===== ==== """ def __new__(meta, filepath): fp = unicode.__new__(meta, filepath.replace("/", sep)) fp._parts = None fp._root = None fp._filename = None fp._name = None fp._dirname= None fp._path = None fp._parent = None fp._base = None fp._ext = None return fp def dump(self, level=0): sys.stdout.write(self.dumped(level=level)) def dumped(self, level=0): output = [] output.append(self) output.append("parts: %s" % self.parts) output.append("root: %s" % self.root) output.append("dirname: %s" % self.dirname) output.append("name: %s" % self.name) output.append("path: %s" % self.path) output.append("filename: %s" % self.filename) output.append("base: %s" % self.base) output.append("ext: %s" % self.ext) if self.root and self.parent: output.append("parent: %s" % self.parent) return utils.dumped("\n".join(output), level) @classmethod
[docs] def factory(cls, filepath): """Designed to be redefined in a subclass so that the :meth:`__add__` and :meth:`__radd__` methods can return the appropriate type. """ return cls(filepath)
def __add__(self, other): """Have FilepathA + FilepathB return FilepathA/FilepathB:: from winsys import fs fs.dir(r"c:\windows") + "notepad.exe" # returns fs.File("c:/windows/notepad.exe") """ return self.__class__.factory(os.path.join(unicode(self), unicode(other))) def __eq__(self, other): """Files compare equal if their case-insensitive paths are equal """ return self.lower() == unicode(other).lower() def __hash__(self): """Entries hash equal equal if their case-insensitive paths hash equal """ return hash(self.lower()) def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self) def __radd__(self, other): return self.__class__.factory(os.path.join(unicode(other), unicode(self))) def _get_parts(self): if self._parts is None: self._parts = get_parts(self) return self._parts parts = property(_get_parts) def _get_root(self): if self._root is None: self._root = self.__class__.factory(self.parts[0]) return self._root root = property(_get_root) def _get_filename(self): if self._filename is None: self._filename = self.parts[-1] return self._filename filename = property(_get_filename) def _get_dirname(self): if self._dirname is None: self._dirname = sep.join(self.parts[1:-1]) return self._dirname dirname = property(_get_dirname) def _get_path(self): if self._path is None: self._path = self.__class__.factory((self.root or "") + self.dirname) return self._path path = property(_get_path) def _get_parent(self): if self.is_relative(): raise x_fs(None, "FilePath.parent", "Cannot find parent for relative path") if self._parent is None: parent_dir = [p for p in self.parts if p][:-1] if parent_dir: self._parent = self.__class__.factory(parent_dir[0] + sep.join(parent_dir[1:])) else: self._parent = None return self._parent parent = property(_get_parent) def _get_name(self): if self._name is None: self._name = self.parts[-1] or self.parts[-2] return self._name name = property(_get_name) def _get_base(self): if self._base is None: self._base = os.path.splitext(self.filename)[0] return self._base base = property(_get_base) def _get_ext(self): if self._ext is None: self._ext = os.path.splitext(self.filename)[1] return self._ext ext = property(_get_ext)
[docs] def is_relative(self): """A filepath is relative if it has no root or if its root is a drive letter only (without a directory backslash).""" return not self.root.endswith(sep)
[docs] def relative_to(self, other): """Return this filepath as relative to another. cf :func:`utils.relative_to` """ return self.__class__.factory(relative_to(self, unicode(other)))
[docs] def absolute(self): """Return an absolute version of the current FilePath, whether relative or not. Use :func:`os.path.abspath` semantics. """ return self.__class__.factory(os.path.abspath(self))
abspath = absolute
[docs] def changed(self, root=None, dirname=None, filename=None, base=None, infix=None, ext=None): """Return a new :class:`FilePath` with one or more parts changed. This is particularly convenient for, say, changing the extension of a file or producing a version on another path, eg:: from winsys import fs, shell BACKUP_DRIVE = "D:\\" for f in fs.flat(shell.special_folder("personal"), "*.doc"): f.copy(f.changed(root=BACKUP_DRIVE)) """ if filename and (base or ext or infix): raise x_fs(None, "FilePath.changed", "You cannot change filename *and* base or ext or infix") if ext: ext = "." + ext.lstrip(".") parts = self.parts _filename = parts[-1] _base, _ext = os.path.splitext(_filename) if not (base or ext): base, ext = os.path.splitext(filename or _filename) if infix: base += infix return self.__class__.from_parts(root or parts[0], dirname or sep.join(parts[1:-1]), base or _base, ext or _ext)
@classmethod
[docs] def from_parts(cls, root, dirname, base, ext): """Recreate a filepath from its constituent parts. No real validation is done; it is assumed that the parameters are valid parts of a filepath. """ return cls(root + sep.join(os.path.normpath(dirname).split(sep) + [base+ext]))
filepath = FilePath
[docs]class Drive(core._WinSysObject): """Wraps a drive letter, offering access to its :attr:`Drive.volume` and the ability to :meth:`mount` or :meth:`dismount` it on a particular volume. """ def __init__(self, drive): self.name = drive.lower().rstrip(seps).rstrip(":") + ":" + sep self.type = wrapped(win32file.GetDriveTypeW, self.name) def as_string(self): return "Drive %s" % self.name def _get_root(self): """Return a :class:`fs.Dir` object corresponding to the root directory of this drive. """ return Dir(self.name) root = property(_get_root) def _get_volume(self): """Any volume currently mounted on this drive root. NB A drive can be referred to without a volume mounted, eg to call its :meth:`mount` method. """ try: return volume(self.name) except x_no_such_file: return None volume = property(_get_volume)
[docs] def mount(self, vol): """Mount the specified volume in this drive. :param vol: anything accepted by :func:`volume` :returns: `self` """ self.root.mount(vol) return self
[docs] def dismount(self): """Dismount this drive from its volume :returns: `self` """ self.root.dismount() return self
def dumped(self, level): output = [] output.append("name: %s" % self.name) output.append("type (DRIVE_TYPE): %s" % DRIVE_TYPE.name_from_value(self.type)) if self.volume: output.append("volume:\n%s" % self.volume.dumped(level)) mount_points = [(mount_point, volume) for (mount_point, volume) in mounts() if mount_point.startswith(self.name)] output.append("mount_points:\n%s" % utils.dumped_list(("%s => %s" % i for i in mount_points), level)) return utils.dumped("\n".join(output), level)
[docs]class Volume(core._WinSysObject): """Wraps a filesystem volume, giving access to useful information such as the filesystem and a list of drives mounted on it. Also offers the ability to mount or dismount. Attributes: * :attr:`label` * :attr:`serial_number` * :attr:`maximum_component_length` * :attr:`flags` - combination of :const:`VOLUME_FLAG` * :attr:`file_system_name` * :attr:`mounts` """ def __init__(self, volume): self.name = volume def as_string(self): return self.name def _get_info(self): try: return wrapped(win32api.GetVolumeInformation, self.name) except x_not_ready: return [None, None, None, 0, None] def _get_label(self): """The user-assigned label set by the DOS LABEL command """ return self._get_info()[0] label = property(_get_label) def _get_serial_number(self): """A software serial number, not the hardware serial number assigned by the device manufacturer. """ value = self._get_info()[1] if value is None: return None else: serial_number, = struct.unpack("L", struct.pack("l", value)) return serial_number serial_number = property(_get_serial_number) def _get_maximum_component_length(self): """The maximum length any one component of the file system name can reach. For NTFS this is 255, meaning that any one segment of of the path can be no longer than 255 chars. """ return self._get_info()[2] maximum_component_length = property(_get_maximum_component_length) def _get_flags(self): """An attribute set corresponding to some combination of :const:`VOLUME_FLAG`""" return constants.Attributes(self._get_info()[3], VOLUME_FLAG) flags = property(_get_flags) def _get_file_system_name(self): """The name of the file system present on this volume, eg NTFS""" return self._get_info()[4] file_system_name = property(_get_file_system_name) def _get_mounts(self): """An iterator of the :class:`Dir` objects which mount this volume. NB Windows restrictions mean that no more than one drive root directory can mount the same volume simultaneously. But it is possible for a volume to be mounted on, eg, e:\ and c:\mounts\e at the same time. """ return (dir(m) for m in wrapped(win32file.GetVolumePathNamesForVolumeName, self.name)) mounts = property(_get_mounts) def dumped(self, level): output = [] output.append("volume: %s" % self.name) output.append("mounted at:\n%s" % utils.dumped_list(self.mounts, level)) output.append("label: %s" % self.label) if self.serial_number is not None: output.append("serial_number: %08x" % self.serial_number) output.append("maximum_component_length: %s" % self.maximum_component_length) output.append("flags (VOLUME_FLAG):\n%s" % self.flags.dumped(level)) output.append("file_system_name: %s" % self.file_system_name) return utils.dumped("\n".join(output), level)
[docs] def mount(self, filepath): """Mount this volume on a particular filepath :param filepath: anything accepted by :func:`dir` """ return mount(filepath, self)
[docs] def dismount(self, filepath): """Dismount this volume from a particular filepath :param filepath: anything accepted by :func:`dir` """ dir(filepath).dismount()
class Share(core._WinSysObject): """Wraps a drive share """ def __init__(self, servername, sharename): self.servername =(servername or ".").strip("\\") self.sharename = sharename.strip("\\") def as_string(self): return r"\\%s\%s" % (self.servername, self.sharename) def __bool__(self): try: wrapped(win32net.NetShareGetInfo, self.servername, self.sharename, 2) except x_no_such_file: return False else: return True __nonzero__ = __bool__ def __getattr__(self, attr): info = self._get_info() try: return info[attr] except KeyError: raise AttributeError def _get_info(self): return wrapped(win32net.NetShareGetInfo, self.servername, self.sharename, 502) def _get_path(self): return dir(self._get_info().get("path")) path = property(_get_path) def _get_security(self): return security_.security(self._get_info().get('security_descriptor')) security = property(_get_security) def dumped(self, level=0, show_security=True): output = [] output.append(self.as_string()) output.append("type: (%d) %s" % (self.type, STYPE.names_from_value(self.type))) output.append("remark: %s" % self.remark) output.append("path: %s" % self.path) if show_security: security = self.security if security: output.append(security_.dumped(level)) return utils.dumped("\n".join(output), level) def create(self, servername, sharename, path, remark=core.UNSET, security=core.UNSET): remark = "" if remark is core.UNSET else remark security = security_.security(None if security is core.UNSET else security) info = dict( netname=sharename, path=path, remark=remark, security=security.pyobject().SECURITY_DESCRIPTOR if security else None ) wrapped(win32net.NetShareAdd, servername, 502, info) return self.__class__(servername, sharename) def clone(self, servername=core.UNSET, sharename=core.UNSET, path=core.UNSET, remark=core.UNSET, security=core.UNSET): if servername is core.UNSET and sharename is core.UNSET: raise x_fs("At least one of servername and sharename must be specified") return self.__class__.create( servername = self.servername if servername is core.UNSET else servername, sharename = self.sharename if sharename is core.UNSET else sharename, path = self.path if path is core.UNSET else path, remark = self.remark if remark is core.UNSET else remark, security = self.security if security is core.UNSET else security ) def delete(self): wrapped(win32net.NetShareDel, self.servername, self.sharename)
[docs]class Entry(FilePath, core._WinSysObject): """Heart of the fs module. This is a subtype of :class:`FilePath` and therefore of the base unicode type and is the parent of the :class:`Dir` and :class:`File` classes and contains all the functionality common to both. It is rarely instantiated itself, altho' it's possible to do so. Attributes: * :attr:`readable` * :attr:`filepath` * :attr:`created_at` * :attr:`accessed_at` * :attr:`written_at` * :attr:`uncompressed_size` * :attr:`size` * :attr:`attributes` * :attr:`id` * :attr:`n_links` * :attr:`attributes - an :class:`constants.Attributes` object representing combinations of :const:`FILE_ATTRIBUTE` Common functionality: * Entries compare (eq, lt, etc.) according to their full case-insensitive filepath. To do a content-wise comparison, use :meth:`equal_contents`. * Entries are True according to their existence on a filesystem * The str representation is the filepath utf8-encoded; unicode is the filepath itself * Adding one path to another will use os.path.join semantics """ def __new__(meta, filepath, _file_info=core.UNSET): fp = FilePath.__new__(meta, filepath) fp._normpath = normalised(fp) # # An Entry can be initialised from a Win32 FIND_FILES object, in which # case cache the information available for speed. # if _file_info is core.UNSET: fp._attributes = core.UNSET fp._created_at = core.UNSET fp._accessed_at = core.UNSET fp._written_at = core.UNSET fp._size = core.UNSET fp._reparse_tag = core.UNSET else: fp._attributes = constants.Attributes(_file_info[0], FILE_ATTRIBUTE) fp._created_at = utils.from_pytime(_file_info[1]) fp._accessed_at = utils.from_pytime(_file_info[2]) fp._written_at = utils.from_pytime(_file_info[3]) fp._size = utils._longword(_file_info[5], _file_info[4]) fp._reparse_tag = _file_info[6] return fp
[docs] def as_string(self): return self.encode("utf8")
[docs] def dumped(self, level=0, show_security=False): output = [] output.append(self) output.append(super(Entry, self).dumped(level)) output.append("readable: %s" % self.readable) if self.readable: output.append("id: %s" % self.id) output.append("n_links: %s" % self.n_links) output.append("created_at: %s" % self.created_at) output.append("accessed_at: %s" % self.accessed_at) output.append("written_at: %s" % self.written_at) output.append("uncompressed_size: %s" % self.uncompressed_size) output.append("size: %s" % self.size) output.append("Attributes:") output.append(self.attributes.dumped(level)) is_directory = self.attributes.directory else: is_directory = False if is_directory: vol = self.mounted_by() if vol: output.append("Mount point for:") output.append(vol.dumped(level)) if show_security: try: s = self.security() except win32file.error(exception): (errno, errctx, errmsg) = exception.args if errno == winerror.ERROR_ACCESS_DENIED: pass else: output.append("Security:\n" + s.dumped(level)) if is_directory: vol = self.mounted_by() if vol: output.append("Mount point for:") output.append(vol.dumped(level)) return utils.dumped("\n".join(output), level)
def __add__(self, other): return entry(super(Entry, self).__add__(other)) def __radd__(self, other): return entry(super(Entry, self).__radd__(other)) def __nonzero__(self): """Determine whether the file exists (at least from the POV of the current user) on the filesystem so that it can be checked with if fs.entry ("..."): """ return(wrapped(win32file.GetFileAttributesW, self._normpath) != -1) __bool__ = __nonzero__ @classmethod
[docs] def factory(cls, filepath): return entry(filepath)
def _get_readable(self): # # Check whether the user has at least enough # permissions to open the file for reading. # try: with Handle(self): pass except: return False else: return True readable = property(_get_readable)
[docs] def get_created_at(self): """Get and store the latest creation time from the filesystem. Note that this forces a re-read of the metadata.""" self._created_at = utils.from_pytime(wrapped(win32file.GetFileAttributesExW, self._normpath)[1]) return self._created_at
def _get_created_at(self): """Get the creation time from the original file read or the latest from the filesystem if none was stored.""" if self._created_at is core.UNSET: return self.get_created_at() else: return self._created_at def _set_created_at(self, created_at, handle=None): with Handle(handle or self.normapth, True) as handle: created_at = pywintypes.Time(time.mktime(created_at.timetuple())) wrapped(win32file.SetFileTime, handle, created_at, None, None) created_at = property(_get_created_at, _set_created_at)
[docs] def get_accessed_at(self): """Get and store the latest access time from the filesystem. Note that this forces a re-read of the metadata""" self._accessed_at = utils.from_pytime(wrapped(win32file.GetFileAttributesExW, self._normpath)[3]) return self._accessed_at
def _get_accessed_at(self): """Get the access time from the original file read or the latest from the filesystem if none was stored.""" if self._accessed_at is core.UNSET: return self.get_accessed_at() else: return self._accessed_at def _set_accessed_at(self, accessed_at, handle=None): with Handle(handle or self, True) as handle: accessed_at = pywintypes.Time(time.mktime(accessed_at.timetuple())) wrapped(win32file.SetFileTime, handle, None, accessed_at, None) accessed_at = property(_get_accessed_at, _set_accessed_at)
[docs] def get_written_at(self): """Get and store the latest modification time from the filesystem. Note that this forces a re-read of the metadata""" self._written_at = utils.from_pytime(wrapped(win32file.GetFileAttributesExW, self._normpath)[2]) return self._written_at
def _get_written_at(self): """Get and store the modification time from the original file read or the latest from the filesystem if none was stored.""" if self._written_at is core.UNSET: return self.get_written_at() else: return self._written_at def _set_written_at(self, written_at, handle=None): with Handle(handle or self, True) as handle: written_at = pywintypes.Time(time.mktime(written_at.timetuple())) wrapped(win32file.SetFileTime, handle, None, None, written_at) written_at = property(_get_written_at, _set_written_at) def _get_uncompressed_size(self, handle=None): """Get the size of the file data, ignoring any filesystem compression which may have been applied. """ with Handle(handle or self) as handle: return wrapped(win32file.GetFileSize, handle) uncompressed_size = property(_get_uncompressed_size)
[docs] def get_size(self): """Get and store the latest (possibly compressed) size from the filesystem. Note that this forces a re-read of the metadata""" self._size = wrapped(_kernel32.GetCompressedFileSize, self._normpath) return self._size
def _get_size(self): """Get and store the (possibly compressed) size from the original file read or the latest from the filesystem if none was stored.""" if self._size is core.UNSET: return self.get_size() else: return self._size size = property(_get_size)
[docs] def get_attributes(self): """Get and store the latest file attributes from the filesystem. Note that this forces a re-read of the metadata""" self._attributes = constants.Attributes(wrapped(win32file.GetFileAttributesExW, self._normpath)[0], FILE_ATTRIBUTE) return self._attributes
def _get_attributes(self): """Get and store the file attributes from the original file read or the latest from the filesystem if none was stored.""" if self._attributes is core.UNSET: return self.get_attributes() else: return self._attributes attributes = property(_get_attributes) def _get_id(self): """Return an id for this file which can be used to compare it to another while both files are open to determine if both are the same physical file.""" with Handle(self) as hFile: file_information = wrapped(win32file.GetFileInformationByHandle, hFile) volume_serial_number = file_information[4] index_lo, index_hi = file_information[8:10] return volume_serial_number + (utils._longword(index_hi, index_lo) * 2 << 31) id = property(_get_id) def _get_n_links(self): """Determine how many links point to this file. >1 indicates that the file is hardlinked. """ with Handle(self) as hFile: file_information = wrapped(win32file.GetFileInformationByHandle, hFile) return file_information[7] n_links = property(_get_n_links) def _set_file_attribute(self, key, value): try: attr = FILE_ATTRIBUTE.constant(key) except KeyError: raise AttributeError(key) if value: wrapped(win32file.SetFileAttributesW, normalised(self), self.attributes.flags | attr) else: wrapped(win32file.SetFileAttributesW, normalised(self), self.attributes.flags & ~attr) def _get_archive(self): "Is the archive bit set on the file?" return self.attributes.archive def _set_archive(self, value): self._set_file_attribute("archive", value) archive = property(_get_archive, _set_archive) def _get_compressed(self): "Is the file compressed?" return self.attributes.compressed compressed = property(_get_compressed) def _get_directory(self): "Is the file a directory?" return self.attributes.directory directory = property(_get_directory) def _get_encrypted(self): "Is the file encrypted?" return self.attributes.encrypted encrypted = property(_get_encrypted) def _get_hidden(self): "Is the file hidden?" return self.attributes.hidden def _set_hidden(self, value): self._set_file_attribute("hidden", value) hidden = property(_get_hidden, _set_hidden) def _get_normal(self): "Is the file normal?" return self.attributes.normal def _set_normal(self, value): wrapped(win32file.SetFileAttributesW, normalised(self), FILE_ATTRIBUTE.NORMAL) normal = property(_get_normal, _set_normal) def _get_not_content_indexed(self): "Should the file's content not be indexed?" return self.attributes.not_content_indexed def _set_not_content_indexed(self, value): self._set_file_attribute("not_content_indexed", value) not_content_indexed = property(_get_not_content_indexed, _set_not_content_indexed) def _get_offline(self): "Is the file offline?" return self.attributes.offline def _set_offline(self, value): self._set_file_attribute("offline", value) offline = property(_get_offline, _set_offline) def _get_readonly(self): "Is the file readonly?" return self.attributes.readonly def _set_readonly(self, value): self._set_file_attribute("readonly", value) readonly = property(_get_readonly, _set_readonly) def _get_reparse_point(self): "Is the file a reparse point?" return self.attributes.reparse_point reparse_point = property(_get_reparse_point) def _get_sparse_file(self): "Is the file sparse?" return self.attributes.sparse_file sparse_file = property(_get_sparse_file) def _get_system(self): "Is the file a system file?" return self.attributes.system def _set_system(self, value): self._set_file_attribute("system", value) system = property(_get_system, _set_system) def _get_temporary(self): "Is the file a temporary file?" return self.attributes.temporary def _set_temporary(self, value): self._set_file_attribute("temporary", value) temporary = property(_get_temporary, _set_temporary) def _get_virtual(self): "Is the file a virtual file?" return self.attributes.virtual virtual = property(_get_virtual)
[docs] def like(self, pattern): """Return true if this filename's name (not the path) matches `pattern` according to `fnmatch`, eg:: from winsys import fs for f in fs.files (): if f.directory and f.like("test_*"): print f :param pattern: an `fnmatch` pattern :returns: True if this file matches `pattern` """ return fnmatch.fnmatch(self.name, pattern)
[docs] def ancestors(self): """Iterate over this entry's ancestors, yielding the :class:`Dir` object corresponding to each one. :returns: yield a :class:`Dir` object for each ancestor """ if self.parent: yield self.parent for ancestor in self.parent.ancestors(): yield ancestor
[docs] def handle(self, mode="r"): return handles.handle(handle(self, write="w" in mode, exclusive="x" in mode))
[docs] def security(self, options=security_.Security.DEFAULT_OPTIONS): """Return a :class:`security_.Security` object corresponding to this entry's security attributes. Note that the returning object is a context manager so a common pattern is:: # # Find all private key files and ensure that only # the owner has any access. # from winsys import fs for f in fs.flat("*.ppk"): with f.security() as s: s.break_inheritance() s.dacl = [(s.owner, "F", "ALLOW")] :param options: cf :func:`security_.security` :returns: a :class:`security_.Security` object which may be used as a context manager """ return security_.security(self, options=options)
[docs] def compress(self): """Compress this entry; if it is a file, it will be compressed, if it is a directory it will be marked so that any new files added to it will be compressed automatically. :returns: self """ with Handle(self, True) as hFile: compression_type = struct.pack("H", COMPRESSION_FORMAT.DEFAULT) wrapped(win32file.DeviceIoControl, hFile, FSCTL.SET_COMPRESSION, compression_type, None, None) return self
[docs] def uncompress(self): """Uncompress this entry; if it is a file, it will be uncompressed, if it is a directory it will be marked so that any new files added to it will not be compressed automatically. :returns: self """ with Handle(self, True) as hFile: compression_type = struct.pack("H", COMPRESSION_FORMAT.NONE) wrapped(win32file.DeviceIoControl, hFile, FSCTL.SET_COMPRESSION, compression_type, None, None) return self
[docs] def encrypt(self): """FIXME: Need to work out how to create certificates for this :returns: self """ wrapped(win32file.EncryptFile, self._normpath) return self
[docs] def unencrypt(self): """FIXME: Need to work out how to create certificates for this :returns: self """ wrapped(win32file.DecryptFile, self._normpath) return self
[docs] def encryption_users(self): """FIXME: Need to work out how to create certificates for this """ return ( (security_.principal(sid), hashblob, info) for (sid, hashblob, info) in wrapped(win32file.QueryUsersOnEncryptedFile, self._normpath) )
[docs] def move(self, other, callback=None, callback_data=None, clobber=False): """Move this entry to the file/directory represented by other. If other is a directory, self :param other: anything accepted by :func:`entry` :param callback: a function which will receive a total size & total transferred :param callback_data: passed as extra data to callback :param clobber: whether to overwrite the other file if it exists :returns: a :class:`File` object corresponding to the target file """ other_file = entry(other) # # If the target is already a directory, the result of the move will # be a file or a directory inside that directory. Otherwise the # result will be a new file or directory inside the target's # parent. # if other_file and other_file.directory: target_filepath = self.factory(other_file + self.filename) else: target_filepath = self.factory(other_file) flags = MOVEFILE.WRITE_THROUGH if clobber: flags |= MOVEFILE.REPLACE_EXISTING wrapped( win32file.MoveFileWithProgress, self._normpath, normalised(target_filepath), progress_wrapper(callback), callback_data, flags ) return entry(unicode(target_filepath))
rename = move
[docs] def take_control(self, principal=core.UNSET): """Give the logged-on user full control to a file. This may need to be preceded by a call to :func:`take_ownership` so that the user gains WRITE_DAC permissions. :param principal: anything accepted by :func:`principal` [logged-on user] """ if principal is core.UNSET: principal = security_.me() # # Specify only DACL when reading as we may have no more rights # than that, and we don't need any more. # with self.security(options="D") as s: s.dacl.append((principal, "F", "ALLOW"))
[docs] def take_ownership(self, principal=core.UNSET): """Set the new owner of the file to be the logged-on user. This is no more than a slight shortcut to the equivalent security operations. If you specify a principal (other than the logged-in user, the default) you may need to have enabled SE_RESTORE privilege. Even the logged-in user may need to have enabled SE_TAKE_OWNERSHIP if that user has not been granted the appropriate security by the ACL:: from winsys import fs, security f = fs.file("c:/temp/temp.txt") assert f with security_.change_privileges(["take_ownership"]): f.take_ownership() f.take_control() :param principal: anything accepted by :func:`principal` [logged-on user] """ if principal is core.UNSET: principal = security_.me() # # Specify no options when reading as we may have no rights # whatsoever on the security descriptor and be relying on # the take_ownership privilege. # with self.security(options=None) as s: s.owner = principal
[docs]class File(Entry): ## ## 3 ways in which 2 files can be equal: ## Their filepaths are equal ## Their ids (inodes) are equal ## Their contents are equal ##
[docs] def open_(self, mode="r", attributes=None, sec=None): """EXPERIMENTAL: Use the `codecs.open` function to open this file as a Python file object. Positional and keyword arguments are passed straight through to the codecs function. :param mode: any of the usual Python modes :param attributes: anything accepted by :const:`FILE_ATTRIBUTE` :param sec: anything accepted by :func:`Security.security` """ mode = mode.lower() if mode else "r" self.hFile = handle(self, "r" not in mode, sec) flags = 0 if "t" in mode or "b" not in mode: flags |= os.O_TEXT if "r" in mode: flags |= os.O_RDONLY elif "a" in mode or "w" in mode: flags |= os.O_APPEND self.fd = msvcrt.open_osfhandle(self.hFile, flags) return os.fdopen(self.fd, mode)
[docs] def delete(self): """Delete this file :returns: self """ wrapped(win32file.DeleteFileW, self._normpath) return self
[docs] def copy(self, other, callback=None, callback_data=None): """Copy this file to another file or directory. If other is a directory, this file is copied into it, otherwise this file is copied over it. :param other: anything accepted by :func:`entry` :param callback: function receiving total size, total so far, callback_data :param callback_data: passed to callback :returns: :class:`File` object representing other """ other_file = entry(other) if other_file and other_file.directory: target_filepath = other_file + self.filename else: target_filepath = other_file wrapped( win32file.CopyFileEx, self._normpath, normalised(target_filepath), progress_wrapper(callback), callback_data ) return file(target_filepath)
[docs] def equal_contents(self, other): """Compare only the contents of the two files, ignoring everything else. Note that filecmp.cmp fails early. :param other: anything accepted by :func:`file` :returns: True if the files are equal in contents """ return filecmp.cmp(self, other, False)
[docs] def equals(self, other, compare_contents=False): """Is this file equal in size, dates and attributes to another. if `compare_contents` is True, use filecmp to compare the contents of the files. Note that filecmp.cmp fails early. :param other: anything accepted by :func:`file` :compare_contents: True to compare contents, False otherwise :returns: True if the files are equal in size, modification date, attributes and contents """ other = entry(other) if self.size != other.size: return False if self.written_at != other.written_at: return False if self.attributes != other.attributes: return False if compare_contents: if not filecmp.cmp(self, other, False): return False return True
[docs] def create(self, security=None): """Create this file optionally with specific security_. If the file already exists it will not be overwritten. :param security: a :class:`security_.Security` object :returns: this object """ wrapped( win32file.CreateFile, self._normpath, FILE_ACCESS.WRITE, 0, None if security is None else security_.pyobject(), FILE_CREATION.OPEN_ALWAYS, 0, None ).close() return self
[docs] def zip(self, zip_filename=core.UNSET, mode="w", compression=zipfile.ZIP_DEFLATED, allow_zip64=False): """Zip the file up into a zipfile. By default, the zipfile will have the name of the file with ".zip" appended and will be a sibling of the file. Also by default a new zipfile will be created, overwriting any existing one, and standard compression will be used. The filename will be stored without any directory information. A different zipfile can be specific as the zip_filename parameter, and this can be appended to (if it exists) by specifying "a" as the mode param. :param zip_filename: The name of the resulting zipfile [this file with the extension changed to .zip] :param mode: mode (usually "w") to pass to the zipfile constructor :param compression: compression level (usually DEFLATED) :param allow_zip64: passed to the zipfile constructor to allow > 2Gb files :returns: a :class:`File` object corresponding to the zipfile created """ if zip_filename is core.UNSET: zip_filename = self.changed(ext=".zip") z = zipfile.ZipFile(zip_filename, mode, compression, allow_zip64) z.write(self, arcname=self.filename) z.close() return file(zip_filename)
[docs] def bytes(self): """Return the contents of the file as a bytes object (str in 2.x) :returns: a bytes/str object corresponding to the contents of the file """ with open(self, "rb") as f: return f.read()
[docs] def text(self, encoding="ascii"): """Return the contents of the file as a text object (unicode in 2.x) :param encoding: valid encoding to pass to codecs.open :returns: a text/unicode object corresponding to the contents of the file """ with codecs.open(self, "r", encoding=encoding) as f: return f.read()
touch = create
[docs]class Dir(Entry): def __new__(meta, filepath, *args, **kwargs): # # Ensure that a directory always ends in a backslash # return Entry.__new__(meta, filepath.rstrip(seps) + sep, *args, **kwargs)
[docs] def is_empty(self): r"""Returns True if this directory is empty, False otherwise. Will fail if the directory does not yet exist. """ for _ in self: return True else: return False
[docs] def compress(self, apply_to_contents=True, callback=None): """Flag this directory so that any new files are automatically compressed. If apply_to_contents is True, iterate over all subdirectories and their files, compressing likewise. :param apply_to_contents: whether to compress all existing subdirectories and their files :param callback: called for each subdirectory / file compressed :returns: this directory """ Entry.compress(self) if apply_to_contents: for dirpath, dirs, files in self.walk(): for dir in dirs: if callback: callback(dir) dir.compress(False) for file in files: if callback: callback(file) file.compress() return self
[docs] def uncompress(self, apply_to_contents=True, callback=None): """Flag this directory so that any new files are automatically not compressed. If apply_to_contents is True, iterate over all subdirectories and their files, uncompressing likewise. :param apply_to_contents: whether to uncompress all existing subdirectories and their files :param callback: called for each subdirectory / file uncompressed :returns: this directory """ Entry.uncompress(self) if apply_to_contents: for dirpath, dirs, files in self.walk(): for dir in dirs: if callback: callback(dir) dir.uncompress(False) for file in files: if callback: callback(file) file.uncompress() return self
[docs] def encrypt(self, apply_to_contents=True): Entry.encrypt(self) if apply_to_contents: for dirpath, dirs, files in self.walk(): for dir in dirs: dir.encrypt(False) for file in files: file.encrypt() return self
[docs] def unencrypt(self, apply_to_contents=True): Entry.unencrypt(self) if apply_to_contents: for dirpath, dirs, files in self.walk(): for dir in dirs: dir.unencrypt(False) for file in files: file.unencrypt() return self
[docs] def disable_encryption(self): wrapped(win32security.EncryptionDisable, self._normpath, True) return self
[docs] def enable_encryption(self): wrapped(win32security.EncryptionDisable, self._normpath, False) return self
[docs] def create(self, security_descriptor=None): """Create this directory, optionally specifying a security descriptor. If the directory already exists, silently succeed. All intervening directories are automatically created if they do not already exist. If any exists but is a file rather than a directory, an exception is raised. :param security_descriptor: anything accepted by :func:`security_.security` :returns: a :class:`Dir` representing the newly-created directory """ security_descriptor = security_.security(security_descriptor) parts = self.parts root, pieces = parts[0], parts[1:] for i, piece in enumerate(pieces): path = normalised(root + sep.join(pieces[:i+1])) f = entry(path) if f: if not f.directory: raise x_fs(errctx="Dir.create", errmsg="%s exists and is not a directory" % f) else: wrapped( win32file.CreateDirectory, path, security_descriptor.pyobject() if security_descriptor else None ) return self.factory(path)
[docs] def mkdir(self, dirname, security_descriptor=None): r"""Create :dirname: as a subdirectory of this directory, specifying a security descriptor. This is implemented in terms of :meth:`create` by concatenating this directory and dirname and calling .create on the resulting :class:`Dir` object. :param dirname: a relative path :param security_descriptor: anything accepted by :func:`security_.security` :returns: a :class:`Dir` representing the newly-created directory """ return self.dir(dirname).create(security_descriptor=security_descriptor)
[docs] def entries(self, pattern="*", *args, **kwargs): r"""Iterate over all entries -- files & directories -- in this directory. Implemented via :func:`files` :param pattern: a \|-separated list of wildcards to match """ return files("|".join(self + p for p in pattern.split("|")), *args, **kwargs)
__iter__ = entries
[docs] def file(self, name): """Return a :class:`File` object representing a file called name inside this directory. """ return file(self + name)
[docs] def dir(self, name): """Return a :class:`Dir` object representing a Directory called name inside this directory. """ return dir(self + name)
[docs] def files(self, pattern="*", *args, **kwargs): """Iterate over all files in this directory which match pattern, yielding a :class:`File` object for each one. Implemented via :meth:`Dir.entries`. :param pattern: a \|-separated list of wildcards to match """ return (f for f in self.entries(pattern, *args, **kwargs) if isinstance(f, File))
[docs] def dirs(self, pattern="*", *args, **kwargs): """Iterate over all directories in this directory which match pattern, yielding a :class:`Dir` object for each one. Implemented via :meth:`Dir.entries`. :param pattern: a \|-separated list of wildcards to match """ return (f for f in self.entries(pattern, *args, **kwargs) if isinstance(f, Dir))
[docs] def walk(self, depthfirst=False, error_handler=None): """Mimic os.walk, iterating over each directory and the files within in. Each iteration yields: :class:`Dir`, (generator for :class:`Dir` objects), (generator for :class:`File` objects) :param depthfirst: Whether to use breadth-first (the default) or depth-first traversal :param error_handler: Whether to continue traversing in the face of access-denied errors """ top = self dirs, nondirs = [], [] for f in self.entries(error_handler=error_handler): if isinstance(f, Dir): dirs.append(f) else: nondirs.append(f) if not depthfirst: yield top, dirs, nondirs for d in dirs: for x in d.walk(depthfirst=depthfirst, error_handler=error_handler): yield x if depthfirst: yield top, dirs, nondirs
[docs] def flat(self, pattern="*", includedirs=False, depthfirst=False, error_handler=None): """Iterate over this directory and all its subdirectories, yielding one :class:`File` object on each iteration, and optionally :class:`Dir` objects as well. :param pattern: limit the files returned by filename :includedirs: whether to yield directories as well as files [False] :depthfirst: as for :meth:`Dir.walk` :error_handler: as for :meth:`Dir.walk` """ patterns = pattern.split("|") walker = self.walk( depthfirst=depthfirst, error_handler=error_handler ) for dirpath, dirs, files in walker: if includedirs: for dir in dirs: for pattern in patterns: if dir.like(pattern): yield dir break for file in files: for pattern in patterns: if file.like(pattern): yield file break
[docs] def mounted_by(self): """Return the volume mounted on this directory, or None. :returns: a :class:`Volume` object or :const:`None` """ for dir, vol in mounts(): if dir == self: return vol
[docs] def mount(self, vol): """Mount a volume on this directory. The directory must be empty or an exception is raised. eg:: from winsys import fs fs.dir("c:/temp").mkdir("c_drive").mount("c:") :param vol: anything accepted by :func:`volume` :returns: this :class:`Dir` """ for f in self.flat(includedirs=True): raise x_fs(errctx="Dir.mount", errmsg="You can't mount to a non-empty directory") vol = volume(vol) #~ for m in vol.mounts: #~ if not m.dirname: #~ raise x_fs(errctx="Dir.mount", errmsg="Volume %s already has a drive letter %s" %(vol, m.root)) wrapped(win32file.SetVolumeMountPoint, self, vol.name) return self
[docs] def dismount(self): """Dismount whatever volume is mounted at this directory :returns: this :class:`Dir` """ wrapped(win32file.DeleteVolumeMountPoint, self._normpath) return self
[docs] def copy(self, target_filepath, callback=None, callback_data=None): """Copy this directory to another, which must be a directory if it exists. If it does exist, this directory's contents will be copied inside it; if it does not exist, this directory will become it. NB To copy this directory inside another, set the `target_filepath` to `other_directory + self.name`. :param target_filepath: anything accepted by :func:`entry` :param callback: cf :meth:`File.copy` :param callback_data: cf :meth:`File.copy` :returns: a :class:`Dir` object representing target_filepath """ target = entry(target_filepath.rstrip(sep) + sep) if target and not target.directory: raise x_no_such_file(None, "Dir.copy", "%s exists but is not a directory") if not target: target.create() for dirpath, dirs, files in self.walk(): for d in dirs: target_dir = Dir(target + d.relative_to(self)) target_dir.create() for f in files: target_file = File(target + f.relative_to(self)) f.copy(target_file, callback, callback_data) return target
[docs] def delete(self, recursive=False): """Delete this directory, optionally including its children. :param recursive: whether to remove all subdirectories and files first :returns: this :class:`Dir` """ if recursive: for dirpath, dirs, files in self.walk(depthfirst=True): for d in dirs: d.delete(recursive=True) for f in files: f.delete() wrapped(win32file.RemoveDirectory, self._normpath) return self
[docs] def watch(self, *args, **kwargs): """Return a directory watcher, as per :func:`watch` """ return watch(self, *args, **kwargs)
[docs] def zip(self, zip_filename=core.UNSET, mode="w", compression=zipfile.ZIP_DEFLATED): """Zip the directory up into a zip file. By default, the file will have the name of the directory with ".zip" appended and will be a sibling of the directory. Also by default a new zipfile will be created, overwriting any existing one, and standard compression will be used. Filenames are stored as relative to this dir. A different zip filename can be specific as the zip_filename parameter, and this can be appended to (if it exists) by specifying "a" as the mode param. The created / appended zip file is returned. :param zip_filename: The name of the zip file to hold the archive of this directory and its children. [directory.zip] :param mode: cf zipfile.ZipFile :param compressions: cf zipfile.ZipFile :returns: a :class:`File` object representing the resulting zip file """ if zip_filename is core.UNSET: zip_filename = os.path.join(self.parent, self.name + ".zip") z = zipfile.ZipFile(zip_filename, mode=mode, compression=compression) try: for f in self.flat(): z.write(f, f.relative_to(self)) finally: z.close() return file(zip_filename)
rmdir = delete
class SharedDir(Dir): def mounted_by(self): raise NotImplementedError def mount(self, vol): raise NotImplementedError def dumped(self, level=0): raise NotImplementedError def files(pattern="*", ignore=[".", ".."], error_handler=None): """Iterate over files and directories matching pattern, which can include a path. Calls win32file.FindFilesIterator under the covers, which uses FindFirstFile / FindNextFile. :pattern: A string with one or more wildcard patterns, pipe-separated :ignore: A container of specific paths to ignore :error_handler: a callable which returns True if the iteration is to continue, false otherwise """ for p in pattern.split("|"): for f in _files(p, ignore=ignore, error_handler=error_handler): yield f def _files(pattern="*", ignore=[".", ".."], error_handler=None): # # special-case ".": FindFilesIterator treats a directory # name as an invitation to return only that directory. # It treats . as an invitation to return all the files # in that directory # if pattern == '.': yield Dir(".") raise StopIteration try: iterator = wrapped(win32file.FindFilesIterator, pattern) except x_no_such_file: # # If this occurs, it means there's a bizarre problem # with a filename windows won't handle. Just stop # iteration. # core.warn("Ignored no-such-file on first iteration of %s", pattern) raise StopIteration except: # # If we get a trappable error at this point, there's nowhere # else to go: raise StopIteration to exit cleanly # if error_handler and error_handler(sys.exc_info()): raise StopIteration else: raise parts = get_parts(unicode(pattern)) dirpath = parts[0] + sep.join(parts[1:-1]) while True: try: file_info = next(iterator) filename = file_info[8] if filename in ignore: continue filepath = os.path.join(dirpath, filename) yield entry(filepath, file_info) except StopIteration: break except x_no_such_file: # # If this occurs, it means there's a bizarre problem # with a filename windows won't handle. Just stop # iteration. # core.warn("Ignored no-such-file on first iteration of %s", pattern) raise StopIteration except: # # If the error_handler chooses to swallow this error, carry on # if error_handler and error_handler(sys.exc_info()): core.warn("Error %s ignored", sys.exc_info()[0]) continue else: raise
[docs]def entry(filepath, _file_info=core.UNSET): """Return a :class:`File` or :class:`Dir` object representing this filepath. ======================================= ================================================== filepath Result ======================================= ================================================== :const:`None` or "" :const:`None` an :class:`Entry` or subclass object the same object an existing file name a :class:`File` object representing that file an existing directory name a :class:`Dir` object representing that directory a file name which doesn't exist a :class:`Dir` if filepath ends with \\, :class:`File` otherwise ======================================= ================================================== """ def _guess(filepath): """If the path doesn't exist on the filesystem, guess whether it's intended to be a dir or a file by looking for a trailing slash. """ if filepath.endswith(sep): return Dir(filepath) else: return File(filepath) if filepath is None or filepath == "": return None elif isinstance(filepath, Entry): return filepath else: filepath = unicode(filepath) if _file_info is core.UNSET: attributes = wrapped(win32file.GetFileAttributesW, normalised(filepath)) else: attributes = _file_info[0] if attributes == -1: return _guess(filepath) elif attributes & FILE_ATTRIBUTE.DIRECTORY: return Dir(filepath, _file_info) else: return File(filepath, _file_info)
[docs]def file(filepath): """Return a :class:`File` object representing this filepath on the filepath. If filepath is already a :class:`File` object, return it unchanged otherwise ensure that the filepath doesn't point to an existing directory and return a :class:`File` object which represents it. """ f = entry(filepath) if isinstance(f, File): return f elif isinstance(f, Dir) and f: raise x_fs((None, "file", "%s exists but is a directory" % filepath)) else: return File(unicode(filepath))
[docs]def dir(filepath): """Return a :class:`Dir` object representing this filepath on the filepath. If filepath is already a :class:`Dir` object, return it unchanged otherwise ensure that the filepath doesn't point to an existing file and return a :class:`Dir` object which represents it. """ f = entry(filepath) if isinstance(f, Dir): return f elif isinstance(f, File) and f: raise x_fs(None, "dir", "%s exists but is a file" % filepath) else: if re.match(UNC, f.root): return SharedDir(unicode(filepath)) else: return Dir(unicode(filepath))
[docs]def glob(pattern): """Mimic the built-in glob.glob functionality as a generator, optionally ignoring access errors. :param pattern: passed to :func:`files` :returns: yields a :class:`FilePath` object for each matching file """ return files(pattern)
[docs]def listdir(d): """Mimic the built-in os.list functionality as a generator, optionally ignoring access errors. :param d: anything accepted by :func:`dir` :returns: yield the name of each file in directory d """ return (f.name for f in files(dir(d) + "*"))
[docs]def walk(root, depthfirst=False, error_handler=None): """Walk the directory tree starting from root, optionally ignoring access errors. :param root: anything accepted by :func:`dir` :param depthfirst: passed to :meth:`Dir.walk` :param error_handler: passed to :meth:`Dir.walk` :returns: as :meth:`Dir.walk` """ return dir(root).walk(depthfirst=depthfirst, error_handler=error_handler)
[docs]def flat(root, pattern="*", includedirs=False, depthfirst=False, error_handler=None): """Iterate over a flattened version of the directory tree starting from root. Implemented via :meth:`Dir.flat`. :param root: anything accepted by :func:`dir` :param pattern: passed to :meth:`Dir.flat` :param includedirs: passed to :meth:`Dir.flat` :param depthfirst: passed to :meth:`Dir.flat` :param error_handler: passed to :meth:`Dir.flat` :returns: as :meth:`Dir.flat` """ return dir(root).flat( pattern, includedirs=includedirs, depthfirst=depthfirst, error_handler=error_handler )
def progress_wrapper(callback): def _progress_wrapper( TotalFileSize, TotalBytesTransferred, StreamSize, StreamBytesTransferred, StreamNumber, CallbackReason, SourceFile, DestinationFile, data ): if callback(TotalFileSize, TotalBytesTransferred, data): return PROGRESS.CANCEL else: return PROGRESS.CONTINUE if callback: return _progress_wrapper else: return None
[docs]def move(source_filepath, *args, **kwargs): """Move one :class:`Entry` object to another, implemented via :meth:`File.move` or :meth:`Dir.move` :param source_filepath: anything accepted by :func:`entry` """ return entry(source_filepath).move(*args, **kwargs)
[docs]def copy(source_filepath, *args, **kwargs): """Copy one :class:`Entry` object to another, implemented via :meth:`File.copy` or :meth:`Dir.copy` :param source_filepath: anything accepted by :func:`entry` """ return entry(source_filepath).copy(*args, **kwargs)
[docs]def delete(filepath): """Deletes a :class:`Entry` object, implemented via :meth:`File.delete` or :meth:`Dir.delete` :param filepath: anything accepted by :func:`entry` """ return entry(filepath).delete()
[docs]def rmdir(filepath, recursive=False): """Mimic the os.rmdir functionality, optionally recursing :param filepath: anything accepted by :func:`dir` :param recursive: passed to :meth:`Dir.delete` """ return dir(filepath).delete(recursive=recursive)
[docs]def attributes(filepath): """Return an :class:`constants.Attributes` object representing the file attributes of filepath, implemented via :meth:`Entry.attributes` :param filepath: anything accepted by :func:`entry` :returns: an :class:`constants.Attributes` object """ return entry(filepath).attributes
[docs]def exists(filepath): """Mimic os.path.exists, implemented via the :class:`Entry` boolean mechanism :param filepath: anything accepted by :func:`entry` :returns: :const:`True` if filepath exists, :const:`False` otherwise """ return bool(entry(filepath))
[docs]def mkdir(dirpath, *args, **kwargs): """Mimic os.mkdir, implemented via :meth:`Dir.create` """ return dir(dirpath).create(*args, **kwargs)
[docs]def touch(filepath): """Update a file's modification time, creating it if it does not exist, implemented via :meth:`File.create` :param filepath: anything accepted by :func:`file` """ return file(filepath).create()
[docs]def mount(filepath, vol): """Mount vol at filepath, implemented via :meth:`Dir.mount` :param filepath: anything accepted by :func:`dir` :param vol: passed to :meth:`Dir.mount` """ return dir(filepath).mount(vol)
[docs]def dismount(filepath): """Dismount the volume at filepath, implemented via :meth:`Dir.dismount` :param filepath: anything accepted by :func:`dir` """ return dir(filepath).dismount()
[docs]def zip(filepath, *args, **kwargs): """Create and return a zip archive of filepath, implemented via :meth:`File.zip` or :meth:`Dir.zip` :param filepath: anything accepted by :func:`entry` """ return entry(filepath).zip(*args, **kwargs)
[docs]def drive(drive): """Return a :class:`Drive` object representing drive ======================================= ================================================== drive Result ======================================= ================================================== :const:`None` :const:`None` an :class:`Drive` or subclass object the same object a drive letter a :class:`Drive` object ======================================= ================================================== """ if drive is None: return None elif isinstance(drive, Drive): return drive else: return Drive(drive)
[docs]def drives(): """Iterate over all the drive letters in the system, yielding a :class:`Drive` object representing each one. """ for drive in wrapped(win32api.GetLogicalDriveStrings).strip("\x00").split("\x00"): yield Drive(drive)
[docs]def volume(volume): r"""Return a :class:`Volume` object corresponding to volume ======================================= ================================================== volume Result ======================================= ================================================== :const:`None` :const:`None` an :class:`Volume` or subclass object the same object a volume name \\?\Volume... a :class:`Volume` object representing that volume a directory name a :class:`Volume` object representing the volume at that mountpoint ======================================= ================================================== """ if volume is None: return None elif isinstance(volume, Volume): return volume elif volume.startswith(r"\\?\Volume"): return Volume(volume) else: return Volume(wrapped(win32file.GetVolumeNameForVolumeMountPoint, volume.rstrip(sep) + sep))
[docs]def volumes(): """Iterate over all the volumes in the system, yielding a :class:`Volume` object representing each one. """ hSearch, volume_name = _kernel32.FindFirstVolume() yield Volume(volume_name) while True: volume_name = _kernel32.FindNextVolume(hSearch) if volume_name is None: break else: yield Volume(volume_name)
def share(share): """Return a :class:`Share` object corresponding to share """ if share is None: return None elif isinstance(share, Share): return share else: match = re.match(sep * 4 + "(" + LEGAL_FILECHARS + ")" + sep * 2 + "(" + LEGAL_FILECHARS + ")", share) if match: return Share(*match.groups()) else: raise x_fs("Invalid share name: %s" % share) def shares(servername="."): """Iterate over all the shares in a system, yielding a :class:`Share` object representing each one """ infos, total, hResume = wrapped(win32net.NetShareEnum, servername, 0) for info in infos: yield Share(servername, info['netname']) while hResume > 0: infos, total, hResume = wrapped(win32net.NetShareEnum, servername, 0, hResume) for info in infos: yield Share(servername, info['netname'])
[docs]def mounts(): """Iterate over all mounted volume mountpoints in the system, yielding a (:class:`Dir`, :class:`Volume`) pair for each one, eg:: from winsys import fs drive_volumes = dict(fs.mounts()) """ for v in volumes(): for m in v.mounts: yield Dir(m), v
class _DirWatcher(object): WATCH_FOR = reduce(operator.or_, FILE_NOTIFY_CHANGE.values()) BUFFER_SIZE = 8192 TIMEOUT = 500 def __init__(self, root, subdirs=False, watch_for=WATCH_FOR, buffer_size=BUFFER_SIZE): self.root = root self.subdirs = subdirs self.watch_for = watch_for self.overlapped = wrapped(pywintypes.OVERLAPPED) self.overlapped.hEvent = wrapped(win32event.CreateEvent, None, 0, 0, None) self.buffer = wrapped(win32file.AllocateReadBuffer, buffer_size) self.hDir = wrapped( win32file.CreateFile, normalised(root), FILE_ACCESS.LIST_DIRECTORY, # # This must allow RWD otherwises files in # the dir will be constrained. # FILE_SHARE.READ | FILE_SHARE.WRITE | FILE_SHARE.DELETE, None, FILE_CREATION.OPEN_EXISTING, FILE_FLAG.BACKUP_SEMANTICS | FILE_FLAG.OVERLAPPED, None ) self._changes = collections.deque() def __iter__(self): return self def next(self): wrapped( win32file.ReadDirectoryChangesW, self.hDir, self.buffer, self.subdirs, self.watch_for, self.overlapped ) while True: if wrapped( win32event.WaitForSingleObject, self.overlapped.hEvent, self.TIMEOUT ) == win32event.WAIT_OBJECT_0: n_bytes = wrapped(win32file.GetOverlappedResult, self.hDir, self.overlapped, True) if n_bytes == 0: continue last_result = None old_file = new_file = None for action, filename in wrapped(win32file.FILE_NOTIFY_INFORMATION, self.buffer, n_bytes): if action == FILE_ACTION.ADDED: new_file = entry(os.path.join(self.root, filename)) elif action == FILE_ACTION.REMOVED: old_file = entry(os.path.join(self.root, filename)) elif action == FILE_ACTION.MODIFIED: old_file = new_file = entry(os.path.join(self.root, filename)) elif action == FILE_ACTION.RENAMED_OLD_NAME: old_file = entry(os.path.join(self.root, filename)) action = None elif action == FILE_ACTION.RENAMED_NEW_NAME: new_file = entry(os.path.join(self.root, filename)) if action: result =(action, old_file, new_file) if result != last_result: self._changes.append(result) if self._changes: return self._changes.popleft() __next__ = next def stop(self): self.hDir.close()
[docs]def watch( root, subdirs=False, watch_for=_DirWatcher.WATCH_FOR, buffer_size=_DirWatcher.BUFFER_SIZE ): """Return an iterator which returns a file change on every iteration. The file change comes in the form: action, old_filename, new_filename. action is one of the :const:`FILE_ACTION` constants, while the filenames speak for themselves. The filenames will be the same if the file has been updated. If the file is new, old_filename will be None; if it has been deleted, new_filename will be None; if it has been renamed, they will be different:: from winsys import fs watcher = fs.watch("c:/temp", subdirs=True) for action, old_filename, new_filename in watcher: if action == fs.FILE_ACTION.ADDED: print new_filename, "added" elif action == fs.FILE_ACTION.REMOVED: print old_filename, "removed" """ return _DirWatcher(unicode(root), subdirs, watch_for, buffer_size)
if __name__ == '__main__': print("Watching", os.path.abspath(".")) watcher = watch(".", True) try: for action, old_filename, new_filename in watcher: if action in (FILE_ACTION.ADDED, FILE_ACTION.MODIFIED): print("%10s %s %d" % (FILE_ACTION.name_from_value(action), new_filename, entry(new_filename).size)) except KeyboardInterrupt: watcher.stop()