aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorItay Grudev <itay+github.com@grudev.com>2021-06-01 23:59:59 +0300
committerGitHub <noreply@github.com>2021-06-01 23:59:59 +0300
commitc557da5d0cb63b8002c1ba99ec18f257620009b1 (patch)
tree49ea9772668632a316eddea0bc7bed11dc3efb9c
parentMerge pull request #132 from itay-grudev/removed_incorrect_coment_in_basic_ex... (diff)
parentEnsure data sent via SingleApplication::sendMessage is received completely (diff)
downloadsingleapplication-c557da5d0cb63b8002c1ba99ec18f257620009b1.tar.xz
Merge pull request #133 from njeisecke/master
Fix protocol to solve #121 and #125
-rw-r--r--singleapplication.cpp5
-rw-r--r--singleapplication_p.cpp93
-rw-r--r--singleapplication_p.h13
3 files changed, 78 insertions, 33 deletions
diff --git a/singleapplication.cpp b/singleapplication.cpp
index 7e153a0..09e264e 100644
--- a/singleapplication.cpp
+++ b/singleapplication.cpp
@@ -248,10 +248,7 @@ bool SingleApplication::sendMessage( const QByteArray &message, int timeout )
if( ! d->connectToPrimary( timeout, SingleApplicationPrivate::Reconnect ) )
return false;
- d->socket->write( message );
- bool dataWritten = d->socket->waitForBytesWritten( timeout );
- d->socket->flush();
- return dataWritten;
+ return d->writeConfirmedMessage( timeout, message );
}
/**
diff --git a/singleapplication_p.cpp b/singleapplication_p.cpp
index 27b688f..1339728 100644
--- a/singleapplication_p.cpp
+++ b/singleapplication_p.cpp
@@ -263,20 +263,46 @@ bool SingleApplicationPrivate::connectToPrimary( int msecs, ConnectionType conne
#endif
writeStream << checksum;
- // The header indicates the message length that follows
+ return writeConfirmedMessage( static_cast<int>(msecs - time.elapsed()), initMsg );
+}
+
+void SingleApplicationPrivate::writeAck( QLocalSocket *sock ) {
+ sock->putChar('\n');
+}
+
+bool SingleApplicationPrivate::writeConfirmedMessage (int msecs, const QByteArray &msg)
+{
+ QElapsedTimer time;
+ time.start();
+
+ // Frame 1: The header indicates the message length that follows
QByteArray header;
QDataStream headerStream(&header, QIODevice::WriteOnly);
#if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0))
headerStream.setVersion(QDataStream::Qt_5_6);
#endif
- headerStream << static_cast <quint64>( initMsg.length() );
+ headerStream << static_cast <quint64>( msg.length() );
- socket->write( header );
- socket->write( initMsg );
- bool result = socket->waitForBytesWritten( static_cast<int>(msecs - time.elapsed()) );
+ if( ! writeConfirmedFrame( static_cast<int>(msecs - time.elapsed()), header ))
+ return false;
+
+ // Frame 2: The message
+ return writeConfirmedFrame( static_cast<int>(msecs - time.elapsed()), msg );
+}
+
+bool SingleApplicationPrivate::writeConfirmedFrame( int msecs, const QByteArray &msg )
+{
+ socket->write( msg );
socket->flush();
- return result;
+
+ bool result = socket->waitForReadyRead( msecs ); // await ack byte
+ if (result) {
+ socket->read( 1 );
+ return true;
+ }
+
+ return false;
}
quint16 SingleApplicationPrivate::blockChecksum() const
@@ -340,13 +366,16 @@ void SingleApplicationPrivate::slotConnectionEstablished()
[nextConnSocket, this](){
auto &info = connectionMap[nextConnSocket];
switch(info.stage){
- case StageHeader:
- readInitMessageHeader(nextConnSocket);
+ case StageInitHeader:
+ readMessageHeader( nextConnSocket, StageInitBody );
break;
- case StageBody:
+ case StageInitBody:
readInitMessageBody(nextConnSocket);
break;
- case StageConnected:
+ case StageConnectedHeader:
+ readMessageHeader( nextConnSocket, StageConnectedBody );
+ break;
+ case StageConnectedBody:
this->slotDataAvailable( nextConnSocket, info.instanceId );
break;
default:
@@ -356,7 +385,7 @@ void SingleApplicationPrivate::slotConnectionEstablished()
);
}
-void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock )
+void SingleApplicationPrivate::readMessageHeader( QLocalSocket *sock, SingleApplicationPrivate::ConnectionStage nextStage )
{
if (!connectionMap.contains( sock )){
return;
@@ -376,29 +405,35 @@ void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock )
quint64 msgLen = 0;
headerStream >> msgLen;
ConnectionInfo &info = connectionMap[sock];
- info.stage = StageBody;
+ info.stage = nextStage;
info.msgLen = msgLen;
- if ( sock->bytesAvailable() >= (qint64) msgLen ){
- readInitMessageBody( sock );
- }
+ writeAck( sock );
}
-void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock )
+bool SingleApplicationPrivate::isFrameComplete( QLocalSocket *sock )
{
- Q_Q(SingleApplication);
-
if (!connectionMap.contains( sock )){
- return;
+ return false;
}
ConnectionInfo &info = connectionMap[sock];
if( sock->bytesAvailable() < ( qint64 )info.msgLen ){
- return;
+ return false;
}
+ return true;
+}
+
+void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock )
+{
+ Q_Q(SingleApplication);
+
+ if( !isFrameComplete( sock ) )
+ return;
+
// Read the message body
- QByteArray msgBytes = sock->read(info.msgLen);
+ QByteArray msgBytes = sock->readAll();
QDataStream readStream(msgBytes);
#if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0))
@@ -438,8 +473,9 @@ void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock )
return;
}
+ ConnectionInfo &info = connectionMap[sock];
info.instanceId = instanceId;
- info.stage = StageConnected;
+ info.stage = StageConnectedHeader;
if( connectionType == NewInstance ||
( connectionType == SecondaryInstance &&
@@ -448,15 +484,22 @@ void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock )
Q_EMIT q->instanceStarted();
}
- if (sock->bytesAvailable() > 0){
- this->slotDataAvailable( sock, instanceId );
- }
+ writeAck( sock );
}
void SingleApplicationPrivate::slotDataAvailable( QLocalSocket *dataSocket, quint32 instanceId )
{
Q_Q(SingleApplication);
+
+ if ( !isFrameComplete( dataSocket ) )
+ return;
+
Q_EMIT q->receivedMessage( instanceId, dataSocket->readAll() );
+
+ writeAck( dataSocket );
+
+ ConnectionInfo &info = connectionMap[dataSocket];
+ info.stage = StageConnectedHeader;
}
void SingleApplicationPrivate::slotClientConnectionClosed( QLocalSocket *closedSocket, quint32 instanceId )
diff --git a/singleapplication_p.h b/singleapplication_p.h
index c49a46d..58507cf 100644
--- a/singleapplication_p.h
+++ b/singleapplication_p.h
@@ -61,9 +61,10 @@ public:
Reconnect = 3
};
enum ConnectionStage : quint8 {
- StageHeader = 0,
- StageBody = 1,
- StageConnected = 2,
+ StageInitHeader = 0,
+ StageInitBody = 1,
+ StageConnectedHeader = 2,
+ StageConnectedBody = 3,
};
Q_DECLARE_PUBLIC(SingleApplication)
@@ -79,8 +80,12 @@ public:
quint16 blockChecksum() const;
qint64 primaryPid() const;
QString primaryUser() const;
- void readInitMessageHeader(QLocalSocket *socket);
+ bool isFrameComplete(QLocalSocket *sock);
+ void readMessageHeader(QLocalSocket *socket, ConnectionStage nextStage);
void readInitMessageBody(QLocalSocket *socket);
+ void writeAck(QLocalSocket *sock);
+ bool writeConfirmedFrame(int msecs, const QByteArray &msg);
+ bool writeConfirmedMessage(int msecs, const QByteArray &msg);
static void randomSleep();
void addAppData(const QString &data);
QStringList appData() const;