Commit c1b8cf36 by Konstantin Käfer

convert eio_custom to uv_work_requests

parent 8303094e
......@@ -77,7 +77,7 @@ void Database::Process() {
}
}
void Database::Schedule(EIO_Callback callback, Baton* baton, bool exclusive) {
void Database::Schedule(Work_Callback callback, Baton* baton, bool exclusive) {
if (!open && locked) {
EXCEPTION(String::New("Database is closed"), SQLITE_MISUSE, exception);
if (!baton->callback.IsEmpty() && baton->callback->IsFunction()) {
......@@ -130,16 +130,18 @@ Handle<Value> Database::New(const Arguments& args) {
// Start opening the database.
OpenBaton* baton = new OpenBaton(db, callback, *filename, SQLITE_OPEN_FULLMUTEX | mode);
EIO_BeginOpen(baton);
Work_BeginOpen(baton);
return args.This();
}
void Database::EIO_BeginOpen(Baton* baton) {
eio_custom(EIO_Open, EIO_PRI_DEFAULT, EIO_AfterOpen, baton);
void Database::Work_BeginOpen(Baton* baton) {
int status = uv_queue_work(uv_default_loop(),
&baton->request, Work_Open, Work_AfterOpen);
assert(status == 0);
}
void Database::EIO_Open(eio_req *req) {
void Database::Work_Open(uv_work_t* req) {
OpenBaton* baton = static_cast<OpenBaton*>(req->data);
Database* db = baton->db;
......@@ -161,7 +163,7 @@ void Database::EIO_Open(eio_req *req) {
}
}
int Database::EIO_AfterOpen(eio_req *req) {
void Database::Work_AfterOpen(uv_work_t* req) {
HandleScope scope;
OpenBaton* baton = static_cast<OpenBaton*>(req->data);
Database* db = baton->db;
......@@ -191,7 +193,6 @@ int Database::EIO_AfterOpen(eio_req *req) {
}
delete baton;
return 0;
}
Handle<Value> Database::OpenGetter(Local<String> str, const AccessorInfo& accessor) {
......@@ -206,22 +207,24 @@ Handle<Value> Database::Close(const Arguments& args) {
OPTIONAL_ARGUMENT_FUNCTION(0, callback);
Baton* baton = new Baton(db, callback);
db->Schedule(EIO_BeginClose, baton, true);
db->Schedule(Work_BeginClose, baton, true);
return args.This();
}
void Database::EIO_BeginClose(Baton* baton) {
void Database::Work_BeginClose(Baton* baton) {
assert(baton->db->locked);
assert(baton->db->open);
assert(baton->db->handle);
assert(baton->db->pending == 0);
baton->db->RemoveCallbacks();
eio_custom(EIO_Close, EIO_PRI_DEFAULT, EIO_AfterClose, baton);
int status = uv_queue_work(uv_default_loop(),
&baton->request, Work_Close, Work_AfterClose);
assert(status == 0);
}
void Database::EIO_Close(eio_req *req) {
void Database::Work_Close(uv_work_t* req) {
Baton* baton = static_cast<Baton*>(req->data);
Database* db = baton->db;
......@@ -235,7 +238,7 @@ void Database::EIO_Close(eio_req *req) {
}
}
int Database::EIO_AfterClose(eio_req *req) {
void Database::Work_AfterClose(uv_work_t* req) {
HandleScope scope;
Baton* baton = static_cast<Baton*>(req->data);
Database* db = baton->db;
......@@ -268,7 +271,6 @@ int Database::EIO_AfterClose(eio_req *req) {
}
delete baton;
return 0;
}
Handle<Value> Database::Serialize(const Arguments& args) {
......@@ -486,20 +488,22 @@ Handle<Value> Database::Exec(const Arguments& args) {
OPTIONAL_ARGUMENT_FUNCTION(1, callback);
Baton* baton = new ExecBaton(db, callback, *sql);
db->Schedule(EIO_BeginExec, baton, true);
db->Schedule(Work_BeginExec, baton, true);
return args.This();
}
void Database::EIO_BeginExec(Baton* baton) {
void Database::Work_BeginExec(Baton* baton) {
assert(baton->db->locked);
assert(baton->db->open);
assert(baton->db->handle);
assert(baton->db->pending == 0);
eio_custom(EIO_Exec, EIO_PRI_DEFAULT, EIO_AfterExec, baton);
int status = uv_queue_work(uv_default_loop(),
&baton->request, Work_Exec, Work_AfterExec);
assert(status == 0);
}
void Database::EIO_Exec(eio_req *req) {
void Database::Work_Exec(uv_work_t* req) {
ExecBaton* baton = static_cast<ExecBaton*>(req->data);
char* message = NULL;
......@@ -517,7 +521,7 @@ void Database::EIO_Exec(eio_req *req) {
}
}
int Database::EIO_AfterExec(eio_req *req) {
void Database::Work_AfterExec(uv_work_t* req) {
HandleScope scope;
ExecBaton* baton = static_cast<ExecBaton*>(req->data);
Database* db = baton->db;
......@@ -543,7 +547,6 @@ int Database::EIO_AfterExec(eio_req *req) {
db->Process();
delete baton;
return 0;
}
Handle<Value> Database::LoadExtension(const Arguments& args) {
......@@ -554,20 +557,22 @@ Handle<Value> Database::LoadExtension(const Arguments& args) {
OPTIONAL_ARGUMENT_FUNCTION(1, callback);
Baton* baton = new LoadExtensionBaton(db, callback, *filename);
db->Schedule(EIO_BeginLoadExtension, baton, true);
db->Schedule(Work_BeginLoadExtension, baton, true);
return args.This();
}
void Database::EIO_BeginLoadExtension(Baton* baton) {
void Database::Work_BeginLoadExtension(Baton* baton) {
assert(baton->db->locked);
assert(baton->db->open);
assert(baton->db->handle);
assert(baton->db->pending == 0);
eio_custom(EIO_LoadExtension, EIO_PRI_DEFAULT, EIO_AfterLoadExtension, baton);
int status = uv_queue_work(uv_default_loop(),
&baton->request, Work_LoadExtension, Work_AfterLoadExtension);
assert(status == 0);
}
void Database::EIO_LoadExtension(eio_req *req) {
void Database::Work_LoadExtension(uv_work_t* req) {
LoadExtensionBaton* baton = static_cast<LoadExtensionBaton*>(req->data);
sqlite3_enable_load_extension(baton->db->handle, 1);
......@@ -588,7 +593,7 @@ void Database::EIO_LoadExtension(eio_req *req) {
}
}
int Database::EIO_AfterLoadExtension(eio_req *req) {
void Database::Work_AfterLoadExtension(uv_work_t* req) {
HandleScope scope;
LoadExtensionBaton* baton = static_cast<LoadExtensionBaton*>(req->data);
Database* db = baton->db;
......@@ -613,7 +618,6 @@ int Database::EIO_AfterLoadExtension(eio_req *req) {
db->Process();
delete baton;
return 0;
}
void Database::RemoveCallbacks() {
......@@ -626,54 +630,3 @@ void Database::RemoveCallbacks() {
debug_profile = NULL;
}
}
/**
* Override this so that we can properly close the database when this object
* gets garbage collected.
*/
void Database::Wrap(Handle<Object> handle) {
assert(handle_.IsEmpty());
assert(handle->InternalFieldCount() > 0);
handle_ = Persistent<Object>::New(handle);
handle_->SetPointerInInternalField(0, this);
handle_.MakeWeak(this, Destruct);
}
inline void Database::MakeWeak (void) {
handle_.MakeWeak(this, Destruct);
}
void Database::Unref() {
assert(!handle_.IsEmpty());
assert(!handle_.IsWeak());
assert(refs_ > 0);
if (--refs_ == 0) { MakeWeak(); }
}
void Database::Destruct(Persistent<Value> value, void *data) {
Database* db = static_cast<Database*>(data);
db->RemoveCallbacks();
if (db->handle) {
eio_custom(EIO_Destruct, EIO_PRI_DEFAULT, EIO_AfterDestruct, db);
uv_ref(uv_default_loop());
}
else {
delete db;
}
}
void Database::EIO_Destruct(eio_req *req) {
Database* db = static_cast<Database*>(req->data);
sqlite3_close(db->handle);
db->handle = NULL;
}
int Database::EIO_AfterDestruct(eio_req *req) {
Database* db = static_cast<Database*>(req->data);
uv_unref(uv_default_loop());
delete db;
return 0;
}
......@@ -30,6 +30,7 @@ public:
}
struct Baton {
uv_work_t request;
Database* db;
Persistent<Function> callback;
int status;
......@@ -39,6 +40,7 @@ public:
db(db_), status(SQLITE_OK) {
db->Ref();
uv_ref(uv_default_loop());
request.data = this;
callback = Persistent<Function>::New(cb_);
}
virtual ~Baton() {
......@@ -67,12 +69,12 @@ public:
Baton(db_, cb_), filename(filename_) {}
};
typedef void (*EIO_Callback)(Baton* baton);
typedef void (*Work_Callback)(Baton* baton);
struct Call {
Call(EIO_Callback cb_, Baton* baton_, bool exclusive_ = false) :
Call(Work_Callback cb_, Baton* baton_, bool exclusive_ = false) :
callback(cb_), exclusive(exclusive_), baton(baton_) {};
EIO_Callback callback;
Work_Callback callback;
bool exclusive;
Baton* baton;
};
......@@ -88,7 +90,7 @@ public:
std::string table;
sqlite3_int64 rowid;
};
bool IsOpen() { return open; }
bool IsLocked() { return locked; }
......@@ -111,37 +113,36 @@ protected:
}
~Database() {
assert(handle == NULL);
if (debug_trace) {
delete debug_trace;
debug_trace = NULL;
}
RemoveCallbacks();
sqlite3_close(handle);
handle = NULL;
open = false;
}
static Handle<Value> New(const Arguments& args);
static void EIO_BeginOpen(Baton* baton);
static void EIO_Open(eio_req *req);
static int EIO_AfterOpen(eio_req *req);
static void Work_BeginOpen(Baton* baton);
static void Work_Open(uv_work_t* req);
static void Work_AfterOpen(uv_work_t* req);
static Handle<Value> OpenGetter(Local<String> str, const AccessorInfo& accessor);
void Schedule(EIO_Callback callback, Baton* baton, bool exclusive = false);
void Schedule(Work_Callback callback, Baton* baton, bool exclusive = false);
void Process();
static Handle<Value> Exec(const Arguments& args);
static void EIO_BeginExec(Baton* baton);
static void EIO_Exec(eio_req *req);
static int EIO_AfterExec(eio_req *req);
static void Work_BeginExec(Baton* baton);
static void Work_Exec(uv_work_t* req);
static void Work_AfterExec(uv_work_t* req);
static Handle<Value> Close(const Arguments& args);
static void EIO_BeginClose(Baton* baton);
static void EIO_Close(eio_req *req);
static int EIO_AfterClose(eio_req *req);
static void Work_BeginClose(Baton* baton);
static void Work_Close(uv_work_t* req);
static void Work_AfterClose(uv_work_t* req);
static Handle<Value> LoadExtension(const Arguments& args);
static void EIO_BeginLoadExtension(Baton* baton);
static void EIO_LoadExtension(eio_req *req);
static int EIO_AfterLoadExtension(eio_req *req);
static void Work_BeginLoadExtension(Baton* baton);
static void Work_LoadExtension(uv_work_t* req);
static void Work_AfterLoadExtension(uv_work_t* req);
static Handle<Value> Serialize(const Arguments& args);
static Handle<Value> Parallelize(const Arguments& args);
......@@ -163,12 +164,6 @@ protected:
static void UpdateCallback(Database* db, UpdateInfo* info);
void RemoveCallbacks();
void Wrap (Handle<Object> handle);
inline void MakeWeak();
virtual void Unref();
static void Destruct (Persistent<Value> value, void *data);
static void EIO_Destruct(eio_req *req);
static int EIO_AfterDestruct(eio_req *req);
protected:
sqlite3* handle;
......
......@@ -121,11 +121,11 @@ const char* sqlite_authorizer_string(int type);
FatalException(try_catch); \
} }
#define EIO_DEFINITION(name) \
#define WORK_DEFINITION(name) \
static Handle<Value> name(const Arguments& args); \
static void EIO_Begin##name(Baton* baton); \
static void EIO_##name(eio_req *req); \
static int EIO_After##name(eio_req *req);
static void Work_Begin##name(Baton* baton); \
static void Work_##name(uv_work_t* req); \
static void Work_After##name(uv_work_t* req);
#define STATEMENT_BEGIN(type) \
assert(baton); \
......@@ -135,7 +135,9 @@ const char* sqlite_authorizer_string(int type);
assert(baton->stmt->prepared); \
baton->stmt->locked = true; \
baton->stmt->db->pending++; \
eio_custom(EIO_##type, EIO_PRI_DEFAULT, EIO_After##type, baton);
int status = uv_queue_work(uv_default_loop(), \
&baton->request, Work_##type, Work_After##type); \
assert(status == 0);
#define STATEMENT_INIT(type) \
type* baton = static_cast<type*>(req->data); \
......
......@@ -46,7 +46,7 @@ void Statement::Process() {
}
}
void Statement::Schedule(EIO_Callback callback, Baton* baton) {
void Statement::Schedule(Work_Callback callback, Baton* baton) {
if (finalized) {
queue.push(new Call(callback, baton));
CleanQueue();
......@@ -110,18 +110,20 @@ Handle<Value> Statement::New(const Arguments& args) {
PrepareBaton* baton = new PrepareBaton(db, Local<Function>::Cast(args[2]), stmt);
baton->sql = std::string(*String::Utf8Value(sql));
db->Schedule(EIO_BeginPrepare, baton);
db->Schedule(Work_BeginPrepare, baton);
return args.This();
}
void Statement::EIO_BeginPrepare(Database::Baton* baton) {
void Statement::Work_BeginPrepare(Database::Baton* baton) {
assert(baton->db->open);
baton->db->pending++;
eio_custom(EIO_Prepare, EIO_PRI_DEFAULT, EIO_AfterPrepare, baton);
int status = uv_queue_work(uv_default_loop(),
&baton->request, Work_Prepare, Work_AfterPrepare);
assert(status == 0);
}
void Statement::EIO_Prepare(eio_req *req) {
void Statement::Work_Prepare(uv_work_t* req) {
STATEMENT_INIT(PrepareBaton);
// In case preparing fails, we use a mutex to make sure we get the associated
......@@ -145,7 +147,7 @@ void Statement::EIO_Prepare(eio_req *req) {
sqlite3_mutex_leave(mtx);
}
int Statement::EIO_AfterPrepare(eio_req *req) {
void Statement::Work_AfterPrepare(uv_work_t* req) {
HandleScope scope;
STATEMENT_INIT(PrepareBaton);
......@@ -162,7 +164,6 @@ int Statement::EIO_AfterPrepare(eio_req *req) {
}
STATEMENT_END();
return 0;
}
template <class T> Values::Field*
......@@ -315,16 +316,16 @@ Handle<Value> Statement::Bind(const Arguments& args) {
return ThrowException(Exception::Error(String::New("Data type is not supported")));
}
else {
stmt->Schedule(EIO_BeginBind, baton);
stmt->Schedule(Work_BeginBind, baton);
return args.This();
}
}
void Statement::EIO_BeginBind(Baton* baton) {
void Statement::Work_BeginBind(Baton* baton) {
STATEMENT_BEGIN(Bind);
}
void Statement::EIO_Bind(eio_req *req) {
void Statement::Work_Bind(uv_work_t* req) {
STATEMENT_INIT(Baton);
sqlite3_mutex* mtx = sqlite3_db_mutex(stmt->db->handle);
......@@ -333,7 +334,7 @@ void Statement::EIO_Bind(eio_req *req) {
sqlite3_mutex_leave(mtx);
}
int Statement::EIO_AfterBind(eio_req *req) {
void Statement::Work_AfterBind(uv_work_t* req) {
HandleScope scope;
STATEMENT_INIT(Baton);
......@@ -349,7 +350,6 @@ int Statement::EIO_AfterBind(eio_req *req) {
}
STATEMENT_END();
return 0;
}
......@@ -363,16 +363,16 @@ Handle<Value> Statement::Get(const Arguments& args) {
return ThrowException(Exception::Error(String::New("Data type is not supported")));
}
else {
stmt->Schedule(EIO_BeginGet, baton);
stmt->Schedule(Work_BeginGet, baton);
return args.This();
}
}
void Statement::EIO_BeginGet(Baton* baton) {
void Statement::Work_BeginGet(Baton* baton) {
STATEMENT_BEGIN(Get);
}
void Statement::EIO_Get(eio_req *req) {
void Statement::Work_Get(uv_work_t* req) {
STATEMENT_INIT(RowBaton);
if (stmt->status != SQLITE_DONE || baton->parameters.size()) {
......@@ -396,7 +396,7 @@ void Statement::EIO_Get(eio_req *req) {
}
}
int Statement::EIO_AfterGet(eio_req *req) {
void Statement::Work_AfterGet(uv_work_t* req) {
HandleScope scope;
STATEMENT_INIT(RowBaton);
......@@ -419,7 +419,6 @@ int Statement::EIO_AfterGet(eio_req *req) {
}
STATEMENT_END();
return 0;
}
Handle<Value> Statement::Run(const Arguments& args) {
......@@ -431,16 +430,16 @@ Handle<Value> Statement::Run(const Arguments& args) {
return ThrowException(Exception::Error(String::New("Data type is not supported")));
}
else {
stmt->Schedule(EIO_BeginRun, baton);
stmt->Schedule(Work_BeginRun, baton);
return args.This();
}
}
void Statement::EIO_BeginRun(Baton* baton) {
void Statement::Work_BeginRun(Baton* baton) {
STATEMENT_BEGIN(Run);
}
void Statement::EIO_Run(eio_req *req) {
void Statement::Work_Run(uv_work_t* req) {
STATEMENT_INIT(RunBaton);
sqlite3_mutex* mtx = sqlite3_db_mutex(stmt->db->handle);
......@@ -466,7 +465,7 @@ void Statement::EIO_Run(eio_req *req) {
sqlite3_mutex_leave(mtx);
}
int Statement::EIO_AfterRun(eio_req *req) {
void Statement::Work_AfterRun(uv_work_t* req) {
HandleScope scope;
STATEMENT_INIT(RunBaton);
......@@ -485,7 +484,6 @@ int Statement::EIO_AfterRun(eio_req *req) {
}
STATEMENT_END();
return 0;
}
Handle<Value> Statement::All(const Arguments& args) {
......@@ -497,16 +495,16 @@ Handle<Value> Statement::All(const Arguments& args) {
return ThrowException(Exception::Error(String::New("Data type is not supported")));
}
else {
stmt->Schedule(EIO_BeginAll, baton);
stmt->Schedule(Work_BeginAll, baton);
return args.This();
}
}
void Statement::EIO_BeginAll(Baton* baton) {
void Statement::Work_BeginAll(Baton* baton) {
STATEMENT_BEGIN(All);
}
void Statement::EIO_All(eio_req *req) {
void Statement::Work_All(uv_work_t* req) {
STATEMENT_INIT(RowsBaton);
sqlite3_mutex* mtx = sqlite3_db_mutex(stmt->db->handle);
......@@ -532,7 +530,7 @@ void Statement::EIO_All(eio_req *req) {
sqlite3_mutex_leave(mtx);
}
int Statement::EIO_AfterAll(eio_req *req) {
void Statement::Work_AfterAll(uv_work_t* req) {
HandleScope scope;
STATEMENT_INIT(RowsBaton);
......@@ -567,7 +565,6 @@ int Statement::EIO_AfterAll(eio_req *req) {
}
STATEMENT_END();
return 0;
}
Handle<Value> Statement::Each(const Arguments& args) {
......@@ -587,12 +584,12 @@ Handle<Value> Statement::Each(const Arguments& args) {
}
else {
baton->completed = Persistent<Function>::New(completed);
stmt->Schedule(EIO_BeginEach, baton);
stmt->Schedule(Work_BeginEach, baton);
return args.This();
}
}
void Statement::EIO_BeginEach(Baton* baton) {
void Statement::Work_BeginEach(Baton* baton) {
// Only create the Async object when we're actually going into
// the event loop. This prevents dangling events.
EachBaton* each_baton = static_cast<EachBaton*>(baton);
......@@ -603,7 +600,7 @@ void Statement::EIO_BeginEach(Baton* baton) {
STATEMENT_BEGIN(Each);
}
void Statement::EIO_Each(eio_req *req) {
void Statement::Work_Each(uv_work_t* req) {
STATEMENT_INIT(EachBaton);
Async* async = baton->async;
......@@ -698,7 +695,7 @@ void Statement::AsyncEach(uv_async_t* handle, int status) {
}
}
int Statement::EIO_AfterEach(eio_req *req) {
void Statement::Work_AfterEach(uv_work_t* req) {
HandleScope scope;
STATEMENT_INIT(EachBaton);
......@@ -707,7 +704,6 @@ int Statement::EIO_AfterEach(eio_req *req) {
}
STATEMENT_END();
return 0;
}
Handle<Value> Statement::Reset(const Arguments& args) {
......@@ -717,23 +713,23 @@ Handle<Value> Statement::Reset(const Arguments& args) {
OPTIONAL_ARGUMENT_FUNCTION(0, callback);
Baton* baton = new Baton(stmt, callback);
stmt->Schedule(EIO_BeginReset, baton);
stmt->Schedule(Work_BeginReset, baton);
return args.This();
}
void Statement::EIO_BeginReset(Baton* baton) {
void Statement::Work_BeginReset(Baton* baton) {
STATEMENT_BEGIN(Reset);
}
void Statement::EIO_Reset(eio_req *req) {
void Statement::Work_Reset(uv_work_t* req) {
STATEMENT_INIT(Baton);
sqlite3_reset(stmt->handle);
stmt->status = SQLITE_OK;
}
int Statement::EIO_AfterReset(eio_req *req) {
void Statement::Work_AfterReset(uv_work_t* req) {
HandleScope scope;
STATEMENT_INIT(Baton);
......@@ -744,7 +740,6 @@ int Statement::EIO_AfterReset(eio_req *req) {
}
STATEMENT_END();
return 0;
}
Local<Object> Statement::RowToJS(Row* row) {
......
......@@ -79,6 +79,7 @@ public:
static Handle<Value> New(const Arguments& args);
struct Baton {
uv_work_t request;
Statement* stmt;
Persistent<Function> callback;
Parameters parameters;
......@@ -86,6 +87,7 @@ public:
Baton(Statement* stmt_, Handle<Function> cb_) : stmt(stmt_) {
stmt->Ref();
uv_ref(uv_default_loop());
request.data = this;
callback = Persistent<Function>::New(cb_);
}
virtual ~Baton() {
......@@ -144,11 +146,11 @@ public:
}
};
typedef void (*EIO_Callback)(Baton* baton);
typedef void (*Work_Callback)(Baton* baton);
struct Call {
Call(EIO_Callback cb_, Baton* baton_) : callback(cb_), baton(baton_) {};
EIO_Callback callback;
Call(Work_Callback cb_, Baton* baton_) : callback(cb_), baton(baton_) {};
Work_Callback callback;
Baton* baton;
};
......@@ -196,16 +198,16 @@ public:
}
protected:
static void EIO_BeginPrepare(Database::Baton* baton);
static void EIO_Prepare(eio_req *req);
static int EIO_AfterPrepare(eio_req *req);
static void Work_BeginPrepare(Database::Baton* baton);
static void Work_Prepare(uv_work_t* req);
static void Work_AfterPrepare(uv_work_t* req);
EIO_DEFINITION(Bind);
EIO_DEFINITION(Get);
EIO_DEFINITION(Run);
EIO_DEFINITION(All);
EIO_DEFINITION(Each);
EIO_DEFINITION(Reset);
WORK_DEFINITION(Bind);
WORK_DEFINITION(Get);
WORK_DEFINITION(Run);
WORK_DEFINITION(All);
WORK_DEFINITION(Each);
WORK_DEFINITION(Reset);
static void AsyncEach(uv_async_t* handle, int status);
static void CloseCallback(uv_handle_t* handle);
......@@ -220,7 +222,7 @@ protected:
static void GetRow(Row* row, sqlite3_stmt* stmt);
static Local<Object> RowToJS(Row* row);
void Schedule(EIO_Callback callback, Baton* baton);
void Schedule(Work_Callback callback, Baton* baton);
void Process();
void CleanQueue();
template <class T> static void Error(T* baton);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment