gh-84570: Factor Out _channel_send_wait() (gh-110949)

This makes several subsequent changes cleaner.
This commit is contained in:
Eric Snow 2023-10-16 15:42:57 -06:00 committed by GitHub
parent f46333b9f5
commit 06f844eaa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 43 additions and 45 deletions

View File

@ -1640,6 +1640,34 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj,
return 0;
}
static int
_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj)
{
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
PyErr_NoMemory();
return -1;
}
PyThread_acquire_lock(mutex, NOWAIT_LOCK);
/* Queue up the object. */
int res = _channel_send(channels, cid, obj, mutex);
if (res < 0) {
PyThread_release_lock(mutex);
goto finally;
}
/* Wait until the object is received. */
wait_for_lock(mutex);
/* success! */
res = 0;
finally:
// XXX Delete the lock.
return res;
}
static int
_channel_recv(_channels *channels, int64_t id, PyObject **res)
{
@ -2526,30 +2554,16 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds)
}
cid = cid_data.cid;
/* Queue up the object. */
int err = 0;
if (blocking) {
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
PyErr_NoMemory();
return NULL;
}
PyThread_acquire_lock(mutex, WAIT_LOCK);
/* Queue up the object. */
int err = _channel_send(&_globals.channels, cid, obj, mutex);
if (handle_channel_error(err, self, cid)) {
PyThread_release_lock(mutex);
return NULL;
}
/* Wait until the object is received. */
wait_for_lock(mutex);
err = _channel_send_wait(&_globals.channels, cid, obj);
}
else {
/* Queue up the object. */
int err = _channel_send(&_globals.channels, cid, obj, NULL);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
err = _channel_send(&_globals.channels, cid, obj, NULL);
}
if (handle_channel_error(err, self, cid)) {
return NULL;
}
Py_RETURN_NONE;
@ -2584,33 +2598,17 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
return NULL;
}
/* Queue up the object. */
int err = 0;
if (blocking) {
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
Py_DECREF(tempobj);
PyErr_NoMemory();
return NULL;
}
PyThread_acquire_lock(mutex, WAIT_LOCK);
/* Queue up the buffer. */
int err = _channel_send(&_globals.channels, cid, tempobj, mutex);
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
PyThread_acquire_lock(mutex, WAIT_LOCK);
return NULL;
}
/* Wait until the buffer is received. */
wait_for_lock(mutex);
err = _channel_send_wait(&_globals.channels, cid, tempobj);
}
else {
/* Queue up the buffer. */
int err = _channel_send(&_globals.channels, cid, tempobj, NULL);
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
err = _channel_send(&_globals.channels, cid, tempobj, NULL);
}
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
Py_RETURN_NONE;