Source code for environment

# -*- coding: utf-8 -*-
"""Each process has an environment block (which may be empty). It
consists of a set of key-value pairs, each of which is a string.
The value string may be formed partly or wholly from other environment
variables using the %envvar% notation. By default, this module will
reinterpret those embedded variables but this can be overriden.

The process environment is derived on startup from a combination
of the system environment variables and the user's environment
variable, some of which are generated automatically by the
system to reflect the user's profile location and home drive etc.

All three environments are available as a dictalike class whose
interface matches the :class:`Env` base class. Each environment
object quacks like a dict in respect of item access, :meth:`Env.get`,
:meth:`Env.keys`, :meth:`Env.items` and :meth:`Env.update` methods
and the system and user objects supply an additional :meth:`Persistent.broadcast`
method which sends a message to top-level windows, such as the shell, to
indicate that the environment has changed.
"""
from __future__ import unicode_literals

import os, sys
import win32api
import win32profile
import win32gui
import win32con
import winerror

from winsys import core, exc, utils, registry

class x_environment(exc.x_winsys):
    "Base exception for all env exceptions"

WINERROR_MAP = {
    winerror.ERROR_ENVVAR_NOT_FOUND : exc.x_not_found,
}
wrapped = exc.wrapper(WINERROR_MAP, x_environment)

class _DelimitedText(list):
    """Helper class for values such as PATH and PATHEXT which are
    consistently semicolon-delimited text but which can helpfully
    be treated as a list of individual values. Subclasseed from
    list, it keeps track of the delimited list while exposing
    the more familiar Pythonesque list interface.
    """

    def __init__(self, env, key, delimiter=";", initialiser=None):
        super(_DelimitedText, self).__init__(env[key].split(delimiter) if initialiser is None else initialiser)
        self.env = env
        self.key = key
        self.delimiter = unicode(delimiter)

    def _update(self):
        self.env[self.key] = self.delimiter.join(self)

    def __delitem__(self, *args):
        super(_DelimitedText, self).__delitem__(*args)
        self._update()

    def __delslice__(self, *args):
        super(_DelimitedText, self).__delslice__(*args)
        self._update()

    def __iadd__(self, iterator):
        super(_DelimitedText, self).__iadd__(self.munge_item(unicode(i)) for i in iterator)
        self._update()
        return self

    def __setitem__(self, index, item):
        super(_DelimitedText, self).__setitem__(index, self.munge_item(unicode(item)))
        self._update()

    def __setslice__(self, index0, index1, iterator):
        super(_DelimitedText, self).__setitem__(index0, index1,(self.munge_item(unicode(item)) for item in iterator))
        self._update()

    def append(self, item):
        super(_DelimitedText, self).append(self.munge_item(unicode(item)))
        self._update()

    def extend(self, item):
        super(_DelimitedText, self).extend(self.munge_item(unicode(item)))
        self._update()

    def insert(self, index, item):
        super(_DelimitedText, self).insert(index, self.munge_item(unicode(object)))
        self._update()

    def pop(self, index=-1):
        result = super(_DelimitedText, self).pop(index)
        self._update()
        return result

    def remove(self, item):
        super(_DelimitedText, self).remove(self.munge_item(unicode(item)))
        self._update()

    def reverse(self):
        super(_DelimitedText, self).reverse()
        self._update()

    def sort(self):
        super(_DelimitedText, self).sort()
        self._update()

    def munge_item(self, item):
        return item

class _DelimitedPath(_DelimitedText):
    """Subclass of delimited text to ensure that valid filesystem paths
    are stored in the env var
    """

    def munge_item(self, item):
        return os.path.normpath(item).rstrip("\\")

[docs]class Env(core._WinSysObject): """Semi-abstract base class for all environment classes. Outlines a dict-like interface which relies on subclasses to implement simple :meth:`_get` and :meth:`_items` methods. """ def __getitem__(self, item): """Get environment strings like dictionary items:: from winsys import environment print environment.system()['windir'] """ raise NotImplementedError def __setitem__(self, item, value): """Set environment strings like dictionary items:: from winsys import environment environment.user()['winsys'] = 'TEST' """ raise NotImplementedError def __delitem__(self, item): """Remove an item from the environment:: from winsys import environment del environment.process()['winsys'] """ raise NotImplementedError def __repr__(self): return repr(dict(self).items())
[docs] def dumped(self, level): return utils.dumped_dict(dict(self).items(), level)
[docs] def keys(self): """Yield environment variable names """ raise NotImplementedError
[docs] def items(self, expand=True): """Yield key-value pairs of environment variables :param expand: whether to expand embedded environment variables [True] """ return( (k, self.expand(v) if expand else v) for k, v in self._items() )
iteritems = items def _get_path(self): if self.get("PATH"): return _DelimitedPath(self, "PATH") else: return _DelimitedPath(self, "PATH", initialiser=[]) def _set_path(self, iterator): self['PATH'] = ";".join(_DelimitedPath(self, "PATH", initialiser=iterator)) def _del_path(self): del self['PATH'] path = property(_get_path, _set_path, _del_path)
[docs] def get(self, item, default=None, expand=True): """Return an environment value if it exists, otherwise `[default]`. This is the only way to get an unexpanded environment value by setting `expand` to False. :param item: name of an environment variable :param default: value to return if no such environment variable exists. This default is expanded if `expand` is True. :param expand: whether to expand embedded environment variables [True] """ try: v = self._get(item) except KeyError: return default else: return self.expand(v) if expand else v
[docs] def update(self, dict_initialiser): """Update this environment from a dict-like object, typically another environment:: from winsys import environment penv = environment.process() penv.update(environment.system()) """ for k, v in dict(dict_initialiser).iteritems(): self[k] = v
@staticmethod
[docs] def expand(item): """Return a version of `item` with internal environment variables expanded to their corresponding value. This is done automatically by the functions in this class unless you specify `expand=False`. """ return wrapped(win32api.ExpandEnvironmentStrings, unicode(item))
class Process(Env): """The environment corresponding to the current process. This is visible only to the current process and its children (assuming the environment block is passed). Any changes you make here apply only for the lifetime of this process and do not affect the permanent user or system environment. See the :func:`system` and :func:`user` functions for ways to update the environment permanently. """ def __init__(self): super(Process, self).__init__() def keys(self): return (k for k in wrapped(win32profile.GetEnvironmentStrings).iterkeys()) def _items(self): return (item for item in wrapped(win32profile.GetEnvironmentStrings).iteritems()) def _get(self, item): return wrapped(win32api.GetEnvironmentVariable, item) def __getitem__(self, item): value = self._get(item) if value is None: raise KeyError else: return unicode(value) def __setitem__(self, item, value): if value is None: wrapped(win32api.SetEnvironmentVariable, item, None) else: wrapped(win32api.SetEnvironmentVariable, item, unicode(value)) def __delitem__(self, item): wrapped(win32api.SetEnvironmentVariable, item, None)
[docs]class Persistent(Env): """Represent persistent (registry-based) environment variables. These are held at system and at user level, the latter overriding the former when an process environment is put together. Don't instantiate this class directly: use the :func:`user` and :func:`system` functions. """ @staticmethod
[docs] def broadcast(timeout_ms=2000): """Broadcast a message to all top-level windows informing them that an environment change has occurred. The message must be sent, not posted, and times out after `timeout_ms` ms since some top-level windows handle this badly. NB This is a static method. """ win32gui.SendMessageTimeout( win32con.HWND_BROADCAST, win32con.WM_SETTINGCHANGE, 0, "Environment", win32con.SMTO_ABORTIFHUNG, timeout_ms )
def __init__(self, root): super(Persistent, self).__init__() self.registry = registry.registry(root) def _get(self, item): try: return unicode(self.registry.get_value(item)) except exc.x_not_found: raise KeyError
[docs] def keys(self): return (name for name, value in self.registry.itervalues())
def _items(self): return list(self.registry.itervalues()) def __getitem__(self, item): value = self._get(item) if value is None: raise KeyError else: return value def __setitem__(self, item, value): self.registry.set_value(item, unicode(value)) def __delitem__(self, item): del self.registry[item]
[docs]def process(): """Return a dict-like object representing the environment block of the current process. """ return Process()
[docs]def system(machine=None): """Return a dict-like object representing the system-level persistent environment variables, optionally selecting a different machine. :param machine: name or address of a different machine whose system environment is to be represented. """ ROOT = r"HKLM\System\CurrentControlSet\Control\Session Manager\Environment" if machine: root = r"\\%s\%s" % (machine, ROOT) else: root = ROOT return Persistent(root)
[docs]def user(): """Return a dict-like object representing the user-level persistent environment for the logged-on user. TODO: include alternate user functionality via logon token """ return Persistent("HKCU\Environment")
def broadcast(timeout_ms=2000): return Persistent.broadcast(timeout_ms=timeout_ms)