diff options
| author | Francis Giraldeau <francis.giraldeau@gmail.com> | 2018-09-10 17:35:38 -0400 | 
|---|---|---|
| committer | Itay Grudev <itay-grudev@users.noreply.github.com> | 2018-09-10 22:35:38 +0100 | 
| commit | 0db27016b0470b989f4fa114ae9dde5aa2a325fa (patch) | |
| tree | d6ef50e1ee112a5e9110f36c28b58ea627f70859 | |
| parent | v3.0.12a Removed custom signal handling. (diff) | |
| download | singleapplication-0db27016b0470b989f4fa114ae9dde5aa2a325fa.tar.xz | |
Async socket processing (#49)3.0.13a
* Process socket events asynchronously
Avoid blocking the event loop using waitForReadyRead(). Instead, process the
initialization in two phases. It was necessary to add a map to keep track of
the state of the initial message processing
Signed-off-by: Francis Giraldeau <francis.giraldeau@nrc-cnrc.gc.ca>
* Fix undefined variable on Windows
The timout variable does not exists in this scope, we can safely remove the
Q_UNUSED.
Signed-off-by: Francis Giraldeau <francis.giraldeau@nrc-cnrc.gc.ca>
* Fix validation logic of initial message
I modified the logic to a positive value, but the modification was incomplete
and caused the initial message to be incorrectly considered as invalid.
Signed-off-by: Francis Giraldeau <francis.giraldeau@nrc-cnrc.gc.ca>
* Use the proper socket when receiving message
The socket variable is a class member, but we really want to use
nextConnSocket when receiving the message in the lambda.
Signed-off-by: Francis Giraldeau <francis.giraldeau@nrc-cnrc.gc.ca>
| -rw-r--r-- | singleapplication_p.cpp | 176 | ||||
| -rw-r--r-- | singleapplication_p.h | 16 | 
2 files changed, 124 insertions, 68 deletions
diff --git a/singleapplication_p.cpp b/singleapplication_p.cpp index 7e58d2b..1edd2b8 100644 --- a/singleapplication_p.cpp +++ b/singleapplication_p.cpp @@ -101,7 +101,6 @@ void SingleApplicationPrivate::genBlockServerName()      // User level block requires a user specific data in the hash      if( options & SingleApplication::Mode::User ) {  #ifdef Q_OS_WIN -        Q_UNUSED(timeout);          wchar_t username [ UNLEN + 1 ];          // Specifies size of the buffer on input          DWORD usernameLength = UNLEN + 1; @@ -253,85 +252,127 @@ qint64 SingleApplicationPrivate::primaryPid()   */  void SingleApplicationPrivate::slotConnectionEstablished()  { -    Q_Q(SingleApplication); -      QLocalSocket *nextConnSocket = server->nextPendingConnection(); +    connectionMap.insert(nextConnSocket, ConnectionInfo()); -    quint32 instanceId = 0; -    ConnectionType connectionType = InvalidConnection; -    if( nextConnSocket->waitForReadyRead( 100 ) ) { -        // read the fields in same order and format as written -        QDataStream headerStream(nextConnSocket); -        headerStream.setVersion( QDataStream::Qt_5_6 ); - -        // Read the header to know the message length -        quint64 msgLen = 0; -        headerStream >> msgLen; - -        if( msgLen >= sizeof( quint16 ) ) { -           // Read the message body -           QByteArray msgBytes = nextConnSocket->read(msgLen); -           QDataStream readStream(msgBytes); -           readStream.setVersion( QDataStream::Qt_5_6 ); - -           // server name -           QByteArray latin1Name; -           readStream >> latin1Name; - -           // connection type -           quint8 connType = InvalidConnection; -           readStream >> connType; -           connectionType = static_cast <ConnectionType>( connType ); - -           // instance id -           readStream >> instanceId; - -           // checksum -           quint16 msgChecksum = 0; -           readStream >> msgChecksum; - -           const quint16 actualChecksum = qChecksum(msgBytes.constData(), static_cast<quint32>(msgBytes.length() - sizeof(quint16))); - -           if (readStream.status() != QDataStream::Ok || QLatin1String(latin1Name) != blockServerName || msgChecksum != actualChecksum) { -             connectionType = InvalidConnection; -           } +    QObject::connect(nextConnSocket, &QLocalSocket::aboutToClose, +        [nextConnSocket, this]() { +            auto &info = connectionMap[nextConnSocket]; +            Q_EMIT this->slotClientConnectionClosed( nextConnSocket, info.instanceId );          } -    } - -    if( connectionType == InvalidConnection ) { -        nextConnSocket->close(); -        delete nextConnSocket; -        return; -    } +    ); -    QObject::connect( -        nextConnSocket, -        &QLocalSocket::aboutToClose, -        this, -        [nextConnSocket, instanceId, this]() { -            Q_EMIT this->slotClientConnectionClosed( nextConnSocket, instanceId ); +    QObject::connect(nextConnSocket, &QLocalSocket::disconnected, +        [nextConnSocket, this](){ +            connectionMap.remove(nextConnSocket); +            nextConnSocket->deleteLater();          }      ); -    QObject::connect( -        nextConnSocket, -        &QLocalSocket::readyRead, -        this, -        [nextConnSocket, instanceId, this]() { -            Q_EMIT this->slotDataAvailable( nextConnSocket, instanceId ); +    QObject::connect(nextConnSocket, &QLocalSocket::readyRead, +        [nextConnSocket, this]() { +            auto &info = connectionMap[nextConnSocket]; +            switch(info.stage) { +            case StageHeader: +                readInitMessageHeader(nextConnSocket); +                break; +            case StageBody: +                readInitMessageBody(nextConnSocket); +                break; +            case StageConnected: +                Q_EMIT this->slotDataAvailable( nextConnSocket, info.instanceId ); +                break; +            default: +                break; +            };          }      ); +} + +void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock ) +{ +    if (!connectionMap.contains( sock )) { +        return; +    } + +    if( sock->bytesAvailable() < ( qint64 )sizeof( quint64 ) ) { +        return; +    } + +    QDataStream headerStream( sock ); +    headerStream.setVersion( QDataStream::Qt_5_6 ); + +    // Read the header to know the message length +    quint64 msgLen = 0; +    headerStream >> msgLen; +    ConnectionInfo &info = connectionMap[sock]; +    info.stage = StageBody; +    info.msgLen = msgLen; + +    if ( sock->bytesAvailable() >= (qint64) msgLen ) { +        readInitMessageBody( sock ); +    } +} + +void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) +{ +    Q_Q(SingleApplication); + +    if (!connectionMap.contains( sock )) { +        return; +    } + +    ConnectionInfo &info = connectionMap[sock]; +    if( sock->bytesAvailable() < ( qint64 )info.msgLen ) { +        return; +    } + +    // Read the message body +    QByteArray msgBytes = sock->read(info.msgLen); +    QDataStream readStream(msgBytes); +    readStream.setVersion( QDataStream::Qt_5_6 ); + +    // server name +    QByteArray latin1Name; +    readStream >> latin1Name; + +    // connection type +    ConnectionType connectionType = InvalidConnection; +    quint8 connTypeVal = InvalidConnection; +    readStream >> connTypeVal; +    connectionType = static_cast <ConnectionType>( connTypeVal ); + +    // instance id +    quint32 instanceId = 0; +    readStream >> instanceId; + +    // checksum +    quint16 msgChecksum = 0; +    readStream >> msgChecksum; + +    const quint16 actualChecksum = qChecksum( msgBytes.constData(), static_cast<quint32>( msgBytes.length() - sizeof( quint16 ) ) ); + +    bool isValid = readStream.status() == QDataStream::Ok && +                   QLatin1String(latin1Name) == blockServerName && +                   msgChecksum == actualChecksum; + +    if( !isValid ) { +        sock->close(); +        return; +    } + +    info.instanceId = instanceId; +    info.stage = StageConnected; -    if( connectionType == NewInstance || ( -            connectionType == SecondaryInstance && -            options & SingleApplication::Mode::SecondaryNotification -        ) -    ) { +    if( connectionType == NewInstance || +        ( connectionType == SecondaryInstance && +          options & SingleApplication::Mode::SecondaryNotification ) ) +    {          Q_EMIT q->instanceStarted();      } -    if( nextConnSocket->bytesAvailable() > 0 ) { -        Q_EMIT this->slotDataAvailable( nextConnSocket, instanceId ); +    if (sock->bytesAvailable() > 0) { +        Q_EMIT this->slotDataAvailable( sock, instanceId );      }  } @@ -345,5 +386,4 @@ void SingleApplicationPrivate::slotClientConnectionClosed( QLocalSocket *closedS  {      if( closedSocket->bytesAvailable() > 0 )          Q_EMIT slotDataAvailable( closedSocket, instanceId  ); -    closedSocket->deleteLater();  } diff --git a/singleapplication_p.h b/singleapplication_p.h index bbe6751..e2c361f 100644 --- a/singleapplication_p.h +++ b/singleapplication_p.h @@ -44,6 +44,14 @@ struct InstancesInfo {      quint16 checksum;  }; +struct ConnectionInfo { +    explicit ConnectionInfo() : +        msgLen(0), instanceId(0), stage(0) {} +    qint64 msgLen; +    quint32 instanceId; +    quint8 stage; +}; +  class SingleApplicationPrivate : public QObject {  Q_OBJECT  public: @@ -53,6 +61,11 @@ public:          SecondaryInstance = 2,          Reconnect = 3      }; +    enum ConnectionStage : quint8 { +        StageHeader = 0, +        StageBody = 1, +        StageConnected = 2, +    };      Q_DECLARE_PUBLIC(SingleApplication)      SingleApplicationPrivate( SingleApplication *q_ptr ); @@ -65,6 +78,8 @@ public:      void connectToPrimary(int msecs, ConnectionType connectionType );      quint16 blockChecksum();      qint64 primaryPid(); +    void readInitMessageHeader(QLocalSocket *socket); +    void readInitMessageBody(QLocalSocket *socket);      SingleApplication *q_ptr;      QSharedMemory *memory; @@ -73,6 +88,7 @@ public:      quint32 instanceNumber;      QString blockServerName;      SingleApplication::Options options; +    QMap<QLocalSocket*, ConnectionInfo> connectionMap;  public Q_SLOTS:      void slotConnectionEstablished();  | 
