Source code for event_logs

# -*- coding: utf-8 -*-
from __future__ import with_statement
import os, sys
import contextlib
import re
import struct
import warnings

import winerror
import win32api
import win32event
import win32evtlog
import win32evtlogutil
import pywintypes

from winsys._compat import *
from winsys import accounts, constants, core, exc, registry, utils

EVENTLOG_READ = constants.Constants.from_pattern("EVENTLOG_*_READ", namespace=win32evtlog)
EVENTLOG_READ.doc("Ways of reading event logs")
EVENTLOG_TYPE = constants.Constants.from_pattern("EVENTLOG_*_TYPE", namespace=win32evtlog)
EVENTLOG_TYPE.update(dict(
    EVENTLOG_SUCCESS = 0,
    AUDIT_FAILURE = win32evtlog.EVENTLOG_AUDIT_FAILURE,
    AUDIT_SUCCESS = win32evtlog.EVENTLOG_AUDIT_SUCCESS
))
EVENTLOG_TYPE.doc("Types of records in event logs")

PyHANDLE = pywintypes.HANDLEType

DEFAULT_LOG_NAME = "Application"

[docs]class x_event_logs(exc.x_winsys): "Base exception for eventlog-specific exceptions"
WINERROR_MAP = { winerror.ERROR_ACCESS_DENIED : exc.x_access_denied } wrapped = exc.wrapper(WINERROR_MAP)
[docs]class _EventLogEntry(core._WinSysObject): """Internal class for convenient access to attributes of an event log record. Attributes are available as lowercase_with_underscore equivalents of their TitleCase counterparts and are converted to Python data types where appropriate, eg time_written is a datetime value and sid is an :class:`accounts.Principal` instance. Two _EventLogEntry instances compare equal if they have the same record number on the same event log on the same computer. """ def __init__(self, event_log_name, event_log_entry): self._event_log_name = event_log_name self._event_log_entry = event_log_entry self.record_number = event_log_entry.RecordNumber self.time_generated = utils.from_pytime(event_log_entry.TimeGenerated) self.time_written = utils.from_pytime(event_log_entry.TimeWritten) self.event_id = event_log_entry.EventID self.event_type = event_log_entry.EventType self.event_category = event_log_entry.EventCategory try: self.sid = accounts.principal(event_log_entry.Sid) except accounts.x_accounts: self.sid = None self.computer_name = event_log_entry.ComputerName self.source_name = event_log_entry.SourceName self.data = event_log_entry.Data self._message = None def as_string(self): return "%d - %s (%s)" % (self.record_number, self.source_name, EVENTLOG_TYPE.name_from_value(self.event_type, "<Unknown>")) def __eq__(self, other): return \ self.computer_name == other.computer_name and \ self._event_log_name == other._event_log_name and \ self.record_number == other.record_number def __hash__(self): return hash((self.computer_name, self._event_log_name, self.record_number)) def dumped(self, level=0): output = [] output.append("record_number: %s" % self.record_number) output.append("time_generated: %s" % self.time_generated) output.append("time_written: %s" % self.time_written) output.append("event_id: %s" % self.event_id) output.append("source_name: %s" % self.source_name) output.append("event_type: %s" % EVENTLOG_TYPE.name_from_value(self.event_type)) output.append("event_category: %s" % self.event_category) output.append("sid: %s" % self.sid) output.append("computer_name: %s" % self.computer_name) output.append("data: %s" % repr(self.data)) output.append("message: %s" % self.message) return utils.dumped("\n".join(output), level) def _get_message(self): if self._message is None: self._message = wrapped(win32evtlogutil.SafeFormatMessage, self._event_log_entry, self._event_log_name) return self._message message = property(_get_message)
[docs]class EventLog(core._WinSysObject): """An Event Log is a sequential database managed through API calls with a number of different Event Sources, against which events can be logged. The log can be read backwards (using the reversed() builtin) or forwards but only sequentially. (We simulate random access by reading sequentially until a record is hit). You can use the builtin len() to determine the current size of this log (which may or may not correspond to the maximum record number). Item access is possible from either end by subscripting the log in the usual way. It should be noted that this uses iteration, forward or reverse as needed, so is not going to be that efficient except to find a few records at either end. Instances of this class are expected to be accessed via the :func:`event_log` function. """ REG_ROOT = r"\\%s\HKLM\SYSTEM\CurrentControlSet\Services\Eventlog" def __init__(self, computer, name): core._WinSysObject.__init__(self) self.computer = computer or "." self.name = name try: key = registry.registry(self.REG_ROOT % self.computer).get_key(self.name) except exc.x_winsys(err): warnings.warn("Registry access failed with error: %s; log access may still be possible" % err.args[-1]) values = dict() else: if key: values = dict(key.values()) else: raise exc.x_not_found(None, "EventLog", r"\\%s\%s" % (self.computer, self.name)) self.auto_backup_log_files = values.get("AutoBackupLogFiles") self.display_name_file = values.get("DisplayNameFile") self.display_name_id = values.get("DisplayNameID") self.file = values.get("File") self.max_size = values.get("MaxSize") self.primary_module = values.get("PrimaryModule") self.restrict_guest_access = values.get("RestrictGuestAccess") self.retention = values.get("Retention") self.sources = values.get("Sources") self._handle = wrapped(win32evtlog.OpenEventLog, self.computer, self.name) def as_string(self): return r"%s\%s" % (self.computer, self.name) def dumped(self, level=0): output = [] if self.auto_backup_log_files: output.append("auto_backup_log_files: %s" % self.auto_backup_log_files) if self.display_name_file: output.append("display_name_file: %s" % self.display_name_file) if self.display_name_id: output.append("display_name_id: %s" % self.display_name_id) if self.file: output.append("file: %s" % self.file) if self.max_size is not None: output.append("max_size: %s" % utils.size_as_mb(self.max_size)) if self.primary_module: output.append("primary_module: %s" % self.primary_module) if self.restrict_guest_access: output.append("restrict_guest_access: %s" % self.restrict_guest_access) if self.retention: output.append("retention: %s" % utils.secs_as_string(self.retention)) if self.sources: output.append("sources: %s" % utils.dumped_list(self.sources, level)) return utils.dumped("\n".join(output), level) @contextlib.contextmanager def _temp_handle(self): """Internal, context-managed function to provide a working handle for the event log. You can't just open one at the beginning and work with it. """ handle = wrapped(win32evtlog.OpenEventLog, self.computer, self.name) yield handle wrapped(win32evtlog.CloseEventLog, handle)
[docs] def clear(self, save_to_filename=None): """Clear the event log, optionally saving out to an opaque file first, using the built-in functionality. """ wrapped(win32evtlog.ClearEventLog, self._handle, unicode(save_to_filename) if save_to_filename else None)
[docs] def __len__(self): """Allow len() to return the number of records in the event log""" return wrapped(win32evtlog.GetNumberOfEventLogRecords, self._handle)
[docs] def __getitem__(self, index): """Allow the event log to be accessed by numeric index. An index of zero represents the oldest available record; -1 represents the latest available record. NB This is slow since it simply wraps an iterator in the appropriate direction. There is no way to find an arbitrary record in an event log. """ with self._temp_handle() as handle: if index >= 0: record_number = wrapped(win32evtlog.GetOldestEventLogRecord, handle) + index direction = EVENTLOG_READ.FORWARDS else: record_number = wrapped(win32evtlog.GetOldestEventLogRecord, handle) + len(self) + index direction = EVENTLOG_READ.BACKWARDS for entry in wrapped( win32evtlog.ReadEventLog, handle, EVENTLOG_READ.SEEK | direction, record_number ): return _EventLogEntry(self, entry)
def _iterator(self, flags): """Internal function to open a handle over the event log and iterate in either direction. """ with self._temp_handle() as handle: while True: entries = wrapped(win32evtlog.ReadEventLog, handle, flags, 0) if entries: for entry in entries: yield _EventLogEntry(self, entry) else: raise StopIteration
[docs] def __iter__(self): """Return an iterator which traverses this event log lates record first""" return self._iterator(EVENTLOG_READ.FORWARDS | EVENTLOG_READ.SEQUENTIAL)
[docs] def __reversed__(self): """Return an iterator which traverses this event log oldest record first""" return self._iterator(EVENTLOG_READ.BACKWARDS | EVENTLOG_READ.SEQUENTIAL)
[docs] def watcher(self): """(EXPERIMENTAL) Unsure if this will be of any use. In principle, you can ask for an event to fire when a new record is written to this log. In practice, though, there's no way of determining which record was added and you have to do some housekeeping and work out what changed. Probably quite inefficient since it has to keep iterating backwards over the log every time to find the last record to match against. Does work, though. """ TIMEOUT_SECS = 2 hEvent = win32event.CreateEvent(None, 1, 0, None) iterator = iter(self) last_record = self[-1] for i in iterator: if i == last_record: break with self._temp_handle() as handle: wrapped(win32evtlog.NotifyChangeEventLog, self._handle, hEvent) while True: if win32event.WaitForSingleObject(hEvent, 1000 * TIMEOUT_SECS) != win32event.WAIT_TIMEOUT: last_record = self[-1] for i in iterator: yield i if i == last_record: break
[docs] def log_event(self, source, *args, **kwargs): """Pass-through for :func:`log_event`""" log_event(source, *args, **kwargs)
[docs]class EventSource(core._WinSysObject): """An Event Source is an apparently necessary but in fact slightly unnecessary part of the event log mechanism. In principle, it represents a name and a DLL with a bunch of message ids in it. In practice, you can log an event with an unregistered event source and it will work quite happily although the event viewer won't be able to pick up the full message, only the inserted strings and the added data. Implemented here mostly for internal use in the :func:`log_event` function. NB We're using the convenience functions offered by win32evtlogutil, which make use of defaults built in to the win32event.pyd file. In the future we may implement our own .DLL builder. Instances of this class are expected to be accessed via the :func:`event_source` module-level function. """ _keys = ['CategoryCount', 'CategoryMessageFile', 'EventMessageFile', 'ParameterMessageFile', 'TypesSupported'] def __init__(self, computer, log_name, source_name): core._WinSysObject.__init__(self) self.computer = computer or "." self.log_name = log_name or DEFAULT_LOG_NAME self.name = source_name key = registry.registry(r"%s\%s\%s" % (EventLog.REG_ROOT % self.computer, self.log_name, self.name)) if not key: raise exc.x_not_found(None, "EventSource", r"\\%s\%s\%s" % (self.computer, self.log_name, self.name)) self._handle = None values = dict(key.values()) types = dict((name, type) for (name, value, type) in key.values(_want_types=True)) self.category_count = values.get("CategoryCount") self.category_message_file = values.get("CategoryMessageFile") self.event_message_file = values.get("EventMessageFile") self.parameter_message_file = values.get("ParameterMessageFile") types_supported = values.get("TypesSupported") or 0 # # This is messy because, although TypeSupported is specified # as a DWORD and constitutes a set of flags, it seems to be # implemented in any number of ingenious ways, including # binary data representing a number and a string representing # the hexadecimal value of the flags. # try: self.types_supported = int(types_supported or 0) except ValueError: types_type = types.get("TypesSupported") if types_type == registry.REGISTRY_VALUE_TYPE.REG_SZ: if types_supported.startswith("0x"): self.types_supported = int(types_supported, 16) else: self.types_supported = int(types_supported, 10) elif types_type == registry.REGISTRY_VALUE_TYPE.REG_BINARY: self.types_supported, = struct.unpack("L", types_supported) else: raise x_event_logs(None, None, "Can't determine types supported") def as_string(self): return r"%s\%s\%s" % (self.computer, self.log_name, self.name) def dumped(self, level=0): output = [] output.append(self.as_string()) output.append("category_count: %s" % self.category_count) output.append("category_message_file: %r" % self.category_message_file) output.append("event_message_file: %r" % self.event_message_file) output.append("parameter_message_file: %r" % self.parameter_message_file) output.append("types_supported: %s" % EVENTLOG_TYPE.names_from_value(self.types_supported)) return utils.dumped("\n".join(output), level) # # Context manager to allow a handle for this event source to # be passed to the ReportEvent function in log_event (qv). # def __enter__(self): self._handle = wrapped(win32evtlog.RegisterEventSource, self.computer, self.name) return self._handle def __exit__(self, *exc_info): wrapped(win32evtlog.DeregisterEventSource, self._handle) self._handle = None @classmethod
[docs] def create(cls, name, log_name=DEFAULT_LOG_NAME): """Call the convenience functions to add a simple event source to the registry against a named event log (usually Application). Return the event source so you can log against it. :param name: name of the new event source :param log_name: name of the associated event log """ wrapped(win32evtlogutil.AddSourceToRegistry, appName=name, eventLogType=log_name) return cls("", log_name, name)
[docs] def delete(self): """Remove an event source from the registry. NB There is no particular security at work here: it's perfectly possible to remove someone else's event source. """ wrapped(win32evtlogutil.RemoveSourceFromRegistry, appName=self.name, eventLogType=self.log_name)
[docs] def log_event(self, *args, **kwargs): """Pass-through to module-level :func:`log_event`""" log_event(self, *args, **kwargs) # # Module-level convenience functions #
[docs]def event_logs(computer="."): """Simple iterator over all known event logs. """ for key in registry.registry(EventLog.REG_ROOT % computer).keys(): yield EventLog(computer, key.name)
[docs]def event_log(log): """Convenience function to return an :class:`EventLog` object representing one of the existing event logs. Will raise :exc:`x_not_found` if the event log does not exist. :param log: one of None, an :class:`EventLog` instance, or a [\\\\computer\\]name moniker """ if log is None: return None elif isinstance(log, EventLog): return log else: match = re.match(r"(?:\\\\([^\\]+)\\)?(.+)$", unicode(log), re.UNICODE) if match is None: raise x_event_logs(errmsg=r"Event log must be of form [\\computer\]event_log") else: computer, log_name = match.groups() return EventLog(computer or ".", log_name)
[docs]def event_sources(log_name=DEFAULT_LOG_NAME, computer="."): """Simple iterator over all the event sources for a named log """ for key in registry.registry(EventLog.REG_ROOT % computer).get_key(log_name).keys(): yield EventSource(computer, log_name, key.name)
[docs]def event_source(source): r"""Convenience function to return an :class:`EventSource` object representing one of the existing event sources. Will raise :exc:`exceptions.x_not_found` if the event source does not exist. :param source: one of None, an :class:`EventSource` instance, or a [[\\\\computer]\\log\\]name moniker """ if isinstance(source, EventSource): return source elif source is None: return None else: match = re.match(r"(?:\\\\([^\\]+)\\)?(?:([^\\]+)\\)?(.+)$", source, re.UNICODE) if match is None: raise x_event_logs(errmsg=r"Event source must be of form [\\computer\]event_log\event_source") else: computer, log_name, source_name = match.groups() return EventSource(computer or ".", log_name or DEFAULT_LOG_NAME, source_name)
[docs]def log_event(source, type="error", message=None, data=None, id=0, category=0, principal=core.UNSET): """Convenience function to log an event against an existing source. :param source: anything accepted by :func:`event_source` :param type: an :data:`EVENTLOG_TYPE` :param message: a string or list of strings :param data: a bytestring :param id: a number corresponding to the event message :param category: a number relevant to the event source :param principal: anything which :func:`accounts.principal` accepts [logged-on user] """ type = EVENTLOG_TYPE.constant(type) principal = accounts.me() if principal is core.UNSET else accounts.principal(principal) if isinstance(message, basestring): message = [message] message = message or [] with event_source(source) as hLog: wrapped( win32evtlog.ReportEvent, hLog, type, category, id, principal.pyobject(), message, data )
if __name__ == '__main__': pass