Skip to content

Commit

Permalink
Results and work in 1 for loop
Browse files Browse the repository at this point in the history
Allows us to bail early if we encounter a failure on downloads
  • Loading branch information
joereuss12 committed Jan 23, 2024
1 parent ea45067 commit 33d1d5d
Showing 1 changed file with 73 additions and 53 deletions.
126 changes: 73 additions & 53 deletions cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func stashPluginMain(args []string) {

var wg sync.WaitGroup

workChan := make(chan Transfer) // might need to be a Transfer object
workChan := make(chan Transfer)
results := make(chan *classads.ClassAd, len(transfers))

// start workers
Expand All @@ -219,68 +219,40 @@ func stashPluginMain(args []string) {
go moveObjects(source, methods, upload, &wg, workChan, results)
}

success := true
var resultAds []*classads.ClassAd
for _, transfer := range transfers {
workChan <- transfer
}
close(workChan)

// Wait for transfers
wg.Wait()

var resultAds []*classads.ClassAd
// Every transfer should send a resultAd to the results channel
for i := 0; i < len(transfers); i++ {
select {
//case workChan <- transfer:
// We successfully sent work
case resultAd := <-results:
resultAds = append(resultAds, resultAd)
default:
// If we did not get a result, something terrible happened
log.Errorln(errors.New("Failed to get resultAds from transfer"))
}
}

success := true
retryable := false
for _, resultAd := range resultAds {
_, err := outputFile.WriteString(resultAd.String() + "\n")
if err != nil {
log.Errorln("Failed to write to outfile:", err)
os.Exit(1)
}
transferSuccess, err := resultAd.Get("TransferSuccess")
if err != nil {
log.Errorln("Failed to get TransferSuccess:", err)
success = false
}
success = success && transferSuccess.(bool)
// If we do not get a success, check if it is retryable
if !success {
retryableTransfer, err := resultAd.Get("TransferRetryable")
transferSuccess, err := resultAd.Get("TransferSuccess")
if err != nil {
log.Errorln("Failed to see if ad is retryable", err)
log.Errorln("Failed to get TransferSuccess:", err)
}
retryable = retryableTransfer.(bool)
// If we are not uploading and we fail, we want to abort
if !upload && !transferSuccess.(bool) {
success = false
break
} else { // Otherwise, we add to end result ads
resultAds = append(resultAds, resultAd)
}
default:
// Nothing yet...
}
}
close(workChan)

if err = outputFile.Sync(); err != nil {
var perr *fs.PathError
var serr syscall.Errno
// Error code 1 (serr) is ERROR_INVALID_FUNCTION, the expected Windows syscall error
// Error code EINVAL is returned on Linux
// Error code ENODEV is returned on Mac OS X
if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) && (int(serr) == 1 || serr == syscall.EINVAL || serr == syscall.ENODEV) {
log.Debugf("Error when syncing: %s; can be ignored\n", perr)
} else {
if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) {
log.Errorf("Failed to sync output file: %s (errno %d)", serr, int(serr))
} else {
log.Errorln("Failed to sync output file:", err)
}
os.Exit(1)
}
// Wait for transfers & results if no failure
if success {
wg.Wait()
}

close(results)

success, retryable := writeOutfile(resultAds, outputFile)

if success {
os.Exit(0)
} else if retryable {
Expand All @@ -296,7 +268,7 @@ func stashPluginMain(args []string) {
func moveObjects(source []string, methods []string, upload bool, wg *sync.WaitGroup, workChan <-chan Transfer, results chan<- *classads.ClassAd) {
defer wg.Done()
var result error
for transfer := range workChan { //instead range workChan
for transfer := range workChan {
var tmpDownloaded int64
if upload {
source = append(source, transfer.localFile)
Expand Down Expand Up @@ -364,6 +336,54 @@ func moveObjects(source []string, methods []string, upload bool, wg *sync.WaitGr
}
}

// WriteOutfile takes in the result ads from the job and the file to be outputted, it returns a boolean indicating:
// true: all result ads indicate transfer success
// false: at least one result ad has failed
// As well as a boolean letting us know if errors are retryable
func writeOutfile(resultAds []*classads.ClassAd, outputFile *os.File) (bool, bool) {
success := true
retryable := false
for _, resultAd := range resultAds {
_, err := outputFile.WriteString(resultAd.String() + "\n")
if err != nil {
log.Errorln("Failed to write to outfile:", err)
os.Exit(1)
}
transferSuccess, err := resultAd.Get("TransferSuccess")
if err != nil {
log.Errorln("Failed to get TransferSuccess:", err)
success = false
}
success = success && transferSuccess.(bool)
// If we do not get a success, check if it is retryable
if !success {
retryableTransfer, err := resultAd.Get("TransferRetryable")
if err != nil {
log.Errorln("Failed to see if ad is retryable", err)
}
retryable = retryableTransfer.(bool)
}
}
if err := outputFile.Sync(); err != nil {
var perr *fs.PathError
var serr syscall.Errno
// Error code 1 (serr) is ERROR_INVALID_FUNCTION, the expected Windows syscall error
// Error code EINVAL is returned on Linux
// Error code ENODEV is returned on Mac OS X
if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) && (int(serr) == 1 || serr == syscall.EINVAL || serr == syscall.ENODEV) {
log.Debugf("Error when syncing: %s; can be ignored\n", perr)
} else {
if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) {
log.Errorf("Failed to sync output file: %s (errno %d)", serr, int(serr))
} else {
log.Errorln("Failed to sync output file:", err)
}
os.Exit(1)
}
}
return success, retryable
}

// readMultiTransfers reads the transfers from a Reader, such as stdin
func readMultiTransfers(stdin bufio.Reader) (transfers []Transfer, err error) {
// Check stdin for a list of transfers
Expand Down

0 comments on commit 33d1d5d

Please sign in to comment.