11package locker
22
33import (
4- "bufio"
5- "crypto/sha256"
6- "encoding/binary"
7- "errors"
8- "fmt"
9- "io"
104 "net"
11- "os"
12- "path/filepath"
13- "strings"
14-
15- "vikingPingvin/locker/locker/messaging"
16- "vikingPingvin/locker/locker/messaging/protobuf"
5+ "sync"
176
187 "github.com/rs/xid"
198 "github.com/rs/zerolog/log"
20- "google.golang.org/protobuf/proto"
219)
2210
2311// InputArgPath Relative or absolute path of files for Cobra CLI
24- var InputArgPath string
25- var InputArgNamespace string
12+ var (
13+ InputArgPath string
14+ InputArgNamespace string
15+ InputArgConsume string
16+ )
2617
2718// InputData Populated at the start of the program
2819type InputData struct {
@@ -44,204 +35,42 @@ type ArtifactAgent struct {
4435 Port string
4536}
4637
47- func (a ArtifactAgent ) Start (inputData * InputData ) bool {
48- connection , err := net .Dial ("tcp" , "localhost:27001" )
49- if err != nil {
50- panic (err )
38+ func (a ArtifactAgent ) Start (inputDataArray []* InputData ) bool {
39+ // TODO: move sendConcurrent to config file
40+ const sendConcurrent = true
41+
42+ var wg sync.WaitGroup
43+ for _ , singleInputData := range inputDataArray {
44+ if sendConcurrent {
45+ wg .Add (1 )
46+ go sendArtifactToServer (singleInputData , & wg )
47+ } else {
48+ sendArtifactToServer (singleInputData , & wg )
49+ }
5150 }
52- defer connection .Close ()
53- log .Info ().Msg ("Agent connected to Locker Server..." )
54-
55- // Send Metadata message
56- parseAndSendMetaData (connection , inputData )
57-
58- // Send Payload message(s)
59- parseAndSendPayload (connection , inputData )
60-
61- // Listen for ACK from server
62- listenForACK (connection , inputData )
6351
52+ wg .Wait ()
6453 return true
6554}
6655
67- // If --file cli input is not null, parse file
68- func parseAndSendMetaData (connection net.Conn , inputData * InputData ) (fileInfo os.FileInfo , err error ) {
69- fileInfo , err = os .Stat (inputData .FilePath )
70- if os .IsNotExist (err ) {
71- log .Error ().Msgf ("Parsing file Input error: %v" , err )
72- return fileInfo , err
73- }
74- inputData .FileHash = hashFile (inputData .FilePath )
75- inputData .FileName = fileInfo .Name ()
76-
77- log .Info ().
78- Str ("file name" , fileInfo .Name ()).
79- Str ("Namespace" , fmt .Sprintf ("%s/%s/%s" , inputData .NameSpace , inputData .Project , inputData .JobID )).
80- Str ("size" , fmt .Sprintf ("%d" , fileInfo .Size ())).
81- Str ("hash" , fmt .Sprintf ("%v" , inputData .FileHash )).
82- Str ("id" , inputData .ID .String ()).
83- Msg ("Artifact metadata parsing finished" )
84-
85- message , err := messaging .CreateMessage_FileMeta (
86- inputData .ID .Bytes (),
87- protobuf .MessageType_META ,
88- inputData .NameSpace ,
89- inputData .Project ,
90- inputData .JobID ,
91- inputData .FileName ,
92- inputData .FileHash )
56+ func sendArtifactToServer (artifact * InputData , wg * sync.WaitGroup ) {
57+ connection , err := net .Dial ("tcp" , "localhost:27001" )
9358 if err != nil {
9459 panic (err )
9560 }
61+ defer connection .Close ()
62+ log .Info ().Msg ("Agent connected to Locker Server..." )
9663
9764 // Send Metadata message
98- log .Info ().Msg ("Sending MetaData Packet" )
99- messaging .SendProtoBufMessage (connection , message )
100-
101- return fileInfo , err
102- }
103-
104- //func parseAndSendPayload(bytes *[]byte, numBytes int) {
105- func parseAndSendPayload (connection net.Conn , inputData * InputData ) {
106-
107- f , err := os .Open (inputData .FilePath )
108- defer f .Close ()
109- if err != nil {
110- log .Error ().Msgf ("Cannot open file %s" , inputData .FilePath )
111- }
112-
113- log .Info ().Msg ("Started sending Payload Packets..." )
114- reader := bufio .NewReader (f )
115- isPayloadFinal := false
116-
117- buffer := make ([]byte , 1024 )
118- for {
119- n , ioErr := reader .Read (buffer )
120- if ioErr == io .EOF {
121- isPayloadFinal = true
122- // Send terminating payload protobuf message
123- terminalMessage , err := messaging .CreateMessage_FilePackage (
124- inputData .ID .Bytes (),
125- protobuf .MessageType_PACKAGE ,
126- make ([]byte , 1 ),
127- isPayloadFinal ,
128- )
129- if err != nil {
130- log .Fatal ().Msg ("Fatal Error during payload protobuf assembly" )
131- }
132- messaging .SendProtoBufMessage (connection , terminalMessage )
133- break
134- }
135-
136- message , err := messaging .CreateMessage_FilePackage (
137- inputData .ID .Bytes (),
138- protobuf .MessageType_PACKAGE ,
139- (buffer )[:n ],
140- isPayloadFinal )
141-
142- if err != nil {
143- log .Fatal ().Msg ("Fatal Error during payload protobuf assembly" )
144- }
145-
146- messaging .SendProtoBufMessage (connection , message )
147- }
148- log .Info ().Msg ("Finished sending Payload Packets..." )
149- }
150-
151- // Given a valid file path, returns a SHA256 hash
152- func hashFile (path string ) (hash []byte ) {
153- f , err := os .Open (path )
154- defer f .Close ()
155- if err != nil {
156- log .Err (err ).Msgf ("Cannot open file %s" , path )
157- }
158-
159- hasher := sha256 .New ()
160- if _ , err := io .Copy (hasher , f ); err != nil {
161- log .Err (err ).Msg ("Error calculating SHA256 Hash" )
162- }
163- return hasher .Sum (nil )
164- }
165-
166- func listenForACK (connection net.Conn , inputData * InputData ) {
65+ parseAndSendMetaData (connection , artifact )
16766
168- // TODO: Make into const in message_handler (also server.go)
169- // sizePrefix is 4 bytes protobug message size
170- sizePrefix := make ([]byte , 4 )
171-
172- _ , _ = io .ReadFull (connection , sizePrefix )
173- protoLength := int (binary .BigEndian .Uint32 (sizePrefix ))
174-
175- ackPacketRaw := make ([]byte , protoLength )
176- _ , _ = io .ReadFull (connection , ackPacketRaw )
177- genericProto := & protobuf.LockerMessage {}
178- if err := proto .Unmarshal (ackPacketRaw , genericProto ); err != nil {
179- log .Err (err ).Msg ("Error during unmarshalling" )
180- }
181-
182- if genericProto .GetAck ().ProtoReflect ().IsValid () {
183- ackPacket := genericProto .GetAck ()
184-
185- serverResult := ackPacket .GetServerSuccess ()
186- ackID , _ := xid .FromBytes (ackPacket .GetId ())
187- if ackID != inputData .ID {
188- log .Warn ().
189- Str ("respone_id" , ackID .String ()).
190- Str ("original_id" , inputData .ID .String ()).
191- Msg ("Response ID mismatch." )
192- }
193-
194- log .Info ().
195- Str ("id_back" , ackID .String ()).
196- Msgf ("ACK packet recieved from server with success flag: %v" , serverResult )
197- }
198- }
199-
200- // Parse raw CLI input parameters to internal data structures
201- func parseInputArguments () * InputData {
202- var err error
203- var inputPath string
204-
205- if len (InputArgPath ) == 0 {
206- err = errors .New ("--file empty" )
207- log .Err (err ).Str ("agent" , "parseInputArguments" ).Msgf ("No input file was given." )
208- }
209- if len (InputArgNamespace ) == 0 {
210- err = errors .New ("--namespace empty" )
211- log .Err (err ).Str ("agent" , "parseInputArguments" ).Msgf ("No input namespace was given." )
212- }
213- if err != nil {
214- os .Exit (1 )
215- }
216-
217- inputPath = InputArgPath
218- if ! filepath .IsAbs (InputArgPath ) {
219- cwd , err := os .Getwd ()
220- if err != nil {
221- log .Err (err ).Msg ("Error during CWD PATH parsing" )
222- os .Exit (1 )
223- }
224- log .Debug ().Msgf ("Relative path of input: %s" , InputArgPath )
225- inputPath = filepath .Join (cwd , InputArgPath )
226-
227- }
67+ // Send Payload message(s)
68+ parseAndSendPayload (connection , artifact )
22869
229- fullNameSpace := InputArgNamespace
230- namePaths := strings .Split (fullNameSpace , "/" )
231- if len (namePaths ) != 3 {
232- err = errors .New ("Namespace must contain 3 values separated by '/'" )
233- log .Err (err ).Msg ("Namespace values not valid" )
234- os .Exit (1 )
235- }
70+ // Listen for ACK from server
71+ listenForACK (connection , artifact )
23672
237- data := & InputData {
238- FilePath : inputPath ,
239- NameSpace : namePaths [0 ],
240- Project : namePaths [1 ],
241- JobID : namePaths [2 ],
242- ID : xid .New (),
243- }
244- return data
73+ wg .Done ()
24574}
24675
24776// ExecuteAgent : Entrypoint for Locker agent start
0 commit comments