diff --git a/Lib/test/audit-tests.py b/Lib/test/audit-tests.py index 66c08f7f2ff..4abf33d7cad 100644 --- a/Lib/test/audit-tests.py +++ b/Lib/test/audit-tests.py @@ -429,6 +429,26 @@ def test_wmi_exec_query(): sys.addaudithook(hook) _wmi.exec_query("SELECT * FROM Win32_OperatingSystem") +def test_syslog(): + import syslog + + def hook(event, args): + if event.startswith("syslog."): + print(event, *args) + + sys.addaudithook(hook) + syslog.openlog('python') + syslog.syslog('test') + syslog.setlogmask(syslog.LOG_DEBUG) + syslog.closelog() + # implicit open + syslog.syslog('test2') + # open with default ident + syslog.openlog(logoption=syslog.LOG_NDELAY, facility=syslog.LOG_LOCAL0) + sys.argv = None + syslog.openlog() + syslog.closelog() + if __name__ == "__main__": from test.support import suppress_msvcrt_asserts diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py index 09b3333afe1..50f78f2a6b8 100644 --- a/Lib/test/test_audit.py +++ b/Lib/test/test_audit.py @@ -16,6 +16,7 @@ AUDIT_TESTS_PY = support.findfile("audit-tests.py") class AuditTest(unittest.TestCase): + maxDiff = None @support.requires_subprocess() def do_test(self, *args): @@ -185,7 +186,6 @@ class AuditTest(unittest.TestCase): self.assertEqual(actual, expected) - def test_wmi_exec_query(self): import_helper.import_module("_wmi") returncode, events, stderr = self.run_python("test_wmi_exec_query") @@ -199,6 +199,29 @@ class AuditTest(unittest.TestCase): self.assertEqual(actual, expected) + def test_syslog(self): + syslog = import_helper.import_module("syslog") + + returncode, events, stderr = self.run_python("test_syslog") + if returncode: + self.fail(stderr) + + if support.verbose: + print('Events:', *events, sep='\n ') + + self.assertSequenceEqual( + events, + [('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'), + ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'), + ('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'), + ('syslog.closelog', '', ''), + ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'), + ('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'), + ('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'), + ('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'), + ('syslog.closelog', '', '')] + ) + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_syslog.py b/Lib/test/test_syslog.py index fe09bd39f8b..2125ec58d87 100644 --- a/Lib/test/test_syslog.py +++ b/Lib/test/test_syslog.py @@ -1,5 +1,9 @@ -from test.support import import_helper +from test.support import import_helper, threading_helper syslog = import_helper.import_module("syslog") #skip if not supported +from test import support +import sys +import threading +import time import unittest # XXX(nnorwitz): This test sucks. I don't know of a platform independent way @@ -8,6 +12,9 @@ import unittest class Test(unittest.TestCase): + def tearDown(self): + syslog.closelog() + def test_openlog(self): syslog.openlog('python') # Issue #6697. @@ -18,22 +25,59 @@ class Test(unittest.TestCase): syslog.syslog('test message from python test_syslog') syslog.syslog(syslog.LOG_ERR, 'test error from python test_syslog') + def test_syslog_implicit_open(self): + syslog.closelog() # Make sure log is closed + syslog.syslog('test message from python test_syslog') + syslog.syslog(syslog.LOG_ERR, 'test error from python test_syslog') + def test_closelog(self): syslog.openlog('python') syslog.closelog() + syslog.closelog() # idempotent operation def test_setlogmask(self): - syslog.setlogmask(syslog.LOG_DEBUG) + mask = syslog.LOG_UPTO(syslog.LOG_WARNING) + oldmask = syslog.setlogmask(mask) + self.assertEqual(syslog.setlogmask(0), mask) + self.assertEqual(syslog.setlogmask(oldmask), mask) def test_log_mask(self): - syslog.LOG_MASK(syslog.LOG_INFO) - - def test_log_upto(self): - syslog.LOG_UPTO(syslog.LOG_INFO) + mask = syslog.LOG_UPTO(syslog.LOG_WARNING) + self.assertTrue(mask & syslog.LOG_MASK(syslog.LOG_WARNING)) + self.assertTrue(mask & syslog.LOG_MASK(syslog.LOG_ERR)) + self.assertFalse(mask & syslog.LOG_MASK(syslog.LOG_INFO)) def test_openlog_noargs(self): syslog.openlog() syslog.syslog('test message from python test_syslog') + @threading_helper.requires_working_threading() + def test_syslog_threaded(self): + start = threading.Event() + stop = False + def opener(): + start.wait(10) + i = 1 + while not stop: + syslog.openlog(f'python-test-{i}') # new string object + i += 1 + def logger(): + start.wait(10) + while not stop: + syslog.syslog('test message from python test_syslog') + + orig_si = sys.getswitchinterval() + support.setswitchinterval(1e-9) + try: + threads = [threading.Thread(target=opener)] + threads += [threading.Thread(target=logger) for k in range(10)] + with threading_helper.start_threads(threads): + start.set() + time.sleep(0.1) + stop = True + finally: + sys.setswitchinterval(orig_si) + + if __name__ == "__main__": unittest.main() diff --git a/Modules/syslogmodule.c b/Modules/syslogmodule.c index c409fe968f8..1593eea94a6 100644 --- a/Modules/syslogmodule.c +++ b/Modules/syslogmodule.c @@ -235,7 +235,7 @@ syslog_setlogmask(PyObject *self, PyObject *args) if (!PyArg_ParseTuple(args, "l;mask for priority", &maskpri)) return NULL; - if (PySys_Audit("syslog.setlogmask", "(O)", args ? args : Py_None) < 0) { + if (PySys_Audit("syslog.setlogmask", "l", maskpri) < 0) { return NULL; } omaskpri = setlogmask(maskpri);