Commit e436fca5 by Konstantin Käfer

implement Statement#each with ev_async

parent 09e68144
...@@ -42,7 +42,19 @@ Database.prototype.all = function(sql) { ...@@ -42,7 +42,19 @@ Database.prototype.all = function(sql) {
return this; return this;
} }
// Database#each(sql, [bind1, bind2, ...], [callback])
Database.prototype.each = function(sql) {
var statement = new Statement(this, sql);
statement.each.apply(statement, Array.prototype.slice.call(arguments, 1)).finalize();
return this;
}
Database.prototype.execute = function() { Database.prototype.execute = function() {
console.warn('Database#execute() is deprecated. Use Database#all() instead.'); console.warn('Database#execute() is deprecated. Use Database#all() instead.');
return this.all.apply(this, arguments); return this.all.apply(this, arguments);
}; };
Database.prototype.query = function() {
console.warn('Database#query() is deprecated. Use Database#each() instead.');
return this.each.apply(this, arguments);
};
...@@ -27,6 +27,7 @@ void Statement::Init(Handle<Object> target) { ...@@ -27,6 +27,7 @@ void Statement::Init(Handle<Object> target) {
NODE_SET_PROTOTYPE_METHOD(constructor_template, "get", Get); NODE_SET_PROTOTYPE_METHOD(constructor_template, "get", Get);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "run", Run); NODE_SET_PROTOTYPE_METHOD(constructor_template, "run", Run);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "all", All); NODE_SET_PROTOTYPE_METHOD(constructor_template, "all", All);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "each", Each);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "reset", Reset); NODE_SET_PROTOTYPE_METHOD(constructor_template, "reset", Reset);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "finalize", Finalize); NODE_SET_PROTOTYPE_METHOD(constructor_template, "finalize", Finalize);
...@@ -101,6 +102,7 @@ Handle<Value> Statement::New(const Arguments& args) { ...@@ -101,6 +102,7 @@ Handle<Value> Statement::New(const Arguments& args) {
Statement* stmt = new Statement(db); Statement* stmt = new Statement(db);
stmt->Wrap(args.This()); stmt->Wrap(args.This());
PrepareBaton* baton = new PrepareBaton(db, Local<Function>::Cast(args[2]), stmt); PrepareBaton* baton = new PrepareBaton(db, Local<Function>::Cast(args[2]), stmt);
baton->sql = std::string(*String::Utf8Value(sql)); baton->sql = std::string(*String::Utf8Value(sql));
db->Schedule(EIO_BeginPrepare, baton); db->Schedule(EIO_BeginPrepare, baton);
...@@ -108,7 +110,6 @@ Handle<Value> Statement::New(const Arguments& args) { ...@@ -108,7 +110,6 @@ Handle<Value> Statement::New(const Arguments& args) {
return args.This(); return args.This();
} }
void Statement::EIO_BeginPrepare(Database::Baton* baton) { void Statement::EIO_BeginPrepare(Database::Baton* baton) {
assert(baton->db->open); assert(baton->db->open);
baton->db->pending++; baton->db->pending++;
...@@ -520,6 +521,117 @@ int Statement::EIO_AfterAll(eio_req *req) { ...@@ -520,6 +521,117 @@ int Statement::EIO_AfterAll(eio_req *req) {
return 0; return 0;
} }
Handle<Value> Statement::Each(const Arguments& args) {
HandleScope scope;
Statement* stmt = ObjectWrap::Unwrap<Statement>(args.This());
Baton* baton = stmt->Bind<Baton>(args);
if (baton == NULL) {
return ThrowException(Exception::Error(String::New("Data type is not supported")));
}
else {
stmt->Schedule(EIO_BeginEach, baton);
return args.This();
}
}
void Statement::EIO_BeginEach(Baton* baton) {
STATEMENT_BEGIN(Each);
}
int Statement::EIO_Each(eio_req *req) {
STATEMENT_INIT(Baton);
Async* async = new Async(stmt, baton->callback, AsyncEach);
sqlite3_mutex* mtx = sqlite3_db_mutex(stmt->db->handle);
int retrieved = 0;
// Make sure that we also reset when there are no parameters.
if (!baton->parameters.size()) {
sqlite3_reset(stmt->handle);
}
if (stmt->Bind(baton->parameters)) {
while (true) {
sqlite3_mutex_enter(mtx);
stmt->status = sqlite3_step(stmt->handle);
if (stmt->status == SQLITE_ROW) {
sqlite3_mutex_leave(mtx);
Data::Row* row = new Data::Row();
GetRow(row, stmt->handle);
pthread_mutex_lock(&async->mutex);
async->data.push_back(row);
retrieved++;
pthread_mutex_unlock(&async->mutex);
ev_async_send(EV_DEFAULT_ &async->watcher);
}
else {
if (stmt->status != SQLITE_DONE) {
stmt->message = std::string(sqlite3_errmsg(stmt->db->handle));
}
sqlite3_mutex_leave(mtx);
break;
}
}
}
async->completed = true;
ev_async_send(EV_DEFAULT_ &async->watcher);
return 0;
}
void Statement::AsyncEach(EV_P_ ev_async *w, int revents) {
HandleScope scope;
Async* async = static_cast<Async*>(w->data);
while (true) {
// Get the contents out of the data cache for us to process in the JS callback.
Data::Rows rows;
pthread_mutex_lock(&async->mutex);
rows.swap(async->data);
pthread_mutex_unlock(&async->mutex);
if (rows.empty()) {
break;
}
if (!async->callback.IsEmpty() && async->callback->IsFunction()) {
Local<Value> argv[2];
argv[0] = Local<Value>::New(Null());
Data::Rows::const_iterator it = rows.begin();
Data::Rows::const_iterator end = rows.end();
for (int i = 0; it < end; it++, i++) {
argv[1] = RowToJS(*it);
TRY_CATCH_CALL(async->stmt->handle_, async->callback, 2, argv);
delete *it;
}
}
}
if (async->completed) {
delete async;
w->data = NULL;
}
}
int Statement::EIO_AfterEach(eio_req *req) {
HandleScope scope;
STATEMENT_INIT(Baton);
if (stmt->status != SQLITE_DONE) {
Error(baton);
}
STATEMENT_END();
return 0;
}
Handle<Value> Statement::Reset(const Arguments& args) { Handle<Value> Statement::Reset(const Arguments& args) {
HandleScope scope; HandleScope scope;
Statement* stmt = ObjectWrap::Unwrap<Statement>(args.This()); Statement* stmt = ObjectWrap::Unwrap<Statement>(args.This());
......
...@@ -117,13 +117,41 @@ public: ...@@ -117,13 +117,41 @@ public:
Baton* baton; Baton* baton;
}; };
typedef void (*Async_Callback)(EV_P_ ev_async *w, int revents);
struct Async {
ev_async watcher;
Statement* stmt;
Data::Rows data;
pthread_mutex_t mutex;
Persistent<Function> callback;
bool completed;
Async(Statement* st, Handle<Function> cb, Async_Callback async_cb) :
stmt(st), completed(false) {
watcher.data = this;
ev_async_init(&watcher, async_cb);
ev_async_start(EV_DEFAULT_UC_ &watcher);
callback = Persistent<Function>::New(cb);
stmt->Ref();
pthread_mutex_init(&mutex, NULL);
}
~Async() {
callback.Dispose();
stmt->Unref();
pthread_mutex_destroy(&mutex);
ev_async_stop(EV_DEFAULT_UC_ &watcher);
}
};
Statement(Database* db_) : EventEmitter(), Statement(Database* db_) : EventEmitter(),
db(db_), db(db_),
handle(NULL), handle(NULL),
status(SQLITE_OK), status(SQLITE_OK),
prepared(false), prepared(false),
locked(false), locked(false),
finalized(false) { finalized(false) {
db->Ref(); db->Ref();
} }
...@@ -140,8 +168,11 @@ protected: ...@@ -140,8 +168,11 @@ protected:
EIO_DEFINITION(Get); EIO_DEFINITION(Get);
EIO_DEFINITION(Run); EIO_DEFINITION(Run);
EIO_DEFINITION(All); EIO_DEFINITION(All);
EIO_DEFINITION(Each);
EIO_DEFINITION(Reset); EIO_DEFINITION(Reset);
static void AsyncEach(EV_P_ ev_async *w, int revents);
static Handle<Value> Finalize(const Arguments& args); static Handle<Value> Finalize(const Arguments& args);
static void Finalize(Baton* baton); static void Finalize(Baton* baton);
void Finalize(); void Finalize();
......
var sqlite3 = require('sqlite3'); var sqlite3 = require('sqlite3');
var assert = require('assert'); var assert = require('assert');
function randomString() {
var str = '';
for (var i = Math.random() * 300; i > 0; i--) {
str += String.fromCharCode(Math.floor(Math.random() * 65536));
}
return str;
};
exports['test unicode characters'] = function(beforeExit) { exports['test unicode characters'] = function(beforeExit) {
var db = new sqlite3.Database(':memory:'); var db = new sqlite3.Database(':memory:');
function randomString() {
var str = '';
for (var i = Math.random() * 300; i > 0; i--) {
str += String.fromCharCode(Math.floor(Math.random() * 65536));
}
return str;
};
// Generate random data. // Generate random data.
var data = []; var data = [];
var length = Math.floor(Math.random() * 1000) + 200; var length = Math.floor(Math.random() * 1000) + 200;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment