@@ -13,14 +13,12 @@ import (
13
13
"fmt"
14
14
"io/ioutil"
15
15
"log"
16
- "net"
17
- "net/http"
18
- "os"
19
16
"time"
20
17
"unsafe"
21
18
22
19
"github.com/hashicorp/go-hclog"
23
20
"github.com/turbot/steampipe-plugin-sdk/v5/logging"
21
+ "github.com/turbot/steampipe-plugin-sdk/v5/sperr"
24
22
"github.com/turbot/steampipe-postgres-fdw/hub"
25
23
"github.com/turbot/steampipe-postgres-fdw/types"
26
24
"github.com/turbot/steampipe-postgres-fdw/version"
@@ -56,22 +54,14 @@ func init() {
56
54
log .SetOutput (logger .StandardWriter (& hclog.StandardLoggerOptions {InferLevels : true }))
57
55
log .SetPrefix ("" )
58
56
log .SetFlags (0 )
57
+ // create hub
58
+ if err := hub .CreateHub (); err != nil {
59
+ panic (err )
60
+ }
59
61
log .Printf ("[INFO] .\n ******************************************************\n \n \t \t steampipe postgres fdw init\n \n ******************************************************\n " )
60
62
log .Printf ("[INFO] Version: v%s\n " , version .FdwVersion .String ())
61
63
log .Printf ("[INFO] Log level: %s\n " , level )
62
64
63
- if _ , found := os .LookupEnv ("STEAMPIPE_FDW_PPROF" ); found {
64
- log .Printf ("[INFO] PROFILING!!!!" )
65
- go func () {
66
- listener , err := net .Listen ("tcp" , "localhost:0" )
67
- if err != nil {
68
- log .Println (err )
69
- return
70
- }
71
- log .Printf ("[INFO] Check http://localhost:%d/debug/pprof/" , listener .Addr ().(* net.TCPAddr ).Port )
72
- log .Println (http .Serve (listener , nil ))
73
- }()
74
- }
75
65
}
76
66
77
67
//export goFdwGetRelSize
@@ -80,10 +70,17 @@ func goFdwGetRelSize(state *C.FdwPlanState, root *C.PlannerInfo, rows *C.double,
80
70
81
71
log .Printf ("[TRACE] goFdwGetRelSize" )
82
72
83
- pluginHub , err := hub .GetHub ()
73
+ pluginHub := hub .GetHub ()
74
+
75
+ // get connection name
76
+ connName := GetSchemaNameFromForeignTableId (types .Oid (state .foreigntableid ))
77
+
78
+ log .Println ("[TRACE] connection name:" , connName )
79
+
80
+ serverOpts := GetForeignServerOptionsFromFTableId (types .Oid (state .foreigntableid ))
81
+ err := pluginHub .ProcessImportForeignSchemaOptions (serverOpts , connName )
84
82
if err != nil {
85
- FdwError (err )
86
- return
83
+ FdwError (sperr .WrapWithMessage (err , "failed to process options" ))
87
84
}
88
85
89
86
// reload connection config
@@ -95,15 +92,15 @@ func goFdwGetRelSize(state *C.FdwPlanState, root *C.PlannerInfo, rows *C.double,
95
92
return
96
93
}
97
94
98
- opts := GetFTableOptions (types .Oid (state .foreigntableid ))
95
+ tableOpts := GetFTableOptions (types .Oid (state .foreigntableid ))
99
96
100
97
// build columns
101
98
var columns []string
102
99
if state .target_list != nil {
103
100
columns = CStringListToGoArray (state .target_list )
104
101
}
105
102
106
- result , err := pluginHub .GetRelSize (columns , nil , opts )
103
+ result , err := pluginHub .GetRelSize (columns , nil , tableOpts )
107
104
if err != nil {
108
105
log .Println ("[ERROR] pluginHub.GetRelSize" )
109
106
FdwError (err )
@@ -127,17 +124,12 @@ func goFdwGetPathKeys(state *C.FdwPlanState) *C.List {
127
124
}()
128
125
129
126
log .Printf ("[TRACE] goFdwGetPathKeys" )
130
- pluginHub , err := hub .GetHub ()
131
- if err != nil {
132
- FdwError (err )
133
- }
127
+ pluginHub := hub .GetHub ()
128
+
134
129
var result * C.List
135
130
opts := GetFTableOptions (types .Oid (state .foreigntableid ))
136
- ftable := C .GetForeignTable (state .foreigntableid )
137
- rel := C .RelationIdGetRelation (ftable .relid )
138
- defer C .RelationClose (rel )
139
131
// get the connection name - this is the namespace (i.e. the local schema)
140
- opts ["connection" ] = getNamespace ( rel )
132
+ opts ["connection" ] = GetSchemaNameFromForeignTableId ( types . Oid ( state . foreigntableid ) )
141
133
142
134
if opts ["connection" ] == constants .InternalSchema || opts ["connection" ] == constants .LegacyCommandSchema {
143
135
return result
@@ -240,12 +232,8 @@ func goFdwBeginForeignScan(node *C.ForeignScanState, eflags C.int) {
240
232
quals , unhandledRestrictions := restrictionsToQuals (node , cinfos )
241
233
242
234
// start the plugin hub
243
- var err error
244
- pluginHub , err := hub .GetHub ()
245
- if err != nil {
246
- FdwError (err )
247
- }
248
235
236
+ pluginHub := hub .GetHub ()
249
237
s := & ExecState {
250
238
Rel : rel ,
251
239
Opts : opts ,
@@ -282,7 +270,7 @@ func goFdwIterateForeignScan(node *C.ForeignScanState) *C.TupleTableSlot {
282
270
283
271
slot := node .ss .ss_ScanTupleSlot
284
272
C .ExecClearTuple (slot )
285
- pluginHub , _ := hub .GetHub ()
273
+ pluginHub := hub .GetHub ()
286
274
287
275
log .Printf ("[TRACE] goFdwIterateForeignScan, table '%s' (%p)" , s .Opts ["table" ], s .Iter )
288
276
// if the iterator has not started, start
@@ -367,8 +355,8 @@ func goFdwEndForeignScan(node *C.ForeignScanState) {
367
355
}
368
356
}()
369
357
s := GetExecState (node .fdw_state )
370
- pluginHub , _ := hub .GetHub ()
371
- if s != nil && pluginHub != nil {
358
+ pluginHub := hub .GetHub ()
359
+ if s != nil {
372
360
log .Printf ("[INFO] goFdwEndForeignScan, iterator: %p" , s .Iter )
373
361
pluginHub .EndScan (s .Iter , int64 (s .State .limit ))
374
362
}
@@ -386,9 +374,9 @@ func goFdwAbortCallback() {
386
374
}
387
375
}()
388
376
log .Printf ("[INFO] goFdwAbortCallback" )
389
- if pluginHub , err := hub .GetHub (); err == nil {
390
- pluginHub .Abort ()
391
- }
377
+ pluginHub := hub .GetHub ()
378
+ pluginHub .Abort ()
379
+
392
380
}
393
381
394
382
//export goFdwImportForeignSchema
@@ -402,12 +390,7 @@ func goFdwImportForeignSchema(stmt *C.ImportForeignSchemaStmt, serverOid C.Oid)
402
390
403
391
log .Printf ("[INFO] goFdwImportForeignSchema remote '%s' local '%s'\n " , C .GoString (stmt .remote_schema ), C .GoString (stmt .local_schema ))
404
392
// get the plugin hub,
405
- pluginHub , err := hub .GetHub ()
406
- if err != nil {
407
- log .Printf ("[WARN] goFdwImportForeignSchema failed: %s" , err )
408
- FdwError (err )
409
- return nil
410
- }
393
+ pluginHub := hub .GetHub ()
411
394
412
395
remoteSchema := C .GoString (stmt .remote_schema )
413
396
localSchema := C .GoString (stmt .local_schema )
@@ -426,6 +409,16 @@ func goFdwImportForeignSchema(stmt *C.ImportForeignSchemaStmt, serverOid C.Oid)
426
409
return sql
427
410
}
428
411
412
+ fServer := C .GetForeignServer (serverOid )
413
+ serverOptions := GetForeignServerOptions (fServer )
414
+
415
+ log .Println ("[TRACE] goFdwImportForeignSchema serverOptions:" , serverOptions )
416
+
417
+ err := pluginHub .ProcessImportForeignSchemaOptions (serverOptions , localSchema )
418
+ if err != nil {
419
+ FdwError (sperr .WrapWithMessage (err , "failed to process options" ))
420
+ }
421
+
429
422
schema , err := pluginHub .GetSchema (remoteSchema , localSchema )
430
423
if err != nil {
431
424
log .Printf ("[WARN] goFdwImportForeignSchema failed: %s" , err )
@@ -462,31 +455,22 @@ func goFdwExecForeignInsert(estate *C.EState, rinfo *C.ResultRelInfo, slot *C.Tu
462
455
func handleCommandInsert (rinfo * C.ResultRelInfo , slot * C.TupleTableSlot , rel C.Relation ) * C.TupleTableSlot {
463
456
relid := rinfo .ri_RelationDesc .rd_id
464
457
opts := GetFTableOptions (types .Oid (relid ))
458
+ pluginHub := hub .GetHub ()
465
459
466
460
switch opts ["table" ] {
467
461
case constants .LegacyCommandTableCache :
468
462
// we know there is just a single column - operation
469
463
var isNull C.bool
470
464
datum := C .slot_getattr (slot , 1 , & isNull )
471
465
operation := C .GoString (C .fdw_datumGetString (datum ))
472
- hub , err := hub .GetHub ()
473
- if err != nil {
474
- FdwError (err )
475
- return nil
476
- }
477
- if err := hub .HandleLegacyCacheCommand (operation ); err != nil {
466
+ if err := pluginHub .HandleLegacyCacheCommand (operation ); err != nil {
478
467
FdwError (err )
479
468
return nil
480
469
}
481
470
482
471
case constants .ForeignTableSettings :
483
472
tupleDesc := buildTupleDesc (rel .rd_att )
484
473
attributes := tupleDesc .Attrs
485
- hub , err := hub .GetHub ()
486
- if err != nil {
487
- FdwError (err )
488
- return nil
489
- }
490
474
var key * string
491
475
var value * string
492
476
@@ -520,7 +504,7 @@ func handleCommandInsert(rinfo *C.ResultRelInfo, slot *C.TupleTableSlot, rel C.R
520
504
}
521
505
522
506
// apply the setting
523
- if err = hub .ApplySetting (* key , * value ); err != nil {
507
+ if err := pluginHub .ApplySetting (* key , * value ); err != nil {
524
508
FdwError (err )
525
509
}
526
510
return nil
@@ -548,10 +532,7 @@ func goFdwShutdown() {
548
532
}
549
533
}()
550
534
log .Printf ("[INFO] .\n ******************************************************\n \n \t \t steampipe postgres fdw shutdown\n \n ******************************************************\n " )
551
- pluginHub , err := hub .GetHub ()
552
- if err != nil {
553
- FdwError (err )
554
- }
535
+ pluginHub := hub .GetHub ()
555
536
pluginHub .Close ()
556
537
}
557
538
0 commit comments