Commit 9d4667f0 by Konstantin Käfer

remove last references to ev_

parent c1b8cf36
...@@ -2,12 +2,12 @@ ...@@ -2,12 +2,12 @@
#define NODE_SQLITE3_SRC_ASYNC_H #define NODE_SQLITE3_SRC_ASYNC_H
// Generic ev_async handler. // Generic uv_async handler.
template <class Item, class Parent> class Async { template <class Item, class Parent> class Async {
typedef void (*Callback)(Parent* parent, Item* item); typedef void (*Callback)(Parent* parent, Item* item);
protected: protected:
ev_async watcher; uv_async_t watcher;
pthread_mutex_t mutex; pthread_mutex_t mutex;
std::vector<Item*> data; std::vector<Item*> data;
Callback callback; Callback callback;
...@@ -15,16 +15,15 @@ public: ...@@ -15,16 +15,15 @@ public:
Parent* parent; Parent* parent;
public: public:
inline Async(Parent* parent_, Callback cb_) Async(Parent* parent_, Callback cb_)
: callback(cb_), parent(parent_) { : callback(cb_), parent(parent_) {
watcher.data = this; watcher.data = this;
ev_async_init(&watcher, listener);
ev_async_start(EV_DEFAULT_UC_ &watcher);
pthread_mutex_init(&mutex, NULL); pthread_mutex_init(&mutex, NULL);
uv_async_init(uv_default_loop(), &watcher, listener);
} }
static void listener(EV_P_ ev_async *w, int revents) { static void listener(uv_async_t* handle, int status) {
Async* async = static_cast<Async*>(w->data); Async* async = static_cast<Async*>(handle->data);
std::vector<Item*> rows; std::vector<Item*> rows;
pthread_mutex_lock(&async->mutex); pthread_mutex_lock(&async->mutex);
rows.swap(async->data); rows.swap(async->data);
...@@ -35,7 +34,23 @@ public: ...@@ -35,7 +34,23 @@ public:
} }
} }
inline void add(Item* item) { static void close(uv_handle_t* handle) {
assert(handle != NULL);
assert(handle->data != NULL);
Async* async = static_cast<Async*>(handle->data);
delete async;
handle->data = NULL;
}
void finish() {
// Need to call the listener again to ensure all items have been
// processed. Is this a bug in uv_async? Feels like uv_close
// should handle that.
listener(&watcher, 0);
uv_close((uv_handle_t*)&watcher, close);
}
void add(Item* item) {
// Make sure node runs long enough to deliver the messages. // Make sure node runs long enough to deliver the messages.
uv_ref(uv_default_loop()); uv_ref(uv_default_loop());
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
...@@ -43,19 +58,17 @@ public: ...@@ -43,19 +58,17 @@ public:
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
} }
inline void send() { void send() {
ev_async_send(EV_DEFAULT_ &watcher); uv_async_send(&watcher);
} }
inline void send(Item* item) { void send(Item* item) {
add(item); add(item);
send(); send();
} }
inline ~Async() { ~Async() {
ev_invoke(EV_DEFAULT_UC_ &watcher, ev_async_pending(&watcher));
pthread_mutex_destroy(&mutex); pthread_mutex_destroy(&mutex);
ev_async_stop(EV_DEFAULT_UC_ &watcher);
} }
}; };
......
...@@ -371,7 +371,7 @@ void Database::RegisterTraceCallback(Baton* baton) { ...@@ -371,7 +371,7 @@ void Database::RegisterTraceCallback(Baton* baton) {
else { else {
// Remove it. // Remove it.
sqlite3_trace(db->handle, NULL, NULL); sqlite3_trace(db->handle, NULL, NULL);
delete db->debug_trace; db->debug_trace->finish();
db->debug_trace = NULL; db->debug_trace = NULL;
} }
...@@ -408,7 +408,7 @@ void Database::RegisterProfileCallback(Baton* baton) { ...@@ -408,7 +408,7 @@ void Database::RegisterProfileCallback(Baton* baton) {
else { else {
// Remove it. // Remove it.
sqlite3_profile(db->handle, NULL, NULL); sqlite3_profile(db->handle, NULL, NULL);
delete db->debug_profile; db->debug_profile->finish();
db->debug_profile = NULL; db->debug_profile = NULL;
} }
...@@ -448,7 +448,7 @@ void Database::RegisterUpdateCallback(Baton* baton) { ...@@ -448,7 +448,7 @@ void Database::RegisterUpdateCallback(Baton* baton) {
else { else {
// Remove it. // Remove it.
sqlite3_update_hook(db->handle, NULL, NULL); sqlite3_update_hook(db->handle, NULL, NULL);
delete db->update_event; db->update_event->finish();
db->update_event = NULL; db->update_event = NULL;
} }
...@@ -622,11 +622,11 @@ void Database::Work_AfterLoadExtension(uv_work_t* req) { ...@@ -622,11 +622,11 @@ void Database::Work_AfterLoadExtension(uv_work_t* req) {
void Database::RemoveCallbacks() { void Database::RemoveCallbacks() {
if (debug_trace) { if (debug_trace) {
delete debug_trace; debug_trace->finish();
debug_trace = NULL; debug_trace = NULL;
} }
if (debug_profile) { if (debug_profile) {
delete debug_profile; debug_profile->finish();
debug_profile = NULL; debug_profile = NULL;
} }
} }
...@@ -43,14 +43,16 @@ exports['test scheduling a query with callback after the database was closed'] = ...@@ -43,14 +43,16 @@ exports['test scheduling a query with callback after the database was closed'] =
exports['test running a query after the database was closed'] = function(beforeExit) { exports['test running a query after the database was closed'] = function(beforeExit) {
var error = false; var error = false;
var db = new sqlite3.Database(':memory:'); var db = new sqlite3.Database(':memory:');
db.on('error', function(err) {
error = true;
assert.equal(err.message, "SQLITE_BUSY: unable to close due to unfinalised statements");
});
var stmt = db.prepare("CREATE TABLE foo (id int)"); var stmt = db.prepare("SELECT * FROM sqlite_master", function(err) {
db.close(); if (err) throw err;
stmt.run(); db.close(function(err) {
assert.ok(err);
error = true;
assert.equal(err.message, "SQLITE_BUSY: unable to close due to unfinalised statements");
stmt.run();
});
});
beforeExit(function() { beforeExit(function() {
assert.ok(error); assert.ok(error);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment