kio: port the slave interface and base bits to QSocketDevice and QSocketServer

needs some tweaks and testing but very much functional, the port of
KHTTP turned out to be 3x faster on files but that may be due to the use
of KThreadPool too

Signed-off-by: Ivailo Monev <xakepa10@gmail.com>
This commit is contained in:
Ivailo Monev 2024-04-29 04:03:57 +03:00
parent 6062756a4d
commit 7ae07e0046
11 changed files with 178 additions and 877 deletions

View file

@ -320,7 +320,6 @@ install(
FILES
KIO/AuthInfo
KIO/ChmodJob
KIO/Connection
KIO/CopyInfo
KIO/CopyJob
KIO/DeleteJob

View file

@ -1 +0,0 @@
#include "../../kio/connection.h"

View file

@ -34,7 +34,6 @@ endif()
set(kiocore_STAT_SRCS
kio/authinfo.cpp
kio/chmodjob.cpp
kio/connection.cpp
kio/copyjob.cpp
kio/deletejob.cpp
kio/directorysizejob.cpp
@ -196,7 +195,6 @@ generate_export_header(kio)
install(
FILES
${CMAKE_CURRENT_BINARY_DIR}/kio_export.h
kio/connection.h
kio/slaveinterface.h
kio/slaveconfig.h
kio/global.h

View file

@ -1,551 +0,0 @@
/* This file is part of the KDE libraries
Copyright (C) 2000 Stephan Kulow <coolo@kde.org>
David Faure <faure@kde.org>
Copyright (C) 2007 Thiago Macieira <thiago@kde.org>
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public License
along with this library; see the file COPYING.LIB. If not, write to
the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1301, USA.
*/
#include "connection.h"
#include "connection_p.h"
#include <QQueue>
#include <QPointer>
#include <QElapsedTimer>
#include <kdebug.h>
#include <kcomponentdata.h>
#include <kglobal.h>
#include <klocale.h>
#include <kstandarddirs.h>
#include <kurl.h>
using namespace KIO;
class KIO::ConnectionPrivate
{
public:
inline ConnectionPrivate()
: backend(0), suspended(false)
{ }
void dequeue();
void commandReceived(const Task &task);
void disconnected();
void setBackend(SocketConnectionBackend *b);
QQueue<Task> outgoingTasks;
QQueue<Task> incomingTasks;
SocketConnectionBackend *backend;
Connection *q;
bool suspended;
};
class KIO::ConnectionServerPrivate
{
public:
inline ConnectionServerPrivate()
: backend(0)
{ }
ConnectionServer *q;
SocketConnectionBackend *backend;
};
void ConnectionPrivate::dequeue()
{
if (!backend || suspended)
return;
while (!outgoingTasks.isEmpty()) {
const Task task = outgoingTasks.dequeue();
q->sendnow(task.cmd, task.data);
}
if (!incomingTasks.isEmpty())
emit q->readyRead();
}
void ConnectionPrivate::commandReceived(const Task &task)
{
//kDebug() << this << "Command " << task.cmd << " added to the queue";
if (!suspended && incomingTasks.isEmpty())
QMetaObject::invokeMethod(q, "dequeue", Qt::QueuedConnection);
incomingTasks.enqueue(task);
}
void ConnectionPrivate::disconnected()
{
q->close();
QMetaObject::invokeMethod(q, "readyRead", Qt::QueuedConnection);
}
void ConnectionPrivate::setBackend(SocketConnectionBackend *b)
{
backend = b;
if (backend) {
q->connect(backend, SIGNAL(commandReceived(Task)), SLOT(commandReceived(Task)));
q->connect(backend, SIGNAL(disconnected()), SLOT(disconnected()));
backend->setSuspended(suspended);
}
}
SocketConnectionBackend::SocketConnectionBackend(QObject *parent)
: QObject(parent), state(Idle), socket(nullptr), localServer(nullptr), len(-1),
cmd(0), signalEmitted(false)
{
qRegisterMetaType<Task>("Task");
}
SocketConnectionBackend::~SocketConnectionBackend()
{
}
void SocketConnectionBackend::setSuspended(bool enable)
{
if (state != Connected)
return;
Q_ASSERT(socket);
Q_ASSERT(!localServer);
if (enable) {
//kDebug() << this << " suspending";
socket->setReadBufferSize(1);
} else {
//kDebug() << this << " resuming";
socket->setReadBufferSize(StandardBufferSize);
if (socket->bytesAvailable() >= HeaderSize) {
// there are bytes available
QMetaObject::invokeMethod(this, "socketReadyRead", Qt::QueuedConnection);
}
// We read all bytes here, but we don't use readAll() because we need
// to read at least one byte (even if there isn't any) so that the
// socket notifier is reenabled
QByteArray data = socket->read(socket->bytesAvailable() + 1);
for (int i = data.size(); --i >= 0; )
socket->ungetChar(data[i]);
}
}
bool SocketConnectionBackend::connectToRemote(const QString &address)
{
Q_ASSERT(state == Idle || state == Listening);
Q_ASSERT(!socket);
socket = new QLocalSocket(this);
socket->connectToServer(address);
if (!socket->waitForConnected()) {
state = Idle;
kDebug() << "could not connect to " << address;
return false;
}
connect(socket, SIGNAL(readyRead()), SLOT(socketReadyRead()));
connect(socket, SIGNAL(disconnected()), SLOT(socketDisconnected()));
state = Connected;
return true;
}
void SocketConnectionBackend::socketDisconnected()
{
state = Idle;
emit disconnected();
}
bool SocketConnectionBackend::listenForRemote()
{
Q_ASSERT(state == Idle);
Q_ASSERT(!socket);
Q_ASSERT(!localServer);
// NOTE: using long/complex server name can cause reconnection issues
const QString serveraddress = QString::fromLatin1("kio_") + QString::number(qrand());
localServer = new QLocalServer(this);
localServer->listen(serveraddress);
if (!localServer->isListening()) {
kWarning(7017) << "Could not listen on" << serveraddress << localServer->errorString();
errorString = localServer->errorString();
delete localServer;
localServer = nullptr;
return false;
}
address = localServer->fullServerName();
connect(localServer, SIGNAL(newConnection()), SIGNAL(newConnection()));
state = Listening;
return true;
}
bool SocketConnectionBackend::waitForIncomingTask(int ms)
{
Q_ASSERT(state == Connected);
Q_ASSERT(socket);
if (socket->state() != QLocalSocket::ConnectedState) {
state = Idle;
return false; // socket has probably closed, what do we do?
}
signalEmitted = false;
if (socket->bytesAvailable())
socketReadyRead();
if (signalEmitted)
return true; // there was enough data in the socket
// not enough data in the socket, so wait for more
QElapsedTimer timer;
timer.start();
while (socket->state() == QLocalSocket::ConnectedState && !signalEmitted &&
(ms == -1 || timer.elapsed() < ms))
if (!socket->waitForReadyRead(ms == -1 ? -1 : ms - timer.elapsed()))
break;
if (signalEmitted)
return true;
if (socket->state() != QLocalSocket::ConnectedState)
state = Idle;
return false;
}
bool SocketConnectionBackend::sendCommand(const Task &task)
{
Q_ASSERT(state == Connected);
Q_ASSERT(socket);
char buffer[HeaderSize + 1];
sprintf(buffer, "%6x_%2x_", task.data.size(), task.cmd);
socket->write(buffer, HeaderSize);
socket->write(task.data);
//kDebug() << this << " Sending command " << hex << task.cmd << " of "
// << task.data.size() << " bytes (" << socket->bytesToWrite()
// << " bytes left to write";
// blocking mode:
while (socket->bytesToWrite() > 0 && socket->state() == QLocalSocket::ConnectedState)
socket->waitForBytesWritten(-1);
return socket->state() == QLocalSocket::ConnectedState;
}
SocketConnectionBackend *SocketConnectionBackend::nextPendingConnection()
{
Q_ASSERT(state == Listening);
Q_ASSERT(localServer);
Q_ASSERT(!socket);
//kDebug() << "Got a new connection";
QLocalSocket *newSocket = localServer->nextPendingConnection();
if (!newSocket)
return 0; // there was no connection...
SocketConnectionBackend *result = new SocketConnectionBackend();
result->state = Connected;
result->socket = newSocket;
newSocket->setParent(result);
connect(newSocket, SIGNAL(readyRead()), result, SLOT(socketReadyRead()));
connect(newSocket, SIGNAL(disconnected()), result, SLOT(socketDisconnected()));
return result;
}
void SocketConnectionBackend::socketReadyRead()
{
bool shouldReadAnother;
do {
if (!socket)
// might happen if the invokeMethods were delivered after we disconnected
return;
// kDebug() << this << "Got " << socket->bytesAvailable() << " bytes";
if (len == -1) {
// We have to read the header
char buffer[HeaderSize];
if (socket->bytesAvailable() < HeaderSize) {
return; // wait for more data
}
socket->read(buffer, sizeof buffer);
buffer[6] = 0;
buffer[9] = 0;
char *p = buffer;
while( *p == ' ' ) p++;
len = strtol( p, 0L, 16 );
p = buffer + 7;
while( *p == ' ' ) p++;
cmd = strtol( p, 0L, 16 );
// kDebug() << this << " Beginning of command " << hex << cmd << " of size "
// << len;
}
QPointer<SocketConnectionBackend> that = this;
// kDebug() << this << "Want to read " << len << " bytes";
if (socket->bytesAvailable() >= len) {
Task task;
task.cmd = cmd;
if (len)
task.data = socket->read(len);
len = -1;
signalEmitted = true;
emit commandReceived(task);
} else if (len > StandardBufferSize) {
kDebug(7017) << this << "Jumbo packet of" << len << "bytes";
socket->setReadBufferSize(len + 1);
}
// If we're dead, better don't try anything.
if (that.isNull())
return;
// Do we have enough for an another read?
if (len == -1)
shouldReadAnother = socket->bytesAvailable() >= HeaderSize;
else
shouldReadAnother = socket->bytesAvailable() >= len;
}
while (shouldReadAnother);
}
Connection::Connection(QObject *parent)
: QObject(parent), d(new ConnectionPrivate)
{
d->q = this;
}
Connection::~Connection()
{
close();
delete d;
}
void Connection::suspend()
{
//kDebug() << this << "Suspended";
d->suspended = true;
if (d->backend)
d->backend->setSuspended(true);
}
void Connection::resume()
{
// send any outgoing or incoming commands that may be in queue
QMetaObject::invokeMethod(this, "dequeue", Qt::QueuedConnection);
//kDebug() << this << "Resumed";
d->suspended = false;
if (d->backend)
d->backend->setSuspended(false);
}
void Connection::close()
{
if (d->backend) {
d->backend->disconnect(this);
d->backend->deleteLater();
d->backend = 0;
}
d->outgoingTasks.clear();
d->incomingTasks.clear();
}
bool Connection::isConnected() const
{
return d->backend && d->backend->state == SocketConnectionBackend::Connected;
}
bool Connection::inited() const
{
return d->backend;
}
bool Connection::suspended() const
{
return d->suspended;
}
void Connection::connectToRemote(const QString &address)
{
d->setBackend(new SocketConnectionBackend(this));
kDebug(7017) << "Connection requested to" << address;
if (!d->backend->connectToRemote(address)) {
// should the process owning QLocalServer crash and its address remain in use attempt to
// connect to new server
kDebug(7017) << "Creating new server since connecting to address failed" << address;
d->backend->listenForRemote();
if (!d->backend->connectToRemote(d->backend->address)) {
kWarning(7017) << "Could not connect to" << address;
delete d->backend;
d->backend = 0;
return;
}
}
d->dequeue();
}
QString Connection::errorString() const
{
if (d->backend)
return d->backend->errorString;
return QString();
}
bool Connection::send(int cmd, const QByteArray& data)
{
if (!inited() || !d->outgoingTasks.isEmpty()) {
Task task;
task.cmd = cmd;
task.data = data;
d->outgoingTasks.enqueue(task);
return true;
} else {
return sendnow(cmd, data);
}
}
bool Connection::sendnow(int _cmd, const QByteArray &data)
{
if (data.size() > 0xffffff)
return false;
if (!isConnected())
return false;
//kDebug() << this << "Sending command " << _cmd << " of size " << data.size();
Task task;
task.cmd = _cmd;
task.data = data;
return d->backend->sendCommand(task);
}
bool Connection::hasTaskAvailable() const
{
return !d->incomingTasks.isEmpty();
}
bool Connection::waitForIncomingTask(int ms)
{
if (!isConnected())
return false;
if (d->backend)
return d->backend->waitForIncomingTask(ms);
return false;
}
int Connection::read( int* _cmd, QByteArray &data )
{
// if it's still empty, then it's an error
if (d->incomingTasks.isEmpty()) {
//kWarning() << this << "Task list is empty!";
return -1;
}
const Task task = d->incomingTasks.dequeue();
//kDebug() << this << "Command " << task.cmd << " removed from the queue (size "
// << task.data.size() << ")";
*_cmd = task.cmd;
data = task.data;
// if we didn't empty our reading queue, emit again
if (!d->suspended && !d->incomingTasks.isEmpty())
QMetaObject::invokeMethod(this, "dequeue", Qt::QueuedConnection);
return data.size();
}
ConnectionServer::ConnectionServer(QObject *parent)
: QObject(parent), d(new ConnectionServerPrivate)
{
d->q = this;
}
ConnectionServer::~ConnectionServer()
{
delete d;
}
void ConnectionServer::listenForRemote()
{
d->backend = new SocketConnectionBackend(this);
if (!d->backend->listenForRemote()) {
delete d->backend;
d->backend = 0;
return;
}
connect(d->backend, SIGNAL(newConnection()), this, SIGNAL(newConnection()));
kDebug(7017) << "Listening on " << d->backend->address;
}
QString ConnectionServer::address() const
{
if (d->backend)
return d->backend->address;
return QString();
}
bool ConnectionServer::isListening() const
{
return d->backend && d->backend->state == SocketConnectionBackend::Listening;
}
void ConnectionServer::close()
{
delete d->backend;
d->backend = 0;
}
Connection *ConnectionServer::nextPendingConnection()
{
if (!isListening())
return 0;
SocketConnectionBackend *newBackend = d->backend->nextPendingConnection();
if (!newBackend)
return 0; // no new backend...
Connection *result = new Connection;
result->d->setBackend(newBackend);
newBackend->setParent(result);
return result;
}
void ConnectionServer::setNextPendingConnection(Connection *conn)
{
SocketConnectionBackend *newBackend = d->backend->nextPendingConnection();
Q_ASSERT(newBackend);
conn->d->backend = newBackend;
conn->d->setBackend(newBackend);
newBackend->setParent(conn);
conn->d->dequeue();
}
#include "moc_connection_p.cpp"
#include "moc_connection.cpp"

View file

@ -1,181 +0,0 @@
// -*- c++ -*-
/* This file is part of the KDE libraries
Copyright (C) 2000 Stephan Kulow <coolo@kde.org>
David Faure <faure@kde.org>
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public License
along with this library; see the file COPYING.LIB. If not, write to
the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1301, USA.
*/
#ifndef KIO_CONNECTION_H
#define KIO_CONNECTION_H
#include "kio_export.h"
#include <QtCore/QObject>
namespace KIO {
class ConnectionPrivate;
class ConnectionServer;
/**
* @private
*
* This class provides a simple means for IPC between two applications
* via a pipe.
* It handles a queue of commands to be sent which makes it possible to
* queue data before an actual connection has been established.
*/
class KIO_EXPORT Connection : public QObject
{
Q_OBJECT
public:
/**
* Creates a new connection.
* @see connectToRemote, listenForRemote
*/
explicit Connection(QObject *parent = 0);
virtual ~Connection();
/**
* Connects to the remote address.
*/
void connectToRemote(const QString &address);
/// Closes the connection.
void close();
QString errorString() const;
bool isConnected() const;
/**
* Checks whether the connection has been initialized.
* @return true if the initialized
* @see init()
*/
bool inited() const;
/**
* Sends/queues the given command to be sent.
* @param cmd the command to set
* @param arr the bytes to send
* @return true if successful, false otherwise
*/
bool send(int cmd, const QByteArray &arr = QByteArray());
/**
* Sends the given command immediately.
* @param _cmd the command to set
* @param data the bytes to send
* @return true if successful, false otherwise
*/
bool sendnow( int _cmd, const QByteArray &data );
/**
* Returns true if there are packets to be read immediately,
* false if waitForIncomingTask must be called before more data
* is available.
*/
bool hasTaskAvailable() const;
/**
* Waits for one more command to be handled and ready.
*
* @param ms the time to wait in milliseconds
* @returns true if one command can be read, false if we timed out
*/
bool waitForIncomingTask(int ms = 30000);
/**
* Receive data.
*
* @param _cmd the received command will be written here
* @param data the received data will be written here
* @return >=0 indicates the received data size upon success
* -1 indicates error
*/
int read( int* _cmd, QByteArray &data );
/**
* Don't handle incoming data until resumed.
*/
void suspend();
/**
* Resume handling of incoming data.
*/
void resume();
/**
* Returns status of connection.
* @return true if suspended, false otherwise
*/
bool suspended() const;
Q_SIGNALS:
void readyRead();
private:
Q_PRIVATE_SLOT(d, void dequeue())
Q_PRIVATE_SLOT(d, void commandReceived(Task))
Q_PRIVATE_SLOT(d, void disconnected())
friend class ConnectionPrivate;
friend class ConnectionServer;
class ConnectionPrivate* const d;
};
class ConnectionServerPrivate;
/**
* @private
*
* This class provides a way to obtaining KIO::Connection connections.
*/
class KIO_EXPORT ConnectionServer : public QObject
{
Q_OBJECT
public:
ConnectionServer(QObject *parent = 0);
~ConnectionServer();
/**
* Sets this connection to listen mode. Use address() to obtain the
* address this is listening on.
*/
void listenForRemote();
bool isListening() const;
/// Closes the connection.
void close();
/**
* Returns the address for this connection if it is listening, an empty
* string if not.
*/
QString address() const;
Connection *nextPendingConnection();
void setNextPendingConnection(Connection *conn);
Q_SIGNALS:
void newConnection();
private:
friend class ConnectionServerPrivate;
ConnectionServerPrivate * const d;
};
}
#endif

View file

@ -1,74 +0,0 @@
/* This file is part of the KDE libraries
Copyright (C) 2007 Thiago Macieira <thiago@kde.org>
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public License
along with this library; see the file COPYING.LIB. If not, write to
the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1301, USA.
*/
#ifndef KIO_CONNECTION_P_H
#define KIO_CONNECTION_P_H
#include <QLocalSocket>
#include <QLocalServer>
class KUrl;
namespace KIO {
struct Task {
int cmd;
QByteArray data;
};
class SocketConnectionBackend: public QObject
{
Q_OBJECT
public:
QString address;
QString errorString;
enum { Idle, Listening, Connected } state;
private:
enum { HeaderSize = 10, StandardBufferSize = 32*1024 };
QLocalSocket *socket;
QLocalServer *localServer;
long len;
int cmd;
bool signalEmitted;
public:
explicit SocketConnectionBackend(QObject *parent = 0);
~SocketConnectionBackend();
void setSuspended(bool enable);
bool connectToRemote(const QString &address);
bool listenForRemote();
bool waitForIncomingTask(int ms);
bool sendCommand(const Task &task);
SocketConnectionBackend *nextPendingConnection();
public slots:
void socketReadyRead();
void socketDisconnected();
Q_SIGNALS:
void disconnected();
void commandReceived(const Task &task);
void newConnection();
};
}
Q_DECLARE_METATYPE(KIO::Task)
#endif

View file

@ -24,7 +24,6 @@
#include "slaveconfig.h"
#include "authinfo.h"
#include "slaveinterface.h"
#include "connection.h"
#include "job_p.h"
#include <kdebug.h>

View file

@ -37,6 +37,7 @@
#include <QtCore/QList>
#include <QtCore/QElapsedTimer>
#include <QtCore/QCoreApplication>
#include <QtCore/QThread>
#include "kdebug.h"
#include "kcrash.h"
@ -48,9 +49,9 @@
#include "kwindowsystem.h"
#include "kpasswdstore.h"
#include "kremoteencoding.h"
#include "connection.h"
#include "ioslave_defaults.h"
#include "slaveinterface.h"
#include "slaveinterface_p.h"
#include "job_p.h"
#define AUTHINFO_EXTRAFIELD_DOMAIN QLatin1String("domain")
@ -77,6 +78,35 @@ static const int s_quit_signals[] = {
0
};
static bool waitForIncommingTask(QSocketDevice *socket, const int ms)
{
QEventLoop eventloop;
QObject::connect(socket, SIGNAL(readyRead()), &eventloop, SLOT(quit()));
if (ms > 0) {
QTimer::singleShot(ms, &eventloop, SLOT(quit()));
}
eventloop.exec();
return true;
}
#if 0
// alternative variant with more busy time
static bool waitForIncommingTask2(QSocketDevice *socket, const int ms)
{
static const int s_busytime = 10; // ms
QElapsedTimer elapsed;
elapsed.start();
while (socket->isOpen() && (ms == -1 || elapsed.elapsed() < ms)) {
QCoreApplication::processEvents(QEventLoop::AllEvents, s_busytime);
QThread::msleep(s_busytime);
if (socket->bytesAvailable() >= 10) {
return true;
}
}
return false;
}
#endif
// two keys are used to store the auth info, the reason for doing so is to be able to automagically
// fill user and password when none has been specified (e.g. ftp://foo.bar.com has been
// authenticated before). the same goes for the port
@ -133,7 +163,7 @@ public:
UDSEntryList pendingListEntries;
QElapsedTimer m_timeSinceLastBatch;
Connection appConnection;
QSocketDevice appConnection;
bool needSendCanResume;
bool wasKilled;
@ -271,8 +301,8 @@ SlaveBase::SlaveBase(const QByteArray &protocol,
globalSlave = this;
const QString address = QFile::decodeName(app_socket);
d->appConnection.connectToRemote(address);
if (!d->appConnection.inited()) {
d->appConnection.init(address);
if (!d->appConnection.open(QIODevice::ReadWrite)) {
kDebug(7019) << "failed to connect to" << address << '\n'
<< "Reason:" << d->appConnection.errorString();
exit();
@ -297,26 +327,24 @@ void SlaveBase::dispatchLoop()
special(data);
}
Q_ASSERT(d->appConnection.inited());
int ms = -1;
if (d->timeout) {
ms = 1000 * qMax<time_t>(d->timeout - time(0), 1);
}
int ret = -1;
if (d->appConnection.hasTaskAvailable() || d->appConnection.waitForIncomingTask(ms)) {
if (waitForIncommingTask(&d->appConnection, ms)) {
// dispatch application messages
int cmd;
QByteArray data;
ret = d->appConnection.read(&cmd, data);
ret = kReadCommand(&d->appConnection, &cmd, data);
if (ret != -1) {
dispatch(cmd, data);
}
}
if (ret == -1 || !d->appConnection.isConnected()) {
if (ret == -1 || !d->appConnection.isOpen()) {
// some error occurred or not connected to application socket
break;
}
@ -535,8 +563,8 @@ void SlaveBase::mimeType(const QString &_type)
while (true) {
cmd = 0;
int ret = -1;
if (d->appConnection.hasTaskAvailable() || d->appConnection.waitForIncomingTask(-1)) {
ret = d->appConnection.read(&cmd, data);
if (waitForIncommingTask(&d->appConnection, -1)) {
ret = kReadCommand(&d->appConnection, &cmd, data);
}
if (ret == -1) {
kDebug(7019) << "read error";
@ -812,8 +840,8 @@ int SlaveBase::waitForAnswer(int expected1, int expected2, QByteArray &data, int
int cmd = 0;
int result = -1;
for (;;) {
if (d->appConnection.hasTaskAvailable() || d->appConnection.waitForIncomingTask(-1)) {
result = d->appConnection.read(&cmd, data);
if (waitForIncommingTask(&d->appConnection, -1)) {
result = kReadCommand(&d->appConnection, &cmd, data);
}
if (result == -1) {
kDebug(7019) << "read error.";
@ -1131,7 +1159,7 @@ void SlaveBase::setKillFlag()
void SlaveBase::send(int cmd, const QByteArray &arr)
{
slaveWriteError = false;
if (!d->appConnection.send(cmd, arr)) {
if (!kSendCommandNow(&d->appConnection, cmd, arr)) {
// Note that slaveWriteError can also be set by sigpipe_handler
slaveWriteError = true;
}

View file

@ -22,7 +22,6 @@
#include "slaveinterface_p.h"
#include "usernotificationhandler_p.h"
#include "slavebase.h"
#include "connection.h"
#include "job_p.h"
#include <kdebug.h>
@ -56,14 +55,16 @@ using namespace KIO;
Q_GLOBAL_STATIC(UserNotificationHandler, globalUserNotificationHandler)
SlaveInterfacePrivate::SlaveInterfacePrivate(const QString &protocol)
: connection(nullptr),
filesize(0),
: filesize(0),
offset(0),
last_time(0),
nums(0),
slave_calcs_speed(false),
m_protocol(protocol),
slaveconnserver(new KIO::ConnectionServer()),
connection(nullptr),
slaveconnserver(nullptr),
slavenotifier(nullptr),
m_suspended(false),
m_job(nullptr),
m_pid(0),
m_port(0),
@ -74,15 +75,11 @@ SlaveInterfacePrivate::SlaveInterfacePrivate(const QString &protocol)
{
start_time.tv_sec = 0;
start_time.tv_usec = 0;
slaveconnserver->listenForRemote();
if (!slaveconnserver->isListening()) {
kWarning() << "Connection server not listening, could not connect";
}
}
SlaveInterfacePrivate::~SlaveInterfacePrivate()
{
delete slavenotifier;
delete slaveconnserver;
delete connection;
}
@ -93,9 +90,20 @@ SlaveInterface::SlaveInterface(const QString &protocol, QObject *parent)
d_ptr(new SlaveInterfacePrivate(protocol))
{
connect(&d_ptr->speed_timer, SIGNAL(timeout()), SLOT(calcSpeed()));
d_ptr->slaveconnserver->setParent(this);
d_ptr->connection = new Connection(this);
connect(d_ptr->slaveconnserver, SIGNAL(newConnection()), SLOT(accept()));
// NOTE: using long/complex server name can cause reconnection issues
const QString serveraddress = QString::fromLatin1("kio_") + QString::number(qrand());
d_ptr->slaveconnserver = new QSocketServer(this);
d_ptr->slaveconnserver->init(serveraddress);
if (!d_ptr->slaveconnserver->listen()) {
kWarning() << "Connection server not listening, could not connect";
return;
}
d_ptr->slavenotifier = new QSocketNotifier(d_ptr->slaveconnserver->socketDescriptor(), QSocketNotifier::Read, this);
connect(d_ptr->slavenotifier, SIGNAL(activated(int)), this, SLOT(accept()));
d_ptr->connection = new QSocketDevice(this);
}
SlaveInterface::~SlaveInterface()
@ -205,25 +213,25 @@ bool SlaveInterface::isAlive() const
void SlaveInterface::suspend()
{
Q_D(SlaveInterface);
d->connection->suspend();
d->m_suspended = true;
}
void SlaveInterface::resume()
{
Q_D(SlaveInterface);
d->connection->resume();
d->m_suspended = false;
}
bool SlaveInterface::suspended() const
{
Q_D(const SlaveInterface);
return d->connection->suspended();
return d->m_suspended;
}
void SlaveInterface::send(int cmd, const QByteArray &arr)
{
Q_D(SlaveInterface);
d->connection->send(cmd, arr);
kSendCommand(d->connection, cmd, arr, d->m_tasks, d->m_suspended);
}
void SlaveInterface::kill()
@ -250,7 +258,7 @@ void SlaveInterface::setHost( const QString &host, quint16 port,
QByteArray data;
QDataStream stream(&data, QIODevice::WriteOnly);
stream << d->m_host << d->m_port << d->m_user << d->m_passwd;
d->connection->send(CMD_HOST, data);
kSendCommand(d->connection, CMD_HOST, data, d->m_tasks, d->m_suspended);
}
void SlaveInterface::resetHost()
@ -265,14 +273,14 @@ void SlaveInterface::setConfig(const MetaData &config)
QByteArray data;
QDataStream stream(&data, QIODevice::WriteOnly);
stream << config;
d->connection->send(CMD_CONFIG, data);
kSendCommand(d->connection, CMD_CONFIG, data, d->m_tasks, d->m_suspended);
}
SlaveInterface* SlaveInterface::createSlave(const QString &protocol, const KUrl &url, int &error, QString &error_text)
{
kDebug(7002) << "createSlave" << protocol << "for" << url;
SlaveInterface *slave = new SlaveInterface(protocol);
const QString slaveaddress = slave->d_func()->slaveconnserver->address();
const QString slaveaddress = slave->d_func()->slaveconnserver->socketName();
const QString slavename = KProtocolInfo::exec(protocol);
if (slavename.isEmpty()) {
@ -310,18 +318,6 @@ SlaveInterface* SlaveInterface::createSlave(const QString &protocol, const KUrl
return slave;
}
void SlaveInterface::setConnection(Connection* connection)
{
Q_D(SlaveInterface);
d->connection = connection;
}
Connection *SlaveInterface::connection() const
{
Q_D(const SlaveInterface);
return d->connection;
}
bool SlaveInterface::dispatch()
{
Q_D(SlaveInterface);
@ -329,7 +325,7 @@ bool SlaveInterface::dispatch()
int cmd = 0;
QByteArray data;
int ret = d->connection->read(&cmd, data);
int ret = kReadCommand(d->connection, &cmd, data);
if (ret == -1) {
return false;
}
@ -486,8 +482,8 @@ bool SlaveInterface::dispatch(int cmd, const QByteArray &rawdata)
QString str;
stream >> str;
emit mimeType(str);
if (!d->connection->suspended())
d->connection->sendnow(CMD_NONE, QByteArray());
if (!d->m_suspended)
kSendCommandNow(d->connection, CMD_NONE, QByteArray());
break;
}
case INF_WARNING: {
@ -544,7 +540,7 @@ void SlaveInterface::sendResumeAnswer(bool resume)
{
Q_D(SlaveInterface);
kDebug(7007) << "ok for resuming:" << resume;
d->connection->sendnow(resume ? CMD_RESUMEANSWER : CMD_NONE, QByteArray());
kSendCommandNow(d->connection, resume ? CMD_RESUMEANSWER : CMD_NONE, QByteArray());
}
void SlaveInterface::sendMessageBoxAnswer(int result)
@ -554,13 +550,13 @@ void SlaveInterface::sendMessageBoxAnswer(int result)
return;
}
if (d->connection->suspended()) {
d->connection->resume();
if (d->m_suspended) {
d->m_suspended = false;
}
QByteArray packedArgs;
QDataStream stream(&packedArgs, QIODevice::WriteOnly);
stream << result;
d->connection->sendnow(CMD_MESSAGEBOXANSWER, packedArgs);
kSendCommandNow(d->connection, CMD_MESSAGEBOXANSWER, packedArgs);
kDebug(7007) << "message box answer" << result;
}
@ -576,7 +572,7 @@ void SlaveInterface::messageBox(int type, const QString &text, const QString &ca
Q_D(SlaveInterface);
if (d->connection) {
d->connection->suspend();
d->m_suspended = true;
}
QHash<UserNotificationHandler::MessageBoxDataType, QString> data;
@ -602,11 +598,17 @@ void SlaveInterface::messageBox(int type, const QString &text, const QString &ca
void SlaveInterface::accept()
{
Q_D(SlaveInterface);
d->slaveconnserver->setNextPendingConnection(d->connection);
d->slaveconnserver->deleteLater();
d->slaveconnserver = 0;
if (d->connection) {
// kWarning() << "there is already a connection";
delete d->connection;
}
d->connection = d->slaveconnserver->acceptConnection();
d->slavenotifier->setEnabled(false);
connect(d->connection, SIGNAL(readyRead()), SLOT(gotInput()));
while (!d->m_tasks.isEmpty()) {
const KIO::Task task = d->m_tasks.dequeue();
kSendCommandNow(d->connection, task.cmd, task.data);
}
}
void SlaveInterface::gotInput()
@ -641,7 +643,7 @@ void SlaveInterface::timeout()
// already dead? then slaveDied was emitted and we are done
return;
}
if (d->connection->isConnected()) {
if (d->connection->isOpen()) {
return;
}

View file

@ -34,7 +34,6 @@ class KUrl;
namespace KIO {
class SimpleJob;
class Connection;
// better there is one ...
class SlaveInterfacePrivate;
@ -202,9 +201,6 @@ public:
void ref();
void deref();
void setConnection(Connection *connection);
Connection* connection() const;
// Send our answer to the MSG_RESUME (canResume) request
// (to tell the "put" job whether to resume or not)
void sendResumeAnswer(bool resume);

View file

@ -20,21 +20,103 @@
#define KIO_SLAVEINTERFACEPRIVATE_H
#include "global.h"
#include "connection.h"
#include "kdebug.h"
#include <QtCore/QTimer>
#include <QTimer>
#include <QSocketDevice>
#include <QSocketServer>
#include <QSocketNotifier>
#include <QQueue>
#include <sys/time.h>
static const unsigned int max_nums = 8;
namespace KIO {
struct Task {
int cmd;
QByteArray data;
};
};
static bool kSendCommandNow(QSocketDevice *socket, int cmd, const QByteArray &data)
{
if (data.size() > 0xffffff) {
return false;
}
if (!socket->isOpen()) {
return false;
}
char buffer[11];
::memset(buffer, 0, sizeof(buffer));
::sprintf(buffer, "%6x_%2x_", data.size(), cmd);
socket->write(buffer, 10);
socket->write(data);
return socket->waitForBytesWritten(-1);
}
static bool kSendCommand(QSocketDevice *socket, int cmd, const QByteArray &data,
QQueue<KIO::Task> &queue, const bool suspended)
{
if (suspended || !socket->isOpen()) {
KIO::Task task;
task.cmd = cmd;
task.data = data;
queue.enqueue(task);
return true;
}
while (!queue.isEmpty()) {
const KIO::Task task = queue.dequeue();
if (!kSendCommandNow(socket, task.cmd, task.data)) {
return false;
}
}
return kSendCommandNow(socket, cmd, data);
}
static int kReadCommand(QSocketDevice *socket, int* cmd, QByteArray &data)
{
if (!socket->isOpen()) {
return -1;
}
char buffer[10];
::memset(buffer, 0, sizeof(buffer));
int readresult = socket->read(buffer, 10);
if (readresult != 10) {
kWarning() << "kReadCommand read command failed" << readresult;
return -1;
}
char *p = buffer;
while( *p == ' ' ) p++;
const int len = strtol( p, 0L, 16 );
p = buffer + 7;
while( *p == ' ' ) p++;
*cmd = strtol( p, 0L, 16 );
data.resize(len);
if (len) {
int readtotal = 0;
while (readtotal != len && socket->isOpen()) {
readresult = socket->read(data.data() + readtotal, data.size() - readtotal);
if (readresult < 0) {
if (errno == EAGAIN) {
continue;
}
kWarning() << "kReadCommand read data failed" << readtotal << len << socket->errorString();
return readtotal;
}
readtotal += readresult;
}
}
return data.size();
}
class KIO::SlaveInterfacePrivate
{
public:
SlaveInterfacePrivate(const QString &protocol);
~SlaveInterfacePrivate();
Connection *connection;
QTimer speed_timer;
KIO::filesize_t sizes[max_nums];
@ -50,7 +132,11 @@ public:
QString m_host;
QString m_user;
QString m_passwd;
KIO::ConnectionServer *slaveconnserver;
QSocketDevice *connection;
QSocketServer *slaveconnserver;
QSocketNotifier *slavenotifier;
bool m_suspended;
QQueue<Task> m_tasks;
KIO::SimpleJob *m_job;
pid_t m_pid;
quint16 m_port;