plasma: use QThreadPool and QRunnable for even faster runners

Signed-off-by: Ivailo Monev <xakepa10@gmail.com>
This commit is contained in:
Ivailo Monev 2016-04-09 07:54:42 +00:00
parent 5f025e2a91
commit e7b9481e80
4 changed files with 40 additions and 122 deletions

View file

@ -33,30 +33,22 @@ namespace Plasma {
//////////////////// ////////////////////
FindMatchesJob::FindMatchesJob(Plasma::AbstractRunner *runner, FindMatchesJob::FindMatchesJob(Plasma::AbstractRunner *runner,
Plasma::RunnerContext *context, QObject *parent) Plasma::RunnerContext *context)
: QThread(parent), : QRunnable(),
m_context(*context, 0), m_context(*context, 0),
m_runner(runner) m_runner(runner),
m_finished(false)
{ {
} }
FindMatchesJob::~FindMatchesJob()
{
wait(3000);
}
void FindMatchesJob::run() void FindMatchesJob::run()
{ {
m_finished = false;
// kDebug() << "Running match for " << m_runner->objectName(); // kDebug() << "Running match for " << m_runner->objectName();
if (m_context.isValid()) { if (m_context.isValid()) {
m_runner->performMatch(m_context); m_runner->performMatch(m_context);
emit done(this);
} }
} m_finished = true;
int FindMatchesJob::priority() const
{
return m_runner->priority();
} }
Plasma::AbstractRunner* FindMatchesJob::runner() const Plasma::AbstractRunner* FindMatchesJob::runner() const
@ -64,6 +56,11 @@ Plasma::AbstractRunner* FindMatchesJob::runner() const
return m_runner; return m_runner;
} }
bool FindMatchesJob::isFinished()
{
return m_finished;
}
} // Plasma namespace } // Plasma namespace
#include "moc_runnerjobs_p.cpp" #include "moc_runnerjobs_p.cpp"

View file

@ -23,7 +23,7 @@
#include <QHash> #include <QHash>
#include <QMutex> #include <QMutex>
#include <QSet> #include <QSet>
#include <QThread> #include <QRunnable>
#include "abstractrunner.h" #include "abstractrunner.h"
@ -34,19 +34,14 @@ namespace Plasma {
* FindMatchesJob class * FindMatchesJob class
* Class to run queries in different threads * Class to run queries in different threads
*/ */
class FindMatchesJob : public QThread class FindMatchesJob : public QRunnable
{ {
Q_OBJECT
public: public:
FindMatchesJob(Plasma::AbstractRunner *runner, FindMatchesJob(Plasma::AbstractRunner *runner,
Plasma::RunnerContext *context, QObject *parent = 0); Plasma::RunnerContext *context);
~FindMatchesJob();
int priority() const;
Plasma::AbstractRunner* runner() const; Plasma::AbstractRunner* runner() const;
bool isFinished();
signals:
void done(QThread *thread);
protected: protected:
void run(); void run();
@ -54,6 +49,7 @@ protected:
private: private:
Plasma::RunnerContext m_context; Plasma::RunnerContext m_context;
Plasma::AbstractRunner *m_runner; Plasma::AbstractRunner *m_runner;
bool m_finished;
}; };
} }

View file

@ -26,6 +26,7 @@
#include <QMutex> #include <QMutex>
#include <QTimer> #include <QTimer>
#include <QCoreApplication> #include <QCoreApplication>
#include <QThreadPool>
#include <kdebug.h> #include <kdebug.h>
#include <kplugininfo.h> #include <kplugininfo.h>
@ -51,8 +52,8 @@ public:
RunnerManagerPrivate(RunnerManager *parent) RunnerManagerPrivate(RunnerManager *parent)
: q(parent), : q(parent),
deferredRun(0),
currentSingleRunner(0), currentSingleRunner(0),
threadPool(0),
prepped(false), prepped(false),
allRunnersPrepped(false), allRunnersPrepped(false),
singleRunnerPrepped(false), singleRunnerPrepped(false),
@ -60,6 +61,8 @@ public:
singleMode(false), singleMode(false),
singleRunnerWasLoaded(false) singleRunnerWasLoaded(false)
{ {
threadPool = new QThreadPool();
matchChangeTimer.setSingleShot(true); matchChangeTimer.setSingleShot(true);
QObject::connect(&matchChangeTimer, SIGNAL(timeout()), q, SLOT(matchesChanged())); QObject::connect(&matchChangeTimer, SIGNAL(timeout()), q, SLOT(matchesChanged()));
@ -70,6 +73,10 @@ public:
{ {
KConfigGroup config = configGroup(); KConfigGroup config = configGroup();
context.save(config); context.save(config);
kDebug() << "waiting for runner jobs";
threadPool->waitForDone();
delete threadPool;
} }
void scheduleMatchesChanged() void scheduleMatchesChanged()
@ -86,9 +93,14 @@ public:
{ {
KConfigGroup config = configGroup(); KConfigGroup config = configGroup();
// TODO: instead of that hard-limit use QThreadPool and QRunnable int idealThreads = QThread::idealThreadCount();
if (idealThreads < 0) {
idealThreads = 4;
}
const int maxThreads = config.readEntry("maxThreads", idealThreads);
kDebug() << "limiting runner threads to" << maxThreads;
//This entry allows to define a hard upper limit independent of the number of processors. //This entry allows to define a hard upper limit independent of the number of processors.
maxThreads = config.readEntry("maxThreads", 32); threadPool->setMaxThreadCount(maxThreads);
context.restore(config); context.restore(config);
} }
@ -199,31 +211,7 @@ public:
} }
if (!deadRunners.isEmpty()) { if (!deadRunners.isEmpty()) {
QSet<FindMatchesJob *> deadJobs;
foreach (FindMatchesJob *job, searchJobs) {
if (deadRunners.contains(job->runner())) {
QObject::disconnect(job, SIGNAL(done(QThread*)), q, SLOT(jobDone(QThread*)));
searchJobs.remove(job);
deadJobs.insert(job);
}
}
foreach (FindMatchesJob *job, oldSearchJobs) {
if (deadRunners.contains(job->runner())) {
oldSearchJobs.remove(job);
deadJobs.insert(job);
}
}
if (deadJobs.isEmpty()) {
qDeleteAll(deadRunners); qDeleteAll(deadRunners);
} else {
// cleaner
foreach (FindMatchesJob *job, deadJobs) {
job->quit();
}
deadJobs.clear();
}
} }
if (!singleRunnerWasLoaded) { if (!singleRunnerWasLoaded) {
@ -275,25 +263,8 @@ public:
return runner; return runner;
} }
void jobDone(QThread *job) void jobDone()
{ {
FindMatchesJob *runJob = dynamic_cast<FindMatchesJob *>(job);
if (!runJob) {
return;
}
if (deferredRun.isEnabled() && runJob->runner() == deferredRun.runner()) {
//kDebug() << "job actually done, running now **************";
QueryMatch tmpRun = deferredRun;
deferredRun = QueryMatch(0);
tmpRun.run(context);
}
searchJobs.remove(runJob);
oldSearchJobs.remove(runJob);
runJob->deleteLater();
if (searchJobs.isEmpty() && context.matches().isEmpty()) { if (searchJobs.isEmpty() && context.matches().isEmpty()) {
// we finished our run, and there are no valid matches, and so no // we finished our run, and there are no valid matches, and so no
// signal will have been sent out. so we need to emit the signal // signal will have been sent out. so we need to emit the signal
@ -306,13 +277,13 @@ public:
void checkTearDown() void checkTearDown()
{ {
//kDebug() << prepped << teardownRequested << searchJobs.count() << oldSearchJobs.count(); //kDebug() << prepped << teardownRequested << searchJobs.count();
if (!prepped || !teardownRequested) { if (!prepped || !teardownRequested) {
return; return;
} }
if (searchJobs.isEmpty() && oldSearchJobs.isEmpty()) { if (searchJobs.isEmpty()) {
if (allRunnersPrepped) { if (allRunnersPrepped) {
foreach (AbstractRunner *runner, runners) { foreach (AbstractRunner *runner, runners) {
emit runner->teardown(); emit runner->teardown();
@ -352,26 +323,20 @@ public:
void startJob(AbstractRunner *runner) void startJob(AbstractRunner *runner)
{ {
if ((runner->ignoredTypes() & context.type()) == 0) { if ((runner->ignoredTypes() & context.type()) == 0) {
if (searchJobs.count() >= maxThreads) { FindMatchesJob *job = new FindMatchesJob(runner, &context);
// kWarning() << "not starting a runner due to hard limit of" << maxThreads;
return;
}
FindMatchesJob *job = new FindMatchesJob(runner, &context, q);
QObject::connect(job, SIGNAL(done(QThread*)), q, SLOT(jobDone(QThread*)));
job->start();
searchJobs.insert(job); searchJobs.insert(job);
threadPool->start(job);
} }
} }
RunnerManager *q; RunnerManager *q;
QueryMatch deferredRun;
RunnerContext context; RunnerContext context;
QTimer matchChangeTimer; QTimer matchChangeTimer;
QHash<QString, AbstractRunner*> runners; QHash<QString, AbstractRunner*> runners;
QHash<QString, QString> advertiseSingleRunnerIds; QHash<QString, QString> advertiseSingleRunnerIds;
AbstractRunner* currentSingleRunner; AbstractRunner* currentSingleRunner;
QSet<FindMatchesJob*> searchJobs; QSet<FindMatchesJob*> searchJobs;
QSet<FindMatchesJob*> oldSearchJobs; QThreadPool *threadPool;
KConfigGroup conf; KConfigGroup conf;
QString singleModeRunnerId; QString singleModeRunnerId;
bool loadAll : 1; bool loadAll : 1;
@ -381,7 +346,6 @@ public:
bool teardownRequested : 1; bool teardownRequested : 1;
bool singleMode : 1; bool singleMode : 1;
bool singleRunnerWasLoaded : 1; bool singleRunnerWasLoaded : 1;
int maxThreads;
}; };
/***************************************************** /*****************************************************
@ -407,18 +371,6 @@ RunnerManager::RunnerManager(KConfigGroup &c, QObject *parent)
RunnerManager::~RunnerManager() RunnerManager::~RunnerManager()
{ {
if (!qApp->closingDown() && (!d->searchJobs.isEmpty() || !d->oldSearchJobs.isEmpty())) {
// cleaner
foreach (FindMatchesJob *job, d->searchJobs) {
job->quit();
}
d->searchJobs.clear();
foreach (FindMatchesJob *job, d->oldSearchJobs) {
job->quit();
}
d->searchJobs.clear();
}
delete d; delete d;
} }
@ -559,22 +511,6 @@ void RunnerManager::run(const QueryMatch &match)
if (!match.isEnabled()) { if (!match.isEnabled()) {
return; return;
} }
//TODO: this function is not const as it may be used for learning
AbstractRunner *runner = match.runner();
foreach (FindMatchesJob *job, d->searchJobs) {
if (job->runner() == runner && !job->isFinished()) {
kDebug() << "deferred run";
d->deferredRun = match;
return;
}
}
if (d->deferredRun.isValid()) {
d->deferredRun = QueryMatch(0);
}
d->context.run(match); d->context.run(match);
} }
@ -766,20 +702,9 @@ QString RunnerManager::query() const
void RunnerManager::reset() void RunnerManager::reset()
{ {
Q_FOREACH(FindMatchesJob *job, d->searchJobs) { d->threadPool->waitForDone(3000);
job->terminate();
}
d->oldSearchJobs += d->searchJobs;
d->searchJobs.clear(); d->searchJobs.clear();
if (d->deferredRun.isEnabled()) {
//kDebug() << "job actually done, running now **************";
QueryMatch tmpRun = d->deferredRun;
d->deferredRun = QueryMatch(0);
tmpRun.run(d->context);
}
d->context.reset(); d->context.reset();
} }

View file

@ -288,7 +288,7 @@ class PLASMA_EXPORT RunnerManager : public QObject
private: private:
Q_PRIVATE_SLOT(d, void scheduleMatchesChanged()) Q_PRIVATE_SLOT(d, void scheduleMatchesChanged())
Q_PRIVATE_SLOT(d, void matchesChanged()) Q_PRIVATE_SLOT(d, void matchesChanged())
Q_PRIVATE_SLOT(d, void jobDone(QThread*)) Q_PRIVATE_SLOT(d, void jobDone())
Q_PRIVATE_SLOT(d, void runnerMatchingSuspended(bool)) Q_PRIVATE_SLOT(d, void runnerMatchingSuspended(bool))
RunnerManagerPrivate * const d; RunnerManagerPrivate * const d;