From 33d1d5d3dc9b184751b95d2e855ce610967bd998 Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Tue, 23 Jan 2024 15:16:40 -0600 Subject: [PATCH] Results and work in 1 for loop Allows us to bail early if we encounter a failure on downloads --- cmd/plugin.go | 126 +++++++++++++++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 53 deletions(-) diff --git a/cmd/plugin.go b/cmd/plugin.go index 6b6a28e6a..38103f7fa 100644 --- a/cmd/plugin.go +++ b/cmd/plugin.go @@ -210,7 +210,7 @@ func stashPluginMain(args []string) { var wg sync.WaitGroup - workChan := make(chan Transfer) // might need to be a Transfer object + workChan := make(chan Transfer) results := make(chan *classads.ClassAd, len(transfers)) // start workers @@ -219,68 +219,40 @@ func stashPluginMain(args []string) { go moveObjects(source, methods, upload, &wg, workChan, results) } + success := true + var resultAds []*classads.ClassAd for _, transfer := range transfers { workChan <- transfer - } - close(workChan) - - // Wait for transfers - wg.Wait() - - var resultAds []*classads.ClassAd - // Every transfer should send a resultAd to the results channel - for i := 0; i < len(transfers); i++ { select { + //case workChan <- transfer: + // We successfully sent work case resultAd := <-results: - resultAds = append(resultAds, resultAd) - default: - // If we did not get a result, something terrible happened - log.Errorln(errors.New("Failed to get resultAds from transfer")) - } - } - - success := true - retryable := false - for _, resultAd := range resultAds { - _, err := outputFile.WriteString(resultAd.String() + "\n") - if err != nil { - log.Errorln("Failed to write to outfile:", err) - os.Exit(1) - } - transferSuccess, err := resultAd.Get("TransferSuccess") - if err != nil { - log.Errorln("Failed to get TransferSuccess:", err) - success = false - } - success = success && transferSuccess.(bool) - // If we do not get a success, check if it is retryable - if !success { - retryableTransfer, err := resultAd.Get("TransferRetryable") + transferSuccess, err := resultAd.Get("TransferSuccess") if err != nil { - log.Errorln("Failed to see if ad is retryable", err) + log.Errorln("Failed to get TransferSuccess:", err) } - retryable = retryableTransfer.(bool) + // If we are not uploading and we fail, we want to abort + if !upload && !transferSuccess.(bool) { + success = false + break + } else { // Otherwise, we add to end result ads + resultAds = append(resultAds, resultAd) + } + default: + // Nothing yet... } } + close(workChan) - if err = outputFile.Sync(); err != nil { - var perr *fs.PathError - var serr syscall.Errno - // Error code 1 (serr) is ERROR_INVALID_FUNCTION, the expected Windows syscall error - // Error code EINVAL is returned on Linux - // Error code ENODEV is returned on Mac OS X - if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) && (int(serr) == 1 || serr == syscall.EINVAL || serr == syscall.ENODEV) { - log.Debugf("Error when syncing: %s; can be ignored\n", perr) - } else { - if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) { - log.Errorf("Failed to sync output file: %s (errno %d)", serr, int(serr)) - } else { - log.Errorln("Failed to sync output file:", err) - } - os.Exit(1) - } + // Wait for transfers & results if no failure + if success { + wg.Wait() } + close(results) + + success, retryable := writeOutfile(resultAds, outputFile) + if success { os.Exit(0) } else if retryable { @@ -296,7 +268,7 @@ func stashPluginMain(args []string) { func moveObjects(source []string, methods []string, upload bool, wg *sync.WaitGroup, workChan <-chan Transfer, results chan<- *classads.ClassAd) { defer wg.Done() var result error - for transfer := range workChan { //instead range workChan + for transfer := range workChan { var tmpDownloaded int64 if upload { source = append(source, transfer.localFile) @@ -364,6 +336,54 @@ func moveObjects(source []string, methods []string, upload bool, wg *sync.WaitGr } } +// WriteOutfile takes in the result ads from the job and the file to be outputted, it returns a boolean indicating: +// true: all result ads indicate transfer success +// false: at least one result ad has failed +// As well as a boolean letting us know if errors are retryable +func writeOutfile(resultAds []*classads.ClassAd, outputFile *os.File) (bool, bool) { + success := true + retryable := false + for _, resultAd := range resultAds { + _, err := outputFile.WriteString(resultAd.String() + "\n") + if err != nil { + log.Errorln("Failed to write to outfile:", err) + os.Exit(1) + } + transferSuccess, err := resultAd.Get("TransferSuccess") + if err != nil { + log.Errorln("Failed to get TransferSuccess:", err) + success = false + } + success = success && transferSuccess.(bool) + // If we do not get a success, check if it is retryable + if !success { + retryableTransfer, err := resultAd.Get("TransferRetryable") + if err != nil { + log.Errorln("Failed to see if ad is retryable", err) + } + retryable = retryableTransfer.(bool) + } + } + if err := outputFile.Sync(); err != nil { + var perr *fs.PathError + var serr syscall.Errno + // Error code 1 (serr) is ERROR_INVALID_FUNCTION, the expected Windows syscall error + // Error code EINVAL is returned on Linux + // Error code ENODEV is returned on Mac OS X + if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) && (int(serr) == 1 || serr == syscall.EINVAL || serr == syscall.ENODEV) { + log.Debugf("Error when syncing: %s; can be ignored\n", perr) + } else { + if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) { + log.Errorf("Failed to sync output file: %s (errno %d)", serr, int(serr)) + } else { + log.Errorln("Failed to sync output file:", err) + } + os.Exit(1) + } + } + return success, retryable +} + // readMultiTransfers reads the transfers from a Reader, such as stdin func readMultiTransfers(stdin bufio.Reader) (transfers []Transfer, err error) { // Check stdin for a list of transfers