@@ -82,14 +82,14 @@ public void startConsumption(
8282 int partition ,
8383 Optional <PubSubPosition > pubSubPosition ) {
8484 String storeVersion = storeConfig .getStoreVersionName ();
85- LOGGER .info ("Retrieving storage engine for store {} partition {}" , storeVersion , partition );
85+ String replicaId = Utils .getReplicaId (storeVersion , partition );
86+ LOGGER .info ("Retrieving storage engine for replica {}" , replicaId );
87+
8688 StoreVersionInfo storeAndVersion =
8789 Utils .waitStoreVersionOrThrow (storeVersion , getStoreIngestionService ().getMetadataRepo ());
8890 Supplier <StoreVersionState > svsSupplier = () -> storageMetadataService .getStoreVersionState (storeVersion );
8991 syncStoreVersionConfig (storeAndVersion .getStore (), storeConfig );
9092
91- String replicaId = Utils .getReplicaId (storeVersion , partition );
92-
9393 if (!isIsolatedIngestion ) {
9494 ReplicaConsumptionContext replicaContext = getOrCreateReplicaContext (replicaId );
9595
@@ -120,15 +120,9 @@ public void startConsumption(
120120 }
121121 return storageEngineAtomicReference ;
122122 });
123- LOGGER .info (
124- "Retrieved storage engine for store {} partition {}. Starting consumption in ingestion service" ,
125- storeVersion ,
126- partition );
123+ LOGGER .info ("Retrieved storage engine for replica {}. Starting consumption in ingestion service" , replicaId );
127124 getStoreIngestionService ().startConsumption (storeConfig , partition , pubSubPosition );
128- LOGGER .info (
129- "Completed starting consumption in ingestion service for store {} partition {}" ,
130- storeVersion ,
131- partition );
125+ LOGGER .info ("Completed starting consumption in ingestion service for replica {}" , replicaId );
132126 };
133127
134128 boolean blobTransferActiveInReceiver = shouldEnableBlobTransfer (storeAndVersion .getStore ());
@@ -586,6 +580,11 @@ private void stopBlobTransferAndWait(VeniceStoreVersionConfig storeConfig, int p
586580 // Poll until the blob transfer reaches final state (null or TRANSFER_COMPLETED or TRANSFER_CANCELLED)
587581 final int waitIntervalInSecond = 1 ;
588582 final int maxRetry = timeoutInSeconds / waitIntervalInSecond ;
583+
584+ BlobTransferStatus initialStatus =
585+ blobTransferManager .getTransferStatusTrackingManager ().getTransferStatus (replicaId );
586+
587+ BlobTransferStatus previousStatus = initialStatus ;
589588 int retries = 0 ;
590589 while (!blobTransferManager .getTransferStatusTrackingManager ().isTransferInFinalState (replicaId )
591590 && retries < maxRetry ) {
@@ -594,23 +593,32 @@ private void stopBlobTransferAndWait(VeniceStoreVersionConfig storeConfig, int p
594593 retries ++;
595594 BlobTransferStatus currentStatus =
596595 blobTransferManager .getTransferStatusTrackingManager ().getTransferStatus (replicaId );
597- LOGGER .info (
598- "Waiting for blob transfer to complete for replica {} (attempt {}/{}). Current status: {}" ,
599- replicaId ,
600- retries ,
601- maxRetry ,
602- currentStatus );
596+
597+ if (currentStatus != previousStatus ) {
598+ LOGGER .info (
599+ "Blob transfer status changed for replica {} (attempt {}/{}): {} -> {}" ,
600+ replicaId ,
601+ retries ,
602+ maxRetry ,
603+ previousStatus ,
604+ currentStatus );
605+ previousStatus = currentStatus ;
606+ }
603607 } catch (InterruptedException e ) {
604608 LOGGER .warn ("Interrupted while waiting for blob transfer to complete for replica {}" , replicaId , e );
605609 Thread .currentThread ().interrupt ();
606610 break ;
607611 }
608612 }
609613
614+ BlobTransferStatus finalStatus =
615+ blobTransferManager .getTransferStatusTrackingManager ().getTransferStatus (replicaId );
610616 LOGGER .info (
611- "Blob transfer for replica {} (final status : {}) , proceeding with partition drop" ,
617+ "Blob transfer wait completed for replica {} after {} attempts. Status transition : {} -> {} , proceeding with partition drop" ,
612618 replicaId ,
613- blobTransferManager .getTransferStatusTrackingManager ().getTransferStatus (replicaId ));
619+ retries ,
620+ initialStatus ,
621+ finalStatus );
614622 } finally {
615623 blobTransferManager .getTransferStatusTrackingManager ().clearTransferStatusEnum (replicaId );
616624 consumptionLocks .remove (replicaId );
0 commit comments