Replace stat.ST_xxx usage with os.stat().st_xxx (#116501)

Modernize code to use the new API which avoids the usage of the stat
module just to read os.stat() members.

* Sort logging.handlers imports.
* Rework reopenIfNeeded() code to make it easier to follow.
* Replace "not self.stream" with "self.stream is None".
This commit is contained in:
Victor Stinner 2024-03-08 18:49:09 +01:00 committed by GitHub
parent cca30230d9
commit 61831585b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 41 additions and 28 deletions

View File

@ -23,11 +23,17 @@ Copyright (C) 2001-2021 Vinay Sajip. All Rights Reserved.
To use, simply 'import logging.handlers' and log away!
"""
import io, logging, socket, os, pickle, struct, time, re
from stat import ST_DEV, ST_INO, ST_MTIME
import queue
import threading
import copy
import io
import logging
import os
import pickle
import queue
import re
import socket
import struct
import threading
import time
#
# Some constants...
@ -269,7 +275,7 @@ class TimedRotatingFileHandler(BaseRotatingHandler):
# path object (see Issue #27493), but self.baseFilename will be a string
filename = self.baseFilename
if os.path.exists(filename):
t = os.stat(filename)[ST_MTIME]
t = int(os.stat(filename).st_mtime)
else:
t = int(time.time())
self.rolloverAt = self.computeRollover(t)
@ -459,8 +465,7 @@ class WatchedFileHandler(logging.FileHandler):
This handler is not appropriate for use under Windows, because
under Windows open files cannot be moved or renamed - logging
opens the files with exclusive locks - and so there is no need
for such a handler. Furthermore, ST_INO is not supported under
Windows; stat always returns zero for this value.
for such a handler.
This handler is based on a suggestion and patch by Chad J.
Schroeder.
@ -476,9 +481,11 @@ class WatchedFileHandler(logging.FileHandler):
self._statstream()
def _statstream(self):
if self.stream:
sres = os.fstat(self.stream.fileno())
self.dev, self.ino = sres[ST_DEV], sres[ST_INO]
if self.stream is None:
return
sres = os.fstat(self.stream.fileno())
self.dev = sres.st_dev
self.ino = sres.st_ino
def reopenIfNeeded(self):
"""
@ -488,6 +495,9 @@ class WatchedFileHandler(logging.FileHandler):
has, close the old stream and reopen the file to get the
current stream.
"""
if self.stream is None:
return
# Reduce the chance of race conditions by stat'ing by path only
# once and then fstat'ing our new fd if we opened a new log stream.
# See issue #14632: Thanks to John Mulligan for the problem report
@ -495,18 +505,23 @@ class WatchedFileHandler(logging.FileHandler):
try:
# stat the file by path, checking for existence
sres = os.stat(self.baseFilename)
# compare file system stat with that of our stream file handle
reopen = (sres.st_dev != self.dev or sres.st_ino != self.ino)
except FileNotFoundError:
sres = None
# compare file system stat with that of our stream file handle
if not sres or sres[ST_DEV] != self.dev or sres[ST_INO] != self.ino:
if self.stream is not None:
# we have an open file handle, clean it up
self.stream.flush()
self.stream.close()
self.stream = None # See Issue #21742: _open () might fail.
# open a new file handle and get new stat info from that fd
self.stream = self._open()
self._statstream()
reopen = True
if not reopen:
return
# we have an open file handle, clean it up
self.stream.flush()
self.stream.close()
self.stream = None # See Issue #21742: _open () might fail.
# open a new file handle and get new stat info from that fd
self.stream = self._open()
self._statstream()
def emit(self, record):
"""

View File

@ -2,7 +2,6 @@
"""
import os
import stat
import sys
import unittest
import socket
@ -29,7 +28,7 @@ class LargeFileTest:
mode = 'w+b'
with self.open(TESTFN, mode) as f:
current_size = os.fstat(f.fileno())[stat.ST_SIZE]
current_size = os.fstat(f.fileno()).st_size
if current_size == size+1:
return
@ -40,13 +39,13 @@ class LargeFileTest:
f.seek(size)
f.write(b'a')
f.flush()
self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1)
self.assertEqual(os.fstat(f.fileno()).st_size, size+1)
@classmethod
def tearDownClass(cls):
with cls.open(TESTFN, 'wb'):
pass
if not os.stat(TESTFN)[stat.ST_SIZE] == 0:
if not os.stat(TESTFN).st_size == 0:
raise cls.failureException('File was not truncated by opening '
'with mode "wb"')
unlink(TESTFN2)
@ -67,7 +66,7 @@ class TestFileMethods(LargeFileTest):
self.assertEqual(f.tell(), size + 1)
def test_osstat(self):
self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1)
self.assertEqual(os.stat(TESTFN).st_size, size+1)
def test_seek_read(self):
with self.open(TESTFN, 'rb') as f:

View File

@ -702,8 +702,7 @@ class TestMaildir(TestMailbox, unittest.TestCase):
self.assertEqual(self._box._factory, factory)
for subdir in '', 'tmp', 'new', 'cur':
path = os.path.join(self._path, subdir)
mode = os.stat(path)[stat.ST_MODE]
self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
self.assertTrue(os.path.isdir(path), f"Not a directory: {path!r}")
def test_list_folders(self):
# List folders