plasma: make runners use QThread instead of ThreadWeaver

Signed-off-by: Ivailo Monev <xakepa10@gmail.com>
This commit is contained in:
Ivailo Monev 2016-04-07 10:55:05 +00:00
parent f97ffe1036
commit 8993202acd
5 changed files with 40 additions and 329 deletions

View file

@ -202,7 +202,6 @@ target_link_libraries(plasma
${QT_QTDECLARATIVE_LIBRARY} ${QT_QTDECLARATIVE_LIBRARY}
${KDE4_KDECORE_LIBS} ${KDE4_KDECORE_LIBS}
${KDE4_KDEUI_LIBS} ${KDE4_KDEUI_LIBS}
${KDE4_THREADWEAVER_LIBS}
${KDE4_KDECLARATIVE_LIBS} ${KDE4_KDECLARATIVE_LIBS}
${PLASMA_EXTRA_LIBS} ${PLASMA_EXTRA_LIBS}
) )

View file

@ -23,140 +23,34 @@
#include <kdebug.h> #include <kdebug.h>
//#include <Weaver/DebuggingAids.h>
#include <Weaver/ThreadWeaver.h>
#include "runnermanager.h" #include "runnermanager.h"
#include "plasma/querymatch.h" #include "plasma/querymatch.h"
using ThreadWeaver::Job;
using ThreadWeaver::Weaver;
namespace Plasma { namespace Plasma {
DelayedRunnerPolicy::DelayedRunnerPolicy()
: QueuePolicy()
{}
DelayedRunnerPolicy::~DelayedRunnerPolicy()
{}
DelayedRunnerPolicy& DelayedRunnerPolicy::instance()
{
static DelayedRunnerPolicy policy;
return policy;
}
bool DelayedRunnerPolicy::canRun(Job *job)
{
FindMatchesJob *aJob = static_cast<FindMatchesJob*>(job);
if (QTimer *t = aJob->delayTimer()) {
// If the timer is active, the required delay has not been reached
//kDebug() << "delayed timer" << aJob->runner()->name() << !t->isActive();
return !t->isActive(); // DATA RACE! (with QTimer start/stop from runnermanager.cpp)
}
return true;
}
void DelayedRunnerPolicy::free(Job *job)
{
Q_UNUSED(job)
}
void DelayedRunnerPolicy::release(Job *job)
{
free(job);
}
void DelayedRunnerPolicy::destructed(Job *job)
{
Q_UNUSED(job)
}
DefaultRunnerPolicy::DefaultRunnerPolicy()
: QueuePolicy(),
m_cap(2)
{}
DefaultRunnerPolicy::~DefaultRunnerPolicy()
{}
DefaultRunnerPolicy& DefaultRunnerPolicy::instance()
{
static DefaultRunnerPolicy policy;
return policy;
}
bool DefaultRunnerPolicy::canRun(Job *job)
{
Plasma::AbstractRunner *runner = static_cast<FindMatchesJob*>(job)->runner();
QMutexLocker l(&m_mutex);
if (m_runCounts[runner->name()] > m_cap) {
return false;
} else {
++m_runCounts[runner->name()];
return true;
}
}
void DefaultRunnerPolicy::free(Job *job)
{
Plasma::AbstractRunner *runner = static_cast<FindMatchesJob*>(job)->runner();
QMutexLocker l(&m_mutex);
--m_runCounts[runner->name()];
}
void DefaultRunnerPolicy::release(Job *job)
{
free(job);
}
void DefaultRunnerPolicy::destructed(Job *job)
{
Q_UNUSED(job)
}
//////////////////// ////////////////////
// Jobs // Jobs
//////////////////// ////////////////////
FindMatchesJob::FindMatchesJob(Plasma::AbstractRunner *runner, FindMatchesJob::FindMatchesJob(Plasma::AbstractRunner *runner,
Plasma::RunnerContext *context, QObject *parent) Plasma::RunnerContext *context, QObject *parent)
: ThreadWeaver::Job(parent), : QThread(parent),
m_context(*context, 0), m_context(*context, 0),
m_runner(runner), m_runner(runner)
m_timer(0)
{ {
if (runner->speed() == Plasma::AbstractRunner::SlowSpeed) {
assignQueuePolicy(&DelayedRunnerPolicy::instance());
} else {
assignQueuePolicy(&DefaultRunnerPolicy::instance());
}
} }
FindMatchesJob::~FindMatchesJob() FindMatchesJob::~FindMatchesJob()
{ {
} wait(3000);
QTimer* FindMatchesJob::delayTimer() const
{
return m_timer;
}
void FindMatchesJob::setDelayTimer(QTimer *timer)
{
m_timer = timer;
} }
void FindMatchesJob::run() void FindMatchesJob::run()
{ {
// kDebug() << "Running match for " << m_runner->objectName() // kDebug() << "Running match for " << m_runner->objectName();
// << " in Thread " << thread()->id() << endl;
if (m_context.isValid()) { if (m_context.isValid()) {
m_runner->performMatch(m_context); m_runner->performMatch(m_context);
emit done(this);
} }
} }
@ -170,50 +64,6 @@ Plasma::AbstractRunner* FindMatchesJob::runner() const
return m_runner; return m_runner;
} }
DelayedJobCleaner::DelayedJobCleaner(const QSet<FindMatchesJob *> &jobs, const QSet<AbstractRunner *> &runners)
: QObject(Weaver::instance()),
m_weaver(Weaver::instance()),
m_jobs(jobs),
m_runners(runners)
{
connect(m_weaver, SIGNAL(finished()), this, SLOT(checkIfFinished()));
foreach (FindMatchesJob *job, m_jobs) {
connect(job, SIGNAL(done(ThreadWeaver::Job*)), this, SLOT(jobDone(ThreadWeaver::Job*)));
}
}
DelayedJobCleaner::~DelayedJobCleaner()
{
qDeleteAll(m_runners);
}
void DelayedJobCleaner::jobDone(ThreadWeaver::Job *job)
{
FindMatchesJob *runJob = dynamic_cast<FindMatchesJob *>(job);
if (!runJob) {
return;
}
m_jobs.remove(runJob);
runJob->deleteLater();
if (m_jobs.isEmpty()) {
deleteLater();
}
}
void DelayedJobCleaner::checkIfFinished()
{
if (m_weaver->isIdle()) {
qDeleteAll(m_jobs);
m_jobs.clear();
deleteLater();
}
}
} // Plasma namespace } // Plasma namespace
// #include "runnerjobs.moc" #include "moc_runnerjobs_p.cpp"

View file

@ -23,86 +23,20 @@
#include <QHash> #include <QHash>
#include <QMutex> #include <QMutex>
#include <QSet> #include <QSet>
#include <QThread>
#include <Weaver/Job.h>
#include <Weaver/QueuePolicy.h>
#include "abstractrunner.h" #include "abstractrunner.h"
using ThreadWeaver::Job;
class QTimer;
namespace Plasma { namespace Plasma {
// Queue policies // Queue policies
// QueuePolicy that only allows a job to be executed after
// waiting in the queue for the specified timeout
class DelayedRunnerPolicy : public ThreadWeaver::QueuePolicy
{
public:
~DelayedRunnerPolicy();
static DelayedRunnerPolicy &instance();
bool canRun(Job *job);
void free(Job *job);
void release(Job *job);
void destructed(Job *job);
private:
DelayedRunnerPolicy();
QMutex m_mutex;
};
// QueuePolicy that limits the instances of a particular runner
class DefaultRunnerPolicy : public ThreadWeaver::QueuePolicy
{
public:
~DefaultRunnerPolicy();
static DefaultRunnerPolicy &instance();
void setCap(int cap)
{
m_cap = cap;
}
int cap() const
{
return m_cap;
}
bool canRun(Job *job);
void free(Job *job);
void release(Job *job);
void destructed(Job *job);
private:
DefaultRunnerPolicy();
int m_cap;
QHash<QString, int> m_runCounts;
QMutex m_mutex;
};
/* ThreadWeaver work around:
* There is no method exposed that allows us to inform
* ThreadWeaver that a previously unavailable job is now
* available; thus, we use an empty job to wake up the threads
*/
class DummyJob : public ThreadWeaver::Job
{
public:
DummyJob(QObject *parent) : Job(parent) {}
~DummyJob() {}
private:
void run() {}
};
/* /*
* FindMatchesJob class * FindMatchesJob class
* Class to run queries in different threads * Class to run queries in different threads
*/ */
class FindMatchesJob : public Job class FindMatchesJob : public QThread
{ {
Q_OBJECT
public: public:
FindMatchesJob(Plasma::AbstractRunner *runner, FindMatchesJob(Plasma::AbstractRunner *runner,
Plasma::RunnerContext *context, QObject *parent = 0); Plasma::RunnerContext *context, QObject *parent = 0);
@ -111,8 +45,8 @@ public:
int priority() const; int priority() const;
Plasma::AbstractRunner* runner() const; Plasma::AbstractRunner* runner() const;
QTimer* delayTimer() const; signals:
void setDelayTimer(QTimer *timer); void done(QThread *thread);
protected: protected:
void run(); void run();
@ -120,23 +54,6 @@ protected:
private: private:
Plasma::RunnerContext m_context; Plasma::RunnerContext m_context;
Plasma::AbstractRunner *m_runner; Plasma::AbstractRunner *m_runner;
QTimer *m_timer;
};
class DelayedJobCleaner : public QObject
{
public:
DelayedJobCleaner(const QSet<FindMatchesJob*> &jobs, const QSet<AbstractRunner *> &runners = QSet<AbstractRunner *>());
~DelayedJobCleaner();
private Q_SLOTS:
void jobDone(ThreadWeaver::Job*);
void checkIfFinished();
private:
ThreadWeaver::WeaverInterface *m_weaver;
QSet<FindMatchesJob*> m_jobs;
QSet<AbstractRunner *> m_runners;
}; };
} }

View file

@ -32,23 +32,10 @@
#include <kservicetypetrader.h> #include <kservicetypetrader.h>
#include <kstandarddirs.h> #include <kstandarddirs.h>
#ifndef PLASMA_NO_SOLID
#include <solid/device.h>
#include <solid/deviceinterface.h>
#endif
#include <Weaver/DebuggingAids.h>
#include <Weaver/State.h>
#include <Weaver/Thread.h>
#include <Weaver/ThreadWeaver.h>
#include "private/runnerjobs_p.h" #include "private/runnerjobs_p.h"
#include "pluginloader.h" #include "pluginloader.h"
#include "querymatch.h" #include "querymatch.h"
using ThreadWeaver::Weaver;
using ThreadWeaver::Job;
//#define MEASURE_PREPTIME //#define MEASURE_PREPTIME
namespace Plasma namespace Plasma
@ -74,11 +61,9 @@ public:
singleRunnerWasLoaded(false) singleRunnerWasLoaded(false)
{ {
matchChangeTimer.setSingleShot(true); matchChangeTimer.setSingleShot(true);
delayTimer.setSingleShot(true);
QObject::connect(&matchChangeTimer, SIGNAL(timeout()), q, SLOT(matchesChanged())); QObject::connect(&matchChangeTimer, SIGNAL(timeout()), q, SLOT(matchesChanged()));
QObject::connect(&context, SIGNAL(matchesChanged()), q, SLOT(scheduleMatchesChanged())); QObject::connect(&context, SIGNAL(matchesChanged()), q, SLOT(scheduleMatchesChanged()));
QObject::connect(&delayTimer, SIGNAL(timeout()), q, SLOT(unblockJobs()));
} }
~RunnerManagerPrivate() ~RunnerManagerPrivate()
@ -101,24 +86,9 @@ public:
{ {
KConfigGroup config = configGroup(); KConfigGroup config = configGroup();
//The number of threads used scales with the number of processors. // TODO: instead of that hard-limit use QThreadPool and QRunnable
#ifndef PLASMA_NO_SOLID
const int numProcs =
qMax(Solid::Device::listFromType(Solid::DeviceInterface::Processor).count(), 1);
#else
const int numProcs = 1;
#endif
//This entry allows to define a hard upper limit independent of the number of processors. //This entry allows to define a hard upper limit independent of the number of processors.
const int maxThreads = config.readEntry("maxThreads", 16); maxThreads = config.readEntry("maxThreads", 32);
const int numThreads = qMin(maxThreads, 2 + ((numProcs - 1) * 2));
//kDebug() << "setting up" << numThreads << "threads for" << numProcs << "processors";
if (numThreads > Weaver::instance()->maximumNumberOfThreads()) {
Weaver::instance()->setMaximumNumberOfThreads(numThreads);
}
// Limit the number of instances of a single normal speed runner and all of the slow runners
// to half the number of threads
const int cap = qMax(2, numThreads/2);
DefaultRunnerPolicy::instance().setCap(cap);
context.restore(config); context.restore(config);
} }
@ -232,7 +202,7 @@ public:
QSet<FindMatchesJob *> deadJobs; QSet<FindMatchesJob *> deadJobs;
foreach (FindMatchesJob *job, searchJobs) { foreach (FindMatchesJob *job, searchJobs) {
if (deadRunners.contains(job->runner())) { if (deadRunners.contains(job->runner())) {
QObject::disconnect(job, SIGNAL(done(ThreadWeaver::Job*)), q, SLOT(jobDone(ThreadWeaver::Job*))); QObject::disconnect(job, SIGNAL(done(QThread*)), q, SLOT(jobDone(QThread*)));
searchJobs.remove(job); searchJobs.remove(job);
deadJobs.insert(job); deadJobs.insert(job);
} }
@ -248,7 +218,11 @@ public:
if (deadJobs.isEmpty()) { if (deadJobs.isEmpty()) {
qDeleteAll(deadRunners); qDeleteAll(deadRunners);
} else { } else {
new DelayedJobCleaner(deadJobs, deadRunners); // cleaner
foreach (FindMatchesJob *job, deadJobs) {
job->quit();
}
deadJobs.clear();
} }
} }
@ -301,7 +275,7 @@ public:
return runner; return runner;
} }
void jobDone(ThreadWeaver::Job *job) void jobDone(QThread *job)
{ {
FindMatchesJob *runJob = dynamic_cast<FindMatchesJob *>(job); FindMatchesJob *runJob = dynamic_cast<FindMatchesJob *>(job);
@ -338,13 +312,6 @@ public:
return; return;
} }
if (Weaver::instance()->isIdle()) {
qDeleteAll(searchJobs);
searchJobs.clear();
qDeleteAll(oldSearchJobs);
oldSearchJobs.clear();
}
if (searchJobs.isEmpty() && oldSearchJobs.isEmpty()) { if (searchJobs.isEmpty() && oldSearchJobs.isEmpty()) {
if (allRunnersPrepped) { if (allRunnersPrepped) {
foreach (AbstractRunner *runner, runners) { foreach (AbstractRunner *runner, runners) {
@ -369,21 +336,6 @@ public:
} }
} }
void unblockJobs()
{
// WORKAROUND: Queue an empty job to force ThreadWeaver to awaken threads
if (searchJobs.isEmpty() && Weaver::instance()->isIdle()) {
qDeleteAll(oldSearchJobs);
oldSearchJobs.clear();
checkTearDown();
return;
}
DummyJob *dummy = new DummyJob(q);
Weaver::instance()->enqueue(dummy);
QObject::connect(dummy, SIGNAL(done(ThreadWeaver::Job*)), dummy, SLOT(deleteLater()));
}
void runnerMatchingSuspended(bool suspended) void runnerMatchingSuspended(bool suspended)
{ {
if (suspended || !prepped || teardownRequested) { if (suspended || !prepped || teardownRequested) {
@ -400,24 +352,21 @@ public:
void startJob(AbstractRunner *runner) void startJob(AbstractRunner *runner)
{ {
if ((runner->ignoredTypes() & context.type()) == 0) { if ((runner->ignoredTypes() & context.type()) == 0) {
FindMatchesJob *job = new FindMatchesJob(runner, &context, Weaver::instance()); if (searchJobs.count() >= maxThreads) {
QObject::connect(job, SIGNAL(done(ThreadWeaver::Job*)), q, SLOT(jobDone(ThreadWeaver::Job*))); // kWarning() << "not starting a runner due to hard limit of" << maxThreads;
if (runner->speed() == AbstractRunner::SlowSpeed) { return;
job->setDelayTimer(&delayTimer);
} }
Weaver::instance()->enqueue(job); FindMatchesJob *job = new FindMatchesJob(runner, &context, q);
QObject::connect(job, SIGNAL(done(QThread*)), q, SLOT(jobDone(QThread*)));
job->start();
searchJobs.insert(job); searchJobs.insert(job);
} }
} }
// Delay in ms before slow runners are allowed to run
static const int slowRunDelay = 400;
RunnerManager *q; RunnerManager *q;
QueryMatch deferredRun; QueryMatch deferredRun;
RunnerContext context; RunnerContext context;
QTimer matchChangeTimer; QTimer matchChangeTimer;
QTimer delayTimer; // Timer to control when to run slow runners
QHash<QString, AbstractRunner*> runners; QHash<QString, AbstractRunner*> runners;
QHash<QString, QString> advertiseSingleRunnerIds; QHash<QString, QString> advertiseSingleRunnerIds;
AbstractRunner* currentSingleRunner; AbstractRunner* currentSingleRunner;
@ -432,6 +381,7 @@ public:
bool teardownRequested : 1; bool teardownRequested : 1;
bool singleMode : 1; bool singleMode : 1;
bool singleRunnerWasLoaded : 1; bool singleRunnerWasLoaded : 1;
int maxThreads;
}; };
/***************************************************** /*****************************************************
@ -443,7 +393,6 @@ RunnerManager::RunnerManager(QObject *parent)
d(new RunnerManagerPrivate(this)) d(new RunnerManagerPrivate(this))
{ {
d->loadConfiguration(); d->loadConfiguration();
//ThreadWeaver::setDebugLevel(true, 4);
} }
RunnerManager::RunnerManager(KConfigGroup &c, QObject *parent) RunnerManager::RunnerManager(KConfigGroup &c, QObject *parent)
@ -454,13 +403,20 @@ RunnerManager::RunnerManager(KConfigGroup &c, QObject *parent)
// more sense. // more sense.
d->conf = KConfigGroup(&c, "PlasmaRunnerManager"); d->conf = KConfigGroup(&c, "PlasmaRunnerManager");
d->loadConfiguration(); d->loadConfiguration();
//ThreadWeaver::setDebugLevel(true, 4);
} }
RunnerManager::~RunnerManager() RunnerManager::~RunnerManager()
{ {
if (!qApp->closingDown() && (!d->searchJobs.isEmpty() || !d->oldSearchJobs.isEmpty())) { if (!qApp->closingDown() && (!d->searchJobs.isEmpty() || !d->oldSearchJobs.isEmpty())) {
new DelayedJobCleaner(d->searchJobs + d->oldSearchJobs); // cleaner
foreach (FindMatchesJob *job, d->searchJobs) {
job->quit();
}
d->searchJobs.clear();
foreach (FindMatchesJob *job, d->oldSearchJobs) {
job->quit();
}
d->searchJobs.clear();
} }
delete d; delete d;
@ -756,9 +712,6 @@ void RunnerManager::launchQuery(const QString &untrimmedTerm, const QString &run
d->startJob(r); d->startJob(r);
} }
// Start timer to unblock slow runners
d->delayTimer.start(RunnerManagerPrivate::slowRunDelay);
} }
bool RunnerManager::execQuery(const QString &term) bool RunnerManager::execQuery(const QString &term)
@ -813,17 +766,10 @@ QString RunnerManager::query() const
void RunnerManager::reset() void RunnerManager::reset()
{ {
// If ThreadWeaver is idle, it is safe to clear previous jobs Q_FOREACH(FindMatchesJob *job, d->searchJobs) {
if (Weaver::instance()->isIdle()) { job->terminate();
qDeleteAll(d->searchJobs);
qDeleteAll(d->oldSearchJobs);
d->oldSearchJobs.clear();
} else {
Q_FOREACH(FindMatchesJob *job, d->searchJobs) {
Weaver::instance()->dequeue(job);
}
d->oldSearchJobs += d->searchJobs;
} }
d->oldSearchJobs += d->searchJobs;
d->searchJobs.clear(); d->searchJobs.clear();

View file

@ -288,8 +288,7 @@ class PLASMA_EXPORT RunnerManager : public QObject
private: private:
Q_PRIVATE_SLOT(d, void scheduleMatchesChanged()) Q_PRIVATE_SLOT(d, void scheduleMatchesChanged())
Q_PRIVATE_SLOT(d, void matchesChanged()) Q_PRIVATE_SLOT(d, void matchesChanged())
Q_PRIVATE_SLOT(d, void jobDone(ThreadWeaver::Job*)) Q_PRIVATE_SLOT(d, void jobDone(QThread*))
Q_PRIVATE_SLOT(d, void unblockJobs())
Q_PRIVATE_SLOT(d, void runnerMatchingSuspended(bool)) Q_PRIVATE_SLOT(d, void runnerMatchingSuspended(bool))
RunnerManagerPrivate * const d; RunnerManagerPrivate * const d;