Skip to content

Commit

Permalink
Add health endpoints (#122)
Browse files Browse the repository at this point in the history
* feat: add health endpoints for LS app and streamer app

* feat: update logic or health checking in LS app and streamer app

* fix: Add block insert time threshold for batch processing

* chore: add message_desc and message_code in healthcheck response

* chore: use atomic variables in HealthStatusCachingServiceImpl

* fix: change properties in @value in HealthStatusServiceImpl.java

* fix: ensure that the values used for health checks are saved after the data is successfully written to the database

* chore: (streamer) add message code and message description in healthcheck response

* fix: fix unit tests in BlockSyncServiceImplTest

* chore: (streamer) use atomic variable in HealthCheckCachingServiceImpl
  • Loading branch information
Sotatek-HuyLe3a authored Feb 16, 2024
1 parent be0d925 commit d5c9c95
Show file tree
Hide file tree
Showing 23 changed files with 688 additions and 11 deletions.
1 change: 1 addition & 0 deletions application/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-configuration-processor'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-jooq'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.cardanofoundation.ledgersync.dto.healthcheck;

import lombok.*;

import java.time.LocalDateTime;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class HealthStatus {
Boolean isHealthy;
String messageCode;
String messageDesc;
LocalDateTime latestBlockInsertTime;
Boolean hasStopSlot;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.cardanofoundation.ledgersync.dto.healthcheck;

import lombok.Getter;

@Getter
public enum Message {
READY_TO_SERVE("READY_TO_SERVE", "Data is ready to serve"),
IS_NOT_SYNCING("IS_NOT_SYNCING", "Connection to node is not healthy, data is not being synchronized"),
SYNCING_BUT_NOT_READY("SYNCING_BUT_NOT_READY", "Data is being synchronized, but it isn't ready to serve yet"),
CONNECTION_HEALTHY_BUT_BLOCK_CONSUMING_NOT_HEALTHY("CONNECTION_HEALTHY_BUT_BLOCK_CONSUMING_NOT_HEALTHY",
"Connection to node is healthy, but the latest block insertion time has exceeded the threshold"),
SYNCING_HAS_FINISHED("SYNCING_HAS_FINISHED",
"Connection to node is healthy, but the latest block insertion time has exceeded the threshold");

private final String code;
private final String desc;

Message(String code, String desc) {
this.code = code;
this.desc = desc;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.cardanofoundation.ledgersync.healthcheck;

import lombok.RequiredArgsConstructor;
import org.cardanofoundation.ledgersync.dto.healthcheck.HealthStatus;
import org.cardanofoundation.ledgersync.service.HealthStatusService;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;

@Endpoint(id = "health-status")
@RequiredArgsConstructor
@Component
public class LedgerSyncHealthEndpoint {

private final HealthStatusService healthStatusService;

@ReadOperation
@Bean
public ResponseEntity<HealthStatus> checkHealthStatus() {
var healthStatus = healthStatusService.getHealthStatus();

if (Boolean.FALSE.equals(healthStatus.getIsHealthy())) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(healthStatus);
}

return ResponseEntity.ok().body(healthStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -34,6 +38,7 @@ public class BlockEventListener {

private final BlockRepository blockRepository;
private final MetricCollectorService metricCollectorService;
private final HealthCheckCachingService healthCheckCachingService;
private final AtomicInteger blockCount = new AtomicInteger(0);

@Value("${blocks.batch-size}")
Expand Down Expand Up @@ -90,14 +95,21 @@ public void handleGenesisBlock(GenesisBlockEvent genesisBlockEvent) {
}

genesisDataService.setupData(genesisHash);
healthCheckCachingService.saveLatestBlockSlot(genesisBlockEvent.getSlot());
healthCheckCachingService.saveLatestBlockInsertTime(LocalDateTime.now(ZoneOffset.UTC));
healthCheckCachingService.saveLatestBlockTime(
LocalDateTime.ofInstant(Instant.ofEpochSecond(genesisBlockEvent.getBlockTime()), ZoneId.of("UTC")));
}

@EventListener
@Transactional
public void handleRollback(RollbackEvent rollbackEvent) {
Long rollbackBlockNo = blockRepository.findBySlotNo(rollbackEvent.getRollbackTo().getSlot())
.map(block -> block.getBlockNo())
.orElse(0L);
long rollbackBlockNo = 0;
var rollBackBlock = blockRepository.findBySlotNo(rollbackEvent.getRollbackTo().getSlot());

if (rollBackBlock.isPresent()) {
rollbackBlockNo = rollBackBlock.get().getBlockNo();
}

if (rollbackBlockNo == 0) {
log.warn("Rollback block no {}, hash {} not found", rollbackEvent.getRollbackTo().getSlot(),
Expand All @@ -109,6 +121,10 @@ public void handleRollback(RollbackEvent rollbackEvent) {
rollbackService.rollBackFrom(rollbackBlockNo);
metricCollectorService.collectRollbackMetric();
blockCount.set(0);

healthCheckCachingService.saveLatestBlockSlot(rollbackEvent.getRollbackTo().getSlot());
healthCheckCachingService.saveLatestBlockInsertTime(LocalDateTime.now(ZoneOffset.UTC));
healthCheckCachingService.saveLatestBlockTime(rollBackBlock.get().getTime().toLocalDateTime());
}

private boolean checkIfBlockExists(EventMetadata metadata) {
Expand Down Expand Up @@ -166,6 +182,13 @@ private void handleAggregateBlock(EventMetadata eventMetadata, AggregatedBlock a
int currentBlockCount = blockCount.incrementAndGet();
if (currentBlockCount % batchSize == 0 || lastReceivedTimeElapsed >= commitThreshold || eventMetadata.isSyncMode()) {
blockSyncService.startBlockSyncing();

healthCheckCachingService.saveLatestBlockInsertTime(LocalDateTime.now(ZoneOffset.UTC));
healthCheckCachingService.saveLatestBlockTime(LocalDateTime.ofEpochSecond(
eventMetadata.getBlockTime(), 0, ZoneOffset.ofHours(0)));
healthCheckCachingService.saveIsSyncMode(eventMetadata.isSyncMode());
healthCheckCachingService.saveLatestBlockSlot(eventMetadata.getSlot());

blockCount.set(0);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.cardanofoundation.ledgersync.service;

import java.time.LocalDateTime;

public interface HealthCheckCachingService {

/**
* Cache latest block time
*/
void saveLatestBlockTime(LocalDateTime blockTime);

/**
* Get the latest block
*/
LocalDateTime getLatestBlockTime();

/**
* Cache the time when the most recent block was inserted
*/
void saveLatestBlockInsertTime(LocalDateTime insertTime);

LocalDateTime getLatestBlockInsertTime();

/**
* Cache latest slot no
*/
void saveLatestBlockSlot(Long slot);

/**
* Get latest slot no
*/
Long getLatestBlockSlot();

/**
* Cache the value indicates whether the yaci is crawling with sync mode or not (use Chain-Sync protocol)
*/
void saveIsSyncMode(Boolean isSyncMode);

/**
* Get the value indicates whether the yaci is crawling with sync mode or not (use Chain-Sync protocol)
*/
Boolean getIsSyncMode();
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.cardanofoundation.ledgersync.service;


import org.cardanofoundation.ledgersync.dto.healthcheck.HealthStatus;

public interface HealthStatusService {

HealthStatus getHealthStatus();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.cardanofoundation.ledgersync.service.impl;

import jakarta.annotation.PostConstruct;
import org.cardanofoundation.ledgersync.service.HealthCheckCachingService;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

@Service
public class HealthStatusCachingServiceImpl implements HealthCheckCachingService {
private LocalDateTime latestBlockTime;
private LocalDateTime latestBlockInsertTime;
private final AtomicLong latestBlockSlot = new AtomicLong();
private final AtomicBoolean isSyncMode = new AtomicBoolean();

@PostConstruct
void init() {
latestBlockInsertTime = LocalDateTime.now(ZoneOffset.UTC);
latestBlockSlot.set(-10L); // dummy value
isSyncMode.set(Boolean.FALSE);
}

@Override
public void saveLatestBlockTime(LocalDateTime blockTime) {
latestBlockTime = blockTime;
}

@Override
public LocalDateTime getLatestBlockTime() {
return latestBlockTime;
}

@Override
public void saveLatestBlockInsertTime(LocalDateTime insertTime) {
latestBlockInsertTime = insertTime;
}

@Override
public LocalDateTime getLatestBlockInsertTime() {
return latestBlockInsertTime;
}

@Override
public void saveLatestBlockSlot(Long slot) {
latestBlockSlot.set(slot);
}

@Override
public Long getLatestBlockSlot() {
return latestBlockSlot.get();
}

@Override
public void saveIsSyncMode(Boolean value) {
isSyncMode.set(value);
}

@Override
public Boolean getIsSyncMode() {
return isSyncMode.get();
}
}
Loading

0 comments on commit d5c9c95

Please sign in to comment.