@@ -7,6 +7,7 @@ class ImpalaQuery implements \ThriftSQLQuery {
7
7
private $ _handle ;
8
8
private $ _client ;
9
9
private $ _ready ;
10
+ private $ _lastResponse ;
10
11
11
12
public function __construct ( $ queryStr , $ client ) {
12
13
$ queryCleaner = new \ThriftSQL \Utils \QueryCleaner ();
@@ -19,14 +20,15 @@ public function __construct( $queryStr, $client ) {
19
20
}
20
21
21
22
public function wait () {
22
- $ sleeper = new \ThriftSQL \Utils \Sleeper ();
23
+ if ( $ this ->_ready ) {
24
+ return $ this ;
25
+ }
23
26
24
- // Wait for results
27
+ // Wait for query to be ready
28
+ $ sleeper = new \ThriftSQL \Utils \Sleeper ();
25
29
$ sleeper ->reset ();
26
30
do {
27
-
28
31
$ slept = $ sleeper ->sleep ()->getSleptSecs ();
29
-
30
32
if ( $ slept > 18000 ) { // 5 Hours
31
33
// TODO: Actually kill the query then throw exception.
32
34
throw new \ThriftSQL \Exception ( 'Impala Query Killed! ' );
@@ -51,35 +53,47 @@ public function wait() {
51
53
52
54
} while (true );
53
55
56
+ // Wait for results by fetching some rows -- triggers query to run
57
+ $ this ->_fetchResponse ( 2 );
58
+
54
59
$ this ->_ready = true ;
55
60
return $ this ;
56
61
}
57
62
58
63
public function fetch ( $ maxRows ) {
64
+ $ result = array ();
59
65
if ( !$ this ->_ready ) {
60
66
throw new \ThriftSQL \Exception ( "Query is not ready. Call `->wait()` before `->fetch()` " );
61
67
}
62
68
try {
63
- $ sleeper = new \ThriftSQL \Utils \Sleeper ();
64
- $ sleeper ->reset ();
65
-
66
- do {
67
- $ response = $ this ->_client ->fetch ( $ this ->_handle , false , $ maxRows );
68
- if ( $ response ->ready ) {
69
- break ;
70
- }
71
- $ slept = $ sleeper ->sleep ()->getSleptSecs ();
72
-
73
- if ( $ slept > 60 ) { // 1 minute
74
- throw new \ThriftSQL \Exception ( 'Impala Query took too long to fetch! ' );
75
- }
76
-
77
- } while ( true );
69
+ if ( !( $ this ->_lastResponse instanceof \ThriftSQL \Results ) ) {
70
+ $ this ->_fetchResponse ( $ maxRows );
71
+ }
78
72
79
- return $ this ->_parseResponse ( $ response );
73
+ $ result = $ this ->_parseResponse ( $ this ->_lastResponse );
74
+ $ this ->_lastResponse = null ;
80
75
} catch ( Exception $ e ) {
81
76
throw new \ThriftSQL \Exception ( $ e ->getMessage () );
82
77
}
78
+
79
+ return $ result ;
80
+ }
81
+
82
+ private function _fetchResponse ( $ maxRows ) {
83
+ $ sleeper = new \ThriftSQL \Utils \Sleeper ();
84
+ $ sleeper ->reset ();
85
+
86
+ do {
87
+ $ this ->_lastResponse = $ this ->_client ->fetch ( $ this ->_handle , false , $ maxRows );
88
+ if ( $ this ->_lastResponse ->ready ) {
89
+ return ;
90
+ }
91
+
92
+ if ( $ sleeper ->sleep ()->getSleptSecs () > 60 ) { // 1 minute
93
+ throw new \ThriftSQL \Exception ( 'Impala Query took too long to fetch! ' );
94
+ }
95
+
96
+ } while ( true );
83
97
}
84
98
85
99
private function _parseResponse ( $ response ) {
0 commit comments