20
20
import io .vertx .core .http .HttpMethod ;
21
21
import kotlin .Pair ;
22
22
import okhttp3 .ResponseBody ;
23
+ import org .apache .commons .codec .binary .Base64 ;
23
24
import org .apache .commons .lang3 .StringUtils ;
24
25
import org .apache .http .HttpRequest ;
25
26
import org .apache .http .HttpStatus ;
41
42
import javax .ws .rs .core .Response ;
42
43
import java .io .IOException ;
43
44
import java .io .InputStream ;
45
+ import java .io .UnsupportedEncodingException ;
44
46
import java .net .URL ;
47
+ import java .time .Duration ;
45
48
import java .util .List ;
46
49
import java .util .concurrent .TimeUnit ;
47
50
import java .util .function .Predicate ;
55
58
import static org .commonjava .indy .service .httprox .util .MetricsConstants .CONTENT_ENTRY_POINT ;
56
59
import static org .commonjava .indy .service .httprox .util .MetricsConstants .PATH ;
57
60
import static org .commonjava .indy .service .httprox .util .MetricsConstants .METADATA_CONTENT ;
61
+ import static org .commonjava .indy .service .httprox .util .UrlUtils .base64url ;
58
62
59
63
public class ProxyResponseHelper
60
64
{
@@ -66,7 +70,7 @@ public class ProxyResponseHelper
66
70
67
71
private final ProxyConfiguration config ;
68
72
69
- private boolean transferred ;
73
+ private volatile boolean transferred ;
70
74
71
75
private ProxyRepositoryCreator repoCreator ;
72
76
@@ -389,10 +393,10 @@ private void doTransfer( final HttpConduitWrapper http, final ArtifactStore stor
389
393
{
390
394
if ( transferred )
391
395
{
396
+ logger .info ("Transfer already done, store: {}, path: {}" , store .getKey (), path );
392
397
return ;
393
398
}
394
399
395
- transferred = true ;
396
400
if ( !http .isOpen () )
397
401
{
398
402
throw new IOException ( "Sink channel already closed (or null)!" );
@@ -412,7 +416,10 @@ private void doTransfer( final HttpConduitWrapper http, final ArtifactStore stor
412
416
}
413
417
414
418
try {
415
- Uni <okhttp3 .Response > responseUni = contentRetrievalService .doGet (trackingId , store .getType ().name (), store .getName (), path );
419
+ String encodedPath = base64url (path );
420
+ logger .debug ( "Get from content service, store: {}, path: {}" , store .getKey (), encodedPath );
421
+ Uni <okhttp3 .Response > responseUni = contentRetrievalService .doGet (trackingId , store .getType ().name (),
422
+ store .getName (), encodedPath );
416
423
417
424
responseUni .subscribe ().with (
418
425
response ->
@@ -438,7 +445,7 @@ private void doTransfer( final HttpConduitWrapper http, final ArtifactStore stor
438
445
}
439
446
finally
440
447
{
441
- transferred = false ;
448
+ transferred = true ;
442
449
if ( response != null && responseBody != null )
443
450
{
444
451
responseBody .close ();
@@ -457,15 +464,12 @@ private void doTransfer( final HttpConduitWrapper http, final ArtifactStore stor
457
464
}
458
465
finally
459
466
{
460
- transferred = false ;
467
+ transferred = true ;
461
468
}
462
469
}
463
470
);
464
471
465
- while ( transferred )
466
- {
467
- TimeUnit .MILLISECONDS .sleep ( 100 );
468
- }
472
+ responseUni .await ().atMost (Duration .ofMinutes (5 )); // Wait until the item or a failure is emitted
469
473
470
474
if ( meter != null )
471
475
{
0 commit comments