/* Copyright (c) 2006 Tobias Koenig 2006 Marc Mutz 2006 - 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "job.h" #include "job_p.h" #include "dbusconnectionpool.h" #include #include "imapparser_p.h" #include "session.h" #include "session_p.h" #include #include #include #include #include #include #include using namespace Akonadi; static QDBusAbstractInterface *s_jobtracker = 0; //@cond PRIVATE void JobPrivate::handleResponse(const QByteArray &tag, const QByteArray &data) { Q_Q(Job); if (mCurrentSubJob) { mCurrentSubJob->d_ptr->handleResponse(tag, data); return; } if (tag == mTag) { if (data.startsWith("NO ") || data.startsWith("BAD ")) { //krazy:exclude=strings QString msg = QString::fromUtf8(data); msg.remove(0, msg.startsWith(QLatin1String("NO ")) ? 3 : 4); if (msg.endsWith(QLatin1String("\r\n"))) { msg.chop(2); } q->setError(Job::Unknown); q->setErrorText(msg); q->emitResult(); return; } else if (data.startsWith("OK")) { //krazy:exclude=strings // We can't use emitResult() here: The slot connected to the result signal might exec() // another job, and therefore this method would never return. That causes the session // to deadlock, since it calls this method and does not continue starting new jobs until // this method finishes. Which would also mean the exec()'d job is never started,, and there- // fore everything deadlocks. QTimer::singleShot(0, q, SLOT(delayedEmitResult())); return; } } q->doHandleResponse(tag, data); } void JobPrivate::init(QObject *parent) { Q_Q(Job); mParentJob = dynamic_cast(parent); mSession = dynamic_cast(parent); if (!mSession) { if (!mParentJob) { mSession = Session::defaultSession(); } else { mSession = mParentJob->d_ptr->mSession; } } if (!mParentJob) { mSession->d->addJob(q); } else { mParentJob->addSubjob(q); } // if there's a job tracker running, tell it about the new job if (!s_jobtracker) { // Let's only check for the debugging console every 3 seconds, otherwise every single job // makes a dbus call to the dbus daemon, doesn't help performance. static QTime s_lastTime; if (s_lastTime.isNull() || s_lastTime.elapsed() > 3000) { if (s_lastTime.isNull()) { s_lastTime.start(); } if (DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String("org.kde.akonadiconsole"))) { s_jobtracker = new QDBusInterface(QLatin1String("org.kde.akonadiconsole"), QLatin1String("/jobtracker"), QLatin1String("org.freedesktop.Akonadi.JobTracker"), DBusConnectionPool::threadConnection(), 0); } else { s_lastTime.restart(); } } // Note: we never reset s_jobtracker to 0 when a call fails; but if we did // then we should restart s_lastTime. } QMetaObject::invokeMethod(q, "signalCreationToJobTracker", Qt::QueuedConnection); } void JobPrivate::signalCreationToJobTracker() { Q_Q(Job); if (s_jobtracker) { // We do these dbus calls manually, so as to avoid having to install (or copy) the console's // xml interface document. Since this is purely a debugging aid, that seems preferable to // publishing something not intended for public consumption. // WARNING: for any signature change here, apply it to resourcescheduler.cpp too QList argumentList; argumentList << QLatin1String(mSession->sessionId()) << QString::number(reinterpret_cast(q), 16) << (mParentJob ? QString::number(reinterpret_cast(mParentJob), 16) : QString()) << QString::fromLatin1(q->metaObject()->className()) << jobDebuggingString(); s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String("jobCreated"), argumentList); } } void JobPrivate::signalStartedToJobTracker() { Q_Q(Job); if (s_jobtracker) { // if there's a job tracker running, tell it a job started QList argumentList; argumentList << QString::number(reinterpret_cast(q), 16); s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String("jobStarted"), argumentList); } } void JobPrivate::aboutToFinish() { // Dummy } void JobPrivate::delayedEmitResult() { Q_Q(Job); aboutToFinish(); q->emitResult(); } void JobPrivate::startQueued() { Q_Q(Job); mStarted = true; emit q->aboutToStart(q); q->doStart(); QTimer::singleShot(0, q, SLOT(startNext())); QMetaObject::invokeMethod(q, "signalStartedToJobTracker", Qt::QueuedConnection); } void JobPrivate::lostConnection() { Q_Q(Job); if (mCurrentSubJob) { mCurrentSubJob->d_ptr->lostConnection(); } else { q->setError(Job::ConnectionFailed); q->emitResult(); } } void JobPrivate::slotSubJobAboutToStart(Job *job) { Q_ASSERT(mCurrentSubJob == 0); mCurrentSubJob = job; } void JobPrivate::startNext() { Q_Q(Job); if (mStarted && !mCurrentSubJob && q->hasSubjobs()) { Job *job = dynamic_cast(q->subjobs().first()); Q_ASSERT(job); job->d_ptr->startQueued(); } } QByteArray JobPrivate::newTag() { if (mParentJob) { mTag = mParentJob->d_ptr->newTag(); } else { mTag = QByteArray::number(mSession->d->nextTag()); } return mTag; } QByteArray JobPrivate::tag() const { return mTag; } void JobPrivate::writeData(const QByteArray &data) { Q_ASSERT_X(!mWriteFinished, "Job::writeData()", "Calling writeData() after emitting writeFinished()"); mSession->d->writeData(data); } void JobPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision) { mSession->d->itemRevisionChanged(itemId, oldRevision, newRevision); } void JobPrivate::updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision) { Q_Q(Job); foreach (KJob *j, q->subjobs()) { Akonadi::Job *job = qobject_cast(j); if (job) { job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision); } } doUpdateItemRevision(itemId, oldRevision, newRevision); } void JobPrivate::doUpdateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision) { Q_UNUSED(itemId); Q_UNUSED(oldRevision); Q_UNUSED(newRevision); } int JobPrivate::protocolVersion() const { return mSession->d->protocolVersion; } //@endcond Job::Job(QObject *parent) : KCompositeJob(parent) , d_ptr(new JobPrivate(this)) { d_ptr->init(parent); } Job::Job(JobPrivate *dd, QObject *parent) : KCompositeJob(parent) , d_ptr(dd) { d_ptr->init(parent); } Job::~Job() { delete d_ptr; // if there is a job tracer listening, tell it the job is done now if (s_jobtracker) { QList argumentList; argumentList << QString::number(reinterpret_cast(this), 16) << errorString(); s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String("jobEnded"), argumentList); } } void Job::start() { } bool Job::doKill() { Q_D(Job); if (d->mStarted) { // the only way to cancel an already started job is reconnecting to the server d->mSession->d->forceReconnect(); } d->mStarted = false; return true; } QString Job::errorString() const { QString str; switch (error()) { case NoError: break; case ConnectionFailed: str = i18n("Cannot connect to the Akonadi service."); break; case ProtocolVersionMismatch: str = i18n("The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed."); break; case UserCanceled: str = i18n("User canceled operation."); break; case Unknown: return errorText(); default: str = i18n("Unknown error."); break; } if (!errorText().isEmpty()) { str += QString::fromLatin1(" (%1)").arg(errorText()); } return str; } bool Job::addSubjob(KJob *job) { bool rv = KCompositeJob::addSubjob(job); if (rv) { connect(job, SIGNAL(aboutToStart(Akonadi::Job*)), SLOT(slotSubJobAboutToStart(Akonadi::Job*))); QTimer::singleShot(0, this, SLOT(startNext())); } return rv; } bool Job::removeSubjob(KJob *job) { bool rv = KCompositeJob::removeSubjob(job); if (job == d_ptr->mCurrentSubJob) { d_ptr->mCurrentSubJob = 0; QTimer::singleShot(0, this, SLOT(startNext())); } return rv; } void Job::doHandleResponse(const QByteArray &tag, const QByteArray &data) { kDebug() << "Unhandled response: " << tag << data; } void Job::slotResult(KJob *job) { if (d_ptr->mCurrentSubJob == job) { // current job finished, start the next one d_ptr->mCurrentSubJob = 0; KCompositeJob::slotResult(job); if (!job->error()) { QTimer::singleShot(0, this, SLOT(startNext())); } } else { // job that was still waiting for execution finished, probably canceled, // so just remove it from the queue and move on without caring about // its error code KCompositeJob::removeSubjob(job); } } void Job::emitWriteFinished() { d_ptr->mWriteFinished = true; emit writeFinished(this); } #include "moc_job.cpp"