From 2f485d5376a3984597591ab74ed5867747fc82c1 Mon Sep 17 00:00:00 2001 From: Ivailo Monev Date: Thu, 18 Apr 2024 02:47:48 +0300 Subject: [PATCH] generic: replace QThreadPool and QRunnable with custom class on a side note it does not solve any thread-safety issues, such are out of the scope of the class Signed-off-by: Ivailo Monev --- includes/CMakeLists.txt | 1 + includes/KThreadPool | 1 + kdecore/CMakeLists.txt | 2 + kdecore/kernel/kthreadpool.cpp | 162 +++++++++++++++++++++++++++++++++ kdecore/kernel/kthreadpool.h | 59 ++++++++++++ kio/kio/khttp.cpp | 20 ++-- plasma/private/runnerjobs.cpp | 8 +- plasma/private/runnerjobs_p.h | 5 +- plasma/runnermanager.cpp | 39 ++++++-- 9 files changed, 271 insertions(+), 26 deletions(-) create mode 100644 includes/KThreadPool create mode 100644 kdecore/kernel/kthreadpool.cpp create mode 100644 kdecore/kernel/kthreadpool.h diff --git a/includes/CMakeLists.txt b/includes/CMakeLists.txt index 37329004..ea117aa5 100644 --- a/includes/CMakeLists.txt +++ b/includes/CMakeLists.txt @@ -251,6 +251,7 @@ install( KTempDir KTemporaryFile KTextEdit + KThreadPool KTimeEdit KTimeZone KTimerDialog diff --git a/includes/KThreadPool b/includes/KThreadPool new file mode 100644 index 00000000..e04e3da1 --- /dev/null +++ b/includes/KThreadPool @@ -0,0 +1 @@ +#include "../kthreadpool.h" diff --git a/kdecore/CMakeLists.txt b/kdecore/CMakeLists.txt index 79b8a5b5..5fd03a4e 100644 --- a/kdecore/CMakeLists.txt +++ b/kdecore/CMakeLists.txt @@ -72,6 +72,7 @@ set(kdecore_LIB_SRCS kernel/kglobal.cpp kernel/kcomponentdata.cpp kernel/kstandarddirs.cpp + kernel/kthreadpool.cpp services/kmimetypefactory.cpp services/kmimemagicrule.cpp services/kmimetypetrader.cpp @@ -218,6 +219,7 @@ install( kernel/kglobal.h kernel/kcomponentdata.h kernel/kstandarddirs.h + kernel/kthreadpool.h services/kmimetype.h services/kmimetypetrader.h services/kservice.h diff --git a/kdecore/kernel/kthreadpool.cpp b/kdecore/kernel/kthreadpool.cpp new file mode 100644 index 00000000..5702ff86 --- /dev/null +++ b/kdecore/kernel/kthreadpool.cpp @@ -0,0 +1,162 @@ +/* + This file is part of the KDE libraries + Copyright (C) 2024 Ivailo Monev + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Library General Public + License version 2, as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Library General Public License for more details. + + You should have received a copy of the GNU Library General Public License + along with this library; see the file COPYING.LIB. If not, write to + the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +*/ + +#include "kthreadpool.h" +#include "kdebug.h" + +#include +#include +#include +#include + +static const int s_waittimeout = 100; +static const int s_terminatetimeout = 5000; + +class KThreadPoolPrivate +{ +public: + KThreadPoolPrivate(KThreadPool *parent); + + void appendThread(QThread *thread); + + void _k_slotFinished(); + + QMutex mutex; + KThreadPool* parent; + int maxthreads; + QAtomicInt activethreadcount; + QList activethreads; + QList queuedthreads; +}; + +KThreadPoolPrivate::KThreadPoolPrivate(KThreadPool *_parent) + : parent(_parent), + maxthreads(QThread::idealThreadCount()), + activethreadcount(0) +{ +} + +void KThreadPoolPrivate::appendThread(QThread *thread) +{ + activethreadcount.ref(); + activethreads.append(thread); + parent->connect( + thread, SIGNAL(finished()), + parent, SLOT(_k_slotFinished()) + ); +} + +void KThreadPoolPrivate::_k_slotFinished() +{ + QMutexLocker locker(&mutex); + QMutableListIterator iter(activethreads); + while (iter.hasNext()) { + QThread* thread = iter.next(); + if (thread->isFinished()) { + kDebug() << "thread finished" << thread; + iter.remove(); + activethreadcount.deref(); + } + } + if (!queuedthreads.isEmpty()) { + QThread* thread = queuedthreads.takeFirst(); + kDebug() << "starting thread from queue" << thread; + appendThread(thread); + locker.unlock(); + thread->start(); + } +} + + +KThreadPool::KThreadPool(QObject *parent) + : QObject(parent ? parent : qApp), + d(new KThreadPoolPrivate(this)) +{ +} + +KThreadPool::~KThreadPool() +{ + waitForDone(); + delete d; +} + +void KThreadPool::start(QThread *thread, const QThread::Priority priority) +{ + QMutexLocker locker(&d->mutex); + if (d->activethreadcount >= d->maxthreads) { + kDebug() << "too many threads active, putting thread in queue" << thread; + d->queuedthreads.append(thread); + } else { + kDebug() << "starting thread" << thread; + d->appendThread(thread); + locker.unlock(); + thread->start(priority); + } +} + +void KThreadPool::waitForDone(const int timeout) +{ + kDebug() << "waiting for threads" << timeout; + QMutexLocker locker(&d->mutex); + QElapsedTimer elapsedtimer; + elapsedtimer.start(); + QMutableListIterator iter(d->activethreads); + kDebug() << "currently" << d->activethreads << "active threads"; + while ((timeout < 1 || elapsedtimer.elapsed() < timeout) && d->activethreadcount > 0) { + iter.toFront(); + while (iter.hasNext()) { + QThread* thread = iter.next(); + // kDebug() << "waiting for" << thread; + disconnect(thread, 0, this, 0); + thread->wait(s_waittimeout); + if (thread->isFinished()) { + kDebug() << "thread finished" << thread; + iter.remove(); + d->activethreadcount.deref(); + } + } + } + if (d->activethreadcount > 0) { + kWarning() << "still there are active threads" << d->activethreadcount; + iter.toFront(); + while (iter.hasNext()) { + QThread* thread = iter.next(); + kWarning() << "terminating" << thread; + thread->terminate(); + thread->wait(s_terminatetimeout); + } + } +} + +int KThreadPool::maxThreadCount() const +{ + return d->maxthreads; +} + +void KThreadPool::setMaxThreadCount(int count) +{ + d->maxthreads = count; +} + +int KThreadPool::activeThreadCount() const +{ + return d->activethreadcount; +} + +#include "moc_kthreadpool.cpp" diff --git a/kdecore/kernel/kthreadpool.h b/kdecore/kernel/kthreadpool.h new file mode 100644 index 00000000..c9fe9b3f --- /dev/null +++ b/kdecore/kernel/kthreadpool.h @@ -0,0 +1,59 @@ +/* + This file is part of the KDE libraries + Copyright (C) 2024 Ivailo Monev + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Library General Public + License version 2, as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Library General Public License for more details. + + You should have received a copy of the GNU Library General Public License + along with this library; see the file COPYING.LIB. If not, write to + the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +*/ + +#ifndef KTHREADPOOL_H +#define KTHREADPOOL_H + +#include + +#include +#include + +class KThreadPoolPrivate; + +/*! + Thread pool class + + @since 4.24 + @warning the API is subject to change +*/ +class KDECORE_EXPORT KThreadPool : public QObject +{ + Q_OBJECT + +public: + KThreadPool(QObject *parent = nullptr); + ~KThreadPool(); + + void start(QThread *thread, const QThread::Priority priority = QThread::InheritPriority); + void waitForDone(const int timeout = 30000); + + int maxThreadCount() const; + void setMaxThreadCount(int count); + + int activeThreadCount() const; + +private: + Q_DISABLE_COPY(KThreadPool); + KThreadPoolPrivate *const d; + + Q_PRIVATE_SLOT(d, void _k_slotFinished()); +}; + +#endif // KTHREADPOOL_H diff --git a/kio/kio/khttp.cpp b/kio/kio/khttp.cpp index 24bcb820..2d85de59 100644 --- a/kio/kio/khttp.cpp +++ b/kio/kio/khttp.cpp @@ -18,10 +18,9 @@ #include "khttp.h" #include "klocale.h" +#include "kthreadpool.h" #include "kdebug.h" -#include -#include #include #include #include @@ -380,10 +379,11 @@ void KHTTPHeadersParser::parseHeaders(const QByteArray &header, const bool authe // qDebug() << Q_FUNC_INFO << m_method << m_path << m_version << m_authuser << m_authpass; } -class KHTTPRunnable : public QRunnable +class KHTTPThread : public QThread { + Q_OBJECT public: - KHTTPRunnable(QFile *file, QTcpSocket *client, QAtomicInt *ref); + KHTTPThread(QObject *parent, QFile *file, QTcpSocket *client, QAtomicInt *ref); protected: void run() final; @@ -394,15 +394,15 @@ private: QAtomicInt* m_ref; }; -KHTTPRunnable::KHTTPRunnable(QFile *file, QTcpSocket *client, QAtomicInt *ref) - : QRunnable(), +KHTTPThread::KHTTPThread(QObject *parent, QFile *file, QTcpSocket *client, QAtomicInt *ref) + : QThread(parent), m_file(file), m_client(client), m_ref(ref) { } -void KHTTPRunnable::run() +void KHTTPThread::run() { QByteArray httpbuffer(KHTTP_BUFFSIZE, '\0'); qint64 httpfileresult = m_file->read(httpbuffer.data(), httpbuffer.size()); @@ -460,7 +460,7 @@ private: void writeResponse(const ushort httpstatus, const bool authenticate, QTcpSocket *client, const bool get); QAtomicInt m_ref; - QThreadPool* m_filepool; + KThreadPool* m_filepool; }; KHTTPPrivate::KHTTPPrivate(QObject *parent) @@ -472,7 +472,7 @@ KHTTPPrivate::KHTTPPrivate(QObject *parent) serverid = QCoreApplication::applicationName(); // NOTE: the default thread limit is number of CPU cores online - m_filepool = new QThreadPool(this); + m_filepool = new KThreadPool(this); // NOTE: the default maximum for pending connections is 30 tcpserver = new QTcpServer(this); @@ -567,7 +567,7 @@ void KHTTPPrivate::slotNewConnection() client->flush(); if (get) { - m_filepool->start(new KHTTPRunnable(httpfile, client, &m_ref)); + m_filepool->start(new KHTTPThread(m_filepool, httpfile, client, &m_ref)); } else { kDebug(s_khttpdebugarea) << "done with client" << client->peerAddress() << client->peerPort(); client->disconnectFromHost(); diff --git a/plasma/private/runnerjobs.cpp b/plasma/private/runnerjobs.cpp index 73938ce3..8ad83444 100644 --- a/plasma/private/runnerjobs.cpp +++ b/plasma/private/runnerjobs.cpp @@ -18,13 +18,9 @@ */ #include "runnerjobs_p.h" - -#include - -#include - #include "runnermanager.h" #include "plasma/querymatch.h" +#include "kdebug.h" namespace Plasma { @@ -34,7 +30,7 @@ namespace Plasma { FindMatchesJob::FindMatchesJob(Plasma::AbstractRunner *runner, Plasma::RunnerContext *context) - : QRunnable(), + : QThread(runner), m_context(*context, 0), m_runner(runner) { diff --git a/plasma/private/runnerjobs_p.h b/plasma/private/runnerjobs_p.h index 5e3df3e7..b37451d0 100644 --- a/plasma/private/runnerjobs_p.h +++ b/plasma/private/runnerjobs_p.h @@ -22,7 +22,7 @@ #include #include -#include +#include #include "abstractrunner.h" @@ -33,8 +33,9 @@ namespace Plasma { * FindMatchesJob class * Class to run queries in different threads */ -class FindMatchesJob : public QRunnable +class FindMatchesJob : public QThread { + Q_OBJECT public: FindMatchesJob(Plasma::AbstractRunner *runner, Plasma::RunnerContext *context); diff --git a/plasma/runnermanager.cpp b/plasma/runnermanager.cpp index 9a2250a8..62c46633 100644 --- a/plasma/runnermanager.cpp +++ b/plasma/runnermanager.cpp @@ -25,12 +25,12 @@ #include #include -#include -#include -#include -#include -#include +#include "kplugininfo.h" +#include "kservicetypetrader.h" +#include "kstandarddirs.h" +#include "kthreadpool.h" +#include "kdebug.h" // #define MEASURE_PREPTIME @@ -51,7 +51,7 @@ public: allRunnersPrepped(false), teardownRequested(false) { - threadPool = new QThreadPool(); + threadPool = new KThreadPool(q); matchChangeTimer.setSingleShot(true); @@ -165,11 +165,34 @@ public: } } + static QThread::Priority threadPriority(AbstractRunner::Priority priority) + { + switch (priority) { + case AbstractRunner::LowestPriority: { + return QThread::LowestPriority; + } + case AbstractRunner::LowPriority: { + return QThread::LowPriority; + } + case AbstractRunner::NormalPriority: { + return QThread::NormalPriority; + } + case AbstractRunner::HighPriority: { + return QThread::HighPriority; + } + case AbstractRunner::HighestPriority: { + return QThread::HighestPriority; + } + } + kWarning() << "unhandled runner priority" << priority; + return QThread::InheritPriority; + } + RunnerManager *q; RunnerContext context; QTimer matchChangeTimer; QHash runners; - QThreadPool *threadPool; + KThreadPool *threadPool; QStringList allowedRunners; bool prepped; bool allRunnersPrepped; @@ -367,7 +390,7 @@ void RunnerManager::launchQuery(const QString &untrimmedTerm) foreach (Plasma::AbstractRunner *runner, d->runners) { if ((runner->ignoredTypes() & d->context.type()) == 0) { FindMatchesJob *job = new FindMatchesJob(runner, &d->context); - d->threadPool->start(job, static_cast(runner->priority())); + d->threadPool->start(job, RunnerManagerPrivate::threadPriority(runner->priority())); } } }