Skip to content

Commit

Permalink
HTTP processor (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
M0rdecay authored Feb 28, 2025
1 parent 4e3dbe4 commit 7a485a0
Show file tree
Hide file tree
Showing 33 changed files with 854 additions and 120 deletions.
45 changes: 45 additions & 0 deletions .pipelines/test.pipeline.httpproc.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[settings]
id = "test.pipeline.http"
lines = 1
run = true
buffer = 1_000

[[inputs]]
[inputs.http]
address = ":9100"
max_connections = 10
wait_for_delivery = true
tls_enable = false
tls_key_file = '.testcerts\server-key.pem'
tls_cert_file = '.testcerts\server-cert.pem'
tls_allowed_cacerts = [ '.testcerts\ca-cert.pem' ]
[inputs.http.labelheaders]
uri = "x-uri"
[inputs.http.parser]
type = "json"
split_array = false

[[processors]]
[processors.http]
host = "https://jsonplaceholder.typicode.com/posts"
method = "POST"
# path_label = "uri"
retry_attempts = 3
retry_after = "1s"
tls_enable = true
response_body_to = "typicode.response"
request_body_from = "payload"
[processors.http.serializer]
type = "json"
data_only = false
[processors.http.parser]
type = "json"
split_array = true


[[outputs]]
[outputs.log]
level = "info"
[outputs.log.serializer]
type = "json"
data_only = false
2 changes: 1 addition & 1 deletion .pipelines/test.pipeline.kafka.1.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[settings]
id = "test.pipeline.kafka.1"
lines = 5
run = true
run = false
buffer = 1_000

[[inputs]]
Expand Down
2 changes: 1 addition & 1 deletion .pipelines/test.pipeline.kafka.2.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[settings]
id = "test.pipeline.kafka.2"
lines = 5
run = true
run = false
buffer = 5

[[inputs]]
Expand Down
1 change: 1 addition & 0 deletions .pipelines/test.pipeline.sql.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

[[inputs]]
[inputs.sql]
enable_metrics = true
driver = "pgx"
dsn = "postgres://postgres:pguser@localhost:5432/postgres"
interval = "2s"
Expand Down
1 change: 1 addition & 0 deletions .pipelines/test.pipeline.sqlout.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

[[outputs]]
[outputs.sql]
enable_metrics = true
driver = "pgx"
dsn = "postgres://postgres:pguser@localhost:5432/postgres"
[outputs.sql.columns]
Expand Down
9 changes: 5 additions & 4 deletions .pipelines/test.pipeline.starlark.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
buffer = 1_000

[[inputs]]
[inputs.httpl]
[inputs.http]
address = ":9100"
max_connections = 10
tls_enable = false
tls_key_file = '.testcerts\server-key.pem'
tls_cert_file = '.testcerts\server-cert.pem'
tls_allowed_cacerts = [ '.testcerts\ca-cert.pem' ]
[inputs.httpl.parser]
[inputs.http.parser]
type = "json"

[[processors]]
Expand All @@ -32,6 +32,8 @@ def process(event):
event.setTimestamp(event.getTimestamp() + time.hour)
event.setField("errors", len(event.getErrors()))
return [event]
'''

Expand All @@ -45,7 +47,7 @@ def process(event):

[[outputs]]
[outputs.log]
level = "debug"
level = "info"
[outputs.log.serializer]
type = "json"
data_only = false
Expand All @@ -67,4 +69,3 @@ def filter(event):
idle_timeout = "10s"
tls_enable = true
tls_insecure_skip_verify = true

50 changes: 50 additions & 0 deletions docs/METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -872,3 +872,53 @@ Labels:
- **partition** - partition number
- **group_id** - consumer group
- **client_id** - kafka client ID

### DB pool

#### Gauge `plugin_db_connections_max`
Pipeline plugin DB pool maximum number of open connections.

Labels:
- **pipeline** - pipeline Id
- **plugin_name** - plugin name (alias)
- **driver** - driver name

#### Gauge `plugin_db_connections_open`
Pipeline plugin DB pool number of established connections both in use and idle.

Labels:
- **pipeline** - pipeline Id
- **plugin_name** - plugin name (alias)
- **driver** - driver name

#### Gauge `plugin_db_connections_in_use`
Pipeline plugin DB pool number of connections currently in use.

Labels:
- **pipeline** - pipeline Id
- **plugin_name** - plugin name (alias)
- **driver** - driver name

#### Gauge `plugin_db_connections_idle`
Pipeline plugin DB pool number of idle connections..

Labels:
- **pipeline** - pipeline Id
- **plugin_name** - plugin name (alias)
- **driver** - driver name

#### Counter `plugin_db_connections_waited_total`
Pipeline plugin DB pool total number of connections waited for.

Labels:
- **pipeline** - pipeline Id
- **plugin_name** - plugin name (alias)
- **driver** - driver name

#### Counter `plugin_db_connections_waited_seconds_total`
Pipeline plugin DB pool total time blocked waiting for a new connection.

Labels:
- **pipeline** - pipeline Id
- **plugin_name** - plugin name (alias)
- **driver** - driver name
7 changes: 6 additions & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (p *Pipeline) Config() *config.Pipeline {
return p.config
}

// Pipeline Close() MUST be called only if pipeline build failed.
// After successfull Run(), each plugin will be closed dy it's unit.
func (p *Pipeline) Close() error {
for _, k := range p.keepers {
k.Close()
Expand Down Expand Up @@ -353,7 +355,10 @@ func (p *Pipeline) Run(ctx context.Context) {
}
wg.Wait()

p.Close()
for _, k := range p.keepers {
k.Close()
}

p.log.Info("pipeline stopped")
p.state = StateStopped
}
Expand Down
160 changes: 160 additions & 0 deletions plugins/common/metrics/sql_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package metrics

import (
"sync"

"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
)

var (
dbMetricsRegister = &sync.Once{}
dbMetricsCollector = &dbCollector{
dbs: make(map[dbDescriptor]*sqlx.DB),
mu: &sync.Mutex{},
}

dbConnectionsMax *prometheus.Desc
dbConnectionsOpen *prometheus.Desc
dbConnectionsInUse *prometheus.Desc
dbConnectionsIdle *prometheus.Desc
dbConnectionsWaitedTotal *prometheus.Desc
dbConnectionsWaitedSecondsTotal *prometheus.Desc
)

func init() {
dbConnectionsMax = prometheus.NewDesc(
"plugin_db_connections_max",
"Pipeline plugin DB pool maximum number of open connections.",
[]string{"pipeline", "plugin_name", "driver"},
nil,
)
dbConnectionsOpen = prometheus.NewDesc(
"plugin_db_connections_open",
"Pipeline plugin DB pool number of established connections both in use and idle.",
[]string{"pipeline", "plugin_name", "driver"},
nil,
)
dbConnectionsInUse = prometheus.NewDesc(
"plugin_db_connections_in_use",
"Pipeline plugin DB pool number of connections currently in use.",
[]string{"pipeline", "plugin_name", "driver"},
nil,
)
dbConnectionsIdle = prometheus.NewDesc(
"plugin_db_connections_idle",
"Pipeline plugin DB pool number of idle connections.",
[]string{"pipeline", "plugin_name", "driver"},
nil,
)
dbConnectionsWaitedTotal = prometheus.NewDesc(
"plugin_db_connections_waited_total",
"Pipeline plugin DB pool total number of connections waited for.",
[]string{"pipeline", "plugin_name", "driver"},
nil,
)
dbConnectionsWaitedSecondsTotal = prometheus.NewDesc(
"plugin_db_connections_waited_seconds_total",
"Pipeline plugin DB pool total time blocked waiting for a new connection.",
[]string{"pipeline", "plugin_name", "driver"},
nil,
)
}

type dbDescriptor struct {
pipeline string
pluginName string
driver string
}

type dbCollector struct {
dbs map[dbDescriptor]*sqlx.DB
mu *sync.Mutex
}

func (c *dbCollector) append(d dbDescriptor, db *sqlx.DB) {
c.mu.Lock()
defer c.mu.Unlock()
c.dbs[d] = db
}

func (c *dbCollector) delete(d dbDescriptor) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.dbs, d)
}

func (c *dbCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- dbConnectionsMax
ch <- dbConnectionsOpen
ch <- dbConnectionsInUse
ch <- dbConnectionsIdle
ch <- dbConnectionsWaitedTotal
ch <- dbConnectionsWaitedSecondsTotal
}

func (c *dbCollector) Collect(ch chan<- prometheus.Metric) {
c.mu.Lock()
defer c.mu.Unlock()

for desc, db := range c.dbs {
stats := db.Stats()

ch <- prometheus.MustNewConstMetric(
dbConnectionsMax,
prometheus.GaugeValue,
float64(stats.MaxOpenConnections),
desc.pipeline, desc.pluginName, desc.driver,
)
ch <- prometheus.MustNewConstMetric(
dbConnectionsOpen,
prometheus.GaugeValue,
float64(stats.OpenConnections),
desc.pipeline, desc.pluginName, desc.driver,
)
ch <- prometheus.MustNewConstMetric(
dbConnectionsInUse,
prometheus.GaugeValue,
float64(stats.InUse),
desc.pipeline, desc.pluginName, desc.driver,
)
ch <- prometheus.MustNewConstMetric(
dbConnectionsIdle,
prometheus.GaugeValue,
float64(stats.Idle),
desc.pipeline, desc.pluginName, desc.driver,
)
ch <- prometheus.MustNewConstMetric(
dbConnectionsWaitedTotal,
prometheus.CounterValue,
float64(stats.WaitCount),
desc.pipeline, desc.pluginName, desc.driver,
)
ch <- prometheus.MustNewConstMetric(
dbConnectionsWaitedSecondsTotal,
prometheus.CounterValue,
stats.WaitDuration.Seconds(),
desc.pipeline, desc.pluginName, desc.driver,
)
}
}

func RegisterDB(pipeline, pluginName, driver string, db *sqlx.DB) {
dbMetricsRegister.Do(func() {
prometheus.MustRegister(dbMetricsCollector)
})

dbMetricsCollector.append(dbDescriptor{
pipeline: pipeline,
pluginName: pluginName,
driver: driver,
}, db)
}

func UnregisterDB(pipeline, pluginName, driver string) {
dbMetricsCollector.delete(dbDescriptor{
pipeline: pipeline,
pluginName: pluginName,
driver: driver,
})
}
1 change: 1 addition & 0 deletions plugins/common/starlark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ This plugin defines new type - `event` - as Neptunus event representation in sta
- `addTag(tag String)` - add tag to event
- `delTag(tag String)` - delete tag from event
- `hasTag(tag String) (ok Bool)` - check if event has tag
- `getErrors() (e List[String])` - get event errors

Also, you can create a new event using `newEvent(key String) (event Event)` builtin function.

Expand Down
Loading

0 comments on commit 7a485a0

Please sign in to comment.