This commit is contained in:
Alexander Belopolsky 2011-05-02 12:31:17 -04:00
commit 09a98a99cd
4 changed files with 116 additions and 113 deletions

View File

@ -27,7 +27,10 @@ To use, simply 'import logging.handlers' and log away!
import logging, socket, os, pickle, struct, time, re import logging, socket, os, pickle, struct, time, re
from stat import ST_DEV, ST_INO, ST_MTIME from stat import ST_DEV, ST_INO, ST_MTIME
import queue import queue
import threading try:
import threading
except ImportError:
threading = None
try: try:
import codecs import codecs
@ -1218,116 +1221,117 @@ class QueueHandler(logging.Handler):
except: except:
self.handleError(record) self.handleError(record)
class QueueListener(object): if threading:
""" class QueueListener(object):
This class implements an internal threaded listener which watches for """
LogRecords being added to a queue, removes them and passes them to a This class implements an internal threaded listener which watches for
list of handlers for processing. LogRecords being added to a queue, removes them and passes them to a
""" list of handlers for processing.
_sentinel = None """
_sentinel = None
def __init__(self, queue, *handlers): def __init__(self, queue, *handlers):
""" """
Initialise an instance with the specified queue and Initialise an instance with the specified queue and
handlers. handlers.
""" """
self.queue = queue self.queue = queue
self.handlers = handlers self.handlers = handlers
self._stop = threading.Event() self._stop = threading.Event()
self._thread = None self._thread = None
def dequeue(self, block): def dequeue(self, block):
""" """
Dequeue a record and return it, optionally blocking. Dequeue a record and return it, optionally blocking.
The base implementation uses get. You may want to override this method The base implementation uses get. You may want to override this method
if you want to use timeouts or work with custom queue implementations. if you want to use timeouts or work with custom queue implementations.
""" """
return self.queue.get(block) return self.queue.get(block)
def start(self): def start(self):
""" """
Start the listener. Start the listener.
This starts up a background thread to monitor the queue for This starts up a background thread to monitor the queue for
LogRecords to process. LogRecords to process.
""" """
self._thread = t = threading.Thread(target=self._monitor) self._thread = t = threading.Thread(target=self._monitor)
t.setDaemon(True) t.setDaemon(True)
t.start() t.start()
def prepare(self , record): def prepare(self , record):
""" """
Prepare a record for handling. Prepare a record for handling.
This method just returns the passed-in record. You may want to This method just returns the passed-in record. You may want to
override this method if you need to do any custom marshalling or override this method if you need to do any custom marshalling or
manipulation of the record before passing it to the handlers. manipulation of the record before passing it to the handlers.
""" """
return record return record
def handle(self, record): def handle(self, record):
""" """
Handle a record. Handle a record.
This just loops through the handlers offering them the record This just loops through the handlers offering them the record
to handle. to handle.
""" """
record = self.prepare(record) record = self.prepare(record)
for handler in self.handlers: for handler in self.handlers:
handler.handle(record) handler.handle(record)
def _monitor(self): def _monitor(self):
""" """
Monitor the queue for records, and ask the handler Monitor the queue for records, and ask the handler
to deal with them. to deal with them.
This method runs on a separate, internal thread. This method runs on a separate, internal thread.
The thread will terminate if it sees a sentinel object in the queue. The thread will terminate if it sees a sentinel object in the queue.
""" """
q = self.queue q = self.queue
has_task_done = hasattr(q, 'task_done') has_task_done = hasattr(q, 'task_done')
while not self._stop.isSet(): while not self._stop.isSet():
try: try:
record = self.dequeue(True) record = self.dequeue(True)
if record is self._sentinel: if record is self._sentinel:
break
self.handle(record)
if has_task_done:
q.task_done()
except queue.Empty:
pass
# There might still be records in the queue.
while True:
try:
record = self.dequeue(False)
if record is self._sentinel:
break
self.handle(record)
if has_task_done:
q.task_done()
except queue.Empty:
break break
self.handle(record)
if has_task_done:
q.task_done()
except queue.Empty:
pass
# There might still be records in the queue.
while True:
try:
record = self.dequeue(False)
if record is self._sentinel:
break
self.handle(record)
if has_task_done:
q.task_done()
except queue.Empty:
break
def enqueue_sentinel(self): def enqueue_sentinel(self):
""" """
This is used to enqueue the sentinel record. This is used to enqueue the sentinel record.
The base implementation uses put_nowait. You may want to override this The base implementation uses put_nowait. You may want to override this
method if you want to use timeouts or work with custom queue method if you want to use timeouts or work with custom queue
implementations. implementations.
""" """
self.queue.put_nowait(self._sentinel) self.queue.put_nowait(self._sentinel)
def stop(self): def stop(self):
""" """
Stop the listener. Stop the listener.
This asks the thread to terminate, and then waits for it to do so. This asks the thread to terminate, and then waits for it to do so.
Note that if you don't call this before your application exits, there Note that if you don't call this before your application exits, there
may be some records still left on the queue, which won't be processed. may be some records still left on the queue, which won't be processed.
""" """
self._stop.set() self._stop.set()
self.enqueue_sentinel() self.enqueue_sentinel()
self._thread.join() self._thread.join()
self._thread = None self._thread = None

View File

@ -1385,7 +1385,10 @@ class DatagramHandlerTest(BaseTest):
logger = logging.getLogger("udp") logger = logging.getLogger("udp")
logger.error("spam") logger.error("spam")
self.handled.wait() self.handled.wait()
self.assertEqual(self.log_output, "spam\n") self.handled.clear()
logger.error("eggs")
self.handled.wait()
self.assertEqual(self.log_output, "spam\neggs\n")
@unittest.skipUnless(threading, 'Threading required for this test.') @unittest.skipUnless(threading, 'Threading required for this test.')
@ -2631,6 +2634,8 @@ class QueueHandlerTest(BaseTest):
self.assertEqual(data.name, self.que_logger.name) self.assertEqual(data.name, self.que_logger.name)
self.assertEqual((data.msg, data.args), (msg, None)) self.assertEqual((data.msg, data.args), (msg, None))
@unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
'logging.handlers.QueueListener required for this test')
def test_queue_listener(self): def test_queue_listener(self):
handler = TestHandler(Matcher()) handler = TestHandler(Matcher())
listener = logging.handlers.QueueListener(self.queue, handler) listener = logging.handlers.QueueListener(self.queue, handler)

View File

@ -132,6 +132,8 @@ Core and Builtins
Library Library
------- -------
- logging: don't define QueueListener if Python has no thread support.
- Issue #11277: mmap.mmap() calls fcntl(fd, F_FULLFSYNC) on Mac OS X to get - Issue #11277: mmap.mmap() calls fcntl(fd, F_FULLFSYNC) on Mac OS X to get
around a mmap bug with sparse files. Patch written by Steffen Daode Nurpmeso. around a mmap bug with sparse files. Patch written by Steffen Daode Nurpmeso.

View File

@ -720,24 +720,16 @@ PyInit_signal(void)
Py_DECREF(x); Py_DECREF(x);
#ifdef SIG_BLOCK #ifdef SIG_BLOCK
x = PyLong_FromLong(SIG_BLOCK); if (PyModule_AddIntMacro(m, SIG_BLOCK))
if (!x || PyDict_SetItemString(d, "SIG_BLOCK", x) < 0) goto finally;
goto finally;
Py_DECREF(x);
#endif #endif
#ifdef SIG_UNBLOCK #ifdef SIG_UNBLOCK
x = PyLong_FromLong(SIG_UNBLOCK); if (PyModule_AddIntMacro(m, SIG_UNBLOCK))
if (!x || PyDict_SetItemString(d, "SIG_UNBLOCK", x) < 0) goto finally;
goto finally;
Py_DECREF(x);
#endif #endif
#ifdef SIG_SETMASK #ifdef SIG_SETMASK
x = PyLong_FromLong(SIG_SETMASK); if (PyModule_AddIntMacro(m, SIG_SETMASK))
if (!x || PyDict_SetItemString(d, "SIG_SETMASK", x) < 0) goto finally;
goto finally;
Py_DECREF(x);
#endif #endif
x = IntHandler = PyDict_GetItemString(d, "default_int_handler"); x = IntHandler = PyDict_GetItemString(d, "default_int_handler");