diff options
author | Itay Grudev <itay+github.com@grudev.com> | 2021-06-01 23:59:59 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-01 23:59:59 +0300 |
commit | c557da5d0cb63b8002c1ba99ec18f257620009b1 (patch) | |
tree | 49ea9772668632a316eddea0bc7bed11dc3efb9c | |
parent | Merge pull request #132 from itay-grudev/removed_incorrect_coment_in_basic_ex... (diff) | |
parent | Ensure data sent via SingleApplication::sendMessage is received completely (diff) | |
download | singleapplication-c557da5d0cb63b8002c1ba99ec18f257620009b1.tar.xz |
Merge pull request #133 from njeisecke/master
Fix protocol to solve #121 and #125
-rw-r--r-- | singleapplication.cpp | 5 | ||||
-rw-r--r-- | singleapplication_p.cpp | 93 | ||||
-rw-r--r-- | singleapplication_p.h | 13 |
3 files changed, 78 insertions, 33 deletions
diff --git a/singleapplication.cpp b/singleapplication.cpp index 7e153a0..09e264e 100644 --- a/singleapplication.cpp +++ b/singleapplication.cpp @@ -248,10 +248,7 @@ bool SingleApplication::sendMessage( const QByteArray &message, int timeout ) if( ! d->connectToPrimary( timeout, SingleApplicationPrivate::Reconnect ) ) return false; - d->socket->write( message ); - bool dataWritten = d->socket->waitForBytesWritten( timeout ); - d->socket->flush(); - return dataWritten; + return d->writeConfirmedMessage( timeout, message ); } /** diff --git a/singleapplication_p.cpp b/singleapplication_p.cpp index 27b688f..1339728 100644 --- a/singleapplication_p.cpp +++ b/singleapplication_p.cpp @@ -263,20 +263,46 @@ bool SingleApplicationPrivate::connectToPrimary( int msecs, ConnectionType conne #endif writeStream << checksum; - // The header indicates the message length that follows + return writeConfirmedMessage( static_cast<int>(msecs - time.elapsed()), initMsg ); +} + +void SingleApplicationPrivate::writeAck( QLocalSocket *sock ) { + sock->putChar('\n'); +} + +bool SingleApplicationPrivate::writeConfirmedMessage (int msecs, const QByteArray &msg) +{ + QElapsedTimer time; + time.start(); + + // Frame 1: The header indicates the message length that follows QByteArray header; QDataStream headerStream(&header, QIODevice::WriteOnly); #if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0)) headerStream.setVersion(QDataStream::Qt_5_6); #endif - headerStream << static_cast <quint64>( initMsg.length() ); + headerStream << static_cast <quint64>( msg.length() ); - socket->write( header ); - socket->write( initMsg ); - bool result = socket->waitForBytesWritten( static_cast<int>(msecs - time.elapsed()) ); + if( ! writeConfirmedFrame( static_cast<int>(msecs - time.elapsed()), header )) + return false; + + // Frame 2: The message + return writeConfirmedFrame( static_cast<int>(msecs - time.elapsed()), msg ); +} + +bool SingleApplicationPrivate::writeConfirmedFrame( int msecs, const QByteArray &msg ) +{ + socket->write( msg ); socket->flush(); - return result; + + bool result = socket->waitForReadyRead( msecs ); // await ack byte + if (result) { + socket->read( 1 ); + return true; + } + + return false; } quint16 SingleApplicationPrivate::blockChecksum() const @@ -340,13 +366,16 @@ void SingleApplicationPrivate::slotConnectionEstablished() [nextConnSocket, this](){ auto &info = connectionMap[nextConnSocket]; switch(info.stage){ - case StageHeader: - readInitMessageHeader(nextConnSocket); + case StageInitHeader: + readMessageHeader( nextConnSocket, StageInitBody ); break; - case StageBody: + case StageInitBody: readInitMessageBody(nextConnSocket); break; - case StageConnected: + case StageConnectedHeader: + readMessageHeader( nextConnSocket, StageConnectedBody ); + break; + case StageConnectedBody: this->slotDataAvailable( nextConnSocket, info.instanceId ); break; default: @@ -356,7 +385,7 @@ void SingleApplicationPrivate::slotConnectionEstablished() ); } -void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock ) +void SingleApplicationPrivate::readMessageHeader( QLocalSocket *sock, SingleApplicationPrivate::ConnectionStage nextStage ) { if (!connectionMap.contains( sock )){ return; @@ -376,29 +405,35 @@ void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock ) quint64 msgLen = 0; headerStream >> msgLen; ConnectionInfo &info = connectionMap[sock]; - info.stage = StageBody; + info.stage = nextStage; info.msgLen = msgLen; - if ( sock->bytesAvailable() >= (qint64) msgLen ){ - readInitMessageBody( sock ); - } + writeAck( sock ); } -void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) +bool SingleApplicationPrivate::isFrameComplete( QLocalSocket *sock ) { - Q_Q(SingleApplication); - if (!connectionMap.contains( sock )){ - return; + return false; } ConnectionInfo &info = connectionMap[sock]; if( sock->bytesAvailable() < ( qint64 )info.msgLen ){ - return; + return false; } + return true; +} + +void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) +{ + Q_Q(SingleApplication); + + if( !isFrameComplete( sock ) ) + return; + // Read the message body - QByteArray msgBytes = sock->read(info.msgLen); + QByteArray msgBytes = sock->readAll(); QDataStream readStream(msgBytes); #if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0)) @@ -438,8 +473,9 @@ void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) return; } + ConnectionInfo &info = connectionMap[sock]; info.instanceId = instanceId; - info.stage = StageConnected; + info.stage = StageConnectedHeader; if( connectionType == NewInstance || ( connectionType == SecondaryInstance && @@ -448,15 +484,22 @@ void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) Q_EMIT q->instanceStarted(); } - if (sock->bytesAvailable() > 0){ - this->slotDataAvailable( sock, instanceId ); - } + writeAck( sock ); } void SingleApplicationPrivate::slotDataAvailable( QLocalSocket *dataSocket, quint32 instanceId ) { Q_Q(SingleApplication); + + if ( !isFrameComplete( dataSocket ) ) + return; + Q_EMIT q->receivedMessage( instanceId, dataSocket->readAll() ); + + writeAck( dataSocket ); + + ConnectionInfo &info = connectionMap[dataSocket]; + info.stage = StageConnectedHeader; } void SingleApplicationPrivate::slotClientConnectionClosed( QLocalSocket *closedSocket, quint32 instanceId ) diff --git a/singleapplication_p.h b/singleapplication_p.h index c49a46d..58507cf 100644 --- a/singleapplication_p.h +++ b/singleapplication_p.h @@ -61,9 +61,10 @@ public: Reconnect = 3 }; enum ConnectionStage : quint8 { - StageHeader = 0, - StageBody = 1, - StageConnected = 2, + StageInitHeader = 0, + StageInitBody = 1, + StageConnectedHeader = 2, + StageConnectedBody = 3, }; Q_DECLARE_PUBLIC(SingleApplication) @@ -79,8 +80,12 @@ public: quint16 blockChecksum() const; qint64 primaryPid() const; QString primaryUser() const; - void readInitMessageHeader(QLocalSocket *socket); + bool isFrameComplete(QLocalSocket *sock); + void readMessageHeader(QLocalSocket *socket, ConnectionStage nextStage); void readInitMessageBody(QLocalSocket *socket); + void writeAck(QLocalSocket *sock); + bool writeConfirmedFrame(int msecs, const QByteArray &msg); + bool writeConfirmedMessage(int msecs, const QByteArray &msg); static void randomSleep(); void addAppData(const QString &data); QStringList appData() const; |