Commit 357f57bb by Dane Springmeyer

enable usage of boost threads instead of direct posix/pthreads usage - all tests…

enable usage of boost threads instead of direct posix/pthreads usage - all tests are passing on OS X 10.7 with boost 1.47 - todo: test on linux - refs #48
parent 9ee3c909
#ifndef NODE_SQLITE3_SRC_ASYNC_H #ifndef NODE_SQLITE3_SRC_ASYNC_H
#define NODE_SQLITE3_SRC_ASYNC_H #define NODE_SQLITE3_SRC_ASYNC_H
#include "threading.h"
#if defined(NODE_SQLITE3_BOOST_THREADING)
#include <boost/thread/mutex.hpp>
#endif
// Generic ev_async handler. // Generic ev_async handler.
template <class Item, class Parent> class Async { template <class Item, class Parent> class Async {
...@@ -8,7 +14,7 @@ template <class Item, class Parent> class Async { ...@@ -8,7 +14,7 @@ template <class Item, class Parent> class Async {
protected: protected:
ev_async watcher; ev_async watcher;
pthread_mutex_t mutex; NODE_SQLITE3_MUTEX_t
std::vector<Item*> data; std::vector<Item*> data;
Callback callback; Callback callback;
public: public:
...@@ -16,19 +22,19 @@ public: ...@@ -16,19 +22,19 @@ public:
public: public:
inline Async(Parent* parent_, Callback cb_) inline Async(Parent* parent_, Callback cb_)
: callback(cb_), parent(parent_) { : mutex(),callback(cb_), parent(parent_) {
watcher.data = this; watcher.data = this;
ev_async_init(&watcher, listener); ev_async_init(&watcher, listener);
ev_async_start(EV_DEFAULT_UC_ &watcher); ev_async_start(EV_DEFAULT_UC_ &watcher);
pthread_mutex_init(&mutex, NULL); NODE_SQLITE3_MUTEX_INIT
} }
static void listener(EV_P_ ev_async *w, int revents) { static void listener(EV_P_ ev_async *w, int revents) {
Async* async = static_cast<Async*>(w->data); Async* async = static_cast<Async*>(w->data);
std::vector<Item*> rows; std::vector<Item*> rows;
pthread_mutex_lock(&async->mutex); NODE_SQLITE3_MUTEX_LOCK(&async->mutex)
rows.swap(async->data); rows.swap(async->data);
pthread_mutex_unlock(&async->mutex); NODE_SQLITE3_MUTEX_UNLOCK(&async->mutex)
for (unsigned int i = 0, size = rows.size(); i < size; i++) { for (unsigned int i = 0, size = rows.size(); i < size; i++) {
ev_unref(EV_DEFAULT_UC); ev_unref(EV_DEFAULT_UC);
async->callback(async->parent, rows[i]); async->callback(async->parent, rows[i]);
...@@ -38,9 +44,9 @@ public: ...@@ -38,9 +44,9 @@ public:
inline void add(Item* item) { inline void add(Item* item) {
// Make sure node runs long enough to deliver the messages. // Make sure node runs long enough to deliver the messages.
ev_ref(EV_DEFAULT_UC); ev_ref(EV_DEFAULT_UC);
pthread_mutex_lock(&mutex); NODE_SQLITE3_MUTEX_LOCK(&mutex)
data.push_back(item); data.push_back(item);
pthread_mutex_unlock(&mutex); NODE_SQLITE3_MUTEX_UNLOCK(&mutex)
} }
inline void send() { inline void send() {
...@@ -54,7 +60,7 @@ public: ...@@ -54,7 +60,7 @@ public:
inline ~Async() { inline ~Async() {
ev_invoke(EV_DEFAULT_UC_ &watcher, ev_async_pending(&watcher)); ev_invoke(EV_DEFAULT_UC_ &watcher, ev_async_pending(&watcher));
pthread_mutex_destroy(&mutex); NODE_SQLITE3_MUTEX_DESTROY
ev_async_stop(EV_DEFAULT_UC_ &watcher); ev_async_stop(EV_DEFAULT_UC_ &watcher);
} }
}; };
......
...@@ -636,11 +636,10 @@ int Statement::EIO_Each(eio_req *req) { ...@@ -636,11 +636,10 @@ int Statement::EIO_Each(eio_req *req) {
sqlite3_mutex_leave(mtx); sqlite3_mutex_leave(mtx);
Row* row = new Row(); Row* row = new Row();
GetRow(row, stmt->handle); GetRow(row, stmt->handle);
NODE_SQLITE3_MUTEX_LOCK(&async->mutex)
pthread_mutex_lock(&async->mutex);
async->data.push_back(row); async->data.push_back(row);
retrieved++; retrieved++;
pthread_mutex_unlock(&async->mutex); NODE_SQLITE3_MUTEX_UNLOCK(&async->mutex)
ev_async_send(EV_DEFAULT_ &async->watcher); ev_async_send(EV_DEFAULT_ &async->watcher);
} }
...@@ -667,9 +666,9 @@ void Statement::AsyncEach(EV_P_ ev_async *w, int revents) { ...@@ -667,9 +666,9 @@ void Statement::AsyncEach(EV_P_ ev_async *w, int revents) {
while (true) { while (true) {
// Get the contents out of the data cache for us to process in the JS callback. // Get the contents out of the data cache for us to process in the JS callback.
Rows rows; Rows rows;
pthread_mutex_lock(&async->mutex); NODE_SQLITE3_MUTEX_LOCK(&async->mutex)
rows.swap(async->data); rows.swap(async->data);
pthread_mutex_unlock(&async->mutex); NODE_SQLITE3_MUTEX_UNLOCK(&async->mutex)
if (rows.empty()) { if (rows.empty()) {
break; break;
......
...@@ -156,7 +156,7 @@ public: ...@@ -156,7 +156,7 @@ public:
ev_async watcher; ev_async watcher;
Statement* stmt; Statement* stmt;
Rows data; Rows data;
pthread_mutex_t mutex; NODE_SQLITE3_MUTEX_t;
Persistent<Function> callback; Persistent<Function> callback;
bool completed; bool completed;
int retrieved; int retrieved;
...@@ -171,14 +171,14 @@ public: ...@@ -171,14 +171,14 @@ public:
callback = Persistent<Function>::New(cb); callback = Persistent<Function>::New(cb);
completed_callback = Persistent<Function>::New(completed_cb); completed_callback = Persistent<Function>::New(completed_cb);
stmt->Ref(); stmt->Ref();
pthread_mutex_init(&mutex, NULL); NODE_SQLITE3_MUTEX_INIT
} }
~Async() { ~Async() {
callback.Dispose(); callback.Dispose();
completed_callback.Dispose(); completed_callback.Dispose();
stmt->Unref(); stmt->Unref();
pthread_mutex_destroy(&mutex); NODE_SQLITE3_MUTEX_DESTROY
ev_async_stop(EV_DEFAULT_UC_ &watcher); ev_async_stop(EV_DEFAULT_UC_ &watcher);
} }
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment