Commit 1362c202 by Konstantin Käfer

fix #4 and provide a complete callback for Statement#each

parent 7eb9536c
......@@ -200,8 +200,8 @@ template <class T> Values::Field*
}
}
template <class T> T* Statement::Bind(const Arguments& args, int start) {
int last = args.Length();
template <class T> T* Statement::Bind(const Arguments& args, int start, int last) {
if (last < 0) last = args.Length();
Local<Function> callback;
if (last > start && args[last - 1]->IsFunction()) {
callback = Local<Function>::Cast(args[last - 1]);
......@@ -586,11 +586,19 @@ Handle<Value> Statement::Each(const Arguments& args) {
HandleScope scope;
Statement* stmt = ObjectWrap::Unwrap<Statement>(args.This());
Baton* baton = stmt->Bind<Baton>(args);
int last = args.Length();
Local<Function> completed;
if (last >= 2 && args[last - 1]->IsFunction() && args[last - 2]->IsFunction()) {
completed = Local<Function>::Cast(args[--last]);
}
EachBaton* baton = stmt->Bind<EachBaton>(args, 0, last);
if (baton == NULL) {
return ThrowException(Exception::Error(String::New("Data type is not supported")));
}
else {
baton->completed = Persistent<Function>::New(completed);
stmt->Schedule(EIO_BeginEach, baton);
return args.This();
}
......@@ -601,9 +609,9 @@ void Statement::EIO_BeginEach(Baton* baton) {
}
int Statement::EIO_Each(eio_req *req) {
STATEMENT_INIT(Baton);
STATEMENT_INIT(EachBaton);
Async* async = new Async(stmt, baton->callback, AsyncEach);
Async* async = new Async(stmt, baton->callback, baton->completed, AsyncEach);
sqlite3_mutex* mtx = sqlite3_db_mutex(stmt->db->handle);
......@@ -669,6 +677,7 @@ void Statement::AsyncEach(EV_P_ ev_async *w, int revents) {
Rows::const_iterator end = rows.end();
for (int i = 0; it < end; it++, i++) {
argv[1] = RowToJS(*it);
async->retrieved++;
TRY_CATCH_CALL(async->stmt->handle_, async->callback, 2, argv);
delete *it;
}
......@@ -676,6 +685,11 @@ void Statement::AsyncEach(EV_P_ ev_async *w, int revents) {
}
if (async->completed) {
if (!async->completed_callback.IsEmpty() &&
async->completed_callback->IsFunction()) {
Local<Value> argv[] = { Integer::New(async->retrieved) };
TRY_CATCH_CALL(async->stmt->handle_, async->completed_callback, 1, argv);
}
delete async;
w->data = NULL;
}
......@@ -683,7 +697,7 @@ void Statement::AsyncEach(EV_P_ ev_async *w, int revents) {
int Statement::EIO_AfterEach(eio_req *req) {
HandleScope scope;
STATEMENT_INIT(Baton);
STATEMENT_INIT(EachBaton);
if (stmt->status != SQLITE_DONE) {
Error(baton);
......
......@@ -115,6 +115,12 @@ public:
Rows rows;
};
struct EachBaton : Baton {
EachBaton(Statement* stmt_, Handle<Function> cb_) :
Baton(stmt_, cb_) {}
Persistent<Function> completed;
};
struct PrepareBaton : Database::Baton {
Statement* stmt;
std::string sql;
......@@ -144,19 +150,24 @@ public:
pthread_mutex_t mutex;
Persistent<Function> callback;
bool completed;
int retrieved;
Persistent<Function> completed_callback;
Async(Statement* st, Handle<Function> cb, Async_Callback async_cb) :
stmt(st), completed(false) {
Async(Statement* st, Handle<Function> cb, Handle<Function>completed_cb,
Async_Callback async_cb) :
stmt(st), completed(false), retrieved(0) {
watcher.data = this;
ev_async_init(&watcher, async_cb);
ev_async_start(EV_DEFAULT_UC_ &watcher);
callback = Persistent<Function>::New(cb);
completed_callback = Persistent<Function>::New(completed_cb);
stmt->Ref();
pthread_mutex_init(&mutex, NULL);
}
~Async() {
callback.Dispose();
completed_callback.Dispose();
stmt->Unref();
pthread_mutex_destroy(&mutex);
ev_async_stop(EV_DEFAULT_UC_ &watcher);
......@@ -196,7 +207,7 @@ protected:
void Finalize();
template <class T> inline Values::Field* BindParameter(const Handle<Value> source, T pos);
template <class T> T* Bind(const Arguments& args, int start = 0);
template <class T> T* Bind(const Arguments& args, int start = 0, int end = -1);
bool Bind(const Parameters parameters);
static void GetRow(Row* row, sqlite3_stmt* stmt);
......
......@@ -18,4 +18,25 @@ exports['test Statement#each'] = function(beforeExit) {
beforeExit(function() {
assert.equal(retrieved, total, "Only retrieved " + retrieved + " out of " + total + " rows.");
});
};
\ No newline at end of file
};
exports['test Statement#each with complete callback'] = function(beforeExit) {
var db = new sqlite3.Database('test/support/big.db', sqlite3.OPEN_READONLY);
var total = 10000;
var retrieved = 0;
var completed = false;
db.each('SELECT id, txt FROM foo LIMIT 0, ?', total, function(err, row) {
if (err) throw err;
retrieved++;
}, function(num) {
assert.equal(retrieved, num);
completed = true;
});
beforeExit(function() {
assert.ok(completed);
assert.equal(retrieved, total, "Only retrieved " + retrieved + " out of " + total + " rows.");
});
};
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment