Commit 1b0cf90f by Konstantin Käfer

fix uv_async usage

parent 229facad
...@@ -3,9 +3,6 @@ var path = require('path'); ...@@ -3,9 +3,6 @@ var path = require('path');
var util = require('util'); var util = require('util');
var EventEmitter = require('events').EventEmitter; var EventEmitter = require('events').EventEmitter;
var Database = sqlite3.Database;
var Statement = sqlite3.Statement;
function errorCallback(args) { function errorCallback(args) {
if (typeof args[args.length - 1] === 'function') { if (typeof args[args.length - 1] === 'function') {
var callback = args[args.length - 1]; var callback = args[args.length - 1];
...@@ -13,6 +10,11 @@ function errorCallback(args) { ...@@ -13,6 +10,11 @@ function errorCallback(args) {
} }
} }
function inherits(target, source) {
for (var k in source.prototype)
target.prototype[k] = source.prototype[k];
}
sqlite3.cached = { sqlite3.cached = {
Database: function(file, a, b) { Database: function(file, a, b) {
if (file === '' || file === ':memory:') { if (file === '' || file === ':memory:') {
...@@ -43,6 +45,13 @@ sqlite3.cached = { ...@@ -43,6 +45,13 @@ sqlite3.cached = {
objects: {} objects: {}
}; };
var Database = sqlite3.Database;
var Statement = sqlite3.Statement;
inherits(Database, EventEmitter);
inherits(Statement, EventEmitter);
// Database#prepare(sql, [bind1, bind2, ...], [callback]) // Database#prepare(sql, [bind1, bind2, ...], [callback])
Database.prototype.prepare = function(sql) { Database.prototype.prepare = function(sql) {
var params = Array.prototype.slice.call(arguments, 1); var params = Array.prototype.slice.call(arguments, 1);
...@@ -149,12 +158,6 @@ Database.prototype.removeAllListeners = function(type) { ...@@ -149,12 +158,6 @@ Database.prototype.removeAllListeners = function(type) {
return val; return val;
}; };
Database.prototype.emit = EventEmitter.prototype.emit;
Database.prototype.once = EventEmitter.prototype.once;
// util.inherits(Database, EventEmitter);
// util.inherits(Statement, EventEmitter);
// Save the stack trace over EIO callbacks. // Save the stack trace over EIO callbacks.
sqlite3.verbose = function() { sqlite3.verbose = function() {
if (!isVerbose) { if (!isVerbose) {
......
...@@ -607,7 +607,6 @@ void Statement::EIO_Each(eio_req *req) { ...@@ -607,7 +607,6 @@ void Statement::EIO_Each(eio_req *req) {
STATEMENT_INIT(EachBaton); STATEMENT_INIT(EachBaton);
Async* async = baton->async; Async* async = baton->async;
fprintf(stderr, "async:%p\n", async);
sqlite3_mutex* mtx = sqlite3_db_mutex(stmt->db->handle); sqlite3_mutex* mtx = sqlite3_db_mutex(stmt->db->handle);
...@@ -620,7 +619,6 @@ void Statement::EIO_Each(eio_req *req) { ...@@ -620,7 +619,6 @@ void Statement::EIO_Each(eio_req *req) {
if (stmt->Bind(baton->parameters)) { if (stmt->Bind(baton->parameters)) {
while (true) { while (true) {
fprintf(stderr, "before mutex\n");
sqlite3_mutex_enter(mtx); sqlite3_mutex_enter(mtx);
stmt->status = sqlite3_step(stmt->handle); stmt->status = sqlite3_step(stmt->handle);
if (stmt->status == SQLITE_ROW) { if (stmt->status == SQLITE_ROW) {
...@@ -628,33 +626,32 @@ void Statement::EIO_Each(eio_req *req) { ...@@ -628,33 +626,32 @@ void Statement::EIO_Each(eio_req *req) {
Row* row = new Row(); Row* row = new Row();
GetRow(row, stmt->handle); GetRow(row, stmt->handle);
// pthread_mutex_lock(&async->mutex); pthread_mutex_lock(&async->mutex);
async->data.push_back(row); async->data.push_back(row);
retrieved++; retrieved++;
// pthread_mutex_unlock(&async->mutex); pthread_mutex_unlock(&async->mutex);
fprintf(stderr, "retrieved:%d\n", retrieved); uv_async_send(&async->watcher);
// uv_async_send(&async->watcher);
} }
else { else {
if (stmt->status != SQLITE_DONE) { if (stmt->status != SQLITE_DONE) {
stmt->message = std::string(sqlite3_errmsg(stmt->db->handle)); stmt->message = std::string(sqlite3_errmsg(stmt->db->handle));
} }
sqlite3_mutex_leave(mtx); sqlite3_mutex_leave(mtx);
fprintf(stderr, "done\n");
break; break;
} }
} }
} }
fprintf(stderr, "retrieved:%d\n", retrieved);
async->completed = true; async->completed = true;
// uv_async_send(&async->watcher); uv_async_send(&async->watcher);
} }
void Statement::CloseCallback(uv_handle_t* handle) { void Statement::CloseCallback(uv_handle_t* handle) {
assert(handle != NULL); assert(handle != NULL);
fprintf(stderr, "close callback\n"); Async* async = static_cast<Async*>(handle->data);
delete async;
handle->data = NULL;
} }
void Statement::AsyncEach(uv_async_t* handle, int status) { void Statement::AsyncEach(uv_async_t* handle, int status) {
...@@ -680,16 +677,15 @@ void Statement::AsyncEach(uv_async_t* handle, int status) { ...@@ -680,16 +677,15 @@ void Statement::AsyncEach(uv_async_t* handle, int status) {
Rows::const_iterator it = rows.begin(); Rows::const_iterator it = rows.begin();
Rows::const_iterator end = rows.end(); Rows::const_iterator end = rows.end();
for (int i = 0; it < end; it++, i++) { for (int i = 0; it < end; it++, i++) {
// argv[1] = RowToJS(*it); argv[1] = RowToJS(*it);
async->retrieved++; async->retrieved++;
// TRY_CATCH_CALL(async->stmt->handle_, baton->callback, 2, argv); TRY_CATCH_CALL(async->stmt->handle_, baton->callback, 2, argv);
// delete *it; delete *it;
} }
} }
} }
if (async->completed) { if (async->completed) {
fprintf(stderr, "completed\n");
if (!baton->completed.IsEmpty() && if (!baton->completed.IsEmpty() &&
baton->completed->IsFunction()) { baton->completed->IsFunction()) {
Local<Value> argv[] = { Local<Value> argv[] = {
...@@ -698,9 +694,7 @@ void Statement::AsyncEach(uv_async_t* handle, int status) { ...@@ -698,9 +694,7 @@ void Statement::AsyncEach(uv_async_t* handle, int status) {
}; };
TRY_CATCH_CALL(async->stmt->handle_, baton->completed, 2, argv); TRY_CATCH_CALL(async->stmt->handle_, baton->completed, 2, argv);
} }
// uv_close((uv_handle_t*)handle, CloseCallback); uv_close((uv_handle_t*)handle, CloseCallback);
delete async;
handle->data = NULL;
} }
} }
......
...@@ -165,11 +165,8 @@ public: ...@@ -165,11 +165,8 @@ public:
stmt(st), baton(eb), completed(false), retrieved(0) { stmt(st), baton(eb), completed(false), retrieved(0) {
watcher.data = this; watcher.data = this;
pthread_mutex_init(&mutex, NULL); pthread_mutex_init(&mutex, NULL);
fprintf(stderr, "initialized mutex\n");
stmt->Ref(); stmt->Ref();
fprintf(stderr, "referenced stmt\n");
uv_async_init(uv_default_loop(), &watcher, async_cb); uv_async_init(uv_default_loop(), &watcher, async_cb);
fprintf(stderr, "started async\n");
} }
~Async() { ~Async() {
......
...@@ -19,24 +19,24 @@ exports['test Statement#each'] = function(beforeExit) { ...@@ -19,24 +19,24 @@ exports['test Statement#each'] = function(beforeExit) {
assert.equal(retrieved, total, "Only retrieved " + retrieved + " out of " + total + " rows."); assert.equal(retrieved, total, "Only retrieved " + retrieved + " out of " + total + " rows.");
}); });
}; };
//
// exports['test Statement#each with complete callback'] = function(beforeExit) { exports['test Statement#each with complete callback'] = function(beforeExit) {
// var db = new sqlite3.Database('test/support/big.db', sqlite3.OPEN_READONLY); var db = new sqlite3.Database('test/support/big.db', sqlite3.OPEN_READONLY);
//
// var total = 10000; var total = 10000;
// var retrieved = 0; var retrieved = 0;
// var completed = false; var completed = false;
//
// db.each('SELECT id, txt FROM foo LIMIT 0, ?', total, function(err, row) { db.each('SELECT id, txt FROM foo LIMIT 0, ?', total, function(err, row) {
// if (err) throw err; if (err) throw err;
// retrieved++; retrieved++;
// }, function(err, num) { }, function(err, num) {
// assert.equal(retrieved, num); assert.equal(retrieved, num);
// completed = true; completed = true;
// }); });
//
// beforeExit(function() { beforeExit(function() {
// assert.ok(completed); assert.ok(completed);
// assert.equal(retrieved, total, "Only retrieved " + retrieved + " out of " + total + " rows."); assert.equal(retrieved, total, "Only retrieved " + retrieved + " out of " + total + " rows.");
// }); });
// }; };
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment