@@ -215,34 +215,48 @@ func (r *Run) exec(ctx context.Context, extraArgs ...string) error {
215
215
func (r * Run ) readEvents (ctx context.Context , events io.Reader ) {
216
216
defer close (r .events )
217
217
218
- scan := bufio .NewScanner (events )
219
- for scan .Scan () {
220
- if ! r .opts .IncludeEvents {
221
- continue
222
- }
218
+ var (
219
+ n int
220
+ err error
221
+ frag []byte
223
222
224
- line := scan .Bytes ()
225
- if len (line ) == 0 {
226
- continue
223
+ b = make ([]byte , 64 * 1024 )
224
+ )
225
+ for ; ; n , err = events .Read (b ) {
226
+ if n == 0 && err != nil {
227
+ break
227
228
}
228
229
229
- var event Event
230
- if err := json .Unmarshal (line , & event ); err != nil {
231
- slog .Debug ("failed to unmarshal event" , "error" , err , "event" , string (line ))
230
+ if ! r .opts .IncludeEvents {
232
231
continue
233
232
}
234
233
235
- select {
236
- case <- ctx .Done ():
237
- go func () {
238
- for scan .Scan () {
239
- // Drain any remaining events
240
- }
241
- }()
242
- return
243
- case r .events <- event :
234
+ for _ , line := range bytes .Split (append (frag , b [:n ]... ), []byte ("\n \n " )) {
235
+ if line = bytes .TrimSpace (line ); len (line ) == 0 {
236
+ frag = frag [:0 ]
237
+ continue
238
+ }
239
+
240
+ var event Event
241
+ if err := json .Unmarshal (line , & event ); err != nil {
242
+ slog .Debug ("failed to unmarshal event" , "error" , err , "event" , string (b ))
243
+ frag = line [:]
244
+ continue
245
+ }
246
+
247
+ select {
248
+ case <- ctx .Done ():
249
+ return
250
+ case r .events <- event :
251
+ frag = frag [:0 ]
252
+ }
244
253
}
245
254
}
255
+
256
+ if err != nil && ! errors .Is (err , io .EOF ) {
257
+ slog .Debug ("failed to read events" , "error" , err )
258
+ r .err = fmt .Errorf ("failed to read events: %w" , err )
259
+ }
246
260
}
247
261
248
262
func (r * Run ) readAllOutput () error {
@@ -367,16 +381,14 @@ func (r *Run) request(ctx context.Context, payload any) (err error) {
367
381
go r .readEvents (cancelCtx , eventsRead )
368
382
369
383
go func () {
384
+ var (
385
+ n int
386
+ frag []byte
387
+ buf = make ([]byte , 64 * 1024 )
388
+ )
370
389
bufferedStdout := bufio .NewWriter (stdoutWriter )
371
390
bufferedStderr := bufio .NewWriter (stderrWriter )
372
- scan := bufio .NewScanner (resp .Body )
373
391
defer func () {
374
- go func () {
375
- for scan .Scan () {
376
- // Drain any remaining events
377
- }
378
- }()
379
-
380
392
eventsWrite .Close ()
381
393
382
394
bufferedStderr .Flush ()
@@ -390,38 +402,59 @@ func (r *Run) request(ctx context.Context, payload any) (err error) {
390
402
resp .Body .Close ()
391
403
}()
392
404
393
- for scan .Scan () {
394
- line := bytes .TrimSpace (bytes .TrimPrefix (scan .Bytes (), []byte ("data: " )))
395
- if len (line ) == 0 {
396
- continue
397
- }
398
- if bytes .Equal (line , []byte ("[DONE]" )) {
399
- return
405
+ for ; ; n , err = resp .Body .Read (buf ) {
406
+ if n == 0 && err != nil {
407
+ break
400
408
}
401
409
402
- if bytes .HasPrefix (line , []byte (`{"stdout":` )) {
403
- _ , err = bufferedStdout .Write (bytes .TrimSuffix (bytes .TrimPrefix (line , []byte (`{"stdout":` )), []byte ("}" )))
404
- if err != nil {
405
- r .state = Error
406
- r .err = fmt .Errorf ("failed to write stdout: %w" , err )
407
- return
410
+ for _ , line := range bytes .Split (bytes .TrimSpace (append (frag , buf [:n ]... )), []byte ("\n \n " )) {
411
+ line = bytes .TrimSpace (bytes .TrimPrefix (line , []byte ("data: " )))
412
+ if len (line ) == 0 {
413
+ frag = frag [:0 ]
414
+ continue
408
415
}
409
- } else if bytes .HasPrefix (line , []byte (`{"stderr":` )) {
410
- _ , err = bufferedStderr .Write (bytes .TrimSuffix (bytes .TrimPrefix (line , []byte (`{"stderr":` )), []byte ("}" )))
411
- if err != nil {
412
- r .state = Error
413
- r .err = fmt .Errorf ("failed to write stderr: %w" , err )
416
+ if bytes .Equal (line , []byte ("[DONE]" )) {
414
417
return
415
418
}
416
- } else {
417
- _ , err = eventsWrite .Write (append (line , '\n' ))
418
- if err != nil {
419
- r .state = Error
420
- r .err = fmt .Errorf ("failed to write events: %w" , err )
421
- return
419
+
420
+ // Is this a JSON object?
421
+ if err := json .Unmarshal (line , & []map [string ]any {make (map [string ]any )}[0 ]); err != nil {
422
+ // If not, then wait until we get the rest of the output.
423
+ frag = line [:]
424
+ continue
425
+ }
426
+
427
+ frag = frag [:0 ]
428
+
429
+ if bytes .HasPrefix (line , []byte (`{"stdout":` )) {
430
+ _ , err = bufferedStdout .Write (bytes .TrimSuffix (bytes .TrimPrefix (line , []byte (`{"stdout":` )), []byte ("}" )))
431
+ if err != nil {
432
+ r .state = Error
433
+ r .err = fmt .Errorf ("failed to write stdout: %w" , err )
434
+ return
435
+ }
436
+ } else if bytes .HasPrefix (line , []byte (`{"stderr":` )) {
437
+ _ , err = bufferedStderr .Write (bytes .TrimSuffix (bytes .TrimPrefix (line , []byte (`{"stderr":` )), []byte ("}" )))
438
+ if err != nil {
439
+ r .state = Error
440
+ r .err = fmt .Errorf ("failed to write stderr: %w" , err )
441
+ return
442
+ }
443
+ } else {
444
+ _ , err = eventsWrite .Write (append (line , '\n' , '\n' ))
445
+ if err != nil {
446
+ r .state = Error
447
+ r .err = fmt .Errorf ("failed to write events: %w" , err )
448
+ return
449
+ }
422
450
}
423
451
}
424
452
}
453
+
454
+ if err != nil && ! errors .Is (err , io .EOF ) {
455
+ slog .Debug ("failed to read events from response" , "error" , err )
456
+ r .err = fmt .Errorf ("failed to read events: %w" , err )
457
+ }
425
458
}()
426
459
427
460
r .wait = func () error {
0 commit comments