Commit 4afdedfb by Konstantin Käfer

add capability to trace queries

parent 916dcac8
...@@ -25,6 +25,7 @@ void Database::Init(Handle<Object> target) { ...@@ -25,6 +25,7 @@ void Database::Init(Handle<Object> target) {
NODE_SET_PROTOTYPE_METHOD(constructor_template, "exec", Exec); NODE_SET_PROTOTYPE_METHOD(constructor_template, "exec", Exec);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "serialize", Serialize); NODE_SET_PROTOTYPE_METHOD(constructor_template, "serialize", Serialize);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "parallelize", Parallelize); NODE_SET_PROTOTYPE_METHOD(constructor_template, "parallelize", Parallelize);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "configure", Configure);
target->Set(String::NewSymbol("Database"), target->Set(String::NewSymbol("Database"),
constructor_template->GetFunction()); constructor_template->GetFunction());
...@@ -301,6 +302,69 @@ Handle<Value> Database::Parallelize(const Arguments& args) { ...@@ -301,6 +302,69 @@ Handle<Value> Database::Parallelize(const Arguments& args) {
return args.This(); return args.This();
} }
Handle<Value> Database::Configure(const Arguments& args) {
HandleScope scope;
Database* db = ObjectWrap::Unwrap<Database>(args.This());
REQUIRE_ARGUMENTS(2);
if (args[0]->Equals(String::NewSymbol("trace"))) {
Local<Function> handle;
Baton* baton = new Baton(db, handle);
db->Schedule(RegisterTraceCallback, baton);
}
else {
ThrowException(Exception::Error(String::Concat(
args[0]->ToString(),
String::NewSymbol(" is not a valid configuration option")
)));
}
return args.This();
}
void Database::RegisterTraceCallback(Baton* baton) {
assert(baton->db->open);
assert(baton->db->handle);
Database* db = baton->db;
if (db->debug_trace == NULL) {
// Add it.
db->debug_trace = new AsyncTrace(db, TraceCallback);
sqlite3_trace(db->handle, TraceCallback, db);
}
else {
// Remove it.
sqlite3_trace(db->handle, NULL, NULL);
delete db->debug_trace;
db->debug_trace = NULL;
}
delete baton;
}
void Database::TraceCallback(void* db, const char* sql) {
// Note: This function is called in the thread pool.
// Note: Some queries, such as "EXPLAIN" queries, are not sent through this.
static_cast<Database*>(db)->debug_trace->send(std::string(sql));
}
void Database::TraceCallback(EV_P_ ev_async *w, int revents) {
// Note: This function is called in the main V8 thread.
HandleScope scope;
AsyncTrace* async = static_cast<AsyncTrace*>(w->data);
std::vector<std::string> queries = async->get();
for (int i = 0; i < queries.size(); i++) {
Local<Value> argv[] = {
String::NewSymbol("trace"),
String::New(queries[i].c_str())
};
EMIT_EVENT(async->parent->handle_, 2, argv);
}
queries.clear();
}
Handle<Value> Database::Exec(const Arguments& args) { Handle<Value> Database::Exec(const Arguments& args) {
HandleScope scope; HandleScope scope;
Database* db = ObjectWrap::Unwrap<Database>(args.This()); Database* db = ObjectWrap::Unwrap<Database>(args.This());
......
...@@ -71,6 +71,57 @@ public: ...@@ -71,6 +71,57 @@ public:
Baton* baton; Baton* baton;
}; };
typedef void (*Async_Callback)(EV_P_ ev_async *w, int revents);
// Generic ev_async handler.
template <class Item, class Parent> class Async {
protected:
ev_async watcher;
pthread_mutex_t mutex;
std::vector<Item> data;
public:
Parent* parent;
public:
Async(Parent* parent_, Async_Callback async_cb) : parent(parent_) {
watcher.data = this;
ev_async_init(&watcher, async_cb);
ev_async_start(EV_DEFAULT_UC_ &watcher);
pthread_mutex_init(&mutex, NULL);
}
inline void add(Item item) {
pthread_mutex_lock(&mutex);
data.push_back(item);
pthread_mutex_unlock(&mutex);
}
inline std::vector<Item> get() {
std::vector<Item> rows;
pthread_mutex_lock(&mutex);
rows.swap(data);
pthread_mutex_unlock(&mutex);
return rows;
}
inline void send() {
ev_async_send(EV_DEFAULT_ &watcher);
}
inline void send(Item item) {
add(item);
send();
}
~Async() {
pthread_mutex_destroy(&mutex);
ev_async_stop(EV_DEFAULT_UC_ &watcher);
}
};
typedef Async<std::string, Database> AsyncTrace;
friend class Statement; friend class Statement;
protected: protected:
...@@ -79,12 +130,17 @@ protected: ...@@ -79,12 +130,17 @@ protected:
open(false), open(false),
locked(false), locked(false),
pending(0), pending(0),
serialize(false) { serialize(false),
debug_trace(NULL) {
} }
~Database() { ~Database() {
assert(handle == NULL); assert(handle == NULL);
if (debug_trace) {
delete debug_trace;
debug_trace = NULL;
}
} }
static Handle<Value> New(const Arguments& args); static Handle<Value> New(const Arguments& args);
...@@ -108,6 +164,11 @@ protected: ...@@ -108,6 +164,11 @@ protected:
static Handle<Value> Serialize(const Arguments& args); static Handle<Value> Serialize(const Arguments& args);
static Handle<Value> Parallelize(const Arguments& args); static Handle<Value> Parallelize(const Arguments& args);
static Handle<Value> Configure(const Arguments& args);
static void RegisterTraceCallback(Baton* baton);
static void TraceCallback(void* db, const char* sql);
static void TraceCallback(EV_P_ ev_async *w, int revents);
void Wrap (Handle<Object> handle); void Wrap (Handle<Object> handle);
inline void MakeWeak(); inline void MakeWeak();
virtual void Unref(); virtual void Unref();
...@@ -125,6 +186,8 @@ protected: ...@@ -125,6 +186,8 @@ protected:
bool serialize; bool serialize;
std::queue<Call*> queue; std::queue<Call*> queue;
AsyncTrace* debug_trace;
}; };
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment