Commit 25388bcd by Konstantin Käfer

fix race condition when baton was deleted to early

parent 1b0cf90f
...@@ -593,13 +593,19 @@ Handle<Value> Statement::Each(const Arguments& args) { ...@@ -593,13 +593,19 @@ Handle<Value> Statement::Each(const Arguments& args) {
} }
else { else {
baton->completed = Persistent<Function>::New(completed); baton->completed = Persistent<Function>::New(completed);
baton->async = new Async(stmt, baton, AsyncEach);
stmt->Schedule(EIO_BeginEach, baton); stmt->Schedule(EIO_BeginEach, baton);
return args.This(); return args.This();
} }
} }
void Statement::EIO_BeginEach(Baton* baton) { void Statement::EIO_BeginEach(Baton* baton) {
// Only create the Async object when we're actually going into
// the event loop. This prevents dangling events.
EachBaton* each_baton = static_cast<EachBaton*>(baton);
each_baton->async = new Async(each_baton->stmt, AsyncEach);
each_baton->async->item_cb = Persistent<Function>::New(each_baton->callback);
each_baton->async->completed_cb = Persistent<Function>::New(each_baton->completed);
STATEMENT_BEGIN(Each); STATEMENT_BEGIN(Each);
} }
...@@ -649,6 +655,7 @@ void Statement::EIO_Each(eio_req *req) { ...@@ -649,6 +655,7 @@ void Statement::EIO_Each(eio_req *req) {
void Statement::CloseCallback(uv_handle_t* handle) { void Statement::CloseCallback(uv_handle_t* handle) {
assert(handle != NULL); assert(handle != NULL);
assert(handle->data != NULL);
Async* async = static_cast<Async*>(handle->data); Async* async = static_cast<Async*>(handle->data);
delete async; delete async;
handle->data = NULL; handle->data = NULL;
...@@ -657,7 +664,6 @@ void Statement::CloseCallback(uv_handle_t* handle) { ...@@ -657,7 +664,6 @@ void Statement::CloseCallback(uv_handle_t* handle) {
void Statement::AsyncEach(uv_async_t* handle, int status) { void Statement::AsyncEach(uv_async_t* handle, int status) {
HandleScope scope; HandleScope scope;
Async* async = static_cast<Async*>(handle->data); Async* async = static_cast<Async*>(handle->data);
EachBaton* baton = async->baton;
while (true) { while (true) {
// Get the contents out of the data cache for us to process in the JS callback. // Get the contents out of the data cache for us to process in the JS callback.
...@@ -670,7 +676,7 @@ void Statement::AsyncEach(uv_async_t* handle, int status) { ...@@ -670,7 +676,7 @@ void Statement::AsyncEach(uv_async_t* handle, int status) {
break; break;
} }
if (!baton->callback.IsEmpty() && baton->callback->IsFunction()) { if (!async->item_cb.IsEmpty() && async->item_cb->IsFunction()) {
Local<Value> argv[2]; Local<Value> argv[2];
argv[0] = Local<Value>::New(Null()); argv[0] = Local<Value>::New(Null());
...@@ -679,20 +685,20 @@ void Statement::AsyncEach(uv_async_t* handle, int status) { ...@@ -679,20 +685,20 @@ void Statement::AsyncEach(uv_async_t* handle, int status) {
for (int i = 0; it < end; it++, i++) { for (int i = 0; it < end; it++, i++) {
argv[1] = RowToJS(*it); argv[1] = RowToJS(*it);
async->retrieved++; async->retrieved++;
TRY_CATCH_CALL(async->stmt->handle_, baton->callback, 2, argv); TRY_CATCH_CALL(async->stmt->handle_, async->item_cb, 2, argv);
delete *it; delete *it;
} }
} }
} }
if (async->completed) { if (async->completed) {
if (!baton->completed.IsEmpty() && if (!async->completed_cb.IsEmpty() &&
baton->completed->IsFunction()) { async->completed_cb->IsFunction()) {
Local<Value> argv[] = { Local<Value> argv[] = {
Local<Value>::New(Null()), Local<Value>::New(Null()),
Integer::New(async->retrieved) Integer::New(async->retrieved)
}; };
TRY_CATCH_CALL(async->stmt->handle_, baton->completed, 2, argv); TRY_CATCH_CALL(async->stmt->handle_, async->completed_cb, 2, argv);
} }
uv_close((uv_handle_t*)handle, CloseCallback); uv_close((uv_handle_t*)handle, CloseCallback);
} }
......
...@@ -88,7 +88,7 @@ public: ...@@ -88,7 +88,7 @@ public:
ev_ref(EV_DEFAULT_UC); ev_ref(EV_DEFAULT_UC);
callback = Persistent<Function>::New(cb_); callback = Persistent<Function>::New(cb_);
} }
~Baton() { virtual ~Baton() {
for (unsigned int i = 0; i < parameters.size(); i++) { for (unsigned int i = 0; i < parameters.size(); i++) {
Values::Field* field = parameters[i]; Values::Field* field = parameters[i];
DELETE_FIELD(field); DELETE_FIELD(field);
...@@ -124,7 +124,7 @@ public: ...@@ -124,7 +124,7 @@ public:
EachBaton(Statement* stmt_, Handle<Function> cb_) : EachBaton(Statement* stmt_, Handle<Function> cb_) :
Baton(stmt_, cb_) {} Baton(stmt_, cb_) {}
Persistent<Function> completed; Persistent<Function> completed;
Async* async; Async* async; // Isn't deleted when the baton is deleted.
}; };
struct PrepareBaton : Database::Baton { struct PrepareBaton : Database::Baton {
...@@ -155,14 +155,18 @@ public: ...@@ -155,14 +155,18 @@ public:
struct Async { struct Async {
uv_async_t watcher; uv_async_t watcher;
Statement* stmt; Statement* stmt;
EachBaton* baton;
Rows data; Rows data;
pthread_mutex_t mutex; pthread_mutex_t mutex;
bool completed; bool completed;
int retrieved; int retrieved;
Async(Statement* st, EachBaton* eb, uv_async_cb async_cb) : // Store the callbacks here because we don't have
stmt(st), baton(eb), completed(false), retrieved(0) { // access to the baton in the async callback.
Persistent<Function> item_cb;
Persistent<Function> completed_cb;
Async(Statement* st, uv_async_cb async_cb) :
stmt(st), completed(false), retrieved(0) {
watcher.data = this; watcher.data = this;
pthread_mutex_init(&mutex, NULL); pthread_mutex_init(&mutex, NULL);
stmt->Ref(); stmt->Ref();
...@@ -171,6 +175,8 @@ public: ...@@ -171,6 +175,8 @@ public:
~Async() { ~Async() {
stmt->Unref(); stmt->Unref();
item_cb.Dispose();
completed_cb.Dispose();
pthread_mutex_destroy(&mutex); pthread_mutex_destroy(&mutex);
} }
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment