Skip to content

Commit f0135e9

Browse files
authored
Adds support for quals, LIMIT, logging and plugin configuration
* Qual * LIMIT * Adds a `{plugin}_configure` function in `SQLite` that can be used to configure the underlying plugin * Adds logging to console with `STEAMPIPE_LOG_LEVEL` environment variable
1 parent c1305c3 commit f0135e9

21 files changed

+894
-744
lines changed

.gitattributes

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
main.go -merge
2+
go.mod -merge
3+
go.sum -merge

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@
2121
go.work
2222
.idea/
2323
.vscode/
24-
.DS_Store
24+
.DS_Store
25+
26+
out/**

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
build:
22
go run generate/generator.go templates . $(plugin_alias) $(plugin_github_url)
33
go mod tidy
4-
make -f out/Makefile build
4+
make -f out/Makefile build

configure_fn.go

+182
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package main
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"log"
7+
8+
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
9+
"go.riyazali.net/sqlite"
10+
)
11+
12+
// ConfigureFn implements a custom scalar sql function
13+
// that allows the user to configure the plugin connection
14+
type ConfigureFn struct {
15+
api *sqlite.ExtensionApi
16+
}
17+
18+
func NewConfigureFn(api *sqlite.ExtensionApi) *ConfigureFn {
19+
return &ConfigureFn{
20+
api: api,
21+
}
22+
}
23+
24+
func (m *ConfigureFn) Args() int { return 1 }
25+
func (m *ConfigureFn) Deterministic() bool { return true }
26+
func (m *ConfigureFn) Apply(ctx *sqlite.Context, values ...sqlite.Value) {
27+
log.Println("[TRACE] ConfigureFn.Apply start")
28+
defer log.Println("[TRACE] ConfigureFn.Apply end")
29+
30+
var config string
31+
var err error
32+
log.Println("[TRACE] getting config")
33+
if config, err = m.getConfig(values...); err != nil {
34+
ctx.ResultError(err)
35+
return
36+
}
37+
38+
// Set Connection Config
39+
err = m.setConnectionConfig(config)
40+
if err != nil {
41+
ctx.ResultError(err)
42+
return
43+
}
44+
}
45+
46+
// getConfig returns the config string from the first argument
47+
func (m *ConfigureFn) getConfig(values ...sqlite.Value) (config string, err error) {
48+
log.Println("[TRACE] ConfigureFn.getConfig start")
49+
defer log.Println("[TRACE] ConfigureFn.getConfig end")
50+
51+
if len(values) > 1 {
52+
return "", errors.New("expected a single argument")
53+
}
54+
55+
switch {
56+
case values[0].Type() == sqlite.SQLITE_TEXT:
57+
config = values[0].Text()
58+
case values[0].Type() == sqlite.SQLITE_BLOB:
59+
config = string(values[0].Blob())
60+
default:
61+
return "", (errors.New("expected a TEXT or BLOB argument"))
62+
}
63+
return config, nil
64+
}
65+
66+
// setConnectionConfig sets the connection config for the plugin
67+
func (m *ConfigureFn) setConnectionConfig(config string) error {
68+
log.Println("[TRACE] ConfigureFn.setConnectionConfig start")
69+
defer log.Println("[TRACE] ConfigureFn.setConnectionConfig end")
70+
71+
pluginName := fmt.Sprintf("steampipe-plugin-%s", pluginAlias)
72+
73+
c := &proto.ConnectionConfig{
74+
Connection: pluginAlias,
75+
Plugin: pluginName,
76+
PluginShortName: pluginAlias,
77+
Config: config,
78+
PluginInstance: pluginName,
79+
}
80+
81+
if currentSchema != nil {
82+
log.Println("[TRACE] ConfigureFn.setConnectionConfig: updating connection config")
83+
// send an update request to the plugin server
84+
cs := []*proto.ConnectionConfig{c}
85+
req := &proto.UpdateConnectionConfigsRequest{Changed: cs}
86+
_, err := pluginServer.UpdateConnectionConfigs(req)
87+
if err != nil {
88+
return err
89+
}
90+
} else {
91+
log.Println("[TRACE] ConfigureFn.setConnectionConfig: setting connection config")
92+
// set the config in the plugin server
93+
cs := []*proto.ConnectionConfig{c}
94+
req := &proto.SetAllConnectionConfigsRequest{
95+
Configs: cs,
96+
MaxCacheSizeMb: 32,
97+
}
98+
_, err := pluginServer.SetAllConnectionConfigs(req)
99+
if err != nil {
100+
return err
101+
}
102+
}
103+
104+
// fetch the schema
105+
// we cannot use the global currentSchema variable here
106+
// because it may not have been loaded yet at all
107+
schema, err := getSchema()
108+
if err != nil {
109+
return err
110+
}
111+
112+
log.Println("[TRACE] ConfigureFn.setConnectionConfig: schema fetched successfully")
113+
114+
// we should also trigger a schema refresh after this call for dynamic backends
115+
if SCHEMA_MODE_DYNAMIC.Equals(schema.Mode) {
116+
// drop the existing tables - if they have been created
117+
if err := m.dropCurrent(); err != nil {
118+
return err
119+
}
120+
121+
// create the tables for the new dynamic schema
122+
if err := setupTables(schema, m.api); err != nil {
123+
return err
124+
}
125+
currentSchema = schema
126+
}
127+
128+
return err
129+
}
130+
131+
func (m *ConfigureFn) dropCurrent() error {
132+
if currentSchema != nil {
133+
sqlite.Register(func(api *sqlite.ExtensionApi) (sqlite.ErrorCode, error) {
134+
conn := api.Connection()
135+
for tableName := range currentSchema.GetSchema() {
136+
log.Println("[TRACE] ConfigureFn.dropCurrent: dropping table", tableName)
137+
q := fmt.Sprintf("DROP TABLE %s", tableName)
138+
log.Println("[TRACE] ConfigureFn.dropCurrent: executing query", q)
139+
err := conn.Exec(q, nil)
140+
if err != nil {
141+
log.Println("[ERROR] ConfigureFn.dropCurrent: error dropping table", tableName, err)
142+
return sqlite.SQLITE_ERROR, err
143+
}
144+
}
145+
return sqlite.SQLITE_OK, nil
146+
})
147+
}
148+
return nil
149+
}
150+
151+
// getSchema returns the schema for the plugin
152+
func getSchema() (*proto.Schema, error) {
153+
log.Println("[TRACE] getSchema start")
154+
defer log.Println("[TRACE] getSchema end")
155+
156+
// Get Plugin Schema
157+
sRequest := &proto.GetSchemaRequest{Connection: pluginAlias}
158+
s, err := pluginServer.GetSchema(sRequest)
159+
if err != nil {
160+
return nil, err
161+
}
162+
return s.GetSchema(), nil
163+
}
164+
165+
// setupTables sets up the schema tables for the plugin
166+
// it fetched the schema from the plugin and then maps it to SQLite tables
167+
func setupTables(schema *proto.Schema, api *sqlite.ExtensionApi) error {
168+
log.Println("[TRACE] setupSchemaTables start")
169+
defer log.Println("[TRACE] setupSchemaTables end")
170+
171+
// Iterate Tables & Build Modules
172+
for tableName, tableSchema := range schema.GetSchema() {
173+
// Translate Schema
174+
sc := getSQLiteColumnsFromTableSchema(tableSchema)
175+
176+
current := NewModule(tableName, sc, tableSchema)
177+
if err := api.CreateModule(tableName, current, sqlite.ReadOnly(true)); err != nil {
178+
return err
179+
}
180+
}
181+
return nil
182+
}

constants.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package main
2+
3+
const (
4+
SQLITE_INDEX_CONSTRAINT_LIMIT = 73
5+
SQLITE_TIMESTAMP_FORMAT = "2006-01-02 15:04:05.999"
6+
)
7+
8+
type SchemaMode string
9+
10+
func (sm SchemaMode) Equals(s string) bool {
11+
return string(sm) == s
12+
}
13+
14+
const (
15+
SCHEMA_MODE_STATIC SchemaMode = "static"
16+
SCHEMA_MODE_DYNAMIC SchemaMode = "dynamic"
17+
)

0 commit comments

Comments
 (0)