Commit 0668424b by Konstantin Käfer

destroy statement objects as soon as we can

parent 1888eea2
......@@ -44,20 +44,22 @@ void Database::Init(v8::Handle<Object> target) {
constructor_template->GetFunction());
}
void Database::Process(Database* db) {
if (!db->open && db->locked && !db->queue.empty()) {
void Database::Process() {
if (!open && locked && !queue.empty()) {
EXCEPTION(String::New("Database handle is closed"), SQLITE_MISUSE, exception);
Local<Value> argv[] = { exception };
bool called = false;
// Call all callbacks with the error object.
while (!db->queue.empty()) {
Call* call = db->queue.front();
while (!queue.empty()) {
Call* call = queue.front();
if (!call->baton->callback.IsEmpty()) {
TRY_CATCH_CALL(db->handle_, call->baton->callback, 1, argv);
TRY_CATCH_CALL(handle_, call->baton->callback, 1, argv);
called = true;
}
db->queue.pop();
queue.pop();
// We don't call the actual callback, so we have to make sure that
// the baton gets destroyed.
delete call->baton;
delete call;
}
......@@ -66,41 +68,40 @@ void Database::Process(Database* db) {
// Database object.
if (!called) {
Local<Value> args[] = { String::NewSymbol("error"), exception };
EMIT_EVENT(db->handle_, 2, args);
EMIT_EVENT(handle_, 2, args);
}
return;
}
while (db->open && !db->locked && !db->queue.empty()) {
Call* call = db->queue.front();
while (open && !locked && !queue.empty()) {
Call* call = queue.front();
if (call->exclusive && db->pending > 0) {
if (call->exclusive && pending > 0) {
break;
}
call->callback(call->baton);
db->queue.pop();
queue.pop();
delete call;
}
}
inline void Database::Schedule(Database* db, EIO_Callback callback, Baton* baton,
bool exclusive = false) {
if (!db->open && db->locked) {
void Database::Schedule(EIO_Callback callback, Baton* baton, bool exclusive = false) {
if (!open && locked) {
EXCEPTION(String::New("Database is closed"), SQLITE_MISUSE, exception);
if (!baton->callback.IsEmpty()) {
Local<Value> argv[] = { exception };
TRY_CATCH_CALL(db->handle_, baton->callback, 1, argv);
TRY_CATCH_CALL(handle_, baton->callback, 1, argv);
}
else {
Local<Value> argv[] = { String::NewSymbol("error"), exception };
EMIT_EVENT(db->handle_, 2, argv);
EMIT_EVENT(handle_, 2, argv);
}
return;
}
if (!db->open || db->locked || (exclusive && db->pending > 0)) {
db->queue.push(new Call(callback, baton, exclusive));
if (!open || locked || (exclusive && pending > 0)) {
queue.push(new Call(callback, baton, exclusive));
}
else {
callback(baton);
......@@ -193,11 +194,10 @@ int Database::EIO_AfterOpen(eio_req *req) {
if (db->open) {
Local<Value> args[] = { String::NewSymbol("open") };
EMIT_EVENT(db->handle_, 1, args);
Process(db);
db->Process();
}
delete baton;
return 0;
}
......@@ -207,7 +207,7 @@ Handle<Value> Database::Close(const Arguments& args) {
OPTIONAL_ARGUMENT_FUNCTION(0, callback);
Baton* baton = new Baton(db, callback);
Schedule(db, EIO_BeginClose, baton, true);
db->Schedule(EIO_BeginClose, baton, true);
return args.This();
}
......@@ -264,11 +264,10 @@ int Database::EIO_AfterClose(eio_req *req) {
if (!db->open) {
Local<Value> args[] = { String::NewSymbol("close"), argv[0] };
EMIT_EVENT(db->handle_, 1, args);
Process(db);
db->Process();
}
delete baton;
return 0;
}
......
......@@ -97,9 +97,8 @@ protected:
static int EIO_Open(eio_req *req);
static int EIO_AfterOpen(eio_req *req);
static void Schedule(Database* db, EIO_Callback callback, Baton* baton,
bool exclusive);
static void Process(Database* db);
void Schedule(EIO_Callback callback, Baton* baton, bool exclusive);
void Process();
static Handle<Value> Close(const Arguments& args);
static void EIO_BeginClose(Baton* baton);
......
......@@ -34,10 +34,39 @@ void Statement::Init(v8::Handle<Object> target) {
constructor_template->InstanceTemplate()->SetInternalFieldCount(1);
constructor_template->SetClassName(String::NewSymbol("Statement"));
NODE_SET_PROTOTYPE_METHOD(constructor_template, "finalize", Finalize);
target->Set(v8::String::NewSymbol("Statement"),
constructor_template->GetFunction());
}
void Statement::Process() {
if (finalized && !queue.empty()) {
return CleanQueue();
}
while (prepared && !locked && !queue.empty()) {
Call* call = queue.front();
queue.pop();
call->callback(call->baton);
delete call;
}
}
void Statement::Schedule(EIO_Callback callback, Baton* baton) {
if (finalized) {
queue.push(new Call(callback, baton));
CleanQueue();
}
else if (!prepared || locked) {
queue.push(new Call(callback, baton));
}
else {
callback(baton);
}
}
// { Database db, String sql, Array params, Function callback }
Handle<Value> Statement::New(const Arguments& args) {
HandleScope scope;
......@@ -85,10 +114,9 @@ Handle<Value> Statement::New(const Arguments& args) {
Statement* stmt = new Statement(db);
stmt->Wrap(args.This());
PrepareBaton* baton = new PrepareBaton(db, Local<Function>::Cast(args[3]));
baton->stmt = stmt;
PrepareBaton* baton = new PrepareBaton(db, Local<Function>::Cast(args[3]), stmt);
baton->sql = std::string(*String::Utf8Value(sql));
Database::Schedule(db, EIO_BeginPrepare, baton, false);
db->Schedule(EIO_BeginPrepare, baton, false);
return args.This();
}
......@@ -97,8 +125,6 @@ Handle<Value> Statement::New(const Arguments& args) {
void Statement::EIO_BeginPrepare(Database::Baton* baton) {
assert(baton->db->open);
assert(!baton->db->locked);
static_cast<PrepareBaton*>(baton)->stmt->Ref();
ev_ref(EV_DEFAULT_UC);
fprintf(stderr, "Prepare started\n");
eio_custom(EIO_Prepare, EIO_PRI_DEFAULT, EIO_AfterPrepare, baton);
}
......@@ -108,6 +134,8 @@ int Statement::EIO_Prepare(eio_req *req) {
Database* db = baton->db;
Statement* stmt = baton->stmt;
// In case preparing fails, we use a mutex to make sure we get the associated
// error message.
sqlite3_mutex* mtx = sqlite3_db_mutex(db->handle);
sqlite3_mutex_enter(mtx);
......@@ -135,94 +163,118 @@ int Statement::EIO_AfterPrepare(eio_req *req) {
Database* db = baton->db;
Statement* stmt = baton->stmt;
stmt->Unref();
ev_unref(EV_DEFAULT_UC);
Local<Value> argv[1];
if (baton->status != SQLITE_OK) {
EXCEPTION(String::New(baton->message.c_str()), baton->status, exception);
argv[0] = exception;
}
else {
stmt->prepared = true;
argv[0] = Local<Value>::New(Null());
}
// Local<Value> argv[1];
// if (baton->status != SQLITE_OK) {
// EXCEPTION(String::New(baton->message), baton->status, exception);
// argv[0] = exception;
// }
// else {
// db->open = false;
// // Leave db->locked to indicate that this db object has reached
// // the end of its life.
// argv[0] = Local<Value>::New(Null());
// }
//
// // Fire callbacks.
// if (!baton->callback.IsEmpty()) {
// TRY_CATCH_CALL(db->handle_, baton->callback, 1, argv);
// }
// else if (db->open) {
// Local<Value> args[] = { String::NewSymbol("error"), argv[0] };
// EMIT_EVENT(db->handle_, 2, args);
// }
//
// if (!db->open) {
// Local<Value> args[] = { String::NewSymbol("close"), argv[0] };
// EMIT_EVENT(db->handle_, 1, args);
// Process(db);
// }
// Fire callbacks.
if (!baton->callback.IsEmpty()) {
TRY_CATCH_CALL(stmt->handle_, baton->callback, 1, argv);
}
else {
Local<Value> args[] = { String::NewSymbol("error"), argv[0] };
EMIT_EVENT(stmt->handle_, 2, args);
}
fprintf(stderr, "Prepare completed\n");
db->Process();
// V8::AdjustAmountOfExternalAllocatedMemory(10000000);
if (stmt->prepared) {
stmt->Process();
}
else {
stmt->Finalize();
}
Database::Process(db);
delete baton;
return 0;
}
/**
* Override this so that we can properly finalize the statement when it
* gets garbage collected.
*/
void Statement::Wrap(Handle<Object> handle) {
assert(handle_.IsEmpty());
assert(handle->InternalFieldCount() > 0);
handle_ = Persistent<Object>::New(handle);
handle_->SetPointerInInternalField(0, this);
handle_.MakeWeak(this, Destruct);
}
inline void Statement::MakeWeak (void) {
handle_.MakeWeak(this, Destruct);
}
void Statement::Unref() {
assert(!handle_.IsEmpty());
assert(!handle_.IsWeak());
assert(refs_ > 0);
if (--refs_ == 0) { MakeWeak(); }
Handle<Value> Statement::Finalize(const Arguments& args) {
HandleScope scope;
Statement* stmt = ObjectWrap::Unwrap<Statement>(args.This());
OPTIONAL_ARGUMENT_FUNCTION(0, callback);
Baton* baton = new Baton(stmt, callback);
stmt->Schedule(Finalize, baton);
return scope.Close(stmt->db->handle_);
}
void Statement::Destruct(Persistent<Value> value, void *data) {
Statement* stmt = static_cast<Statement*>(data);
if (stmt->handle) {
eio_custom(EIO_Destruct, EIO_PRI_DEFAULT, EIO_AfterDestruct, stmt);
ev_ref(EV_DEFAULT_UC);
}
else {
delete stmt;
void Statement::Finalize(Baton* baton) {
baton->stmt->Finalize();
// Fire callback in case there was one.
if (!baton->callback.IsEmpty()) {
TRY_CATCH_CALL(baton->stmt->handle_, baton->callback, 0, NULL);
}
delete baton;
}
int Statement::EIO_Destruct(eio_req *req) {
Statement* stmt = static_cast<Statement*>(req->data);
void Statement::Finalize() {
assert(!finalized);
finalized = true;
fprintf(stderr, "Statement destruct\n");
CleanQueue();
// Finalize returns the status code of the last operation. We already fired
// error events in case those failed.
sqlite3_finalize(handle);
handle = NULL;
db->pending--;
db->Process();
db->Unref();
}
fprintf(stderr, "Auto-Finalizing handle\n");
sqlite3_finalize(stmt->handle);
stmt->handle = NULL;
void Statement::CleanQueue() {
if (prepared && !queue.empty()) {
// This statement has already been prepared and is now finalized.
// Fire error for all remaining items in the queue.
EXCEPTION(String::New("Statement is already finalized"), SQLITE_MISUSE, exception);
Local<Value> argv[] = { exception };
bool called = false;
// Clear out the queue so that this object can get GC'ed.
while (!queue.empty()) {
Call* call = queue.front();
queue.pop();
if (prepared && !call->baton->callback.IsEmpty()) {
TRY_CATCH_CALL(handle_, call->baton->callback, 1, argv);
called = true;
}
return 0;
}
// We don't call the actual callback, so we have to make sure that
// the baton gets destroyed.
delete call->baton;
delete call;
}
int Statement::EIO_AfterDestruct(eio_req *req) {
Statement* stmt = static_cast<Statement*>(req->data);
ev_unref(EV_DEFAULT_UC);
delete stmt;
return 0;
// When we couldn't call a callback function, emit an error on the
// Statement object.
if (!called) {
Local<Value> args[] = { String::NewSymbol("error"), exception };
EMIT_EVENT(handle_, 2, args);
}
}
else while (!queue.empty()) {
// Just delete all items in the queue; we already fired an event when
// preparing the statement failed.
Call* call = queue.front();
queue.pop();
// We don't call the actual callback, so we have to make sure that
// the baton gets destroyed.
delete call->baton;
delete call;
}
}
......@@ -29,16 +29,6 @@
using namespace v8;
using namespace node;
class Statement;
static struct PrepareBaton : Database::Baton {
Statement* stmt;
std::string sql;
PrepareBaton(Database* db_, Handle<Function> cb_) : Baton(db_, cb_) {}
};
class Statement : public EventEmitter {
public:
static Persistent<FunctionTemplate> constructor_template;
......@@ -46,18 +36,56 @@ public:
static void Init(Handle<Object> target);
static Handle<Value> New(const Arguments& args);
Statement(Database* db_) : EventEmitter() {
db = db_;
static struct Baton {
Statement* stmt;
Persistent<Function> callback;
int status;
std::string message;
Baton(Statement* stmt_, Handle<Function> cb_) : stmt(stmt_) {
stmt->Ref();
ev_ref(EV_DEFAULT_UC);
callback = Persistent<Function>::New(cb_);
}
~Baton() {
stmt->Unref();
ev_unref(EV_DEFAULT_UC);
callback.Dispose();
}
};
static struct PrepareBaton : Database::Baton {
Statement* stmt;
std::string sql;
PrepareBaton(Database* db_, Handle<Function> cb_, Statement* stmt_) :
Baton(db_, cb_), stmt(stmt_) {
stmt->Ref();
}
~PrepareBaton() {
stmt->Unref();
}
};
typedef void (*EIO_Callback)(Baton* baton);
struct Call {
Call(EIO_Callback cb_, Baton* baton_) : callback(cb_), baton(baton_) {};
EIO_Callback callback;
Baton* baton;
};
Statement(Database* db_) : EventEmitter(),
db(db_),
handle(NULL),
prepared(false),
locked(false),
finalized(false) {
db->pending++;
db->Ref();
}
~Statement() {
fprintf(stderr, "Deleted Statement\n");
assert(handle == NULL);
db->pending--;
Database::Process(db);
db->Unref();
if (!finalized) Finalize();
}
protected:
......@@ -65,17 +93,24 @@ protected:
static int EIO_Prepare(eio_req *req);
static int EIO_AfterPrepare(eio_req *req);
void Wrap (Handle<Object> handle);
inline void MakeWeak();
virtual void Unref();
static void Destruct(Persistent<Value> value, void *data);
static int EIO_Destruct(eio_req *req);
static int EIO_AfterDestruct(eio_req *req);
void Schedule(EIO_Callback callback, Baton* baton);
void Process();
void CleanQueue();
static Handle<Value> Finalize(const Arguments& args);
static void Finalize(Baton* baton);
void Finalize();
protected:
Database* db;
sqlite3_stmt* handle;
bool prepared;
bool locked;
bool finalized;
std::queue<Call*> queue;
};
#endif
......@@ -18,7 +18,7 @@ exports['Blob overflow test'] = function(beforeExit) {
function() {
var next = this;
db.prepare('CREATE TABLE elmos (image BLOB);', function(err, statement) {
assert.isUndefined(err);
assert.ok(!err);
statement.step(next);
});
},
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment