Skip to content

Commit 048124c

Browse files
committed
Merge pull request #130 from Sage/reader
Reader
2 parents 07f03a5 + cf1d7ae commit 048124c

File tree

8 files changed

+102
-73
lines changed

8 files changed

+102
-73
lines changed

lib/oracle.js

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,36 @@ exports.OCCITIMESTAMP = 7;
3535
exports.OCCINUMBER = 8;
3636
exports.OCCIBLOB = 9;
3737

38-
// Reader.prototype.nextRow is implemented in JS rather than C++.
39-
// This is easier and also more efficient because we don't cross the JS/C++ boundary every time
40-
// we read a record.
41-
bindings.Reader.prototype.nextRow = function(cb) {
38+
// Reader is implemented in JS around a C++ handle
39+
// This is easier and also more efficient because we don't cross the JS/C++ boundary
40+
// every time we read a record.
41+
function Reader(handle) {
42+
this._handle = handle;
43+
this._error = null;
44+
this._rows = [];
45+
}
46+
47+
Reader.prototype.nextRows = function() {
48+
this._handle.nextRows.apply(this._handle, arguments);
49+
}
50+
51+
Reader.prototype.nextRow = function(cb) {
4252
var self = this;
43-
if (self._error || (self._rows && self._rows.length > 0)) {
53+
if (!self._handle || self._error || (self._rows && self._rows.length > 0)) {
4454
process.nextTick(function() {
4555
cb(self._error, self._rows && self._rows.shift());
4656
});
4757
} else {
4858
// nextRows willl use the prefetch row count as window size
49-
self.nextRows(function(err, result) {
59+
self._handle.nextRows(function(err, result) {
5060
self._error = err || self._error;
5161
self._rows = result;
62+
if (err || result.length === 0) self._handle = null;
5263
cb(self._error, self._rows && self._rows.shift());
5364
});
5465
}
5566
};
67+
68+
bindings.Connection.prototype.reader = function(sql, args) {
69+
return new Reader(this.readerHandle(sql, args));
70+
}

src/connection.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ void Connection::Init(Handle<Object> target) {
2222

2323
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "execute", Execute);
2424
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "executeSync", ExecuteSync);
25-
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "reader", CreateReader);
25+
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "readerHandle", CreateReader);
2626
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "close", Close);
2727
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "isConnected", IsConnected);
2828
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "setAutoCommit", SetAutoCommit);
@@ -391,7 +391,7 @@ void Connection::EIO_AfterCommit(uv_work_t* req, int status) {
391391
argv[0] = Undefined();
392392
node::MakeCallback(Context::GetCurrent()->Global(), uni::Deref(baton->callback), 2, argv);
393393
delete baton;
394-
394+
delete req;
395395
}
396396

397397
void Connection::EIO_Rollback(uv_work_t* req) {
@@ -410,7 +410,7 @@ void Connection::EIO_AfterRollback(uv_work_t* req, int status) {
410410
argv[0] = Undefined();
411411
node::MakeCallback(Context::GetCurrent()->Global(), uni::Deref(baton->callback), 2, argv);
412412
delete baton;
413-
413+
delete req;
414414
}
415415

416416
void Connection::EIO_Execute(uv_work_t* req) {
@@ -705,6 +705,7 @@ void Connection::EIO_AfterExecute(uv_work_t* req, int status) {
705705
}
706706

707707
delete baton;
708+
delete req;
708709
}
709710

710711
void Connection::handleResult(ExecuteBaton* baton, Handle<Value> (&argv)[2]) {

src/connection.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ namespace uni {
3636
Handle<T> Deref(Persistent<T>& handle) {
3737
return Handle<T>::New(Isolate::GetCurrent(), handle);
3838
}
39+
template <class T>
40+
Local<T> HandleToLocal(Handle<T> handle) {
41+
return handle;
42+
}
3943
inline Handle<Value> BufferToHandle(BufferType buf) {
4044
return buf;
4145
}
@@ -57,6 +61,10 @@ namespace uni {
5761
Handle<T> Deref(Persistent<T>& handle) {
5862
return Local<T>::New(handle);
5963
}
64+
template <class T>
65+
Local<T> HandleToLocal(Handle<T> handle) {
66+
return Local<T>::New(handle);
67+
}
6068
inline Handle<Value> BufferToHandle(BufferType buf) {
6169
return buf->handle_;
6270
}

src/executeBaton.cpp

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,7 @@ ExecuteBaton::~ExecuteBaton() {
3333
delete val;
3434
}
3535

36-
if(rows) {
37-
for (std::vector<row_t*>::iterator iterator = rows->begin(), end = rows->end(); iterator != end; ++iterator) {
38-
row_t* currentRow = *iterator;
39-
delete currentRow;
40-
}
41-
42-
delete rows;
43-
}
36+
ResetRows();
4437

4538
if(outputs) {
4639
for (std::vector<output_t*>::iterator iterator = outputs->begin(), end = outputs->end(); iterator != end; ++iterator) {
@@ -53,6 +46,18 @@ ExecuteBaton::~ExecuteBaton() {
5346
if(error) delete error;
5447
}
5548

49+
void ExecuteBaton::ResetRows() {
50+
if(rows) {
51+
for (std::vector<row_t*>::iterator iterator = rows->begin(), end = rows->end(); iterator != end; ++iterator) {
52+
row_t* currentRow = *iterator;
53+
delete currentRow;
54+
}
55+
56+
delete rows;
57+
rows = NULL;
58+
}
59+
}
60+
5661
double CallDateMethod(v8::Local<v8::Date> date, const char* methodName) {
5762
Handle<Value> args[1]; // should be zero but on windows the compiler will not allow a zero length array
5863
Local<Value> result = Local<Function>::Cast(date->Get(String::New(methodName)))->Call(date, 0, args);

src/executeBaton.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class ExecuteBaton {
6161
public:
6262
ExecuteBaton(Connection* connection, const char* sql, v8::Local<v8::Array>* values, v8::Handle<v8::Function>* callback);
6363
~ExecuteBaton();
64+
void ResetRows();
6465

6566
Connection *connection;
6667
v8::Persistent<v8::Function> callback;

src/oracle_bindings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ void OracleClient::EIO_AfterConnect(uv_work_t* req, int status) {
148148
node::MakeCallback(Context::GetCurrent()->Global(), uni::Deref(baton->callback), 2, argv);
149149

150150
delete baton;
151+
delete req;
151152
}
152153

153154
uni::CallbackType OracleClient::ConnectSync(const uni::FunctionCallbackInfo& args) {

src/reader.cpp

Lines changed: 44 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,20 @@ uni::CallbackType Reader::NextRows(const uni::FunctionCallbackInfo& args) {
4848
Local<String> message = String::New(baton->error->c_str());
4949
UNI_THROW(Exception::Error(message));
5050
}
51+
if (baton->busy) {
52+
UNI_THROW(Exception::Error(String::New("invalid state: reader is busy with another nextRows call")));
53+
}
54+
baton->busy = true;
5155

5256
if (args.Length() > 1) {
5357
REQ_INT_ARG(0, count);
5458
REQ_FUN_ARG(1, callback);
5559
baton->count = count;
56-
uni::Reset(baton->nextRowsCallback, callback);
60+
uni::Reset(baton->callback, callback);
5761
} else {
5862
REQ_FUN_ARG(0, callback);
5963
baton->count = baton->connection->getPrefetchRowCount();
60-
uni::Reset(baton->nextRowsCallback, callback);
64+
uni::Reset(baton->callback, callback);
6165
}
6266
if (baton->count <= 0) baton->count = 1;
6367

@@ -72,77 +76,64 @@ uni::CallbackType Reader::NextRows(const uni::FunctionCallbackInfo& args) {
7276
void Reader::EIO_NextRows(uv_work_t* req) {
7377
ReaderBaton* baton = static_cast<ReaderBaton*>(req->data);
7478

75-
baton->rows = NULL;
76-
baton->error = NULL;
79+
baton->rows = new vector<row_t*>();
80+
if (baton->done) return;
7781

78-
try {
79-
if(! baton->connection->getConnection()) {
80-
baton->error = new std::string("Connection already closed");
81-
return;
82-
}
83-
if (!baton->rs) {
82+
if (!baton->connection->getConnection()) {
83+
baton->error = new std::string("Connection already closed");
84+
return;
85+
}
86+
if (!baton->rs) {
87+
try {
8488
baton->stmt = baton->connection->getConnection()->createStatement(baton->sql);
8589
baton->stmt->setAutoCommit(baton->connection->getAutoCommit());
8690
baton->stmt->setPrefetchRowCount(baton->count);
8791
Connection::SetValuesOnStatement(baton->stmt, baton);
88-
if (baton->error) goto cleanup;
92+
if (baton->error) return;
8993

9094
int status = baton->stmt->execute();
91-
if(status != oracle::occi::Statement::RESULT_SET_AVAILABLE) {
95+
if (status != oracle::occi::Statement::RESULT_SET_AVAILABLE) {
9296
baton->error = new std::string("Connection already closed");
9397
return;
9498
}
9599
baton->rs = baton->stmt->getResultSet();
96-
Connection::CreateColumnsFromResultSet(baton->rs, baton, baton->columns);
97-
if (baton->error) goto cleanup;
98-
}
99-
baton->rows = new vector<row_t*>();
100-
101-
for (int i = 0; i < baton->count && baton->rs->next(); i++) {
102-
row_t* row = Connection::CreateRowFromCurrentResultSetRow(baton->rs, baton, baton->columns);
103-
if (baton->error) goto cleanup;
104-
baton->rows->push_back(row);
100+
} catch (oracle::occi::SQLException &ex) {
101+
baton->error = new string(ex.getMessage());
102+
return;
105103
}
106-
} catch(oracle::occi::SQLException &ex) {
107-
baton->error = new string(ex.getMessage());
108-
} catch (const exception& ex) {
109-
baton->error = new string(ex.what());
110-
} catch (...) {
111-
baton->error = new string("Unknown Error");
104+
Connection::CreateColumnsFromResultSet(baton->rs, baton, baton->columns);
105+
if (baton->error) return;
112106
}
113-
cleanup:
114-
// nothing for now, cleanup happens in destructor
115-
;
116-
}
117-
118-
#if NODE_MODULE_VERSION >= 0x000D
119-
void ReaderWeakReferenceCallback(Isolate* isolate, v8::Persistent<v8::Function>* callback, void* dummy)
120-
{
121-
callback->Dispose();
122-
}
123-
#else
124-
void ReaderWeakReferenceCallback(v8::Persistent<v8::Value> callback, void* dummy)
125-
{
126-
(Persistent<Function>(Function::Cast(*callback))).Dispose();
107+
for (int i = 0; i < baton->count && baton->rs->next(); i++) {
108+
row_t* row = Connection::CreateRowFromCurrentResultSetRow(baton->rs, baton, baton->columns);
109+
if (baton->error) return;
110+
baton->rows->push_back(row);
111+
}
112+
if (baton->rows->size() < (size_t)baton->count) baton->done = true;
127113
}
128-
#endif
129114

130115
void Reader::EIO_AfterNextRows(uv_work_t* req, int status) {
131116
UNI_SCOPE(scope);
132117
ReaderBaton* baton = static_cast<ReaderBaton*>(req->data);
133118

119+
baton->busy = false;
134120
baton->connection->Unref();
135-
136-
try {
137-
Handle<Value> argv[2];
138-
Connection::handleResult(baton, argv);
139-
node::MakeCallback(Context::GetCurrent()->Global(), uni::Deref(baton->nextRowsCallback), 2, argv);
140-
} catch(const exception &ex) {
141-
Handle<Value> argv[2];
142-
argv[0] = Exception::Error(String::New(ex.what()));
143-
argv[1] = Undefined();
144-
node::MakeCallback(Context::GetCurrent()->Global(), uni::Deref(baton->nextRowsCallback), 2, argv);
121+
// transfer callback to local and dispose persistent handle
122+
// must be done before invoking callback because callback may set another callback into baton->callback
123+
Local<Function> cb = uni::HandleToLocal(uni::Deref(baton->callback));
124+
baton->callback.Dispose();
125+
baton->callback.Clear();
126+
127+
Handle<Value> argv[2];
128+
Connection::handleResult(baton, argv);
129+
node::MakeCallback(Context::GetCurrent()->Global(), cb, 2, argv);
130+
131+
baton->ResetRows();
132+
if (baton->done || baton->error) {
133+
// free occi resources so that we don't run out of cursors if gc is not fast enough
134+
// reader destructor will delete the baton and everything else.
135+
baton->ResetStatement();
145136
}
146-
baton->nextRowsCallback.MakeWeak((void*)NULL, ReaderWeakReferenceCallback);
137+
delete req;
147138
}
148139

src/readerBaton.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,16 @@ class ReaderBaton : public ExecuteBaton {
1010
ReaderBaton(Connection* connection, const char* sql, v8::Local<v8::Array>* values) : ExecuteBaton(connection, sql, values, NULL) {
1111
stmt = NULL;
1212
rs = NULL;
13+
done = false;
14+
busy = false;
1315
}
1416
~ReaderBaton() {
15-
if(stmt && rs) {
16-
stmt->closeResultSet(rs);
17+
ResetStatement();
18+
}
19+
20+
void ResetStatement() {
21+
if(stmt && rs) {
22+
stmt->closeResultSet(rs);
1723
rs = NULL;
1824
}
1925
if(stmt) {
@@ -24,10 +30,11 @@ class ReaderBaton : public ExecuteBaton {
2430
}
2531
}
2632

27-
v8::Persistent<v8::Function> nextRowsCallback;
2833
oracle::occi::Statement* stmt;
2934
oracle::occi::ResultSet* rs;
3035
int count;
36+
bool done;
37+
bool busy;
3138
};
3239

3340
#endif

0 commit comments

Comments
 (0)