-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathworker.go
140 lines (113 loc) · 4.06 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package transfer
import (
"log"
"sync"
"time"
"github.com/Azure/blobporter/internal"
"github.com/Azure/blobporter/pipeline"
)
// Worker represents a worker routine that transfers data to a target and commits the list if applicable.
type Worker struct {
workerID int
workerQueue chan pipeline.Part
workerWG *sync.WaitGroup
commitListHandler *commitListHandler
dupeLevel DupeCheckLevel
}
const (
//CommitEvent TODO
CommitEvent internal.EventName = "commit"
//BufferEvent TODO
BufferEvent = "buffer"
//DataWrittenEvent TODO
DataWrittenEvent = "data-written"
//WrittenPartEvent TODO
WrittenPartEvent = "written-part"
)
// newWorker creates a new instance of upload Worker
//func newWorker(workerID int, workerQueue chan pipeline.Part, resultQueue chan pipeline.WorkerResult, wg *sync.WaitGroup, d DupeCheckLevel) Worker {
func newWorker(workerID int, workerQueue chan pipeline.Part, commit *commitListHandler,
workerWG *sync.WaitGroup, d DupeCheckLevel) Worker {
return Worker{
workerQueue: workerQueue,
workerID: workerID,
workerWG: workerWG,
commitListHandler: commit,
dupeLevel: d}
}
// startWorker starts a worker that reads from the worker queue channel. Which contains the read parts from the source.
// Calls the target's WritePart implementation and sends the result to the results channel.
func (w *Worker) startWorker(target pipeline.TargetPipeline) {
var duration time.Duration
var startTime time.Time
var retries int
var err error
var t = target
//bufferSize := int(float64(len(w.workerQueue)) / float64(cap(w.workerQueue)) * 100)
var doneWQ bool
var tb pipeline.Part
var ok bool
for {
//wr := pipeline.WorkerResult{}
tb, ok = <-w.workerQueue
if !ok { // Work queue has been closed, so done. Signal only one done
if !doneWQ {
w.workerWG.Done()
doneWQ = true
}
return
}
checkForDuplicateChunk(&tb, w.dupeLevel)
if tb.DuplicateOfBlockOrdinal >= 0 {
// This block is a duplicate of another, so don't upload it.
// Instead, just reflect it (with it's "duplicate" status)
// onwards in the completion channel
w.commitListHandler.addWorkerResult(w.workerID, &tb, time.Now(), 0, "Success", 0)
continue
}
if duration, startTime, retries, err = t.WritePart(&tb); err == nil {
tb.ReturnBuffer()
w.commitListHandler.addWorkerResult(w.workerID, &tb, startTime, duration, "Success", retries)
b := float64(tb.BytesToRead)
internal.EventSink.AddSumEvent(internal.Worker, DataWrittenEvent, "", b)
internal.EventSink.AddSumEvent(internal.Worker, WrittenPartEvent, "", float64(1))
} else {
log.Fatal(err)
}
bufferSize := int(float64(len(w.workerQueue)) / float64(cap(w.workerQueue)) * 100)
internal.EventSink.AddEvent(internal.Worker, BufferEvent, "", internal.EventData{Value: bufferSize})
}
}
type Committer struct {
committerWG *sync.WaitGroup
commitListHandler *commitListHandler
}
// newWorker creates a new instance of upload Worker
//func newWorker(workerID int, workerQueue chan pipeline.Part, resultQueue chan pipeline.WorkerResult, wg *sync.WaitGroup, d DupeCheckLevel) Worker {
func newCommitter(commit *commitListHandler, committerWG *sync.WaitGroup) *Committer {
return &Committer{
committerWG: committerWG,
commitListHandler: commit}
}
// startWorker starts a worker that reads from the worker queue channel. Which contains the read parts from the source.
// Calls the target's WritePart implementation and sends the result to the results channel.
func (c *Committer) startCommitter(target pipeline.TargetPipeline) {
var err error
var t = target
var commitReq commitInfo
var ok bool
for {
commitReq, ok = <-c.commitListHandler.commitRequests()
if !ok {
defer c.committerWG.Done()
return
}
if _, err = t.CommitList(commitReq.list, int(commitReq.numOfBlocks), commitReq.targetName); err != nil {
log.Fatal(err)
}
internal.EventSink.AddSumEvent(internal.Worker, CommitEvent, "", float64(1))
if err = c.commitListHandler.trackCommitted(commitReq.targetName); err != nil {
log.Fatal(err)
}
}
}