This repository has been archived by the owner on Oct 2, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathkeryxlib.go
165 lines (136 loc) · 4.68 KB
/
keryxlib.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package keryxlib
// Copyright 2015 MediaMath <http://www.mediamath.com>. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
import (
"context"
"github.com/MediaMath/keryxlib/filters"
"github.com/MediaMath/keryxlib/message"
"github.com/MediaMath/keryxlib/pg"
"github.com/MediaMath/keryxlib/streams"
)
//StartSummaryChannel sets up a keryx symmary stream and schema reader with the provided configuration and returns a channel
func StartSummaryChannel(ctx context.Context, serverVersion string, kc *Config) (<-chan message.TxnSummary, error) {
schemaReader, err := pg.NewSchemaReader(kc.PGConnStrings, "postgres", 255)
if err != nil {
return nil, err
}
bufferWorkingDirectory, err := kc.GetBufferDirectoryOrTemp()
if err != nil {
return nil, err
}
walStream, err := streams.NewWalStream(kc.DataDir)
if err != nil {
return nil, err
}
wal, err := walStream.Start()
if err != nil {
return nil, err
}
go func() {
<-ctx.Done()
walStream.Stop()
}()
f := filters.Exclusive(schemaReader, kc.ExcludeRelations)
if len(kc.IncludeRelations) > 0 {
f = filters.Inclusive(schemaReader, kc.IncludeRelations)
}
txnBuffer := &streams.TxnBuffer{Filters: f, WorkingDirectory: bufferWorkingDirectory, SchemaReader: schemaReader}
buffered, err := txnBuffer.Start(wal)
if err != nil {
walStream.Stop()
return nil, err
}
return streams.SummaryStream{SchemaMetaInformation: schemaReader}.Start(serverVersion, buffered)
}
//TransactionChannel sets up a keryx stream and schema reader with the provided configuration and returns
//it as a channel
func TransactionChannel(serverVersion string, kc *Config) (<-chan *message.Transaction, error) {
return StartTransactionChannel(serverVersion, kc, nil)
}
//StartTransactionChannel sets up a keryx stream and schema reader with the provided configuration and return
//it as a channel. The channel can be stopped with the provided stopper
func StartTransactionChannel(serverVersion string, kc *Config, stopper WaitForStop) (<-chan *message.Transaction, error) {
schemaReader, err := pg.NewSchemaReader(kc.PGConnStrings, "postgres", 255)
if err != nil {
return nil, err
}
bufferWorkingDirectory, err := kc.GetBufferDirectoryOrTemp()
if err != nil {
return nil, err
}
f := filters.Exclusive(schemaReader, kc.ExcludeRelations)
if len(kc.IncludeRelations) > 0 {
f = filters.Inclusive(schemaReader, kc.IncludeRelations)
}
stream := NewKeryxStream(schemaReader, kc.MaxMessagePerTxn)
if stopper != nil {
go func() {
stopper.Wait()
stream.Stop()
}()
}
return stream.StartKeryxStream(serverVersion, f, kc.DataDir, bufferWorkingDirectory)
}
//WaitForStop will wait
type WaitForStop interface {
Wait()
}
//NewTxnChannelStopper creates a TxnChannelStopper
func NewTxnChannelStopper() *TxnChannelStopper {
return &TxnChannelStopper{done: make(chan interface{})}
}
//TxnChannelStopper can be used to stop a Transaction channel
type TxnChannelStopper struct {
done chan interface{}
}
//Stop will initiate a Transaction channel shutdown
func (t *TxnChannelStopper) Stop() {
defer func() { recover() }()
close(t.done)
}
//Wait will block until Stop is called {
func (t *TxnChannelStopper) Wait() {
<-t.done
}
//FullStream is a facade around the full process of taking WAL entries and publishing them as txn messages.
type FullStream struct {
walStream *streams.WalStream
sr *pg.SchemaReader
MaxMessageCount uint
}
//NewKeryxStream takes a schema reader and returns a FullStream
func NewKeryxStream(sr *pg.SchemaReader, maxMessageCount uint) *FullStream {
return &FullStream{nil, sr, maxMessageCount}
}
//Stop will end the reading on the WAL log and subsequent streams will therefore end.
func (fs *FullStream) Stop() {
if fs.walStream != nil {
fs.walStream.Stop()
}
}
//StartKeryxStream will start all the streams necessary to go from WAL entries to txn messages.
func (fs *FullStream) StartKeryxStream(serverVersion string, filters filters.MessageFilter, dataDir string, bufferWorkingDirectory string) (<-chan *message.Transaction, error) {
walStream, err := streams.NewWalStream(dataDir)
if err != nil {
return nil, err
}
fs.walStream = walStream
wal, err := fs.walStream.Start()
if err != nil {
return nil, err
}
txnBuffer := &streams.TxnBuffer{Filters: filters, WorkingDirectory: bufferWorkingDirectory, SchemaReader: fs.sr}
buffered, err := txnBuffer.Start(wal)
if err != nil {
fs.Stop()
return nil, err
}
populated := &streams.PopulatedMessageStream{Filters: filters, SchemaReader: fs.sr, MaxMessageCount: fs.MaxMessageCount}
keryx, err := populated.Start(serverVersion, buffered)
if err != nil {
fs.Stop()
return nil, err
}
return keryx, nil
}