mirror of
https://bitbucket.org/smil3y/kdelibs.git
synced 2025-02-24 19:02:48 +00:00
415 lines
12 KiB
C++
415 lines
12 KiB
C++
/* -*- C++ -*-
|
|
|
|
This file implements the WeaverImpl class.
|
|
|
|
|
|
$ Author: Mirko Boehm $
|
|
$ Copyright: (C) 2005-2013 Mirko Boehm $
|
|
$ Contact: mirko@kde.org
|
|
http://www.kde.org
|
|
http://creative-destruction.me $
|
|
|
|
This library is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU Library General Public
|
|
License as published by the Free Software Foundation; either
|
|
version 2 of the License, or (at your option) any later version.
|
|
|
|
This library is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Library General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Library General Public License
|
|
along with this library; see the file COPYING.LIB. If not, write to
|
|
the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
|
|
Boston, MA 02110-1301, USA.
|
|
|
|
$Id: WeaverImpl.cpp 30 2005-08-16 16:16:04Z mirko $
|
|
|
|
*/
|
|
|
|
#include "WeaverImpl.h"
|
|
|
|
#include <QtCore/QObject>
|
|
#include <QtCore/QMutex>
|
|
#include <kdebug.h>
|
|
|
|
#include "Job.h"
|
|
#include "State.h"
|
|
#include "Thread.h"
|
|
#include "ThreadWeaver.h"
|
|
#include "WeaverObserver.h"
|
|
#include "SuspendedState.h"
|
|
#include "SuspendingState.h"
|
|
#include "DestructedState.h"
|
|
#include "WorkingHardState.h"
|
|
#include "ShuttingDownState.h"
|
|
#include "InConstructionState.h"
|
|
|
|
using namespace ThreadWeaver;
|
|
|
|
WeaverImpl::WeaverImpl( QObject* parent )
|
|
: WeaverInterface(parent)
|
|
, m_active(0)
|
|
, m_inventoryMax( qMax(4, 2 * QThread::idealThreadCount() ) )
|
|
, m_mutex ( new QMutex( QMutex::Recursive ) )
|
|
, m_finishMutex( new QMutex )
|
|
, m_jobAvailableMutex ( new QMutex )
|
|
, m_state (0)
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
// initialize state objects:
|
|
m_states[InConstruction] = new InConstructionState( this );
|
|
setState ( InConstruction );
|
|
m_states[WorkingHard] = new WorkingHardState( this );
|
|
m_states[Suspending] = new SuspendingState( this );
|
|
m_states[Suspended] = new SuspendedState( this );
|
|
m_states[ShuttingDown] = new ShuttingDownState( this );
|
|
m_states[Destructed] = new DestructedState( this );
|
|
|
|
setState( WorkingHard );
|
|
}
|
|
|
|
WeaverImpl::~WeaverImpl()
|
|
{ // the constructor may only be called from the thread that owns this
|
|
// object (everything else would be what we professionals call "insane")
|
|
Q_ASSERT( QThread::currentThread() == thread() );
|
|
kDebug("destroying inventory.");
|
|
setState ( ShuttingDown );
|
|
|
|
m_jobAvailable.wakeAll();
|
|
|
|
// problem: Some threads might not be asleep yet, just finding
|
|
// out if a job is available. Those threads will suspend
|
|
// waiting for their next job (a rare case, but not impossible).
|
|
// Therefore, if we encounter a thread that has not exited, we
|
|
// have to wake it again (which we do in the following for
|
|
// loop).
|
|
|
|
for (;;) {
|
|
Thread* th = 0;
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
if (m_inventory.isEmpty()) break;
|
|
th = m_inventory.takeFirst();
|
|
}
|
|
if ( !th->isFinished() )
|
|
{
|
|
for ( ;; )
|
|
{
|
|
m_jobAvailable.wakeAll();
|
|
if ( th->wait( 100 ) ) break;
|
|
kDebug() << "thread" << th->id() << "did not exit as expected, retrying.";
|
|
}
|
|
}
|
|
emit ( threadExited ( th ) );
|
|
delete th;
|
|
}
|
|
Q_ASSERT(m_inventory.isEmpty());
|
|
delete m_finishMutex;
|
|
delete m_jobAvailableMutex;
|
|
kDebug ( "done" );
|
|
setState ( Destructed ); // m_state = Halted;
|
|
// FIXME: delete state objects. what sense does DestructedState make then?
|
|
// FIXME: make state objects static, since they are
|
|
delete m_mutex;
|
|
}
|
|
|
|
void WeaverImpl::setState ( StateId id )
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
if ( m_state==0 || m_state->stateId() != id )
|
|
{
|
|
m_state = m_states[id];
|
|
kDebug() << "state changed to" << m_state->stateName();
|
|
if ( id == Suspended )
|
|
{
|
|
emit ( suspended() );
|
|
}
|
|
|
|
m_state->activated();
|
|
|
|
emit ( stateChanged ( m_state ) );
|
|
}
|
|
}
|
|
|
|
const State& WeaverImpl::state() const
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
return *m_state;
|
|
}
|
|
|
|
void WeaverImpl::setMaximumNumberOfThreads( int cap )
|
|
{
|
|
Q_ASSERT_X ( cap > 0, "Weaver Impl", "Thread inventory size has to be larger than zero." );
|
|
QMutexLocker l (m_mutex);
|
|
m_inventoryMax = cap;
|
|
}
|
|
|
|
int WeaverImpl::maximumNumberOfThreads() const
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
return m_inventoryMax;
|
|
}
|
|
|
|
int WeaverImpl::currentNumberOfThreads () const
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
return m_inventory.count ();
|
|
}
|
|
|
|
void WeaverImpl::registerObserver ( WeaverObserver *ext )
|
|
{
|
|
connect ( this, SIGNAL (stateChanged(ThreadWeaver::State*)),
|
|
ext, SIGNAL (weaverStateChanged(ThreadWeaver::State*)) );
|
|
connect ( this, SIGNAL (threadStarted(ThreadWeaver::Thread*)),
|
|
ext, SIGNAL (threadStarted(ThreadWeaver::Thread*)) );
|
|
connect ( this, SIGNAL (threadBusy(ThreadWeaver::Thread*,ThreadWeaver::Job*)),
|
|
ext, SIGNAL (threadBusy(ThreadWeaver::Thread*,ThreadWeaver::Job*)) );
|
|
connect ( this, SIGNAL (threadSuspended(ThreadWeaver::Thread*)),
|
|
ext, SIGNAL (threadSuspended(ThreadWeaver::Thread*)) );
|
|
connect ( this, SIGNAL (threadExited(ThreadWeaver::Thread*)) ,
|
|
ext, SIGNAL (threadExited(ThreadWeaver::Thread*)) );
|
|
}
|
|
|
|
void WeaverImpl::enqueue(Job* job)
|
|
{
|
|
if (job) {
|
|
adjustInventory ( 1 );
|
|
kDebug() << "queueing job" << job << "of type " << job->metaObject()->className();
|
|
QMutexLocker l (m_mutex); Q_UNUSED(l);
|
|
job->aboutToBeQueued ( this );
|
|
// find position for insertion:;
|
|
// FIXME (after 0.6) optimize: factor out queue management into own class,
|
|
// and use binary search for insertion (not done yet because
|
|
// refactoring already planned):
|
|
int i = m_assignments.size();
|
|
if (i > 0) {
|
|
while ( i > 0 && m_assignments.at(i - 1)->priority() < job->priority() ) --i;
|
|
m_assignments.insert( i, (job) );
|
|
} else {
|
|
m_assignments.append (job);
|
|
}
|
|
assignJobs();
|
|
}
|
|
}
|
|
|
|
void WeaverImpl::adjustInventory ( int numberOfNewJobs )
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
|
|
// no of threads that can be created:
|
|
const int reserve = m_inventoryMax - m_inventory.count();
|
|
|
|
if ( reserve > 0 )
|
|
{
|
|
for ( int i = 0; i < qMin ( reserve, numberOfNewJobs ); ++i )
|
|
{
|
|
Thread *th = createThread();
|
|
th->moveToThread( th ); // be sane from the start
|
|
m_inventory.append(th);
|
|
connect ( th, SIGNAL (jobStarted(ThreadWeaver::Thread*,ThreadWeaver::Job*)),
|
|
SIGNAL (threadBusy(ThreadWeaver::Thread*,ThreadWeaver::Job*)) );
|
|
connect ( th, SIGNAL (jobDone(ThreadWeaver::Job*)),
|
|
SIGNAL (jobDone(ThreadWeaver::Job*)) );
|
|
connect ( th, SIGNAL (started(ThreadWeaver::Thread*)),
|
|
SIGNAL (threadStarted(ThreadWeaver::Thread*)) );
|
|
|
|
th->start ();
|
|
kDebug() << "thread created," << currentNumberOfThreads() << "threads in inventory.";
|
|
}
|
|
}
|
|
}
|
|
|
|
Thread* WeaverImpl::createThread()
|
|
{
|
|
return new Thread( this );
|
|
}
|
|
|
|
bool WeaverImpl::dequeue ( Job* job )
|
|
{
|
|
bool result;
|
|
{
|
|
QMutexLocker l (m_mutex);
|
|
|
|
int i = m_assignments.indexOf ( job );
|
|
if ( i != -1 )
|
|
{
|
|
job->aboutToBeDequeued( this );
|
|
|
|
m_assignments.removeAt( i );
|
|
result = true;
|
|
kDebug() << "job" << job << "dequeued," << m_assignments.size() << "jobs left.";
|
|
} else {
|
|
kDebug() << "job" << job << "not found in queue.";
|
|
result = false;
|
|
}
|
|
}
|
|
|
|
// from the queues point of view, a job is just as finished if
|
|
// it gets dequeued:
|
|
m_jobFinished.wakeOne();
|
|
return result;
|
|
}
|
|
|
|
void WeaverImpl::dequeue ()
|
|
{
|
|
kDebug( "dequeueing all jobs." );
|
|
QMutexLocker l (m_mutex);
|
|
for ( int index = 0; index < m_assignments.size(); ++index )
|
|
{
|
|
m_assignments.at( index )->aboutToBeDequeued( this );
|
|
}
|
|
m_assignments.clear();
|
|
|
|
Q_ASSERT ( m_assignments.isEmpty() );
|
|
}
|
|
|
|
void WeaverImpl::suspend ()
|
|
{
|
|
m_state->suspend();
|
|
}
|
|
|
|
void WeaverImpl::resume ( )
|
|
{
|
|
m_state->resume();
|
|
}
|
|
|
|
void WeaverImpl::assignJobs()
|
|
{
|
|
m_jobAvailable.wakeAll();
|
|
}
|
|
|
|
bool WeaverImpl::isEmpty() const
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
return m_assignments.isEmpty();
|
|
}
|
|
|
|
|
|
void WeaverImpl::incActiveThreadCount()
|
|
{
|
|
adjustActiveThreadCount ( 1 );
|
|
}
|
|
|
|
void WeaverImpl::decActiveThreadCount()
|
|
{
|
|
adjustActiveThreadCount ( -1 );
|
|
// the done job could have freed another set of jobs, and we do not know how
|
|
// many - therefore we need to wake all threads:
|
|
m_jobFinished.wakeAll();
|
|
}
|
|
|
|
void WeaverImpl::adjustActiveThreadCount( int diff )
|
|
{
|
|
QMutexLocker l (m_mutex); Q_UNUSED(l);
|
|
m_active += diff;
|
|
kDebug() << m_active << "active threads (" << queueLength() << "in queue).";
|
|
|
|
if ( m_assignments.isEmpty() && m_active == 0)
|
|
{
|
|
Q_ASSERT ( diff < 0 ); // cannot reach Zero otherwise
|
|
emit ( finished() );
|
|
}
|
|
}
|
|
|
|
int WeaverImpl::activeThreadCount()
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
return m_active;
|
|
}
|
|
|
|
Job* WeaverImpl::takeFirstAvailableJob(Job *previous)
|
|
{
|
|
QMutexLocker l (m_mutex); Q_UNUSED(l);
|
|
if (previous) {
|
|
// cleanup and send events:
|
|
decActiveThreadCount();
|
|
}
|
|
Job *next = 0;
|
|
for (int index = 0; index < m_assignments.size(); ++index) {
|
|
if ( m_assignments.at(index)->canBeExecuted() ) {
|
|
next = m_assignments.at(index);
|
|
m_assignments.removeAt (index);
|
|
break;
|
|
}
|
|
}
|
|
if (next) {
|
|
incActiveThreadCount();
|
|
}
|
|
return next;
|
|
}
|
|
|
|
Job* WeaverImpl::applyForWork(Thread *th, Job* previous)
|
|
{
|
|
return m_state->applyForWork ( th, previous );
|
|
}
|
|
|
|
void WeaverImpl::waitForAvailableJob(Thread* th)
|
|
{
|
|
m_state->waitForAvailableJob ( th );
|
|
}
|
|
|
|
void WeaverImpl::blockThreadUntilJobsAreBeingAssigned ( Thread *th )
|
|
{ // th is the thread that calls this method:
|
|
kDebug() << "thread"<< th->id() << "blocked.";
|
|
emit threadSuspended ( th );
|
|
QMutexLocker l( m_jobAvailableMutex );
|
|
m_jobAvailable.wait( m_jobAvailableMutex );
|
|
kDebug() << "thread"<< th->id() << "resumed.";
|
|
}
|
|
|
|
int WeaverImpl::queueLength() const
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
return m_assignments.count();
|
|
}
|
|
|
|
bool WeaverImpl::isIdle () const
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
return isEmpty() && m_active == 0;
|
|
}
|
|
|
|
void WeaverImpl::finish()
|
|
{
|
|
#ifdef QT_NO_DEBUG
|
|
const int MaxWaitMilliSeconds = 50;
|
|
#else
|
|
const int MaxWaitMilliSeconds = 500;
|
|
#endif
|
|
while ( !isIdle() ) {
|
|
Q_ASSERT(state().stateId() == WorkingHard);
|
|
kDebug ( "not done, waiting." );
|
|
QMutexLocker l( m_finishMutex );
|
|
if ( m_jobFinished.wait( l.mutex(), MaxWaitMilliSeconds ) == false ) {
|
|
kDebug() << "wait timed out," << queueLength() << "jobs left, waking threads.";
|
|
m_jobAvailable.wakeAll();
|
|
}
|
|
}
|
|
kDebug ( "done.\n\n" );
|
|
}
|
|
|
|
void WeaverImpl::requestAbort()
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
for ( int i = 0; i<m_inventory.size(); ++i ) {
|
|
m_inventory[i]->requestAbort();
|
|
}
|
|
}
|
|
|
|
void WeaverImpl::dumpJobs()
|
|
{
|
|
QMutexLocker l(m_mutex); Q_UNUSED(l);
|
|
kDebug( "current jobs:" );
|
|
for ( int index = 0; index < m_assignments.size(); ++index ) {
|
|
kDebug() << "-->"
|
|
<< index << ":"
|
|
<< m_assignments.at( index )
|
|
<< m_assignments.at( index )->metaObject()->className()
|
|
<< "(priority" << m_assignments.at(index)->priority()
|
|
<< ", can be executed:" << m_assignments.at(index)->canBeExecuted() << ")";
|
|
}
|
|
}
|
|
|
|
#include "moc_WeaverImpl.cpp"
|