diff --git a/plasma/CMakeLists.txt b/plasma/CMakeLists.txt index b375dc36..be9e5bdb 100644 --- a/plasma/CMakeLists.txt +++ b/plasma/CMakeLists.txt @@ -202,7 +202,6 @@ target_link_libraries(plasma ${QT_QTDECLARATIVE_LIBRARY} ${KDE4_KDECORE_LIBS} ${KDE4_KDEUI_LIBS} - ${KDE4_THREADWEAVER_LIBS} ${KDE4_KDECLARATIVE_LIBS} ${PLASMA_EXTRA_LIBS} ) diff --git a/plasma/private/runnerjobs.cpp b/plasma/private/runnerjobs.cpp index 61be2c0e..abbcc292 100644 --- a/plasma/private/runnerjobs.cpp +++ b/plasma/private/runnerjobs.cpp @@ -23,140 +23,34 @@ #include -//#include -#include - #include "runnermanager.h" #include "plasma/querymatch.h" -using ThreadWeaver::Job; -using ThreadWeaver::Weaver; - namespace Plasma { -DelayedRunnerPolicy::DelayedRunnerPolicy() - : QueuePolicy() -{} - -DelayedRunnerPolicy::~DelayedRunnerPolicy() -{} - -DelayedRunnerPolicy& DelayedRunnerPolicy::instance() -{ - static DelayedRunnerPolicy policy; - return policy; -} - -bool DelayedRunnerPolicy::canRun(Job *job) -{ - FindMatchesJob *aJob = static_cast(job); - if (QTimer *t = aJob->delayTimer()) { - // If the timer is active, the required delay has not been reached - //kDebug() << "delayed timer" << aJob->runner()->name() << !t->isActive(); - return !t->isActive(); // DATA RACE! (with QTimer start/stop from runnermanager.cpp) - } - - return true; -} - -void DelayedRunnerPolicy::free(Job *job) -{ - Q_UNUSED(job) -} - -void DelayedRunnerPolicy::release(Job *job) -{ - free(job); -} - -void DelayedRunnerPolicy::destructed(Job *job) -{ - Q_UNUSED(job) -} - -DefaultRunnerPolicy::DefaultRunnerPolicy() - : QueuePolicy(), - m_cap(2) -{} - -DefaultRunnerPolicy::~DefaultRunnerPolicy() -{} - -DefaultRunnerPolicy& DefaultRunnerPolicy::instance() -{ - static DefaultRunnerPolicy policy; - return policy; -} - -bool DefaultRunnerPolicy::canRun(Job *job) -{ - Plasma::AbstractRunner *runner = static_cast(job)->runner(); - QMutexLocker l(&m_mutex); - - if (m_runCounts[runner->name()] > m_cap) { - return false; - } else { - ++m_runCounts[runner->name()]; - return true; - } -} - -void DefaultRunnerPolicy::free(Job *job) -{ - Plasma::AbstractRunner *runner = static_cast(job)->runner(); - QMutexLocker l(&m_mutex); - - --m_runCounts[runner->name()]; -} - -void DefaultRunnerPolicy::release(Job *job) -{ - free(job); -} - -void DefaultRunnerPolicy::destructed(Job *job) -{ - Q_UNUSED(job) -} - //////////////////// // Jobs //////////////////// FindMatchesJob::FindMatchesJob(Plasma::AbstractRunner *runner, Plasma::RunnerContext *context, QObject *parent) - : ThreadWeaver::Job(parent), + : QThread(parent), m_context(*context, 0), - m_runner(runner), - m_timer(0) + m_runner(runner) { - if (runner->speed() == Plasma::AbstractRunner::SlowSpeed) { - assignQueuePolicy(&DelayedRunnerPolicy::instance()); - } else { - assignQueuePolicy(&DefaultRunnerPolicy::instance()); - } } FindMatchesJob::~FindMatchesJob() { -} - -QTimer* FindMatchesJob::delayTimer() const -{ - return m_timer; -} - -void FindMatchesJob::setDelayTimer(QTimer *timer) -{ - m_timer = timer; + wait(3000); } void FindMatchesJob::run() { -// kDebug() << "Running match for " << m_runner->objectName() -// << " in Thread " << thread()->id() << endl; + // kDebug() << "Running match for " << m_runner->objectName(); if (m_context.isValid()) { m_runner->performMatch(m_context); + emit done(this); } } @@ -170,50 +64,6 @@ Plasma::AbstractRunner* FindMatchesJob::runner() const return m_runner; } -DelayedJobCleaner::DelayedJobCleaner(const QSet &jobs, const QSet &runners) - : QObject(Weaver::instance()), - m_weaver(Weaver::instance()), - m_jobs(jobs), - m_runners(runners) -{ - connect(m_weaver, SIGNAL(finished()), this, SLOT(checkIfFinished())); - - foreach (FindMatchesJob *job, m_jobs) { - connect(job, SIGNAL(done(ThreadWeaver::Job*)), this, SLOT(jobDone(ThreadWeaver::Job*))); - } -} - -DelayedJobCleaner::~DelayedJobCleaner() -{ - qDeleteAll(m_runners); -} - -void DelayedJobCleaner::jobDone(ThreadWeaver::Job *job) -{ - FindMatchesJob *runJob = dynamic_cast(job); - - if (!runJob) { - return; - } - - m_jobs.remove(runJob); - runJob->deleteLater(); - - if (m_jobs.isEmpty()) { - deleteLater(); - } -} - -void DelayedJobCleaner::checkIfFinished() -{ - if (m_weaver->isIdle()) { - qDeleteAll(m_jobs); - m_jobs.clear(); - deleteLater(); - } -} - - } // Plasma namespace -// #include "runnerjobs.moc" +#include "moc_runnerjobs_p.cpp" diff --git a/plasma/private/runnerjobs_p.h b/plasma/private/runnerjobs_p.h index 0f400983..12bd8089 100644 --- a/plasma/private/runnerjobs_p.h +++ b/plasma/private/runnerjobs_p.h @@ -23,86 +23,20 @@ #include #include #include - -#include -#include +#include #include "abstractrunner.h" -using ThreadWeaver::Job; - -class QTimer; - namespace Plasma { // Queue policies -// QueuePolicy that only allows a job to be executed after -// waiting in the queue for the specified timeout -class DelayedRunnerPolicy : public ThreadWeaver::QueuePolicy -{ -public: - ~DelayedRunnerPolicy(); - - static DelayedRunnerPolicy &instance(); - - bool canRun(Job *job); - void free(Job *job); - void release(Job *job); - void destructed(Job *job); -private: - DelayedRunnerPolicy(); - QMutex m_mutex; -}; - -// QueuePolicy that limits the instances of a particular runner -class DefaultRunnerPolicy : public ThreadWeaver::QueuePolicy -{ -public: - ~DefaultRunnerPolicy(); - - static DefaultRunnerPolicy &instance(); - - void setCap(int cap) - { - m_cap = cap; - } - int cap() const - { - return m_cap; - } - - bool canRun(Job *job); - void free(Job *job); - void release(Job *job); - void destructed(Job *job); -private: - DefaultRunnerPolicy(); - - int m_cap; - QHash m_runCounts; - QMutex m_mutex; -}; - -/* ThreadWeaver work around: - * There is no method exposed that allows us to inform - * ThreadWeaver that a previously unavailable job is now - * available; thus, we use an empty job to wake up the threads - */ -class DummyJob : public ThreadWeaver::Job -{ - public: - DummyJob(QObject *parent) : Job(parent) {} - ~DummyJob() {} - private: - void run() {} -}; - /* * FindMatchesJob class * Class to run queries in different threads */ -class FindMatchesJob : public Job +class FindMatchesJob : public QThread { + Q_OBJECT public: FindMatchesJob(Plasma::AbstractRunner *runner, Plasma::RunnerContext *context, QObject *parent = 0); @@ -111,8 +45,8 @@ public: int priority() const; Plasma::AbstractRunner* runner() const; - QTimer* delayTimer() const; - void setDelayTimer(QTimer *timer); +signals: + void done(QThread *thread); protected: void run(); @@ -120,23 +54,6 @@ protected: private: Plasma::RunnerContext m_context; Plasma::AbstractRunner *m_runner; - QTimer *m_timer; -}; - -class DelayedJobCleaner : public QObject -{ -public: - DelayedJobCleaner(const QSet &jobs, const QSet &runners = QSet()); - ~DelayedJobCleaner(); - -private Q_SLOTS: - void jobDone(ThreadWeaver::Job*); - void checkIfFinished(); - -private: - ThreadWeaver::WeaverInterface *m_weaver; - QSet m_jobs; - QSet m_runners; }; } diff --git a/plasma/runnermanager.cpp b/plasma/runnermanager.cpp index ea6d76eb..55c4838c 100644 --- a/plasma/runnermanager.cpp +++ b/plasma/runnermanager.cpp @@ -32,23 +32,10 @@ #include #include -#ifndef PLASMA_NO_SOLID -#include -#include -#endif - -#include -#include -#include -#include - #include "private/runnerjobs_p.h" #include "pluginloader.h" #include "querymatch.h" -using ThreadWeaver::Weaver; -using ThreadWeaver::Job; - //#define MEASURE_PREPTIME namespace Plasma @@ -74,11 +61,9 @@ public: singleRunnerWasLoaded(false) { matchChangeTimer.setSingleShot(true); - delayTimer.setSingleShot(true); QObject::connect(&matchChangeTimer, SIGNAL(timeout()), q, SLOT(matchesChanged())); QObject::connect(&context, SIGNAL(matchesChanged()), q, SLOT(scheduleMatchesChanged())); - QObject::connect(&delayTimer, SIGNAL(timeout()), q, SLOT(unblockJobs())); } ~RunnerManagerPrivate() @@ -101,24 +86,9 @@ public: { KConfigGroup config = configGroup(); - //The number of threads used scales with the number of processors. -#ifndef PLASMA_NO_SOLID - const int numProcs = - qMax(Solid::Device::listFromType(Solid::DeviceInterface::Processor).count(), 1); -#else - const int numProcs = 1; -#endif + // TODO: instead of that hard-limit use QThreadPool and QRunnable //This entry allows to define a hard upper limit independent of the number of processors. - const int maxThreads = config.readEntry("maxThreads", 16); - const int numThreads = qMin(maxThreads, 2 + ((numProcs - 1) * 2)); - //kDebug() << "setting up" << numThreads << "threads for" << numProcs << "processors"; - if (numThreads > Weaver::instance()->maximumNumberOfThreads()) { - Weaver::instance()->setMaximumNumberOfThreads(numThreads); - } - // Limit the number of instances of a single normal speed runner and all of the slow runners - // to half the number of threads - const int cap = qMax(2, numThreads/2); - DefaultRunnerPolicy::instance().setCap(cap); + maxThreads = config.readEntry("maxThreads", 32); context.restore(config); } @@ -232,7 +202,7 @@ public: QSet deadJobs; foreach (FindMatchesJob *job, searchJobs) { if (deadRunners.contains(job->runner())) { - QObject::disconnect(job, SIGNAL(done(ThreadWeaver::Job*)), q, SLOT(jobDone(ThreadWeaver::Job*))); + QObject::disconnect(job, SIGNAL(done(QThread*)), q, SLOT(jobDone(QThread*))); searchJobs.remove(job); deadJobs.insert(job); } @@ -248,7 +218,11 @@ public: if (deadJobs.isEmpty()) { qDeleteAll(deadRunners); } else { - new DelayedJobCleaner(deadJobs, deadRunners); + // cleaner + foreach (FindMatchesJob *job, deadJobs) { + job->quit(); + } + deadJobs.clear(); } } @@ -301,7 +275,7 @@ public: return runner; } - void jobDone(ThreadWeaver::Job *job) + void jobDone(QThread *job) { FindMatchesJob *runJob = dynamic_cast(job); @@ -338,13 +312,6 @@ public: return; } - if (Weaver::instance()->isIdle()) { - qDeleteAll(searchJobs); - searchJobs.clear(); - qDeleteAll(oldSearchJobs); - oldSearchJobs.clear(); - } - if (searchJobs.isEmpty() && oldSearchJobs.isEmpty()) { if (allRunnersPrepped) { foreach (AbstractRunner *runner, runners) { @@ -369,21 +336,6 @@ public: } } - void unblockJobs() - { - // WORKAROUND: Queue an empty job to force ThreadWeaver to awaken threads - if (searchJobs.isEmpty() && Weaver::instance()->isIdle()) { - qDeleteAll(oldSearchJobs); - oldSearchJobs.clear(); - checkTearDown(); - return; - } - - DummyJob *dummy = new DummyJob(q); - Weaver::instance()->enqueue(dummy); - QObject::connect(dummy, SIGNAL(done(ThreadWeaver::Job*)), dummy, SLOT(deleteLater())); - } - void runnerMatchingSuspended(bool suspended) { if (suspended || !prepped || teardownRequested) { @@ -400,24 +352,21 @@ public: void startJob(AbstractRunner *runner) { if ((runner->ignoredTypes() & context.type()) == 0) { - FindMatchesJob *job = new FindMatchesJob(runner, &context, Weaver::instance()); - QObject::connect(job, SIGNAL(done(ThreadWeaver::Job*)), q, SLOT(jobDone(ThreadWeaver::Job*))); - if (runner->speed() == AbstractRunner::SlowSpeed) { - job->setDelayTimer(&delayTimer); + if (searchJobs.count() >= maxThreads) { + // kWarning() << "not starting a runner due to hard limit of" << maxThreads; + return; } - Weaver::instance()->enqueue(job); + FindMatchesJob *job = new FindMatchesJob(runner, &context, q); + QObject::connect(job, SIGNAL(done(QThread*)), q, SLOT(jobDone(QThread*))); + job->start(); searchJobs.insert(job); } } - // Delay in ms before slow runners are allowed to run - static const int slowRunDelay = 400; - RunnerManager *q; QueryMatch deferredRun; RunnerContext context; QTimer matchChangeTimer; - QTimer delayTimer; // Timer to control when to run slow runners QHash runners; QHash advertiseSingleRunnerIds; AbstractRunner* currentSingleRunner; @@ -432,6 +381,7 @@ public: bool teardownRequested : 1; bool singleMode : 1; bool singleRunnerWasLoaded : 1; + int maxThreads; }; /***************************************************** @@ -443,7 +393,6 @@ RunnerManager::RunnerManager(QObject *parent) d(new RunnerManagerPrivate(this)) { d->loadConfiguration(); - //ThreadWeaver::setDebugLevel(true, 4); } RunnerManager::RunnerManager(KConfigGroup &c, QObject *parent) @@ -454,13 +403,20 @@ RunnerManager::RunnerManager(KConfigGroup &c, QObject *parent) // more sense. d->conf = KConfigGroup(&c, "PlasmaRunnerManager"); d->loadConfiguration(); - //ThreadWeaver::setDebugLevel(true, 4); } RunnerManager::~RunnerManager() { if (!qApp->closingDown() && (!d->searchJobs.isEmpty() || !d->oldSearchJobs.isEmpty())) { - new DelayedJobCleaner(d->searchJobs + d->oldSearchJobs); + // cleaner + foreach (FindMatchesJob *job, d->searchJobs) { + job->quit(); + } + d->searchJobs.clear(); + foreach (FindMatchesJob *job, d->oldSearchJobs) { + job->quit(); + } + d->searchJobs.clear(); } delete d; @@ -756,9 +712,6 @@ void RunnerManager::launchQuery(const QString &untrimmedTerm, const QString &run d->startJob(r); } - - // Start timer to unblock slow runners - d->delayTimer.start(RunnerManagerPrivate::slowRunDelay); } bool RunnerManager::execQuery(const QString &term) @@ -813,17 +766,10 @@ QString RunnerManager::query() const void RunnerManager::reset() { - // If ThreadWeaver is idle, it is safe to clear previous jobs - if (Weaver::instance()->isIdle()) { - qDeleteAll(d->searchJobs); - qDeleteAll(d->oldSearchJobs); - d->oldSearchJobs.clear(); - } else { - Q_FOREACH(FindMatchesJob *job, d->searchJobs) { - Weaver::instance()->dequeue(job); - } - d->oldSearchJobs += d->searchJobs; + Q_FOREACH(FindMatchesJob *job, d->searchJobs) { + job->terminate(); } + d->oldSearchJobs += d->searchJobs; d->searchJobs.clear(); diff --git a/plasma/runnermanager.h b/plasma/runnermanager.h index 382f76a5..9d46c6c8 100644 --- a/plasma/runnermanager.h +++ b/plasma/runnermanager.h @@ -288,8 +288,7 @@ class PLASMA_EXPORT RunnerManager : public QObject private: Q_PRIVATE_SLOT(d, void scheduleMatchesChanged()) Q_PRIVATE_SLOT(d, void matchesChanged()) - Q_PRIVATE_SLOT(d, void jobDone(ThreadWeaver::Job*)) - Q_PRIVATE_SLOT(d, void unblockJobs()) + Q_PRIVATE_SLOT(d, void jobDone(QThread*)) Q_PRIVATE_SLOT(d, void runnerMatchingSuspended(bool)) RunnerManagerPrivate * const d;