Code:qxrpcpeer.cpp

From qtnode

Jump to: navigation, search
#include "qxrpcpeer.h"
#include <QMap>
#include <QApplication>
#include <QDebug>
#include <QMetaMethod>
#include <QTcpSocket>
#include <QTcpServer>

class QxIntrospector : public QObject {
    // This class MANUALLY implements the necessary parts of QObject.
    // Do NOT add the Q_OBJECT macro. As this class isn't intended
    // for direct use, it doesn't offer any sort of useful meta-object.
public:
    QxIntrospector(QxRPCPeer* parent, QObject* source, const char* signal);

    int qt_metacall(QMetaObject::Call _c, int _id, void **_a);

    QString rpcFunction;

private:
    QxRPCPeer* peer;
    QList<int> argTypes;
};

struct QxRPCConnection {
    QTcpSocket* socket;
    QByteArray buffer;
    QString lastMethod;
};

QxRPCPeer::QxRPCPeer(QObject* parent) : QObject(parent) {
    m_rpctype = Peer;
    m_server = new QTcpServer(this);
    m_peer = new QTcpSocket(this);

    QObject::connect(m_peer, SIGNAL(connected()), this, SIGNAL(peerConnected()));
    QObject::connect(m_peer, SIGNAL(disconnected()), this, SIGNAL(peerDisconnected()));
    QObject::connect(m_peer, SIGNAL(disconnected()), this, SLOT(disconnectSender()));
    QObject::connect(m_peer, SIGNAL(readyRead()), this, SLOT(dataAvailable()));
    QObject::connect(m_peer, SIGNAL(error(QAbstractSocket::SocketError)), this, SIGNAL(peerError(QAbstractSocket::SocketError)));
    QObject::connect(m_server, SIGNAL(newConnection()), this, SLOT(newConnection()));
}

void QxRPCPeer::setRPCType(RPCTypes type) {
    if(m_peer->state() != QAbstractSocket::UnconnectedState || m_server->isListening()) {
        qDebug() << "QxRPCPeer: Cannot change RPC types while connected or listening";
        return;
    }
    m_rpctype = type;
}

QxRPCPeer::RPCTypes QxRPCPeer::rpcType() {
    return m_rpctype;
}

void QxRPCPeer::connect(QHostAddress addr, int port) {
    if(m_rpctype == Server) {
        qDebug() << "QxRPCPeer: Cannot connect outward in Server mode";
        return;
    } else if(m_peer->state()!=QAbstractSocket::UnconnectedState) {
        qDebug() << "QxRPCPeer: Already connected";
        return;
    }
    m_peer->connectToHost(addr, port);
}

bool QxRPCPeer::listen(QHostAddress iface, int port) {
    if(m_rpctype == Client) {
        qDebug() << "QxRPCPeer: Cannot listen in Client mode";
        return false;
    } else if(m_rpctype == Peer && m_peer->state()!=QAbstractSocket::UnconnectedState) {
        qDebug() << "QxRPCPeer: Cannot listen while connected to a peer";
        return false;
    } else if(m_server->isListening()) {
        qDebug() << "QxRPCPeer: Already listening";
        return false;
    }
    return m_server->listen(iface, port);
}

void QxRPCPeer::disconnectPeer(int id) {
    if(m_rpctype == Server && id==0) {
        qDebug() << "QxRPCPeer: Server mode does not have a peer";
        return;
    } else if(m_rpctype != Server && id!=0) {
        qDebug() << "QxRPCPeer: Must specify a client ID to disconnect";
        return;
    }
    QxRPCConnection* conn;
    if(id==0) {
        m_peer->disconnectFromHost();
    } else if((conn = m_clients.take((QObject*)(id))) != 0) {
        conn->socket->disconnectFromHost();
        conn->socket->deleteLater();
        delete conn;
    } else {
        qDebug() << "QxRPCPeer: no client with id " << id;
    }
}

void QxRPCPeer::disconnectAll() {
    if(m_rpctype != Server)
        disconnectPeer();
    else {
        for(ConnHash::const_iterator i = m_clients.constBegin(); i != m_clients.constEnd(); i++) {
            (*i)->socket->deleteLater();
            delete *i;
        }
        m_clients.clear();
    }
}

void QxRPCPeer::stopListening() {
    if(!m_server->isListening()) {
        qDebug() << "QxRPCPeer: Not listening";
        return;
    }
    m_server->close();
}

void QxRPCPeer::attachSignal(QObject* sender, const char* signal, QString rpcFunction) {
    QByteArray sig(signal);
    if(rpcFunction=="") rpcFunction = sig.mid(1,sig.indexOf('(')-1);
    QxIntrospector* spec = new QxIntrospector(this, sender, signal);
    spec->rpcFunction = rpcFunction;
    attachedSignals.insertMulti(sender, spec);
}

void QxRPCPeer::attachSlot(QString rpcFunction, QObject* recv, const char* slot) {
    attachedSlots[rpcFunction].append(QPair<QObject*, int>(recv, recv->metaObject()->indexOfMethod(recv->metaObject()->normalizedSignature(slot).mid(1))));
}

void QxRPCPeer::detachSender() {
    detachObject(sender());
}

void QxRPCPeer::detachObject(QObject* obj) {
    foreach(QxIntrospector* i, attachedSignals.values(obj)) i->deleteLater();
    attachedSignals.remove(obj);
    foreach(QString slot, attachedSlots.keys()) {
        for(QList<QPair<QObject*, int> >::iterator i(attachedSlots[slot].begin());
                i != attachedSlots[slot].end(); ) {
            if((*i).first == obj)
                i = attachedSlots[slot].erase(i);
            else
                i++;
        }
    }
}

QByteArray QxRPCPeer::serialize(QString fn, QVariant p1, QVariant p2, QVariant p3, QVariant p4, QVariant p5, QVariant p6, QVariant p7, QVariant p8, QVariant p9) {
    QByteArray rv;
    QDataStream str(&rv, QIODevice::WriteOnly);
    str << fn;
    unsigned char ct = 9;
    if(p1.isNull()) ct = 0;
    else if(p2.isNull()) ct = 1;
    else if(p3.isNull()) ct = 2;
    else if(p4.isNull()) ct = 3;
    else if(p5.isNull()) ct = 4;
    else if(p6.isNull()) ct = 5;
    else if(p7.isNull()) ct = 6;
    else if(p8.isNull()) ct = 7;
    else if(p9.isNull()) ct = 8;
    str << ct;
    if(ct--) str << p1;
    if(ct--) str << p2;
    if(ct--) str << p3;
    if(ct--) str << p4;
    if(ct--) str << p5;
    if(ct--) str << p6;
    if(ct--) str << p7;
    if(ct--) str << p8;
    if(ct--) str << p9;
    rv.replace(QByteArray("\\"), QByteArray("\\\\"));
    rv.replace(QByteArray("\n"), QByteArray("\\n"));
    rv.append("\n");
    return rv;
}

void QxRPCPeer::call(QString fn, QVariant p1, QVariant p2, QVariant p3, QVariant p4, QVariant p5, QVariant p6, QVariant p7, QVariant p8, QVariant p9) {
    if(m_peer->state() != QAbstractSocket::ConnectedState) return;
    m_peer->write(serialize(fn, p1, p2, p3, p4, p5, p6, p7, p8, p9));
}

void QxRPCPeer::callClient(int id, QString fn, QVariant p1, QVariant p2, QVariant p3, QVariant p4, QVariant p5, QVariant p6, QVariant p7, QVariant p8) {
    QxRPCConnection* conn = m_clients.value((QObject*)(id));
    if(!conn) {
        qDebug() << "QxRPCPeer: no client with id" << id;
        return;
    }
    conn->socket->write(serialize(fn, p1, p2, p3, p4, p5, p6, p7, p8, QVariant()));
}

#define QX_ARG(i) ((numParams>i)?QGenericArgument(p ## i .typeName(), p ## i .constData()):QGenericArgument())
void QxRPCPeer::receivePeerSignal(QString fn, QVariant p0, QVariant p1, QVariant p2, QVariant p3, QVariant p4, QVariant p5, QVariant p6, QVariant p7, QVariant p8) {
    QByteArray sig;
    int numParams;
    foreach(QxRPCPeer::MethodID i, attachedSlots.value(fn)) {
        sig = i.first->metaObject()->method(i.second).signature();
        sig = sig.left(sig.indexOf('('));
        numParams = i.first->metaObject()->method(i.second).parameterTypes().count();
        QMetaObject::invokeMethod(i.first, sig, QX_ARG(0), QX_ARG(1), QX_ARG(2), QX_ARG(3), QX_ARG(4), QX_ARG(5), QX_ARG(6), QX_ARG(7), QX_ARG(8));
    }
}

void QxRPCPeer::receiveClientSignal(int id, QString fn, QVariant p0, QVariant p1, QVariant p2, QVariant p3, QVariant p4, QVariant p5, QVariant p6, QVariant p7) {
    QByteArray sig;
    int numParams;
    foreach(QxRPCPeer::MethodID i, attachedSlots.value(fn)) {
        sig = i.first->metaObject()->method(i.second).signature();
        sig = sig.left(sig.indexOf('('));
        numParams = i.first->metaObject()->method(i.second).parameterTypes().count();
        QMetaObject::invokeMethod(i.first, sig, Q_ARG(int, id), QX_ARG(0), QX_ARG(1), QX_ARG(2), QX_ARG(3), QX_ARG(4), QX_ARG(5), QX_ARG(6), QX_ARG(7));
    }
}
#undef QX_ARG

void QxRPCPeer::newConnection() {
    QTcpSocket* next = m_server->nextPendingConnection();
    if(m_rpctype == Peer) {
        if(m_peer->state() != QAbstractSocket::UnconnectedState) {
            qDebug() << "QxRPCPeer: Rejected connection from " << next->peerAddress().toString() << "; another peer is connected";
            next->disconnectFromHost();
            next->deleteLater();
        } else {
            m_peer->deleteLater();
            m_peer = next;
            QObject::connect(m_peer, SIGNAL(connected()), this, SIGNAL(peerConnected()));
            QObject::connect(m_peer, SIGNAL(disconnected()), this, SIGNAL(peerDisconnected()));
            QObject::connect(m_peer, SIGNAL(disconnected()), this, SLOT(disconnectSender()));
            QObject::connect(m_peer, SIGNAL(readyRead()), this, SLOT(dataAvailable()));
            QObject::connect(m_peer, SIGNAL(error(QAbstractSocket::SocketError)), this, SIGNAL(peerError(QAbstractSocket::SocketError)));
            emit peerConnected();
        }
    } else {
        QxRPCConnection* conn = new QxRPCConnection;
        conn->socket = next;
        m_clients[next] = conn;
        QObject::connect(next, SIGNAL(disconnected()), this, SLOT(disconnectSender()));
        QObject::connect(next, SIGNAL(readyRead()), this, SLOT(dataAvailable()));
        QObject::connect(next, SIGNAL(error(QAbstractSocket::SocketError)), this, SIGNAL(peerError(QAbstractSocket::SocketError)));
        emit clientConnected((int)(next));
    }
}

void QxRPCPeer::dataAvailable() {
    if(m_rpctype!=Server && m_peer==sender()) {
        m_buffer.append(m_peer->readAll());
        processInput(m_peer, m_buffer);
        return;
    } else {
        QxRPCConnection* conn = m_clients.value(sender());
        if(!conn) {
            qDebug() << "QxRPCPeer: Unrecognized client object connected to dataAvailable";
            return;
        }
        conn->buffer.append(conn->socket->readAll());
        processInput(conn->socket, (conn->buffer));
        return;
    }
    qDebug() << "QxRPCPeer: Unrecognized peer object connected to dataAvailable";
}

void QxRPCPeer::disconnectSender() {
    QxRPCConnection* conn = m_clients.value(sender());
    if(!conn) {
        if(m_peer != qobject_cast<QTcpSocket*>(sender())) {
            qDebug() << "QxRPCPeer: Unrecognized object connected to disconnectSender";
            return;
        }
        m_buffer.append(m_peer->readAll());
        m_buffer.append("\n");
        processInput(m_peer, m_buffer);
        m_buffer.clear();
        emit clientDisconnected((int)(sender()));
        return;
    }
    conn->buffer.append(conn->socket->readAll());
    conn->buffer.append("\n");
    processInput(conn->socket, conn->buffer);
    conn->socket->deleteLater();
    delete conn;
    m_clients.remove(sender());
}

void QxRPCPeer::processInput(QTcpSocket* socket, QByteArray& buffer) {
    int pos;
    QByteArray cmd;
    while((pos = buffer.indexOf('\n')) != -1) {
        cmd = buffer.left(pos-1);
        buffer = buffer.mid(pos+1);
        if(cmd.length()==0) continue;
        cmd.replace(QByteArray("\\n"), QByteArray("\n"));
        cmd.replace(QByteArray("\\\\"), QByteArray("\\"));
        QDataStream str(cmd);
        QString signal;
        unsigned char argCount;
        QVariant v[9];
        str >> signal >> argCount;
        for(int i=0; i<argCount; i++) str >> v[i];
        if(socket == m_peer) {
            receivePeerSignal(signal, v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7], v[8]);
        } else {
            receiveClientSignal((unsigned int)(socket), signal, v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7]);
        }
    }
}

QxIntrospector::QxIntrospector(QxRPCPeer* parent, QObject* source, const char* signal) : QObject(parent) {
    peer = parent;
    int idx = source->metaObject()->indexOfSignal(QByteArray(signal).mid(1).constData());
    // Our "method" will have the first ID not used by the superclass.
    QMetaObject::connect(source, idx, this, QObject::staticMetaObject.methodCount());
    QObject::connect(source, SIGNAL(destroyed()), peer, SLOT(detachSender()));
    QList<QByteArray> p = source->metaObject()->method(idx).parameterTypes();
    int ct = p.count();
    for(int i=0; i<ct; i++) argTypes.append(QMetaType::type(p.value(i).constData()));
}

int QxIntrospector::qt_metacall(QMetaObject::Call _c, int _id, void **_a) {
    _id = QObject::qt_metacall(_c, _id, _a);
    if (_id < 0)
        return _id;
    if (_c == QMetaObject::InvokeMetaMethod) {
        if(_id==0) {
            QVariant v[9];
            int n = argTypes.size();
            for(int i=0; i<n; i++) v[i] = QVariant(argTypes[i], _a[i+1]);
            peer->call(rpcFunction, v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7], v[8]);
        }
        _id -= 1;
    }
    return _id;
}
Personal tools