Commit e7a556fb by Konstantin Käfer

clean up async code

parent 296fdfc3
#ifndef NODE_SQLITE3_SRC_ASYNC_H
#define NODE_SQLITE3_SRC_ASYNC_H
// Generic ev_async handler.
template <class Item, class Parent> class Async {
typedef void (*Callback)(Parent* parent, Item* item);
protected:
ev_async watcher;
pthread_mutex_t mutex;
std::vector<Item*> data;
Callback callback;
public:
Parent* parent;
public:
inline Async(Parent* parent_, Callback cb_)
: callback(cb_), parent(parent_) {
watcher.data = this;
ev_async_init(&watcher, listener);
ev_async_start(EV_DEFAULT_UC_ &watcher);
pthread_mutex_init(&mutex, NULL);
}
static void listener(EV_P_ ev_async *w, int revents) {
Async* async = static_cast<Async*>(w->data);
std::vector<Item*> rows;
pthread_mutex_lock(&async->mutex);
rows.swap(async->data);
pthread_mutex_unlock(&async->mutex);
for (unsigned int i = 0, size = rows.size(); i < size; i++) {
ev_unref(EV_DEFAULT_UC);
async->callback(async->parent, rows[i]);
}
}
inline void add(Item* item) {
// Make sure node runs long enough to deliver the messages.
ev_ref(EV_DEFAULT_UC);
pthread_mutex_lock(&mutex);
data.push_back(item);
pthread_mutex_unlock(&mutex);
}
inline void send() {
ev_async_send(EV_DEFAULT_ &watcher);
}
inline void send(Item* item) {
add(item);
send();
}
inline ~Async() {
ev_invoke(&watcher, ev_async_pending(&watcher));
pthread_mutex_destroy(&mutex);
ev_async_stop(EV_DEFAULT_UC_ &watcher);
}
};
#endif
...@@ -351,23 +351,18 @@ void Database::RegisterTraceCallback(Baton* baton) { ...@@ -351,23 +351,18 @@ void Database::RegisterTraceCallback(Baton* baton) {
void Database::TraceCallback(void* db, const char* sql) { void Database::TraceCallback(void* db, const char* sql) {
// Note: This function is called in the thread pool. // Note: This function is called in the thread pool.
// Note: Some queries, such as "EXPLAIN" queries, are not sent through this. // Note: Some queries, such as "EXPLAIN" queries, are not sent through this.
static_cast<Database*>(db)->debug_trace->send(std::string(sql)); static_cast<Database*>(db)->debug_trace->send(new std::string(sql));
} }
void Database::TraceCallback(EV_P_ ev_async *w, int revents) { void Database::TraceCallback(Database* db, std::string* sql) {
// Note: This function is called in the main V8 thread. // Note: This function is called in the main V8 thread.
HandleScope scope; HandleScope scope;
AsyncTrace* async = static_cast<AsyncTrace*>(w->data);
std::vector<std::string> queries = async->get();
for (unsigned int i = 0; i < queries.size(); i++) {
Local<Value> argv[] = { Local<Value> argv[] = {
String::NewSymbol("trace"), String::NewSymbol("trace"),
String::New(queries[i].c_str()) String::New(sql->c_str())
}; };
EMIT_EVENT(async->parent->handle_, 2, argv); EMIT_EVENT(db->handle_, 2, argv);
} delete sql;
queries.clear();
} }
void Database::RegisterProfileCallback(Baton* baton) { void Database::RegisterProfileCallback(Baton* baton) {
...@@ -393,24 +388,20 @@ void Database::RegisterProfileCallback(Baton* baton) { ...@@ -393,24 +388,20 @@ void Database::RegisterProfileCallback(Baton* baton) {
void Database::ProfileCallback(void* db, const char* sql, sqlite3_uint64 nsecs) { void Database::ProfileCallback(void* db, const char* sql, sqlite3_uint64 nsecs) {
// Note: This function is called in the thread pool. // Note: This function is called in the thread pool.
// Note: Some queries, such as "EXPLAIN" queries, are not sent through this. // Note: Some queries, such as "EXPLAIN" queries, are not sent through this.
static_cast<Database*>(db)->debug_profile->send(ProfileInfo(sql, nsecs)); ProfileInfo* info = new ProfileInfo();
*info = (ProfileInfo){ std::string(sql), nsecs };
static_cast<Database*>(db)->debug_profile->send(info);
} }
void Database::ProfileCallback(EV_P_ ev_async *w, int revents) { void Database::ProfileCallback(Database *db, ProfileInfo* info) {
// Note: This function is called in the main V8 thread.
HandleScope scope; HandleScope scope;
AsyncProfile* async = static_cast<AsyncProfile*>(w->data);
std::vector<ProfileInfo> queries = async->get();
for (unsigned int i = 0; i < queries.size(); i++) {
Local<Value> argv[] = { Local<Value> argv[] = {
String::NewSymbol("profile"), String::NewSymbol("profile"),
String::New(queries[i].first.c_str()), String::New(info->sql.c_str()),
Integer::New((double)queries[i].second / 1000000.0) Integer::New((double)info->nsecs / 1000000.0)
}; };
EMIT_EVENT(async->parent->handle_, 3, argv); EMIT_EVENT(db->handle_, 3, argv);
} delete info;
queries.clear();
} }
Handle<Value> Database::Exec(const Arguments& args) { Handle<Value> Database::Exec(const Arguments& args) {
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include <queue> #include <queue>
#include <sqlite3.h> #include <sqlite3.h>
#include "async.h"
using namespace v8; using namespace v8;
using namespace node; using namespace node;
...@@ -77,64 +78,21 @@ public: ...@@ -77,64 +78,21 @@ public:
Baton* baton; Baton* baton;
}; };
typedef void (*Async_Callback)(EV_P_ ev_async *w, int revents); struct ProfileInfo {
std::string sql;
sqlite3_int64 nsecs;
// Generic ev_async handler. };
template <class Item, class Parent> class Async {
protected:
ev_async watcher;
pthread_mutex_t mutex;
std::vector<Item> data;
public:
Parent* parent;
public:
Async(Parent* parent_, Async_Callback async_cb) : parent(parent_) {
watcher.data = this;
ev_async_init(&watcher, async_cb);
ev_async_start(EV_DEFAULT_UC_ &watcher);
pthread_mutex_init(&mutex, NULL);
}
inline void add(Item item) {
// Make sure node runs long enough to deliver the messages.
ev_ref(EV_DEFAULT_UC);
pthread_mutex_lock(&mutex);
data.push_back(item);
pthread_mutex_unlock(&mutex);
}
inline std::vector<Item> get() {
std::vector<Item> rows;
pthread_mutex_lock(&mutex);
rows.swap(data);
pthread_mutex_unlock(&mutex);
for (int i = rows.size(); i > 0; i--) {
ev_unref(EV_DEFAULT_UC);
}
return rows;
}
inline void send() {
ev_async_send(EV_DEFAULT_ &watcher);
}
inline void send(Item item) {
add(item);
send();
}
~Async() { struct UpdateInfo {
ev_invoke(&watcher, ev_async_pending(&watcher)); int type;
pthread_mutex_destroy(&mutex); std::string database;
ev_async_stop(EV_DEFAULT_UC_ &watcher); std::string tablename;
} sqlite3_int64 rowid;
}; };
typedef Async<std::string, Database> AsyncTrace; typedef Async<std::string, Database> AsyncTrace;
typedef std::pair<std::string, sqlite3_uint64> ProfileInfo;
typedef Async<ProfileInfo, Database> AsyncProfile; typedef Async<ProfileInfo, Database> AsyncProfile;
typedef Async<UpdateInfo, Database> AsyncUpdate;
friend class Statement; friend class Statement;
...@@ -185,12 +143,18 @@ protected: ...@@ -185,12 +143,18 @@ protected:
static Handle<Value> Parallelize(const Arguments& args); static Handle<Value> Parallelize(const Arguments& args);
static Handle<Value> Configure(const Arguments& args); static Handle<Value> Configure(const Arguments& args);
static void RegisterTraceCallback(Baton* baton); static void RegisterTraceCallback(Baton* baton);
static void TraceCallback(void* db, const char* sql); static void TraceCallback(void* db, const char* sql);
static void TraceCallback(EV_P_ ev_async *w, int revents); static void TraceCallback(Database* db, std::string* sql);
static void RegisterProfileCallback(Baton* baton); static void RegisterProfileCallback(Baton* baton);
static void ProfileCallback(void* db, const char* sql, sqlite3_uint64 nsecs); static void ProfileCallback(void* db, const char* sql, sqlite3_uint64 nsecs);
static void ProfileCallback(EV_P_ ev_async *w, int revents); static void ProfileCallback(Database* db, ProfileInfo* info);
static void RegisterUpdateCallback(Baton* baton);
static void UpdateCallback(void* db, int type, const char* database, const char* table, sqlite3_uint64 rowid);
static void UpdateCallback(Database* db, UpdateInfo* info);
void RemoveCallbacks(); void RemoveCallbacks();
void Wrap (Handle<Object> handle); void Wrap (Handle<Object> handle);
...@@ -213,6 +177,7 @@ protected: ...@@ -213,6 +177,7 @@ protected:
AsyncTrace* debug_trace; AsyncTrace* debug_trace;
AsyncProfile* debug_profile; AsyncProfile* debug_profile;
AsyncUpdate* update_event;
}; };
} }
......
var sqlite3 = require('sqlite3');
var assert = require('assert');
var util = require('util');
if (process.setMaxListeners) process.setMaxListeners(0);
exports['test Database profiling'] = function(beforeExit) {
var db = new sqlite3.Database(':memory:');
var create = false;
var select = false;
db.on('profile', function(sql, nsecs) {
assert.ok(typeof nsecs === "number");
if (sql.match(/^SELECT/)) {
assert.ok(!select);
assert.equal(sql, "SELECT * FROM foo");
select = true;
}
else if (sql.match(/^CREATE/)) {
assert.ok(!create);
assert.equal(sql, "CREATE TABLE foo (id int)");
create = true;
}
else {
assert.ok(false);
}
});
db.serialize(function() {
db.run("CREATE TABLE foo (id int)");
db.run("SELECT * FROM foo");
});
db.close();
beforeExit(function() {
assert.ok(create);
assert.ok(select);
});
};
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment