Commit d9bdb1d0 by Konstantin Käfer

add serialization API

parent e6e64815
......@@ -24,21 +24,21 @@ Database.prototype.prepare = function(sql) {
// Database#run(sql, [bind1, bind2, ...], [callback])
Database.prototype.run = function(sql) {
var statement = new Statement(this, sql);
statement.run.apply(statement, Array.prototype.slice.call(arguments, 1));
statement.run.apply(statement, Array.prototype.slice.call(arguments, 1)).finalize();
return this;
}
// Database#get(sql, [bind1, bind2, ...], [callback])
Database.prototype.get = function(sql) {
var statement = new Statement(this, sql);
statement.get.apply(statement, Array.prototype.slice.call(arguments, 1));
statement.get.apply(statement, Array.prototype.slice.call(arguments, 1)).finalize();
return this;
}
// Database#all(sql, [bind1, bind2, ...], [callback])
Database.prototype.all = function(sql) {
var statement = new Statement(this, sql);
statement.all.apply(statement, Array.prototype.slice.call(arguments, 1));
statement.all.apply(statement, Array.prototype.slice.call(arguments, 1)).finalize();
return this;
}
......
......@@ -12,7 +12,7 @@ using namespace node_sqlite3;
Persistent<FunctionTemplate> Database::constructor_template;
void Database::Init(v8::Handle<Object> target) {
void Database::Init(Handle<Object> target) {
HandleScope scope;
Local<FunctionTemplate> t = FunctionTemplate::New(New);
......@@ -23,9 +23,11 @@ void Database::Init(v8::Handle<Object> target) {
constructor_template->SetClassName(String::NewSymbol("Database"));
NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Close);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "serialize", Serialize);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "parallelize", Parallelize);
target->Set(v8::String::NewSymbol("Database"),
constructor_template->GetFunction());
target->Set(String::NewSymbol("Database"),
constructor_template->GetFunction());
}
void Database::Process() {
......@@ -57,20 +59,22 @@ void Database::Process() {
return;
}
while (open && !locked && !queue.empty()) {
while (open && (!locked || pending == 0) && !queue.empty()) {
Call* call = queue.front();
if (call->exclusive && pending > 0) {
break;
}
locked = call->exclusive;
call->callback(call->baton);
queue.pop();
delete call;
}
}
void Database::Schedule(EIO_Callback callback, Baton* baton, bool exclusive = false) {
void Database::Schedule(EIO_Callback callback, Baton* baton, bool exclusive) {
if (!open && locked) {
EXCEPTION(String::New("Database is closed"), SQLITE_MISUSE, exception);
if (!baton->callback.IsEmpty() && baton->callback->IsFunction()) {
......@@ -84,10 +88,11 @@ void Database::Schedule(EIO_Callback callback, Baton* baton, bool exclusive = fa
return;
}
if (!open || locked || (exclusive && pending > 0)) {
queue.push(new Call(callback, baton, exclusive));
if (!open || ((locked || exclusive || serialize) && pending > 0)) {
queue.push(new Call(callback, baton, exclusive || serialize));
}
else {
locked = exclusive;
callback(baton);
}
}
......@@ -198,9 +203,7 @@ Handle<Value> Database::Close(const Arguments& args) {
void Database::EIO_BeginClose(Baton* baton) {
assert(baton->db->open);
assert(!baton->db->locked);
assert(baton->db->pending == 0);
baton->db->locked = true;
eio_custom(EIO_Close, EIO_PRI_DEFAULT, EIO_AfterClose, baton);
}
......@@ -255,6 +258,42 @@ int Database::EIO_AfterClose(eio_req *req) {
return 0;
}
Handle<Value> Database::Serialize(const Arguments& args) {
HandleScope scope;
Database* db = ObjectWrap::Unwrap<Database>(args.This());
OPTIONAL_ARGUMENT_FUNCTION(0, callback);
bool before = db->serialize;
db->serialize = true;
if (!callback.IsEmpty() && callback->IsFunction()) {
TRY_CATCH_CALL(args.This(), callback, 0, NULL);
db->serialize = before;
}
db->Process();
return args.This();
}
Handle<Value> Database::Parallelize(const Arguments& args) {
HandleScope scope;
Database* db = ObjectWrap::Unwrap<Database>(args.This());
OPTIONAL_ARGUMENT_FUNCTION(0, callback);
bool before = db->serialize;
db->serialize = false;
if (!callback.IsEmpty() && callback->IsFunction()) {
TRY_CATCH_CALL(args.This(), callback, 0, NULL);
db->serialize = before;
}
db->Process();
return args.This();
}
/**
* Override this so that we can properly close the database when this object
* gets garbage collected.
......
......@@ -21,7 +21,7 @@ class Database;
class Database : public EventEmitter {
public:
static Persistent<FunctionTemplate> constructor_template;
static void Init(v8::Handle<Object> target);
static void Init(Handle<Object> target);
static inline bool HasInstance(Handle<Value> val) {
if (!val->IsObject()) return false;
......@@ -71,7 +71,8 @@ protected:
handle(NULL),
open(false),
locked(false),
pending(0) {
pending(0),
serialize(false) {
}
......@@ -84,7 +85,7 @@ protected:
static int EIO_Open(eio_req *req);
static int EIO_AfterOpen(eio_req *req);
void Schedule(EIO_Callback callback, Baton* baton, bool exclusive);
void Schedule(EIO_Callback callback, Baton* baton, bool exclusive = false);
void Process();
static Handle<Value> Close(const Arguments& args);
......@@ -92,6 +93,9 @@ protected:
static int EIO_Close(eio_req *req);
static int EIO_AfterClose(eio_req *req);
static Handle<Value> Serialize(const Arguments& args);
static Handle<Value> Parallelize(const Arguments& args);
void Wrap (Handle<Object> handle);
inline void MakeWeak();
virtual void Unref();
......@@ -106,6 +110,8 @@ protected:
bool locked;
unsigned int pending;
bool serialize;
std::queue<Call*> queue;
};
......
......@@ -135,13 +135,23 @@ const char* sqlite_code_string(int code);
static int EIO_##name(eio_req *req); \
static int EIO_After##name(eio_req *req);
#define STATEMENT_BEGIN(type) \
assert(!baton->stmt->locked); \
assert(!baton->stmt->finalized); \
assert(baton->stmt->prepared); \
baton->stmt->locked = true; \
baton->stmt->db->pending++; \
eio_custom(EIO_##type, EIO_PRI_DEFAULT, EIO_After##type, baton);
#define STATEMENT_INIT(type) \
type* baton = static_cast<type*>(req->data); \
Statement* stmt = baton->stmt;
#define STATEMENT_END() \
stmt->locked = false; \
stmt->db->pending--; \
stmt->Process(); \
stmt->db->Process(); \
delete baton;
#endif
......
......@@ -12,7 +12,7 @@ using namespace node_sqlite3;
Persistent<FunctionTemplate> Statement::constructor_template;
void Statement::Init(v8::Handle<Object> target) {
void Statement::Init(Handle<Object> target) {
HandleScope scope;
Local<FunctionTemplate> t = FunctionTemplate::New(New);
......@@ -29,8 +29,8 @@ void Statement::Init(v8::Handle<Object> target) {
NODE_SET_PROTOTYPE_METHOD(constructor_template, "reset", Reset);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "finalize", Finalize);
target->Set(v8::String::NewSymbol("Statement"),
constructor_template->GetFunction());
target->Set(String::NewSymbol("Statement"),
constructor_template->GetFunction());
}
void Statement::Process() {
......@@ -102,7 +102,7 @@ Handle<Value> Statement::New(const Arguments& args) {
stmt->Wrap(args.This());
PrepareBaton* baton = new PrepareBaton(db, Local<Function>::Cast(args[2]), stmt);
baton->sql = std::string(*String::Utf8Value(sql));
db->Schedule(EIO_BeginPrepare, baton, false);
db->Schedule(EIO_BeginPrepare, baton);
return args.This();
}
......@@ -110,7 +110,7 @@ Handle<Value> Statement::New(const Arguments& args) {
void Statement::EIO_BeginPrepare(Database::Baton* baton) {
assert(baton->db->open);
assert(!baton->db->locked);
baton->db->pending++;
eio_custom(EIO_Prepare, EIO_PRI_DEFAULT, EIO_AfterPrepare, baton);
}
......@@ -157,7 +157,6 @@ int Statement::EIO_AfterPrepare(eio_req *req) {
}
STATEMENT_END();
baton->db->Process();
return 0;
}
......@@ -263,11 +262,7 @@ Handle<Value> Statement::Bind(const Arguments& args) {
}
void Statement::EIO_BeginBind(Baton* baton) {
assert(!baton->stmt->locked);
assert(!baton->stmt->finalized);
assert(baton->stmt->prepared);
baton->stmt->locked = true;
eio_custom(EIO_Bind, EIO_PRI_DEFAULT, EIO_AfterBind, baton);
STATEMENT_BEGIN(Bind);
}
int Statement::EIO_Bind(eio_req *req) {
......@@ -313,11 +308,7 @@ Handle<Value> Statement::Get(const Arguments& args) {
}
void Statement::EIO_BeginGet(Baton* baton) {
assert(!baton->stmt->locked);
assert(!baton->stmt->finalized);
assert(baton->stmt->prepared);
baton->stmt->locked = true;
eio_custom(EIO_Get, EIO_PRI_DEFAULT, EIO_AfterGet, baton);
STATEMENT_BEGIN(Get);
}
int Statement::EIO_Get(eio_req *req) {
......@@ -383,11 +374,7 @@ Handle<Value> Statement::Run(const Arguments& args) {
}
void Statement::EIO_BeginRun(Baton* baton) {
assert(!baton->stmt->locked);
assert(!baton->stmt->finalized);
assert(baton->stmt->prepared);
baton->stmt->locked = true;
eio_custom(EIO_Run, EIO_PRI_DEFAULT, EIO_AfterRun, baton);
STATEMENT_BEGIN(Run);
}
int Statement::EIO_Run(eio_req *req) {
......@@ -444,11 +431,7 @@ Handle<Value> Statement::All(const Arguments& args) {
}
void Statement::EIO_BeginAll(Baton* baton) {
assert(!baton->stmt->locked);
assert(!baton->stmt->finalized);
assert(baton->stmt->prepared);
baton->stmt->locked = true;
eio_custom(EIO_All, EIO_PRI_DEFAULT, EIO_AfterAll, baton);
STATEMENT_BEGIN(All);
}
int Statement::EIO_All(eio_req *req) {
......@@ -529,11 +512,7 @@ Handle<Value> Statement::Reset(const Arguments& args) {
}
void Statement::EIO_BeginReset(Baton* baton) {
assert(!baton->stmt->locked);
assert(!baton->stmt->finalized);
assert(baton->stmt->prepared);
baton->stmt->locked = true;
eio_custom(EIO_Reset, EIO_PRI_DEFAULT, EIO_AfterReset, baton);
STATEMENT_BEGIN(Reset);
}
int Statement::EIO_Reset(eio_req *req) {
......@@ -649,8 +628,6 @@ void Statement::Finalize() {
// error events in case those failed.
sqlite3_finalize(handle);
handle = NULL;
db->pending--;
db->Process();
db->Unref();
}
......
......@@ -124,7 +124,6 @@ public:
prepared(false),
locked(false),
finalized(false) {
db->pending++;
db->Ref();
}
......
var sqlite3 = require('sqlite3');
var assert = require('assert');
exports['test serialize() and parallelize()'] = function(beforeExit) {
var db = new sqlite3.Database(':memory:');
var inserted1 = 0;
var inserted2 = 0;
var retrieved = 0;
var count = 1000;
db.serialize();
db.run("CREATE TABLE foo (txt text, num int, flt float, blb blob)");
db.parallelize();
var stmt1 = db.prepare("INSERT INTO foo VALUES(?, ?, ?, ?)");
var stmt2 = db.prepare("INSERT INTO foo VALUES(?, ?, ?, ?)");
for (var i = 0; i < count; i++) {
stmt1.run('String ' + i, i, i * Math.PI, function(err) {
if (err) throw err;
inserted1++;
// Might sometimes fail, but should work fine most of the time.
assert.ok(inserted2 >= Math.floor(0.95 * inserted1));
});
i++;
stmt2.run('String ' + i, i, i * Math.PI, function(err) {
if (err) throw err;
inserted2++;
assert.ok(inserted1 >= Math.floor(0.95 * inserted2));
});
}
db.serialize();
db.all("SELECT txt, num, flt, blb FROM foo ORDER BY num", function(err, rows) {
if (err) throw err;
for (var i = 0; i < rows.length; i++) {
assert.equal(rows[i][0], 'String ' + i);
assert.equal(rows[i][1], i);
assert.equal(rows[i][2], i * Math.PI);
assert.equal(rows[i][3], null);
retrieved++;
}
});
beforeExit(function() {
assert.equal(count, inserted1 + inserted2, "Didn't insert all rows");
assert.equal(count, retrieved, "Didn't retrieve all rows");
});
}
exports['test serialize(fn)'] = function(beforeExit) {
var db = new sqlite3.Database(':memory:');
var inserted = 0;
var retrieved = 0;
var count = 1000;
db.serialize(function() {
db.run("CREATE TABLE foo (txt text, num int, flt float, blb blob)");
var stmt = db.prepare("INSERT INTO foo VALUES(?, ?, ?, ?)");
for (var i = 0; i < count; i++) {
stmt.run('String ' + i, i, i * Math.PI, function(err) {
if (err) throw err;
inserted++;
});
}
db.all("SELECT txt, num, flt, blb FROM foo ORDER BY num", function(err, rows) {
if (err) throw err;
for (var i = 0; i < rows.length; i++) {
assert.equal(rows[i][0], 'String ' + i);
assert.equal(rows[i][1], i);
assert.equal(rows[i][2], i * Math.PI);
assert.equal(rows[i][3], null);
retrieved++;
}
});
});
beforeExit(function() {
assert.equal(count, inserted, "Didn't insert all rows");
assert.equal(count, retrieved, "Didn't retrieve all rows");
});
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment