From 3b5e0516e4bc94db6e3033a65cfba427b54dfd78 Mon Sep 17 00:00:00 2001 From: APTX Date: Fri, 21 Aug 2009 19:58:31 +0200 Subject: [PATCH] - Make hashing threads work. --- circularbuffer.h | 19 ++++++---- hash.cpp | 50 +++++++++++++++++--------- hash.h | 14 +++++++- hashconsumer.cpp | 74 +++++++++++++++++++++++++-------------- hashconsumer.h | 16 ++++----- hashproducer.cpp | 83 ++++++++++++++++++++++++++++---------------- hashproducer.h | 20 +++++------ mylistaddcommand.cpp | 4 ++- mylistaddcommand.h | 3 ++ 9 files changed, 185 insertions(+), 98 deletions(-) diff --git a/circularbuffer.h b/circularbuffer.h index dd9a9da..1be3bc0 100644 --- a/circularbuffer.h +++ b/circularbuffer.h @@ -17,26 +17,29 @@ public: m_end = false; } - void put(T data, bool last = false) + bool put(T data, bool last = false, int timeout = SEMAPHORE_ACQUIRE_TIMEOUT) { - if (m_end) return; + if (m_end) return false; - free.acquire(); + if (!free.tryAcquire(1, timeout)) + return false; buffer[w] = data; m_end = last; used.release(); w++; w %= SIZE; + return true; } - T get() + bool get(T *data, int timeout = SEMAPHORE_ACQUIRE_TIMEOUT) { - used.acquire(); - T data = buffer[r]; + if (!used.tryAcquire(1, timeout)) + return false; + *data = buffer[r]; free.release(); r++; r %= SIZE; - return data; + return true; } bool end() const @@ -62,6 +65,8 @@ private: int r; int w; bool m_end; + + static const int SEMAPHORE_ACQUIRE_TIMEOUT = 100; }; typedef CircularBuffer Buffer; diff --git a/hash.cpp b/hash.cpp index bd5dc0b..9141bbb 100644 --- a/hash.cpp +++ b/hash.cpp @@ -22,11 +22,13 @@ void Hash::hashFile(const QFileInfo &file) { qDebug() << "Hash::hashFile"; fileQueue.enqueue(file); + totalFileSize += file.size(); if (hashing) return; - emit startHashing(fileQueue.first().absoluteFilePath()); + totalTime.start(); + startHashing(); } void Hash::endHashing(const QByteArray &hash) @@ -34,33 +36,54 @@ void Hash::endHashing(const QByteArray &hash) qDebug() << "Hash::endHashing"; QFileInfo f = fileQueue.dequeue(); + int fileElapsed = fileTime.elapsed(); + + emit fileHashed(f, hash); +qDebug() << "File:" << f.fileName() << "Hash:" << hash << "Time:" << fileElapsed; + + if (!fileQueue.isEmpty()) { - emit startHashing(fileQueue.first().absoluteFilePath()); + startHashing(); } else { hashing = false; + int totalElapsed = totalTime.elapsed(); + emit finished(); +qDebug() << "Total time:" << totalElapsed; + hashedFileSize = totalFileSize = 0; } - emit fileHashed(f, hash); -qDebug() << "FILE" << f.fileName() << "HASH" << hash; + +} + +void Hash::reportProgress(qint64 read, qint64 total) +{ + emit fileProgress((read * 100) / total); + hashedFileSize += fileQueue.first().size() - read; + emit progress((hashedFileSize * 100) / totalFileSize); } +void Hash::startHashing() +{ + QString file = fileQueue.first().absoluteFilePath(); + + fileTime.start(); + + producer->readFile(file); + consumer->hashFile(file); +} void Hash::setUp() { if (producer || consumer || buffer) return; - +qDebug() << "MAIN thread id is: " << QThread::currentThreadId(); buffer = new HashPrivate::Buffer; producer = new HashPrivate::HashProducer(buffer, this); consumer = new HashPrivate::HashConsumer(buffer, this); - connect(this, SIGNAL(startHashing(QString)), consumer, SLOT(hashFile(QString)), Qt::QueuedConnection); - connect(this, SIGNAL(startHashing(QString)), producer, SLOT(readFile(QString)), Qt::QueuedConnection); - connect(consumer, SIGNAL(finishedHashing(QByteArray)), this, SLOT(endHashing(QByteArray)), Qt::QueuedConnection); - - producer->start(); - consumer->start(); + connect(consumer, SIGNAL(finishedHashing(QByteArray)), this, SLOT(endHashing(QByteArray))); + connect(consumer, SIGNAL(progress(qint64,qint64)), this, SLOT(reportProgress(qint64,qint64))); } void Hash::tearDown() @@ -68,11 +91,6 @@ void Hash::tearDown() if (!producer || !consumer || !buffer) return; - producer->stop(); - consumer->stop(); - producer->wait(); - consumer->wait(); - delete producer; delete consumer; delete buffer; diff --git a/hash.h b/hash.h index 38eaf32..b950162 100644 --- a/hash.h +++ b/hash.h @@ -6,6 +6,7 @@ #include #include #include +#include #include "hashproducer.h" #include "hashconsumer.h" @@ -24,13 +25,18 @@ public: void hashFile(const QFileInfo &file); signals: - void startHashing(const QString &file); void fileHashed(const QFileInfo &file, const QByteArray &hash); + void fileProgress(int percent); + void progress(int percent); + void finished(); private slots: void endHashing(const QByteArray &hash); + void reportProgress(qint64 read, qint64 total); + private: + void startHashing(); void setUp(); void tearDown(); @@ -42,6 +48,12 @@ private: QMap hashedFiles; bool hashing; + + qint64 hashedFileSize; + qint64 totalFileSize; + + QTime fileTime; + QTime totalTime; }; } // namesapce AniDBUdpClient diff --git a/hashconsumer.cpp b/hashconsumer.cpp index f5dc40f..b3b1cf2 100644 --- a/hashconsumer.cpp +++ b/hashconsumer.cpp @@ -9,54 +9,76 @@ HashConsumer::HashConsumer(Buffer *buffer, QObject *parent) : QThread(parent) { this->buffer = buffer; hash = new QCryptographicHash(QCryptographicHash::Md4); - connect(this, SIGNAL(startHashing()), this, SLOT(doHash())); + + restart = false; + abort = false; } HashConsumer::~HashConsumer() { + mutex.lock(); + abort = true; + condition.wakeOne(); + mutex.unlock(); + + wait(); delete hash; } void HashConsumer::hashFile(const QString &file) { + QMutexLocker locker(&mutex); qDebug() << "hashFile()"; fileSize = QFileInfo(file).size(); - emit startHashing(); -} - -void HashConsumer::stop() -{ - m_stop = true; - quit(); + if (!isRunning()) + start(); + else + condition.wakeOne(); } void HashConsumer::run() { - exec(); -} +qDebug() << "Starting thread consumer"; +qDebug() << "Thread consumer id is: " << QThread::currentThreadId(); -void HashConsumer::doHash() -{ - while (!buffer->end()) + forever { -qDebug() << "doHash()->while(" << buffer->end() << ")"; - hashSome(); - } - buffer->reset(); - hash->reset(); -} + mutex.lock(); + qint64 totalSize = fileSize; + qint64 read = 0; + mutex.unlock(); -void HashConsumer::hashSome() -{ - QByteArray data = buffer->get(); + while (!(buffer->end() || abort)) + { +// qDebug() << "hash->while(" << buffer->end() << ")"; + QByteArray data; + + while (!(buffer->get(&data) || abort)); - hash->addData(QCryptographicHash::hash(data, QCryptographicHash::Md4)); + hash->addData(QCryptographicHash::hash(data, QCryptographicHash::Md4)); - if (buffer->end()) - emit finishedHashing(hash->result()); + read += data.size(); + emit progress(read, totalSize); + } + bool r = buffer->reset(); +qDebug() << "buffer reset" << r; -qDebug() << "hashSome()"; + if (abort) + return; + + mutex.lock(); + if (!restart) + { + emit finishedHashing(hash->result().toHex()); + condition.wait(&mutex); + } + restart = false; + mutex.unlock(); + + hash->reset(); + } +qDebug() << "Thread consumer is stopping"; } } // namespace HashPrivate diff --git a/hashconsumer.h b/hashconsumer.h index 69c6c0f..c167fc1 100644 --- a/hashconsumer.h +++ b/hashconsumer.h @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include "circularbuffer.h" @@ -22,26 +24,24 @@ public: public slots: void hashFile(const QString &file); - void stop(); - protected: void run(); signals: - void startHashing(); + void progress(qint64 done, qint64 total); void finishedHashing(QByteArray hash); -private slots: - void doHash(); - private: - void hashSome(); - Buffer *buffer; QCryptographicHash *hash; qint64 fileSize; bool m_stop; + bool restart; + bool abort; + + QMutex mutex; + QWaitCondition condition; }; } // namespace HashPrivate diff --git a/hashproducer.cpp b/hashproducer.cpp index d4c4358..a9376a9 100644 --- a/hashproducer.cpp +++ b/hashproducer.cpp @@ -1,5 +1,7 @@ #include "hashproducer.h" +#include + #include namespace AniDBUdpClient { @@ -8,51 +10,74 @@ namespace HashPrivate { HashProducer::HashProducer(Buffer *buffer, QObject *parent) : QThread(parent) { this->buffer = buffer; - connect(this, SIGNAL(startReading()), this, SLOT(doRead())); + restart = false; + abort = false; +} + +HashProducer::~HashProducer() +{ + mutex.lock(); + abort = true; + condition.wakeOne(); + mutex.unlock(); + + wait(); } void HashProducer::readFile(const QString &file) { qDebug() << "readFile"; - this->file.setFileName(file); +qDebug() << "Thread id is: " << QThread::currentThreadId(); - fileSize = file.size(); + QMutexLocker locker(&mutex); - if (!this->file.open(QIODevice::ReadOnly)) - { -qDebug() << "Failed toopen file" << this->file.fileName(); - return; - } + fileName = file; - emit startReading(); -} + if (!isRunning()) + start(); + else + condition.wakeOne(); -void HashProducer::stop() -{ - m_stop = true; - quit(); } void HashProducer::run() { - exec(); -} +qDebug() << "Starting thread producer"; +qDebug() << "Thread producer id is: " << QThread::currentThreadId(); -void HashProducer::doRead() -{ - while (!this->file.atEnd()) + forever { -qDebug() << "doRead->while(" << (!this->file.atEnd()) << ")"; - readSome(); - } - this->file.close(); -} + mutex.lock(); +qDebug() << "Obtaining new file name"; + QFile file(fileName); + mutex.unlock(); -void HashProducer::readSome() -{ - QByteArray data = file.read(ED2K_PART_SIZE); -qDebug() << "readSome"; - buffer->put(data, file.atEnd()); + if (file.exists()) + { + qDebug() << "File exists, opening"; + if (file.open(QIODevice::ReadOnly)) + { + while (!file.atEnd()) + { + if (abort) + return; +// qDebug() << "read->while(" << (!file.atEnd()) << ")"; + QByteArray data = file.read(ED2K_PART_SIZE); + while (!(buffer->put(data, file.atEnd()) || abort)); + } + } + } + + if (abort) + return; + + mutex.lock(); + if (!restart) + condition.wait(&mutex); + restart = false; + mutex.unlock(); + } +qDebug() << "Thread producer is stopping"; } } // namespace HashPrivate diff --git a/hashproducer.h b/hashproducer.h index 0bde203..3b0ddab 100644 --- a/hashproducer.h +++ b/hashproducer.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include "circularbuffer.h" @@ -17,29 +19,27 @@ class HashProducer : public QThread public: HashProducer(Buffer *buffer, QObject *parent = 0); + ~HashProducer(); public slots: void readFile(const QString &file); - void stop(); protected: void run(); -signals: - void startReading(); - void finishedReading(); - -private slots: - void doRead(); - private: - void readSome(); - Buffer *buffer; + + QString fileName; QFile file; qint64 fileSize; bool m_stop; + bool restart; + bool abort; + + QMutex mutex; + QWaitCondition condition; }; } // namespace HashPrivate diff --git a/mylistaddcommand.cpp b/mylistaddcommand.cpp index e11a953..972e017 100644 --- a/mylistaddcommand.cpp +++ b/mylistaddcommand.cpp @@ -128,6 +128,7 @@ qDebug() << "FAILED to read Mylist ID"; void MylistAddCommand::hash() { + t.start(); future = QtConcurrent::run(this, &MylistAddCommand::doHash, m_file); futureWatcher.setFuture(future); } @@ -141,6 +142,7 @@ qDebug() << "WTF?"; } m_ed2k = QByteArray(future); emit hashComplete(); + qDebug() << "Time:" << t.elapsed(); } QByteArray MylistAddCommand::doHash(QString file) @@ -159,7 +161,7 @@ qDebug() << "hash thread init"; { size = f.read(data, ED2K_PART_SIZE); ed2k.addData(QCryptographicHash::hash(QByteArray(data, size), QCryptographicHash::Md4)); -qDebug() << "hashing..."; +//qDebug() << "hashing..."; } f.close(); delete[] data; diff --git a/mylistaddcommand.h b/mylistaddcommand.h index 18556cb..169f202 100644 --- a/mylistaddcommand.h +++ b/mylistaddcommand.h @@ -5,6 +5,7 @@ #include #include +#include namespace AniDBUdpClient { @@ -51,6 +52,8 @@ private: int mylistId; static const qint64 ED2K_PART_SIZE = 9728000; + + QTime t; }; } // namespace AniDBUdpClient -- 2.52.0