-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathjob.go
95 lines (76 loc) · 2.15 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package openlineage
import (
"context"
"time"
"github.com/ThijsKoot/openlineage-go/pkg/facets"
)
// JobEvent represents an OpenLineage JobEvent.
type JobEvent struct {
Job Job
// The set of **input** datasets.
Inputs []InputElement
// The set of **output** datasets.
Outputs []OutputElement
BaseEvent
}
func (e *JobEvent) AsEmittable() Event {
return Event{
EventTime: e.EventTime,
Job: &e.Job,
Inputs: e.Inputs,
Outputs: e.Outputs,
Producer: e.Producer,
SchemaURL: e.SchemaURL,
}
}
// Emit calls [Client.Emit] on [DefaultClient].
func (e *JobEvent) Emit() {
_ = DefaultClient.Emit(context.Background(), e)
}
func NewNamespacedJobEvent(name, namespace string) *JobEvent {
return &JobEvent{
BaseEvent: BaseEvent{
Producer: producer,
SchemaURL: schemaURL,
EventTime: time.Now().Format(time.RFC3339),
},
Job: NewNamespacedJob(name, namespace),
}
}
// NewJobEvent calls [NewNamespacedJobEvent] with [DefaultNamespace].
func NewJobEvent(name string) *JobEvent {
return NewNamespacedJobEvent(name, DefaultNamespace)
}
// WithFacets sets the supplied instances of [facets.JobFacet] for this event.
func (j *JobEvent) WithFacets(facets ...facets.JobFacet) *JobEvent {
for _, f := range facets {
f.Apply(&j.Job.Facets)
}
return j
}
// WithInputs appends the supplied instances of [InputElement] to this event's inputs.
func (j *JobEvent) WithInputs(inputs ...InputElement) *JobEvent {
j.Inputs = append(j.Inputs, inputs...)
return j
}
// WithOutputs appends the supplied instances of [OutputElement] to this event's outputs.
func (j *JobEvent) WithOutputs(inputs ...OutputElement) *JobEvent {
j.Outputs = append(j.Outputs, inputs...)
return j
}
// NewNamespacedJob creates a new [Job].
func NewNamespacedJob(name string, namespace string, jobFacets ...facets.JobFacet) Job {
var job *facets.JobFacets
for _, f := range jobFacets {
f.Apply(&job)
}
return Job{
Name: name,
Namespace: DefaultNamespace,
Facets: job,
}
}
// NewJob calls [NewNamespacedJob] with [DefaultNamespace].
func NewJob(name string, jobFacets ...facets.JobFacet) Job {
return NewNamespacedJob(name, DefaultNamespace, jobFacets...)
}