# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import marshal
import re
import time
import warnings
import pywintypes
import winerror
import win32event
import win32file
import win32pipe
import win32security
from winsys import constants, core, exc, fs, security, utils, handles
WAIT = constants.Constants.from_pattern("WAIT_*", namespace=win32event)
WAIT.update(dict(INFINITE=win32event.INFINITE))
PIPE_ACCESS = constants.Constants.from_pattern("PIPE_ACCESS_*", namespace=win32pipe)
PIPE_TYPE = constants.Constants.from_pattern("PIPE_TYPE_*", namespace=win32pipe)
NMPWAIT = constants.Constants.from_pattern("NMPWAIT_*", namespace=win32pipe)
PyHANDLE = pywintypes.HANDLEType
[docs]class x_ipc(exc.x_winsys):
pass
class x_ipc_timeout(x_ipc):
pass
[docs]class x_mailslot(x_ipc):
pass
[docs]class x_mailslot_invalid_use(x_mailslot):
pass
[docs]class x_mailslot_empty(x_mailslot):
pass
[docs]class x_mailslot_message_too_big(x_mailslot):
pass
[docs]class x_mailslot_message_too_complex(x_mailslot):
pass
WINERROR_MAP = {
winerror.ERROR_FILE_NOT_FOUND : exc.x_not_found,
}
wrapped = exc.wrapper(WINERROR_MAP, x_ipc)
def _unserialised(data):
return data
[docs]class Mailslot(core._WinSysObject):
"""A mailslot is a mechanism for passing small datasets (up to about
400 bytes) between machines in the same network. For transport and
name resolution it uses NetBIOS so you can't, for example, use a
machine's IP address when specifying the location of a mailslot.
You can, however, use "*" in order to broadcast your message to
all listening machines.
A mailslot is either read-only or write-only. The typical case
is that one machine starts up a reading mailslot, say for trace
output, and all other machines write to that mailslot either by
specifying the machine name directly or by broadcasting. This is
particularly convenient as the writing machines have no need to
determine where the trace-collecting mailslot is running or even
if it is running at all.
The format for mailslot names is \\\\<computer>\\mailslot\\<path>\\<to>\\<mailslot>
The computer name can be "." for the local computer, a Windows
computer name, a domain name, or an asterisk to indicate a broadcast.
It's not necessary to have a complex path for the mailslot but it is
supported and could be used to segregate functionally similar
mailslots on the same or different machines.
This class deliberately wraps the mailslot API in a Python
API which is plug-compatible with that of the Python Queue
mechanism with the following notes:
* A mailslot is either read-only or write-only. Generally,
the first action taken on it determines which it is, although
remote mailslots can only be written to so this is predetermined.
* A mailslot can be context-managed so that it is opened and
closed correctly regardless of any errors.
* A mailslot is its own iterator (strictly: generator)
* By default the data through a mailslot is expected to be
text, and is passed through untouched. Alternative
serialisers can be provided, for example marshal.dumps and
marshal.loads to allow simple objects to be transmitted via
the mailslot. Note that the maximum message size still applies
so it's not possible to send very complex datasets this way.
Since a mailslot will always return immediately if passed to one
of the Wait... functions, events or other synchronisation objects
will be needed to coordinate between mailslots.
"""
MAX_MESSAGE_SIZE = 420
def __init__(
self,
name,
serialiser=(_unserialised, _unserialised),
message_size=0,
timeout_ms=-1,
*args, **kwargs
):
"""Set up a mailslot of the given name, which must be valid according to
the Microsoft docs.
:param serialiser: a pair of callable which will be used to
encode & decode data respectively. Typical
serialisers would be (marshal.dumps, marshal.loads).
:type serialiser: a pair of callables each taking one param and returning bytes
:param message_size: the maximum size of a message to this mailslot,
up to the system-defined maximum of about 400 bytes
if passing between computers.
:param timeout_ms: how many milliseconds to wait when reading from
this mailslot
"""
core._WinSysObject.__init__(self, *args, **kwargs)
self.name = name
self.serialiser = serialiser
self.message_size = message_size
self.timeout_ms = timeout_ms
self._hRead = self._hWrite = None
#
# If the name is a local mailslot it could conceivably
# be used for reading or for writing. If it is a
# remote (including domain) mailslot, it can only
# be written to.
#
if not name.startswith(r"\\."):
self._hWrite = self._write_handle()
if self.message_size != 0 and self.message_size > self.MAX_MESSAGE_SIZE:
warnings.warn(
"You have specified a message size of %d for a remote mailslot."
"Messages over %d in size will probably fail" % \
(self.message_size, self.MAX_MESSAGE_SIZE)
)
def _read_handle(self):
if self._hWrite is not None:
raise x_mailslot_invalid_use(None, "Mailslot._read_handle", "Cannot read from this mailslot; it is used for writing")
if self._hRead is None:
self._hRead = wrapped(win32file.CreateMailslot, self.name, self.message_size, self.timeout_ms, None)
return self._hRead
def _write_handle(self):
if self._hRead is not None:
raise x_mailslot_invalid_use(None, "Mailslot._write_handle", "Cannot write to this mailslot; it is used for reading")
if self._hWrite is None:
self._hWrite = wrapped(
win32file.CreateFile,
self.name,
fs.FILE_ACCESS.GENERIC_WRITE,
fs.FILE_SHARE.READ,
None,
fs.FILE_CREATION.OPEN_EXISTING,
fs.FILE_ATTRIBUTE.NORMAL,
None
)
return self._hWrite
[docs] def pyobject(self):
"""
:returns: the underlying PyHANDLE object
:raises: :exc:`x_mailslot` if the mailslot has not yet been determined for reading or for writing
"""
if self._hRead:
return self._hRead
elif self._hWrite:
return self._hWrite
else:
raise x_mailslot(None, "Mailslot.pyobject", "Mailslot has not yet been used for reading or writing")
def __iter__(self):
while True:
yield self.get()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
[docs] def as_string(self):
return self.name
[docs] def dumped(self, level=0):
output = []
output.append("name: %s" % self.name)
output.append("message_size: %s" % self.message_size)
output.append("timeout_ms: %s" % self.timeout_ms)
if self._hRead:
output.append("in use for reading")
elif self._hWrite:
output.append("in use for writing")
else:
output.append("not yet used for reading or writing")
return utils.dumped("\n".join(output), level)
[docs] def qsize(self):
"""
:returns: the number of messages waiting in the mailslot
"""
maxsize, nextsize, message_count, timeout = wrapped(win32file.GetMailslotInfo, self._read_handle())
return message_count
[docs] def empty(self):
"""
:returns: `True` if there is nothing waiting to be read
"""
maxsize, nextsize, message_count, timeout = wrapped(win32file.GetMailslotInfo, self._read_handle())
return message_count == 0
[docs] def full(self):
"""
:returns: `True` if the number of messages waiting to be read has reached the maximum size for the mailslot
"""
maxsize, nextsize, message_count, timeout = wrapped(win32file.GetMailslotInfo, self._read_handle())
return message_count == maxsize
[docs] def get(self, block=True, timeout_ms=None):
"""Attempt to read from the mailslot, optionally blocking and timing out if nothing is found.
:param block: whether to wait `timeout_ms` milliseconds before raising `x_mailslot_empty`
:param timeout_ms: how many milliseconds to wait before timing out if blocking. None => wait forever
:returns: the first message from the mailslot queue serialised according to the class's `serialiser`
:raises: :exc:`x_mailslot_empty` if timed out or unblocked
"""
hMailslot = self._read_handle()
if timeout_ms is None:
timeout_ms = self.timeout_ms
if timeout_ms == -1:
timeout = None
else:
timeout = timeout_ms / 1000.0
t0 = time.time()
while True:
maxsize, nextsize, message_count, default_timeout = wrapped(win32file.GetMailslotInfo, hMailslot)
if message_count == 0:
if block:
if (timeout is not None) and (time.time() - t0) > timeout:
raise x_mailslot_empty
else:
time.sleep (0.001)
else:
raise x_mailslot_empty
else:
hr, data = wrapped(win32file.ReadFile, hMailslot, nextsize, None)
serialiser_in, serialiser_out = self.serialiser
return serialiser_out(data)
[docs] def get_nowait(self):
"Convenience wrapper which calls :meth:`get` without blocking"
return self.get(False, 0)
[docs] def put(self, data):
"""Attempt to write to the mailslot
:param data: data to be written to the mailslot via its serialisers
:raises: :exc:`x_mailslot_message_too_big` if the serialised message
exceeds the mailslot's maximum size
"""
serialiser_in, serialiser_out = self.serialiser
data = serialiser_in(data)
if self.message_size and len(data) > self.message_size:
raise x_mailslot_message_too_big(
None,
"%s.put" % self.__class__.__name__,
"Mailslot messages must be <= %d bytes" % self.message_size
)
wrapped(win32file.WriteFile, self._write_handle(), data, None)
[docs] def close(self):
"""Close the mailslot for reading or writing. This will be called automatically
if the mailslot is being context-managed. Closing a mailslot which has not been
used (and which therefore has no open handles) will succeed silently.
"""
if self._hRead is not None:
wrapped(win32file.CloseHandle, self._hRead)
if self._hWrite is not None:
wrapped(win32file.CloseHandle, self._hWrite)
[docs]class Event(core._WinSysObject):
"""An event object is an interprocess (but not intermachine) synchronisation
mechanism which allows one or more threads of control to wait on others.
The most common configuration is given by the defaults: anonymous,
not set initially, and automatically reset once set (ie only set for long
enough to release any waiting threads and then reset. A common
variant is a named event which can then be referred to easily by other
processes without having to pass handles around. This is why the :func:`ipc.event`
function reverses the order of parameters and takes the name first.
The Event class mimics Python's Event objects which are in any case very
close to the semantics of the underlying Windows object. For that reason,
although :meth:`clear` is used to reset the event, :meth:`reset` is also
provided as an alias, matching the Windows API.
An event is Truthful if it is currently set
"""
def __init__(self, security=None, needs_manual_reset=False, initially_set=False, name=None):
core._WinSysObject.__init__(self)
self.security = security
self.needs_manual_reset = needs_manual_reset
self.initially_set = initially_set
self.name = name
self._hEvent = None
[docs] def as_string(self):
return self.name or str(int(self._handle()))
[docs] def dumped(self, level=0):
output = []
output.append("name: %s" % self.name or "anonymous")
output.append("needs_manual_reset: %s" % self.needs_manual_reset)
output.append("initially_set: %s" % self.initially_set)
return utils.dumped("\n".join(output), level)
[docs] def pyobject(self):
return self._handle()
def _handle(self):
if self._hEvent is None:
self._hEvent = wrapped(
win32event.CreateEvent,
self.security,
self.needs_manual_reset,
self.initially_set,
self.name
)
return self._hEvent
[docs] def pulse(self):
"Cause the event to set and reset immediately"
wrapped(win32event.PulseEvent, self._handle())
[docs] def set(self):
"Signal the event"
wrapped(win32event.SetEvent, self._handle())
[docs] def clear(self):
"Reset the event"
wrapped(win32event.ResetEvent, self._handle())
reset = clear
[docs] def wait(self, timeout_s=WAIT.INFINITE):
"""Wait, optionally timing out, for the event to fire. cf also the :func:`any` and
:func:`all` convenience functions which take an iterable of events or other objects.
:param timeout_s: how many seconds to wait before timing out.
:type timeout_s: float
:returns: `True` if the event fired, `False` otherwise
"""
if timeout_s == WAIT.INFINITE:
timeout_ms = timeout_s
else:
timeout_ms = timeout_s * 1000.0
result = wrapped(win32event.WaitForSingleObject, self._handle(), int(timeout_ms))
if result == WAIT.TIMEOUT:
return False
else:
return True
[docs] def isSet(self):
"Detect whether the event is currently set (by waiting without blocking)"
return self.wait(0)
def __nonzero__(self):
return self.isSet()
[docs]class Mutex(core._WinSysObject):
"""A Mutex is a kernel object which can only be held by one thread or process
at a time. Its usual application is to protect shared data structures or to
prevent more than one instance of an application from running simultaneously.
Mutexes can be named or anonymous. Anonymous mutexes can be used between
processes by passing their handle from one process to the other on the
command line.
This is very similar to a Python threading.Lock object. (In fact the Python
objects are implemented as Semaphores on Windows, presumably for re-entrancy).
For this reason, the :meth:`acquire` and :meth:`release` names have been
used for methods.
The mutex is its own context manager, so a typical usage would be::
from winsys import ipc
with ipc.mutex("ONLYONCE"):
# do stuff
"""
def __init__(self, name=None, take_initial_ownership=False):
core._WinSysObject.__init__(self)
self.name = name
self._handle = wrapped(win32event.CreateMutex, None, take_initial_ownership, name)
def __enter__(self):
self.acquire()
return self
def __exit__(self, *args):
self.release()
[docs] def pyobject(self):
return self._handler
[docs] def as_string(self):
return self.name or str(int(self._handle))
[docs] def acquire(self, timeout_ms=WAIT.INFINITE):
"""Acquire the mutex waiting for `timeout_ms` milliseconds before failing
:param timeout_ms: how many milliseconds to wait before giving up
:raises: :exc:`x_ipc_timeout` if timeout expires
"""
result = wrapped(win32event.WaitForSingleObject, self._handle, timeout_ms)
if result == WAIT.TIMEOUT:
raise x_ipc_timeout(None, "Mutex.acquire", "timed out")
[docs] def release(self):
"""Release the mutex. Consider using the object as a context manager
instead.
"""
wrapped(win32event.ReleaseMutex, self._handle)
[docs]class Pipe(core._WinSysObject):
"""A pipe is a kernel object which allows communication between two parts
of a process or two separate processes, possibly on separate machines. A
pipe can be named or anonymous. The former can span processes and machines;
the latter are typically used within one process although they can cross
processes with some effort.
Named pipes have one particular characteristic which makes them especially
interesting for handing off between a client and a server: the server can
transparently impersonate the security context of the client. This makes
them ideal for, eg, a server which accepts requests from a client and
actions them on the client's behalf.
"""
DEFAULT_IN_BUFFER_SIZE = 4096
DEFAULT_OUT_BUFFER_SIZE = 4096
def __init__(self, name=None, inheritable=False):
core._WinSysObject.__init__(self)
self.name = name
if inheritable:
self.sa = wrapped(win32security.SECURITY_ATTRIBUTES)
self.sa.bInheritHandle = True
else:
self.sa = None
[docs]class AnonymousPipe(Pipe):
def __init__(self, inheritable=False, buffer_size=0):
Pipe.__init__(self, None, inheritable)
r, w = wrapped(win32pipe.CreatePipe, self.sa, buffer_size)
self._rhandle = handles.handle(r)
self._whandle = handles.handle(w)
[docs] def reader(self, process=None):
return self._rhandle.duplicate(processes.process(process))
[docs] def writer(self, process=None):
return self._whandle.duplicate(processes.process(process))
[docs] def read(self):
"""Read bytes from the pipe.
:returns: any bytes waiting in the pipe. Will block if nothing is ready.
"""
handle = self._rhandle.pyobject()
data = ""
while True:
hr, _data = wrapped(win32file.ReadFile, handle, Pipe.DEFAULT_IN_BUFFER_SIZE, None)
data += _data
if hr == 0:
break
return data
[docs] def write(self, data):
"""Writes `data` to the pipe. Will block if the internal buffer fills up.
"""
handle = self._whandle.pyobject()
wrapped(win32file.WriteFile, handle, data)
[docs]class NamedPipe(Pipe):
def __init__(
self,
name,
mode=PIPE_ACCESS.DUPLEX | fs.FILE_FLAG.OVERLAPPED,
type=PIPE_TYPE.BYTE,
max_instances=win32pipe.PIPE_UNLIMITED_INSTANCES,
in_buffer_size=Pipe.DEFAULT_IN_BUFFER_SIZE,
out_buffer_size=Pipe.DEFAULT_OUT_BUFFER_SIZE,
default_timeout=NMPWAIT.USE_DEFAULT_WAIT,
inheritable=False
):
Pipe.__init__(self, name, inheritable)
self._pipe = None
self._pipe = wrapped(
win32pipe.CreateNamedPipe,
name,
mode,
type,
max_instances,
in_buffer_size,
out_buffer_size,
default_timeout,
self.sa
)
[docs] def listen(self, async=False):
if async:
from winsys import asyncio
waiter = asyncio.AsyncIO()
overlapped = waiter.overlapped
else:
overlapped = None
wrapped(win32pipe.ConnectNamedPipe, self._pipe, overlapped)
if async:
return waiter
#
# Module-level convenience functions
#
[docs]def mailslot(mailslot, marshalled=True, message_size=0, timeout_ms=-1):
"""Return a :class:`Mailslot` instance based on the name in `mailslot`.
If the name is not a fully-qualified mailslot name (\\.\mailslot) then
it is assumed to be on the local machine and is prefixed accordingly.
:param mailslot: a valid mailslot name, with the convenience that if it
is unqualified it is suitably prefixed to form a local
mailslot identifier.
:param marshalled: whether the data is to be marshalled or simply passed
along unchanged.
:param message_size: what message should be used; 0 to use the system default
:param timeout_ms: how many milliseconds should a read wait before giving up?
-1 to wait forever.
"""
if mailslot is None:
return None
elif isinstance(mailslot, Mailslot):
return mailslot
else:
if marshalled:
serialiser = marshal.dumps, marshal.loads
else:
serialiser = _unserialised, _unserialised
if not re.match(r"\\\\[^\\]+\\mailslot\\", unicode(mailslot), re.UNICODE):
mailslot = r"\\.\mailslot\%s" % mailslot
return Mailslot(mailslot, serialiser, message_size, timeout_ms)
[docs]def event(name=None, initially_set=False, needs_manual_reset=False, security=None):
"""Return a :class:`Event` instance, named or anonymous, unset by default
and with automatic reset.
:param name: a valid event name. If `None` (the default) then an anonymous
event is created which may be passed to threads which need to
synchronise.
:param initially_set: whether the event is set on creation. [False]
:param needs_manual_reset: whether the event needs to be reset manually once it
has fired. The alternative is that, once the event has
fired, it falls back to an unset state.
:param security: what security to apply to the new event
:type security: anything accepted by :func:`security.security`
:returns: a :class:`Event` instance
"""
return Event(security, needs_manual_reset, initially_set, name)
[docs]def mutex(name=None, take_initial_ownership=False):
"""Return a :class:`Mutex` instance, named or anonymous, not initially owned
by default.
:param name: a valid mutex name. If `None` (the default) then an anonymous
mutex is created which may be passed to threads which need to
synchronise.
:param take_initial_ownership: whether the mutex just created is to be owned
by the creating thread.
:returns: a :class:`Mutex` instance
"""
return Mutex(name, take_initial_ownership)
def open_pipe(name, mode="rw", timeout_ms=WAIT.INFINITE):
if not name.startswith("\\\\"):
name = r"\\.\pipe\%s" % name
result = win32pipe.WaitNamedPipe(name, timeout_ms)
read_mode = 0
if "r" in mode:
read_mode |= constants.GENERIC_ACCESS.READ
if "w" in mode:
read_mode |= constants.GENERIC_ACCESS.WRITE
hPipe = wrapped(win32file.CreateFile, name, read_mode, 0, None, fs.FILE_CREATION.OPEN_EXISTING, 0, None)
#~ win32pipe.
[docs]def pipe(name=None):
"""Return a pipe. If name is given a :class:`NamedPipe` is returned, otherwise
an :class:`AnonymousPipe`. If name is not in the correct form for a pipe
(\\\\<machine>\\pipe\\<name>) it is assumed to be a local pipe and renamed
as such.
"""
if name is None:
return AnonymousPipe()
else:
if not name.startswith("\\\\"):
name = r"\\.\pipe\%s" % name
return NamedPipe(name)
def wait(object, timeout_ms=WAIT.INFINITE):
"""Wait for one synchronisation object to fire.
:param object: an object whose `pyobject` method returns a handle to a synchronisation object
:param timeout_ms: how many milliseconds to wait
:raises: :exc:`x_ipc_timeout` if `timeout_ms` is exceeded
"""
result = wrapped(win32event.WaitForSingleObject, object.pyobject(), timeout_ms)
if result == WAIT.TIMEOUT:
raise x_ipc_timeout(None, "wait", "wait timed out")
[docs]def any(objects, timeout_ms=WAIT.INFINITE):
"""Wait for any of the Windows synchronisation objects in the list to fire.
The objects must be winsys synchronisation objects (or, at least, have
a pyobject method which returns a PyHANDLE object). The one which
fires will be returned unless a timeout occurs in which case x_ipc_timeout
will be raised.
:param objects: an iterable of winsys objects each of which has a waitable handle
:param timeout_ms: how many milliseconds to wait
:returns: the object which fired
:raises: :exc:`x_ipc_timeout` if `timeout_ms` is exceeded
"""
handles = [o.pyobject() for o in objects]
result = wrapped(win32event.WaitForMultipleObjects, handles, 0, timeout_ms)
if result == WAIT.TIMEOUT:
raise x_ipc_timeout(None, "any", "Wait timed out")
else:
return objects[result - WAIT.OBJECT_0]
[docs]def all(objects, timeout_ms=WAIT.INFINITE):
"""Wait for all of the Windows synchronisation objects in the list to fire.
The objects must be winsys synchronisation objects(or, at least, have
a pyobject method which returns a PyHANDLE object).
:param objects: an iterable of winsys objects each of which has a waitable handle
:param timeout_ms: how many milliseconds to wait
:raises: :exc:`x_ipc_timeout` if `timeout_ms` is exceeded
"""
handles = [o.pyobject() for o in objects]
result = wrapped(win32event.WaitForMultipleObjects, handles, 1, timeout_ms)
if result == WAIT.TIMEOUT:
raise x_ipc_timeout(None, "all", "Wait timed out")