From 06f844eaa0a09b8524ade5734b4f2cc742a0a5c7 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 16 Oct 2023 15:42:57 -0600 Subject: [PATCH] gh-84570: Factor Out _channel_send_wait() (gh-110949) This makes several subsequent changes cleaner. --- Modules/_xxinterpchannelsmodule.c | 88 +++++++++++++++---------------- 1 file changed, 43 insertions(+), 45 deletions(-) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index bc8cd0e2cff..34efe9d6d1b 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -1640,6 +1640,34 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj, return 0; } +static int +_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj) +{ + PyThread_type_lock mutex = PyThread_allocate_lock(); + if (mutex == NULL) { + PyErr_NoMemory(); + return -1; + } + PyThread_acquire_lock(mutex, NOWAIT_LOCK); + + /* Queue up the object. */ + int res = _channel_send(channels, cid, obj, mutex); + if (res < 0) { + PyThread_release_lock(mutex); + goto finally; + } + + /* Wait until the object is received. */ + wait_for_lock(mutex); + + /* success! */ + res = 0; + +finally: + // XXX Delete the lock. + return res; +} + static int _channel_recv(_channels *channels, int64_t id, PyObject **res) { @@ -2526,30 +2554,16 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds) } cid = cid_data.cid; + /* Queue up the object. */ + int err = 0; if (blocking) { - PyThread_type_lock mutex = PyThread_allocate_lock(); - if (mutex == NULL) { - PyErr_NoMemory(); - return NULL; - } - PyThread_acquire_lock(mutex, WAIT_LOCK); - - /* Queue up the object. */ - int err = _channel_send(&_globals.channels, cid, obj, mutex); - if (handle_channel_error(err, self, cid)) { - PyThread_release_lock(mutex); - return NULL; - } - - /* Wait until the object is received. */ - wait_for_lock(mutex); + err = _channel_send_wait(&_globals.channels, cid, obj); } else { - /* Queue up the object. */ - int err = _channel_send(&_globals.channels, cid, obj, NULL); - if (handle_channel_error(err, self, cid)) { - return NULL; - } + err = _channel_send(&_globals.channels, cid, obj, NULL); + } + if (handle_channel_error(err, self, cid)) { + return NULL; } Py_RETURN_NONE; @@ -2584,33 +2598,17 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } + /* Queue up the object. */ + int err = 0; if (blocking) { - PyThread_type_lock mutex = PyThread_allocate_lock(); - if (mutex == NULL) { - Py_DECREF(tempobj); - PyErr_NoMemory(); - return NULL; - } - PyThread_acquire_lock(mutex, WAIT_LOCK); - - /* Queue up the buffer. */ - int err = _channel_send(&_globals.channels, cid, tempobj, mutex); - Py_DECREF(tempobj); - if (handle_channel_error(err, self, cid)) { - PyThread_acquire_lock(mutex, WAIT_LOCK); - return NULL; - } - - /* Wait until the buffer is received. */ - wait_for_lock(mutex); + err = _channel_send_wait(&_globals.channels, cid, tempobj); } else { - /* Queue up the buffer. */ - int err = _channel_send(&_globals.channels, cid, tempobj, NULL); - Py_DECREF(tempobj); - if (handle_channel_error(err, self, cid)) { - return NULL; - } + err = _channel_send(&_globals.channels, cid, tempobj, NULL); + } + Py_DECREF(tempobj); + if (handle_channel_error(err, self, cid)) { + return NULL; } Py_RETURN_NONE;