diff --git a/block-node/base/src/main/java/module-info.java b/block-node/base/src/main/java/module-info.java index f5be6eda32..b5c4588102 100644 --- a/block-node/base/src/main/java/module-info.java +++ b/block-node/base/src/main/java/module-info.java @@ -7,6 +7,7 @@ requires transitive org.hiero.block.node.spi; requires com.hedera.pbj.runtime; + requires org.hiero.block.common; requires com.github.luben.zstd_jni; requires java.net.http; requires java.xml; diff --git a/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3Client.java b/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3Client.java index abc43b99c0..26bd11967b 100644 --- a/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3Client.java +++ b/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3Client.java @@ -3,12 +3,14 @@ import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; +import java.io.InputStream; import java.io.UncheckedIOException; import java.net.URI; import java.net.URISyntaxException; import java.net.URLDecoder; import java.net.URLEncoder; import java.net.http.HttpClient; +import java.net.http.HttpHeaders; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandler; @@ -30,10 +32,16 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilderFactory; +import org.hiero.block.common.utils.Preconditions; +import org.hiero.block.common.utils.StringUtilities; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.NodeList; @@ -42,7 +50,7 @@ * Simple standalone S3 client for uploading, downloading and listing objects from S3. */ @SuppressWarnings("JavadocLinkAsPlainText") -public class S3Client implements AutoCloseable { +public final class S3Client implements AutoCloseable { /* Set the system property to allow restricted headers in HttpClient */ static { System.setProperty("jdk.httpclient.allowRestrictedHeaders", "Host,Content-Length"); @@ -69,6 +77,16 @@ public class S3Client implements AutoCloseable { private static final char QUERY_PARAMETER_SEPARATOR = '&'; /** The query parameter value separator **/ private static final char QUERY_PARAMETER_VALUE_SEPARATOR = '='; + /** The size limit of response body to be read for an exceptional response */ + private static final int ERROR_BODY_MAX_LENGTH = 32_768; + /** The GET HTTP Method canonical name **/ + private static final String GET = "GET"; + /** The POST HTTP Method canonical name **/ + private static final String POST = "POST"; + /** The PUT HTTP Method canonical name **/ + private static final String PUT = "PUT"; + /** The DELETE HTTP Method canonical name **/ + private static final String DELETE = "DELETE"; /** The S3 region name **/ private final String regionName; @@ -82,30 +100,43 @@ public class S3Client implements AutoCloseable { private final String secretKey; /** The HTTP client used for making requests **/ private final HttpClient httpClient; + /** The document builder factory used for response body parsing **/ + private final DocumentBuilderFactory documentBuilderFactory; /** * Constructor for S3Client. * + * @param regionName The S3 region name (e.g. "us-east-1"). * @param endpoint The S3 endpoint URL (e.g. "https://s3.amazonaws.com/"). * @param bucketName The name of the S3 bucket. * @param accessKey The S3 access key. * @param secretKey The S3 secret key. + * @throws S3ClientInitializationException if an error occurs during + * client initialization or preconditions are not met. */ public S3Client( - final String regionName, - final String endpoint, - final String bucketName, - final String accessKey, - final String secretKey) { - this.regionName = regionName; - this.endpoint = endpoint.endsWith("/") ? endpoint : endpoint + "/"; - this.bucketName = bucketName; - this.accessKey = accessKey; - this.secretKey = secretKey; - this.httpClient = HttpClient.newBuilder() - .version(HttpClient.Version.HTTP_1_1) - .connectTimeout(Duration.ofSeconds(30)) - .build(); + @NonNull final String regionName, + @NonNull final String endpoint, + @NonNull final String bucketName, + @NonNull final String accessKey, + @NonNull final String secretKey) + throws S3ClientInitializationException { + try { + this.regionName = Preconditions.requireNotBlank(regionName); + this.endpoint = Preconditions.requireNotBlank(endpoint).endsWith("/") ? endpoint : endpoint + "/"; + this.bucketName = Preconditions.requireNotBlank(bucketName); + this.accessKey = Preconditions.requireNotBlank(accessKey); + this.secretKey = Preconditions.requireNotBlank(secretKey); + this.httpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .connectTimeout(Duration.ofSeconds(30)) + .build(); + this.documentBuilderFactory = DocumentBuilderFactory.newInstance(); + this.documentBuilderFactory.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true); + this.documentBuilderFactory.setNamespaceAware(true); + } catch (final Exception e) { + throw new S3ClientInitializationException(e); + } } /** @@ -122,27 +153,50 @@ public void close() throws Exception { * Lists objects in the S3 bucket with the specified prefix * * @param prefix The prefix to filter the objects. + * Use {@link StringUtilities#EMPTY} for a wildcard prefix (list all). * @param maxResults The maximum number of results to return. * @return A list of object keys. + * @throws S3ResponseException if a non-200 response is received from S3 + * @throws IOException if an error occurs while reading the response body */ - public List listObjects(String prefix, int maxResults) { - String canonicalQueryString = "list-type=2&prefix=" + prefix + "&max-keys=" + maxResults; - HttpResponse response = - requestXML(endpoint + bucketName + "/?" + canonicalQueryString, "GET", Collections.emptyMap(), null); - // extract the object keys from the XML response - List keys = new ArrayList<>(); - // Get all "Contents" elements - NodeList contentsNodes = response.body().getElementsByTagName("Contents"); - for (int i = 0; i < contentsNodes.getLength(); i++) { - Element contentsElement = (Element) contentsNodes.item(i); - - // Get the "Key" element inside each "Contents" - NodeList keyNodes = contentsElement.getElementsByTagName("Key"); - if (keyNodes.getLength() > 0) { - keys.add(keyNodes.item(0).getTextContent()); + public List listObjects(@NonNull final String prefix, final int maxResults) + throws S3ResponseException, IOException { + Objects.requireNonNull(prefix); + Preconditions.requireInRange(maxResults, 1, 1000); + // build a canonical query string with the prefix and max results + final String canonicalQueryString = "list-type=2&prefix=" + prefix + "&max-keys=" + maxResults; + // build the URL for the request + final String url = endpoint + bucketName + "/?" + canonicalQueryString; + // make the request to S3 + final HttpResponse response = + request(url, GET, Collections.emptyMap(), null, BodyHandlers.ofInputStream()); + // get status code + final int responseStatusCode = response.statusCode(); + // parse the response body as XML, we always expect a body here generally + try (final InputStream in = response.body()) { // ensure body stream is always closed + if (responseStatusCode != 200) { + final String formattedPrefix = StringUtilities.isBlank(prefix) ? "BLANK_PREFIX" : prefix; + final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH); + final HttpHeaders responseHeaders = response.headers(); + final String message = "Unsuccessful listing of objects: prefix=%s, maxResults=%s" + .formatted(formattedPrefix, maxResults); + throw new S3ResponseException(responseStatusCode, responseBody, responseHeaders, message); + } else { + // extract the object keys from the XML response + final List keys = new ArrayList<>(); + // Get all "Contents" elements + final NodeList contentsNodes = parseDocument(in).getElementsByTagName("Contents"); + for (int i = 0; i < contentsNodes.getLength(); i++) { + final Element contentsElement = (Element) contentsNodes.item(i); + // Get the "Key" element inside each "Contents" + final NodeList keyNodes = contentsElement.getElementsByTagName("Key"); + if (keyNodes.getLength() > 0) { + keys.add(keyNodes.item(0).getTextContent()); + } + } + return keys; } } - return keys; } /** @@ -151,56 +205,92 @@ public List listObjects(String prefix, int maxResults) { * @param objectKey the key for the object in S3 (e.g., "myfolder/myfile.txt") * @param storageClass the storage class (e.g., "STANDARD", "REDUCED_REDUNDANCY") * @param content the content of the file as a string - * @return true if the upload was successful, false otherwise + * @throws S3ResponseException if a non-200 response is received from S3 during file upload + * @throws IOException if an error occurs while reading the response body in case of non 200 response */ - public boolean uploadTextFile(String objectKey, String storageClass, String content) { + public void uploadTextFile( + @NonNull final String objectKey, @NonNull final String storageClass, @NonNull final String content) + throws S3ResponseException, IOException { + Preconditions.requireNotBlank(objectKey); + Preconditions.requireNotBlank(storageClass); + Preconditions.requireNotBlank(content); + // get content data final byte[] contentData = content.getBytes(StandardCharsets.UTF_8); + // initialize headers final Map headers = new HashMap<>(); headers.put("content-length", Integer.toString(contentData.length)); headers.put("content-type", "text/plain"); headers.put("x-amz-storage-class", storageClass); headers.put("x-amz-content-sha256", base64(sha256(contentData))); - HttpResponse response = - requestXML(endpoint + bucketName + "/" + urlEncode(objectKey, true), "PUT", headers, contentData); - if (response.statusCode() != 200) { - throw new RuntimeException("Failed to upload text file: " + response.statusCode() + "\n" + response.body()); + // build the URL for the request + final String url = endpoint + bucketName + "/" + urlEncode(objectKey, true); + // make the request to S3 + final HttpResponse response = + request(url, PUT, headers, contentData, BodyHandlers.ofInputStream()); + // get anc check status code + final int responseStatusCode = response.statusCode(); + try (final InputStream in = response.body()) { // ensure body stream is always closed + if (responseStatusCode != 200) { + final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH); + final HttpHeaders responseHeaders = response.headers(); + final String message = "Failed to upload text file: key=%s".formatted(objectKey); + throw new S3ResponseException( + responseStatusCode, + responseBody, // we expect a body here if the upload fails + responseHeaders, + message); + } } - return true; } /** * Downloads a text file from S3, assumes the file is small enough as uses single part download. * - * @param objectKey the key for the object in S3 (e.g., "myfolder/myfile.txt") + * @param key the key for the object in S3 (e.g., "myfolder/myfile.txt"), cannot be blank * @return the content of the file as a string, null if the file doesn't exist + * @throws S3ResponseException if a non-200 response is received from S3 during file download + * @throws IOException if an error occurs while reading the response body in case of non 200 response */ - public String downloadTextFile(String objectKey) { - final Map headers = new HashMap<>(); - HttpResponse response = request( - endpoint + bucketName + "/" + urlEncode(objectKey, true), - "GET", - headers, - new byte[0], - BodyHandlers.ofString(StandardCharsets.UTF_8)); - if (response.statusCode() == 404) { - return null; - } else if (response.statusCode() != 200) { - throw new RuntimeException( - "Failed to download text file: " + response.statusCode() + "\n" + response.body()); + public String downloadTextFile(@NonNull final String key) throws S3ResponseException, IOException { + Preconditions.requireNotBlank(key); + // build the URL for the request + final String url = endpoint + bucketName + "/" + urlEncode(key, true); + // make the request + final HttpResponse response = + request(url, GET, Collections.emptyMap(), null, BodyHandlers.ofInputStream()); + // check status code and return value + final int responseStatusCode = response.statusCode(); + try (final InputStream in = response.body()) { // ensure body stream is always closed + if (responseStatusCode == 404) { + // if not found, return null + return null; + } else if (responseStatusCode != 200) { + final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH); + final HttpHeaders responseHeaders = response.headers(); + final String message = "Failed to download text file: key=%s".formatted(key); + throw new S3ResponseException(responseStatusCode, responseBody, responseHeaders, message); + } else { + return new String(in.readAllBytes(), StandardCharsets.UTF_8); + } } - return response.body(); } /** - * Uploads a file to S3 using multipart upload + * Uploads a file to S3 using multipart upload. * - * @param objectKey the key for the object in S3 (e.g., "myfolder/myfile.txt") - * @param contentIterable an Iterable of byte arrays representing the file content - * @param contentType the content type of the file (e.g., "text/plain") - * @return true if the upload was successful, false otherwise + * @param objectKey the key for the object in S3 (e.g., "myfolder/myfile.txt"), cannot be blank + * @param storageClass the storage class (e.g., "STANDARD", "REDUCED_REDUNDANCY"), cannot be blank + * @param contentIterable an Iterable of byte arrays representing the file content, cannot be null + * @param contentType the content type of the file (e.g., "text/plain"), cannot be blank + * @throws S3ResponseException if a non-200 response is received from S3 during file upload + * @throws IOException if an error occurs while reading the response body in case of non 200 response */ - public boolean uploadFile( - String objectKey, String storageClass, Iterator contentIterable, String contentType) { + public void uploadFile( + @NonNull final String objectKey, + @NonNull final String storageClass, + @NonNull final Iterator contentIterable, + @NonNull final String contentType) + throws S3ResponseException, IOException { // start the multipart upload final String uploadId = createMultipartUpload(objectKey, storageClass, contentType); // create a list to store the ETags of the uploaded parts @@ -242,77 +332,197 @@ public boolean uploadFile( } // Complete the multipart upload completeMultipartUpload(objectKey, uploadId, eTags); - return true; + } + + /** + * This method will list all multipart uploads currently active for the given + * bucket. It will return a map of object keys and upload IDs, or an empty + * map if none found. + * + * @return a map of all multipart uploads in the bucket, where the key is + * the object key and the value is a list of upload IDs, or an empty map if + * none are found. + * @throws S3ResponseException if a non-200 response is received from S3 + * @throws IOException if an error occurs while reading the response body + */ + @NonNull + public Map> listMultipartUploads() throws S3ResponseException, IOException { + // todo could add some query parameters to limit the number of results + // also, we could add query params for prefix or maybe key-marker (to search for a specific key) + // it depends on our needs as to how we will be cleaning up outstanding failed uploads (TBD) + // build the URL for the request + final String canonicalQueryString = "uploads="; + // build the request URL + final String url = endpoint + bucketName + "/" + "?" + canonicalQueryString; + // make the request + final HttpResponse response = + request(url, GET, Collections.emptyMap(), null, BodyHandlers.ofInputStream()); + // check status code + final int responseStatusCode = response.statusCode(); + try (final InputStream in = response.body()) { // ensure body stream is always closed + if (responseStatusCode != 200) { + final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH); + final HttpHeaders responseHeaders = response.headers(); + final String message = "Failed to list multipart uploads"; + throw new S3ResponseException(responseStatusCode, responseBody, responseHeaders, message); + } else { + // build a map of upload IDs + final Map> uploadIds = new TreeMap<>(); + final Document bodyAsDocument = parseDocument(in); + final NodeList uploads = bodyAsDocument.getElementsByTagName("Upload"); + final int length = uploads.getLength(); + for (int i = 0; i < length; i++) { + final Element uploadElement = (Element) uploads.item(i); + final String key = + uploadElement.getElementsByTagName("Key").item(0).getTextContent(); + final String uploadId = uploadElement + .getElementsByTagName("UploadId") + .item(0) + .getTextContent(); + // Add the UploadId to the map under the corresponding Key + uploadIds.computeIfAbsent(key, k -> new ArrayList<>()).add(uploadId); + } + return Collections.unmodifiableMap(uploadIds); + } + } + } + + /** + * This method will abort a multipart upload for the specified object key. + * + * @param key the object key, cannot be blank + * @param uploadId the upload ID for the multipart upload, cannot be blank + * @throws S3ResponseException if a non-204 response is received from S3 + * @throws IOException if an error occurs while reading the response body in case of non-204 response + */ + public void abortMultipartUpload(@NonNull final String key, @NonNull final String uploadId) + throws S3ResponseException, IOException { + Preconditions.requireNotBlank(key); + Preconditions.requireNotBlank(uploadId); + // build the canonical query string + final String canonicalQueryString = "uploadId=" + uploadId; + // build the request URL + final String url = endpoint + bucketName + "/" + key + "?" + canonicalQueryString; + // make the request + final HttpResponse response = + request(url, DELETE, Collections.emptyMap(), null, BodyHandlers.ofInputStream()); + // check status code + final int responseStatusCode = response.statusCode(); + try (final InputStream in = response.body()) { // ensure body stream is always closed + if (responseStatusCode != 204) { + final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH); + final HttpHeaders responseHeaders = response.headers(); + final String message = "Failed to abort multipart upload: key=%s, uploadId=%s".formatted(key, uploadId); + throw new S3ResponseException(responseStatusCode, responseBody, responseHeaders, message); + } + } } /** * Creates a multipart upload for the specified object key. * - * @param key The object key. - * @param storageClass The storage class (e.g. "STANDARD", "REDUCED_REDUNDANCY"). - * @param contentType The content type of the object. - * @return The upload ID for the multipart upload. + * @param key The object key, cannot be null + * @param storageClass The storage class (e.g. "STANDARD", "REDUCED_REDUNDANCY"), nullable + * @param contentType The content type of the object, cannot be null + * @return The upload ID for the multipart upload + * @throws S3ResponseException if a non-200 response is received from S3 + * @throws IOException if an error occurs while reading the response body */ - String createMultipartUpload(String key, String storageClass, String contentType) { - String canonicalQueryString = "uploads="; - Map headers = new HashMap<>(); + String createMultipartUpload( + @NonNull final String key, @NonNull final String storageClass, @NonNull final String contentType) + throws S3ResponseException, IOException { + // build the canonical query string + final String canonicalQueryString = "uploads="; + // build the request headers + final Map headers = new HashMap<>(); headers.put("content-type", contentType); - if (storageClass != null) { - headers.put("x-amz-storage-class", storageClass); - } + headers.put("x-amz-storage-class", storageClass); // TODO add checksum algorithm and overall checksum support using x-amz-checksum-algorithm=SHA256 and // x-amz-checksum-type=COMPOSITE - try { - HttpResponse response = - requestXML(endpoint + bucketName + "/" + key + "?" + canonicalQueryString, "POST", headers, null); - if (response.statusCode() != 200) { - throw new RuntimeException("Failed to create multipart upload: " + response.statusCode()); + // build the request URL + final String url = endpoint + bucketName + "/" + key + "?" + canonicalQueryString; + // make the request + final HttpResponse response = request(url, POST, headers, null, BodyHandlers.ofInputStream()); + // parse the response body as XML and check status + final int responseStatusCode = response.statusCode(); + try (final InputStream in = response.body()) { // ensure body stream is always closed + if (responseStatusCode != 200) { + final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH); + final HttpHeaders responseHeaders = response.headers(); + final String message = "Failed to create multipart upload: key=%s".formatted(key); + throw new S3ResponseException(responseStatusCode, responseBody, responseHeaders, message); + } else { + return parseDocument(in) + .getElementsByTagName("UploadId") + .item(0) + .getTextContent(); } - return response.body().getElementsByTagName("UploadId").item(0).getTextContent(); - } catch (Exception e) { - throw new RuntimeException(e); } } /** * Uploads a part of a multipart upload. * - * @param key The object key. - * @param uploadId The upload ID for the multipart upload. - * @param partNumber The part number (1-based). - * @param partData The data for the part. - * @return The ETag of the uploaded part. + * @param key The object key, cannot be blank + * @param uploadId The upload ID for the multipart upload, cannot be blank + * @param partNumber The part number (1-based) + * @param partData The data for the part, cannot be null + * @return The ETag of the uploaded part + * @throws S3ResponseException if a non-200 response is received from S3 + * @throws IOException if an error occurs while reading the response body */ - String multipartUploadPart(String key, String uploadId, int partNumber, byte[] partData) { - String canonicalQueryString = "uploadId=" + uploadId + "&partNumber=" + partNumber; - Map headers = new HashMap<>(); + String multipartUploadPart( + @NonNull final String key, + @NonNull final String uploadId, + final int partNumber, + @NonNull final byte[] partData) + throws S3ResponseException, IOException { + // build the canonical query string + final String canonicalQueryString = "uploadId=" + uploadId + "&partNumber=" + partNumber; + // build request headers + final Map headers = new HashMap<>(); headers.put("content-length", Integer.toString(partData.length)); headers.put("content-type", "application/octet-stream"); headers.put("x-amz-content-sha256", base64(sha256(partData))); - try { - HttpResponse response = requestXML( - endpoint + bucketName + "/" + key + "?" + canonicalQueryString, "PUT", headers, partData); - if (response.statusCode() != 200) { - throw new RuntimeException("Failed to upload multipart part: " + response.statusCode()); + // build the URL for the request + final String url = endpoint + bucketName + "/" + key + "?" + canonicalQueryString; + // make the request + final HttpResponse response = request(url, PUT, headers, partData, BodyHandlers.ofInputStream()); + // check status code + final int responseStatusCode = response.statusCode(); + try (final InputStream in = response.body()) { // ensure body stream is always closed + if (responseStatusCode != 200) { + // throw if request not successful + final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH); + final HttpHeaders responseHeaders = response.headers(); + final String message = "Failed to upload multipart part: key=%s, uploadId=%s, partNumber=%d" + .formatted(key, uploadId, partNumber); + throw new S3ResponseException(responseStatusCode, responseBody, responseHeaders, message); + } else { + return response.headers().firstValue("ETag").orElse(null); } - return response.headers().firstValue("ETag").orElse(null); - } catch (Exception e) { - throw new RuntimeException(e); } } /** * Completes a multipart upload. * - * @param key The object key. - * @param uploadId The upload ID for the multipart upload. - * @param eTags The list of ETags for the uploaded parts. + * @param key The object key, cannot be blank + * @param uploadId The upload ID for the multipart upload, cannot be blank + * @param eTags The list of ETags for the uploaded parts, cannot be null + * @throws S3ResponseException if a non-200 response is received from S3 + * @throws IOException if an error occurs while reading the response body in case of non 200 response */ - void completeMultipartUpload(String key, String uploadId, List eTags) { - String canonicalQueryString = "uploadId=" + uploadId; - Map headers = new HashMap<>(); + void completeMultipartUpload( + @NonNull final String key, @NonNull final String uploadId, @NonNull final List eTags) + throws S3ResponseException, IOException { + // build canonical query string + final String canonicalQueryString = "uploadId=" + uploadId; + // build the headers for the request + final Map headers = new HashMap<>(); headers.put("content-type", "application/xml"); - StringBuilder sb = new StringBuilder(); + // build the body of the request + final StringBuilder sb = new StringBuilder(); sb.append(""); for (int i = 0; i < eTags.size(); i++) { sb.append("") @@ -322,83 +532,80 @@ void completeMultipartUpload(String key, String uploadId, List eTags) { .append(""); } sb.append(""); - byte[] partData = sb.toString().getBytes(StandardCharsets.UTF_8); - HttpResponse response = - requestXML(endpoint + bucketName + "/" + key + "?" + canonicalQueryString, "POST", headers, partData); - if (response.statusCode() != 200) { - throw new RuntimeException("Failed to complete multipart upload: " + response.statusCode()); + final byte[] requestBody = sb.toString().getBytes(StandardCharsets.UTF_8); + // build the request URL + final String url = endpoint + bucketName + "/" + key + "?" + canonicalQueryString; + // make the request + final HttpResponse response = + request(url, POST, headers, requestBody, BodyHandlers.ofInputStream()); + // check status code + final int responseStatusCode = response.statusCode(); + try (final InputStream in = response.body()) { // ensure body stream is always closed + if (responseStatusCode != 200) { + // throw if request not successful + final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH); + final HttpHeaders responseHeaders = response.headers(); + final String message = + "Failed to complete multipart upload: key=%s, uploadId=%s".formatted(key, uploadId); + throw new S3ResponseException(responseStatusCode, responseBody, responseHeaders, message); + } } } /** * Performs an HTTP request to S3 to the specified URL with the given parameters. * - * @param url The URL to send the request to. - * @param httpMethod The HTTP method to use (e.g. "GET", "POST", "PUT"). - * @param headers The request headers to send. - * @param requestBody The request body to send, or null if no request body is needed. - * @return HTTP response and result parsed as an XML document. If there is no response body, the result is empty - * document. - */ - private HttpResponse requestXML( - final String url, final String httpMethod, final Map headers, byte[] requestBody) { - return request(url, httpMethod, headers, requestBody, new XmlBodyHandler()); - } - - /** - * Performs an HTTP request to S3 to the specified URL with the given parameters. - * - * @param url The URL to send the request to. - * @param httpMethod The HTTP method to use (e.g. "GET", "POST", "PUT"). - * @param headers The request headers to send. - * @param requestBody The request body to send, or null if no request body is needed. - * @param bodyHandler The body handler for parsing response. - * @return HTTP response and result parsed using the provided body handler. + * @param url The URL to send the request to + * @param httpMethod The HTTP method to use (e.g. GET, POST, PUT) + * @param headers The request headers to send + * @param requestBody The request body to send, or null if no request body is needed + * @param bodyHandler The body handler for parsing response + * @return HTTP response and result parsed using the provided body handler */ private HttpResponse request( final String url, final String httpMethod, final Map headers, byte[] requestBody, - BodyHandler bodyHandler) { + final BodyHandler bodyHandler) { try { // the region-specific endpoint to the target object expressed in path style - URI endpointUrl = new URI(url); - - Map h = new HashMap<>(headers); + final URI endpointUrl = new URI(url); + final Map localHeaders = new TreeMap<>(headers); final String contentHashString; if (requestBody == null || requestBody.length == 0) { contentHashString = EMPTY_BODY_SHA256; requestBody = new byte[0]; } else { contentHashString = UNSIGNED_PAYLOAD; - h.put("content-length", "" + requestBody.length); + localHeaders.put("content-length", Integer.toString(requestBody.length)); } - h.put("x-amz-content-sha256", contentHashString); - - Map q = extractQueryParameters(endpointUrl); - String authorization = computeSignatureForAuthorizationHeader( - endpointUrl, httpMethod, regionName, h, q, contentHashString, accessKey, secretKey); - + localHeaders.put("x-amz-content-sha256", contentHashString); + // extract query parameters from the URL + final Map q = extractQueryParameters(endpointUrl); + // compute the authorization header + final String authorization = computeSignatureForAuthorizationHeader( + endpointUrl, httpMethod, regionName, localHeaders, q, contentHashString, accessKey, secretKey); // place the computed signature into a formatted 'Authorization' header and call S3 - h.put("Authorization", authorization); + localHeaders.put("Authorization", authorization); // build the request HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(endpointUrl); requestBuilder = switch (httpMethod) { - case "POST" -> requestBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(requestBody)); - case "PUT" -> requestBuilder.PUT(HttpRequest.BodyPublishers.ofByteArray(requestBody)); - case "GET" -> requestBuilder.GET(); + case POST -> requestBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(requestBody)); + case PUT -> requestBuilder.PUT(HttpRequest.BodyPublishers.ofByteArray(requestBody)); + case GET -> requestBuilder.GET(); + case DELETE -> requestBuilder.DELETE(); default -> throw new IllegalArgumentException("Unsupported HTTP method: " + httpMethod);}; - requestBuilder = requestBuilder.headers(h.entrySet().stream() + requestBuilder = requestBuilder.headers(localHeaders.entrySet().stream() .flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())) .toArray(String[]::new)); - - HttpRequest request = requestBuilder.build(); - + final HttpRequest request = requestBuilder.build(); return httpClient.send(request, bodyHandler); - } catch (IOException e) { + } catch (final IOException e) { throw new UncheckedIOException(e); - } catch (InterruptedException | URISyntaxException e) { + } catch (final InterruptedException | URISyntaxException e) { + // todo what would be the correct handling for the InterruptedException? + Thread.currentThread().interrupt(); throw new UncheckedIOException(new IOException(e)); } } @@ -409,13 +616,13 @@ private HttpResponse request( * @param endpointUrl The endpoint URL to extract parameters from. * @return The list of parameters, in the order they were found. */ - private static Map extractQueryParameters(URI endpointUrl) { + private static Map extractQueryParameters(final URI endpointUrl) { final String rawQuery = endpointUrl.getQuery(); if (rawQuery == null) { return Collections.emptyMap(); } else { - Map results = new HashMap<>(); - int endIndex = rawQuery.length() - 1; + final Map results = new HashMap<>(); + final int endIndex = rawQuery.length() - 1; int index = 0; while (index <= endIndex) { // Ideally we should first look for '&', then look for '=' before the '&', but that's not how AWS @@ -429,7 +636,6 @@ private static Map extractQueryParameters(URI endpointUrl) { // No value name = rawQuery.substring(index); value = null; - index = endIndex + 1; } else { int parameterSeparatorIndex = rawQuery.indexOf(QUERY_PARAMETER_SEPARATOR, nameValueSeparatorIndex); @@ -438,14 +644,13 @@ private static Map extractQueryParameters(URI endpointUrl) { } name = rawQuery.substring(index, nameValueSeparatorIndex); value = rawQuery.substring(nameValueSeparatorIndex + 1, parameterSeparatorIndex); - index = parameterSeparatorIndex + 1; } // note that value = null is valid as we can have a parameter without a value in // a query string (legal http) results.put( URLDecoder.decode(name, StandardCharsets.UTF_8), - value == null ? value : URLDecoder.decode(value, StandardCharsets.UTF_8)); + value == null ? null : URLDecoder.decode(value, StandardCharsets.UTF_8)); } return results; } @@ -454,16 +659,16 @@ private static Map extractQueryParameters(URI endpointUrl) { /** * Computes an AWS4 signature for a request, ready for inclusion as an 'Authorization' header. * - * @param endpointUrl the url to which the request is being made - * @param httpMethod the HTTP method (GET, POST, PUT, etc.) - * @param regionName the AWS region name - * @param headers The request headers; 'Host' and 'X-Amz-Date' will be added to this set. + * @param endpointUrl the url to which the request is being made + * @param httpMethod the HTTP method (GET, POST, PUT, etc.) + * @param regionName the AWS region name + * @param headers The request headers; 'Host' and 'X-Amz-Date' will be added to this set * @param queryParameters Any query parameters that will be added to the endpoint. The parameters should be - * specified in canonical format. - * @param bodyHash Precomputed SHA256 hash of the request body content; this value should also be set as the - * header 'X-Amz-Content-SHA256' for non-streaming uploads. - * @param awsAccessKey The user's AWS Access Key. - * @param awsSecretKey The user's AWS Secret Key. + * specified in canonical format + * @param bodyHash Precomputed SHA256 hash of the request body content; this value should also be set as the + * header 'X-Amz-Content-SHA256' for non-streaming uploads + * @param awsAccessKey The user's AWS Access Key + * @param awsSecretKey The user's AWS Secret Key * @return The computed authorization string for the request. This value needs to be set as the header * 'Authorization' on the further HTTP request. */ @@ -481,15 +686,16 @@ private static String computeSignatureForAuthorizationHeader( final ZonedDateTime now = ZonedDateTime.now(java.time.ZoneOffset.UTC); final String dateTimeStamp = DATE_TIME_FORMATTER.format(now); final String dateStamp = DATE_STAMP_FORMATTER.format(now); - // update the headers with required 'x-amz-date' and 'host' values headers.put("x-amz-date", dateTimeStamp); + // determine host header String hostHeader = endpointUrl.getHost(); final int port = endpointUrl.getPort(); if (port > -1) { hostHeader = hostHeader.concat(":" + port); } + // update the host header headers.put("Host", hostHeader); // canonicalize the headers; we need the set of header names as well as the @@ -538,11 +744,11 @@ private static String computeSignatureForAuthorizationHeader( final byte[] kSigning = sign(TERMINATOR, kService); final byte[] signature = sign(stringToSign, kSigning); + // build and return the authorization header final String credentialsAuthorizationHeader = "Credential=" + awsAccessKey + "/" + scope; final String signedHeadersAuthorizationHeader = "SignedHeaders=" + canonicalizedHeaderNames; final String signatureAuthorizationHeader = "Signature=" + HexFormat.of().formatHex(signature); - return SCHEME + "-" + ALGORITHM + " " + credentialsAuthorizationHeader + ", " + signedHeadersAuthorizationHeader + ", " + signatureAuthorizationHeader; } @@ -550,17 +756,17 @@ private static String computeSignatureForAuthorizationHeader( /** * Signs the given data using HMAC SHA256 with the specified key. * - * @param stringData The data to sign. - * @param key The key to use for signing. - * @return The signed data as a byte array. + * @param stringData The data to sign + * @param key The key to use for signing + * @return The signed data as a byte array */ - private static byte[] sign(String stringData, byte[] key) { + private static byte[] sign(final String stringData, final byte[] key) { try { - Mac mac = Mac.getInstance(S3Client.ALGORITHM_HMAC_SHA256); + final Mac mac = Mac.getInstance(S3Client.ALGORITHM_HMAC_SHA256); mac.init(new SecretKeySpec(key, S3Client.ALGORITHM_HMAC_SHA256)); return mac.doFinal(stringData.getBytes(StandardCharsets.UTF_8)); - } catch (NoSuchAlgorithmException | InvalidKeyException e) { - throw new RuntimeException(e); + } catch (final NoSuchAlgorithmException | InvalidKeyException e) { + throw new UncheckedIOException(new IOException(e)); } } @@ -571,9 +777,8 @@ private static byte[] sign(String stringData, byte[] key) { * @param keepPathSlash true, if slashes in the path should be preserved, false * @return the encoded URL */ - private static String urlEncode(String url, boolean keepPathSlash) { - String encoded; - encoded = URLEncoder.encode(url, StandardCharsets.UTF_8).replace("+", "%20"); + private static String urlEncode(final String url, final boolean keepPathSlash) { + final String encoded = URLEncoder.encode(url, StandardCharsets.UTF_8).replace("+", "%20"); if (keepPathSlash) { return encoded.replace("%2F", "/"); } else { @@ -587,12 +792,12 @@ private static String urlEncode(String url, boolean keepPathSlash) { * @param data the data to hash * @return the SHA-256 hash as a byte array */ - private static byte[] sha256(byte[] data) { + private static byte[] sha256(final byte[] data) { try { - MessageDigest md = MessageDigest.getInstance("SHA-256"); + final MessageDigest md = MessageDigest.getInstance("SHA-256"); md.update(data); return md.digest(); - } catch (NoSuchAlgorithmException e) { + } catch (final NoSuchAlgorithmException e) { throw new UncheckedIOException(new IOException(e)); } } @@ -603,7 +808,22 @@ private static byte[] sha256(byte[] data) { * @param data the byte array to encode * @return the base64 encoded string */ - private static String base64(byte[] data) { + private static String base64(final byte[] data) { return new String(Base64.getEncoder().encode(data)); } + + /** + * This method parses an XML document from an input stream. Uses the + * configured {@link DocumentBuilderFactory}. + * + * @param is to parse + * @return a {@link Document} parsed from the input stream + */ + private Document parseDocument(final InputStream is) { + try { + return documentBuilderFactory.newDocumentBuilder().parse(is); + } catch (final Exception e) { + throw new UncheckedIOException(new IOException(e)); + } + } } diff --git a/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3ClientException.java b/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3ClientException.java new file mode 100644 index 0000000000..e7c16328d1 --- /dev/null +++ b/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3ClientException.java @@ -0,0 +1,23 @@ +// SPDX-License-Identifier: Apache-2.0 +package org.hiero.block.node.base.s3; + +/** + * A checked exception to act as a base for all S3 client exceptions. + */ +public class S3ClientException extends Exception { + public S3ClientException() { + super(); + } + + public S3ClientException(final String message) { + super(message); + } + + public S3ClientException(final Throwable cause) { + super(cause); + } + + public S3ClientException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3ClientInitializationException.java b/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3ClientInitializationException.java new file mode 100644 index 0000000000..8b3ad62d61 --- /dev/null +++ b/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3ClientInitializationException.java @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: Apache-2.0 +package org.hiero.block.node.base.s3; + +public class S3ClientInitializationException extends S3ClientException { + public S3ClientInitializationException() { + super(); + } + + public S3ClientInitializationException(final String message) { + super(message); + } + + public S3ClientInitializationException(final Throwable cause) { + super(cause); + } + + public S3ClientInitializationException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3ResponseException.java b/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3ResponseException.java new file mode 100644 index 0000000000..4298d944af --- /dev/null +++ b/block-node/base/src/main/java/org/hiero/block/node/base/s3/S3ResponseException.java @@ -0,0 +1,187 @@ +// SPDX-License-Identifier: Apache-2.0 +package org.hiero.block.node.base.s3; + +import edu.umd.cs.findbugs.annotations.Nullable; +import java.net.http.HttpHeaders; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * A checked exception thrown when an S3 response is not successful. + */ +public final class S3ResponseException extends S3ClientException { + /** The size limit for the toString response body length */ + private static final int MAX_RESPONSE_BODY_STRING_LENGTH = 2048; + /** The size limit for the toString headers length */ + private static final int MAX_HEADER_STRING_LENGTH = 2048; + /** The four-space indent used for formatting the exception string */ + private static final String FOUR_SPACE_INDENT = " "; + /** The response status code */ + private final int responseStatusCode; + /** The response body, nullable */ + private final byte[] responseBody; + /** The response headers, nullable */ + private final HttpHeaders responseHeaders; + + public S3ResponseException( + final int responseStatusCode, + @Nullable final byte[] responseBody, + @Nullable final HttpHeaders responseHeaders) { + super(); + this.responseStatusCode = responseStatusCode; + this.responseBody = responseBody; + this.responseHeaders = responseHeaders; + } + + public S3ResponseException( + final int responseStatusCode, + @Nullable final byte[] responseBody, + @Nullable final HttpHeaders responseHeaders, + final String message) { + super(message); + this.responseStatusCode = responseStatusCode; + this.responseBody = responseBody; + this.responseHeaders = responseHeaders; + } + + public S3ResponseException( + final int responseStatusCode, + @Nullable final byte[] responseBody, + @Nullable final HttpHeaders responseHeaders, + final Throwable cause) { + super(cause); + this.responseStatusCode = responseStatusCode; + this.responseBody = responseBody; + this.responseHeaders = responseHeaders; + } + + public S3ResponseException( + final int responseStatusCode, + @Nullable final byte[] responseBody, + @Nullable final HttpHeaders responseHeaders, + final String message, + final Throwable cause) { + super(message, cause); + this.responseStatusCode = responseStatusCode; + this.responseBody = responseBody; + this.responseHeaders = responseHeaders; + } + + public int getResponseStatusCode() { + return responseStatusCode; + } + + public byte[] getResponseBody() { + return responseBody; + } + + public HttpHeaders getResponseHeaders() { + return responseHeaders; + } + + /** + * @return a {@link String} representation of this exception including the + * response code, headers if available and body if available. Header and + * body strings are limited to a maximum length: header limit - {@value MAX_HEADER_STRING_LENGTH} + * and body limit - {@value MAX_RESPONSE_BODY_STRING_LENGTH}. + */ + @Override + public String toString() { + // start with standard exception string building + final String className = getClass().getName(); + final String message = getLocalizedMessage(); + final StringBuilder sb = new StringBuilder(className); + if (message != null) { + // if there is a message, append it + sb.append(": ").append(message); + } + sb.append(System.lineSeparator()); + appendResponseCode(sb); + appendHeaders(sb); + appendBody(sb); + return sb.toString(); + } + + /** + * Append the response code to the StringBuilder. + */ + private void appendResponseCode(final StringBuilder sb) { + // append the response status code + sb.append(FOUR_SPACE_INDENT).append("Response status code: ").append(responseStatusCode); + } + + /** + * Append the response headers to the StringBuilder. + * If there are no headers, this method does nothing. + * Size limit for the headers string is {@value MAX_HEADER_STRING_LENGTH}. + */ + private void appendHeaders(final StringBuilder sb) { + if (responseHeaders != null) { + final Map> headersMap = responseHeaders.map(); + if (headersMap != null && !headersMap.isEmpty()) { + // for each header, append the header name and value(s) + sb.append(System.lineSeparator()); + sb.append(FOUR_SPACE_INDENT).append("Response headers:"); + // we limit the size of the printed headers + int headerSizeCount = 0; + for (final Entry> entry : headersMap.entrySet()) { + // for each header, we get the key and values + final String headerKey = entry.getKey() + ": "; + sb.append(System.lineSeparator()); + if (headerSizeCount + headerKey.length() > MAX_HEADER_STRING_LENGTH) { + // if string limit size is exceeded, break + sb.append(FOUR_SPACE_INDENT.repeat(2)).append("..."); + break; + } else { + // append the header key + sb.append(FOUR_SPACE_INDENT.repeat(2)).append(headerKey); + headerSizeCount += headerKey.length(); + } + // append the header values, usually we expect only one value per header + final List values = entry.getValue(); + boolean isFirstValue = true; + for (final String value : values) { + // if string limit size is exceeded, break + if (headerSizeCount + value.length() > MAX_HEADER_STRING_LENGTH) { + // if the value size exceeds the limit, break + sb.append(" ..."); + break; + } else { + // append the value, separate with a comma for multi-value headers + if (!isFirstValue) { + sb.append(", "); + headerSizeCount += 2; + } else { + isFirstValue = false; + } + sb.append(value); + headerSizeCount += value.length(); + } + } + } + } + } + } + + /** + * Append the response body to the StringBuilder. + * If there is no response body, this method does nothing. + * Size limit for the response body string is {@value MAX_RESPONSE_BODY_STRING_LENGTH}. + */ + private void appendBody(final StringBuilder sb) { + if (responseBody != null && responseBody.length > 0) { + // if there is a response body, append it + sb.append(System.lineSeparator()); + // we limit the size of the printed response body + sb.append(" Response body: "); + if (responseBody.length > MAX_RESPONSE_BODY_STRING_LENGTH) { + sb.append(new String(responseBody, 0, MAX_RESPONSE_BODY_STRING_LENGTH)) + .append(" ..."); + } else { + // if the response body is small enough, append it fully + sb.append(new String(responseBody)); + } + } + } +} diff --git a/block-node/base/src/main/java/org/hiero/block/node/base/s3/XmlBodyHandler.java b/block-node/base/src/main/java/org/hiero/block/node/base/s3/XmlBodyHandler.java deleted file mode 100644 index 6938797c22..0000000000 --- a/block-node/base/src/main/java/org/hiero/block/node/base/s3/XmlBodyHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -package org.hiero.block.node.base.s3; - -import java.io.IOException; -import java.io.InputStream; -import java.io.UncheckedIOException; -import java.net.http.HttpResponse; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import org.w3c.dom.Document; -import org.xml.sax.SAXException; - -/** - * A custom {@link HttpResponse.BodyHandler} that parses the response body as XML. - */ -public class XmlBodyHandler implements HttpResponse.BodyHandler { - - /** - * {@inheritDoc} - * - * @throws UncheckedIOException if an error occurs during parsing - */ - @Override - public HttpResponse.BodySubscriber apply(HttpResponse.ResponseInfo responseInfo) { - int contentLength = Integer.parseInt( - responseInfo.headers().firstValue("Content-Length").orElse("0")); - if (contentLength == 0) { - try { - return HttpResponse.BodySubscribers.replacing(DocumentBuilderFactory.newInstance() - .newDocumentBuilder() - .newDocument()); - } catch (ParserConfigurationException e) { - throw new UncheckedIOException(new IOException("Failed to parse XML", e)); - } - } else { - return HttpResponse.BodySubscribers.mapping( - HttpResponse.BodySubscribers.ofInputStream(), XmlBodyHandler::parseXml); - } - } - - /** - * Parses the given InputStream as XML and returns a Document. - * - * @param inputStream the InputStream to parse - * @return parsed XML Document - * @throws UncheckedIOException if an error occurs during parsing - */ - private static Document parseXml(InputStream inputStream) { - try { - DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); - factory.setNamespaceAware(true); - DocumentBuilder builder = factory.newDocumentBuilder(); - return builder.parse(inputStream); - } catch (IOException e) { - throw new UncheckedIOException("Failed to parse XML", e); - } catch (ParserConfigurationException | SAXException e) { - throw new UncheckedIOException(new IOException("Failed to parse XML", e)); - } - } -} diff --git a/block-node/base/src/test/java/org/hiero/block/node/base/s3/S3ClientTest.java b/block-node/base/src/test/java/org/hiero/block/node/base/s3/S3ClientTest.java index 91ecce5ff5..ae440d7ba1 100644 --- a/block-node/base/src/test/java/org/hiero/block/node/base/s3/S3ClientTest.java +++ b/block-node/base/src/test/java/org/hiero/block/node/base/s3/S3ClientTest.java @@ -1,20 +1,25 @@ // SPDX-License-Identifier: Apache-2.0 package org.hiero.block.node.base.s3; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import io.minio.GetObjectArgs; +import io.minio.ListObjectsArgs; import io.minio.MakeBucketArgs; import io.minio.MinioClient; import io.minio.PutObjectArgs; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.stream.Collectors; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DisplayName; @@ -63,126 +68,209 @@ void teardown() { } } + /** + * This test aims to verify that the + * {@link S3Client#listObjects(String, int)} method will correctly return + * existing objects in a bucket. + */ @Test + @DisplayName("Test listObjects() correctly returns existing objects in a bucket") void testList() throws Exception { - // Upload a file - String content = "Hello, MinIO!"; - for (int i = 0; i < 5; i++) { - minioClient.putObject(PutObjectArgs.builder().bucket(BUCKET_NAME).object("block-" + i + ".txt").stream( + // Setup + final String content = "Hello, MinIO!"; + final String keyPrefix = "block-"; + final List expected = List.of( + keyPrefix.concat("0.txt"), + keyPrefix.concat("1.txt"), + keyPrefix.concat("2.txt"), + keyPrefix.concat("3.txt"), + keyPrefix.concat("4.txt")); + // verify that the bucket is empty before the test + final boolean preCheck = minioClient + .listObjects(ListObjectsArgs.builder() + .bucket(BUCKET_NAME) + .prefix(keyPrefix) + .maxKeys(100) + .build()) + .iterator() + .hasNext(); + assertThat(preCheck).isFalse(); + // upload objects to the bucket + for (final String object : expected) { + minioClient.putObject(PutObjectArgs.builder().bucket(BUCKET_NAME).object(object).stream( new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)), content.length(), -1) .build()); } + try (final S3Client s3Client = client()) { + // Call + final List actual = s3Client.listObjects(keyPrefix, 100); + // Assert + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + // Call filter by max results + final List actualFilterMaxResults = s3Client.listObjects(keyPrefix, 2); + // Assert + assertThat(actualFilterMaxResults) + .containsExactlyInAnyOrderElementsOf(List.of(expected.get(0), expected.get(1))); + } + } - try (S3Client s3Client = - new S3Client(REGION_NAME, endpoint, BUCKET_NAME, MINIO_ROOT_USER, MINIO_ROOT_PASSWORD)) { - final List keys = s3Client.listObjects("", 100); - List expectedKeys = - List.of("block-0.txt", "block-1.txt", "block-2.txt", "block-3.txt", "block-4.txt"); - assertEquals( - expectedKeys, - keys.stream().filter(name -> name.startsWith("block-")).toList(), - "Downloaded content does not match expected content"); + /** + * This test aims to verify that the + * {@link S3Client#listObjects(String, int)} method will return an empty + * list when no objects are found. + */ + @Test + @DisplayName("Test listObjects() returns empty when no objects are found") + void testListNonExistentObjects() throws Exception { + try (final S3Client s3Client = client()) { + // Call + final List actual = s3Client.listObjects("non-existent-prefix", 100); + // Assert + assertThat(actual).isNotNull().isEmpty(); } } + /** + * This test aims to verify that the multipart upload functionality of the + * S3Client works correctly. We manually build 3 parts of a file and then + * proceed to upload them using the multipart upload API of the S3Client. + * Then we verify that the data exists in the bucket by downloading it + * via the MinIO client and checking that the content matches what we + * uploaded. + */ @Test @DisplayName("Test multipart upload") void testMultipartUpload() throws Exception { - final String key = "foo.txt"; - // create sample data - Random random = new Random(23131535653443L); - byte[] part1 = new byte[5 * 1024 * 1024]; - byte[] part2 = new byte[5 * 1024 * 1024]; - byte[] part3 = new byte[1024]; + // Setup + final String key = "testMultipartUploadSuccess.txt"; + // check that the object does not exist before the test + final boolean preCheck = minioClient + .listObjects(ListObjectsArgs.builder() + .bucket(BUCKET_NAME) + .prefix(key) + .maxKeys(100) + .build()) + .iterator() + .hasNext(); + assertThat(preCheck).isFalse(); + final Random random = new Random(23131535653443L); + final byte[] part1 = new byte[5 * 1024 * 1024]; + final byte[] part2 = new byte[5 * 1024 * 1024]; + final byte[] part3 = new byte[1024]; random.nextBytes(part1); random.nextBytes(part2); random.nextBytes(part3); - - try (S3Client s3Client = - new S3Client(REGION_NAME, endpoint, BUCKET_NAME, MINIO_ROOT_USER, MINIO_ROOT_PASSWORD)) { + try (final S3Client s3Client = client()) { + // Call final String uploadId = s3Client.createMultipartUpload(key, "STANDARD", "plain/text"); - List eTags = new ArrayList<>(); + final List eTags = new ArrayList<>(); eTags.add(s3Client.multipartUploadPart(key, uploadId, 1, part1)); eTags.add(s3Client.multipartUploadPart(key, uploadId, 2, part2)); eTags.add(s3Client.multipartUploadPart(key, uploadId, 3, part3)); s3Client.completeMultipartUpload(key, uploadId, eTags); } - + // Assert // download with a minio client - byte[] downloadedContent = minioClient + byte[] actual = minioClient .getObject( GetObjectArgs.builder().bucket(BUCKET_NAME).object(key).build()) .readAllBytes(); // Verify the content - byte[] expectedContent = new byte[part1.length + part2.length + part3.length]; - System.arraycopy(part1, 0, expectedContent, 0, part1.length); - System.arraycopy(part2, 0, expectedContent, part1.length, part2.length); - System.arraycopy(part3, 0, expectedContent, part1.length + part2.length, part3.length); - assertEquals( - expectedContent.length, - downloadedContent.length, - "Downloaded content length does not match expected content length"); - assertArrayEquals(expectedContent, downloadedContent, "Downloaded content does not match expected content"); + byte[] expected = new byte[part1.length + part2.length + part3.length]; + System.arraycopy(part1, 0, expected, 0, part1.length); + System.arraycopy(part2, 0, expected, part1.length, part2.length); + System.arraycopy(part3, 0, expected, part1.length + part2.length, part3.length); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected).containsExactly(expected); } + /** + * This test aims to verify that the + * {@link S3Client#uploadFile(String, String, Iterator, String)} method + * will correctly upload a large file in parts to the S3 bucket. + */ @Test @DisplayName("Test upload of a large file") void testUploadFile() throws Exception { + // Setup final int testContentSize = 8 * 1024 * 1024 + 826; - final String key = "foo.txt"; + final String key = "uploadOfLargeFileSuccessful.txt"; + // check that the object does not exist before the test + final boolean preCheck = minioClient + .listObjects(ListObjectsArgs.builder() + .bucket(BUCKET_NAME) + .prefix(key) + .maxKeys(100) + .build()) + .iterator() + .hasNext(); + assertThat(preCheck).isFalse(); // create sample string data - StringBuilder contentBuilder = new StringBuilder(); + final StringBuilder contentBuilder = new StringBuilder(); while (contentBuilder.length() < testContentSize) { contentBuilder.append("foo bar baz"); } final String content = contentBuilder.toString(); - byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8); + byte[] expected = content.getBytes(StandardCharsets.UTF_8); // split content in random size parts - Random random = new Random(23131535653443L); - List parts = new ArrayList<>(); + final Random random = new Random(23131535653443L); + final List parts = new ArrayList<>(); int offset = 0; - while (offset < contentBytes.length) { + while (offset < expected.length) { int partSize = random.nextInt(1, 1024 * 1024); - if (offset + partSize > contentBytes.length) { - partSize = contentBytes.length - offset; + if (offset + partSize > expected.length) { + partSize = expected.length - offset; } - byte[] part = new byte[partSize]; - System.arraycopy(contentBytes, offset, part, 0, partSize); + final byte[] part = new byte[partSize]; + System.arraycopy(expected, offset, part, 0, partSize); parts.add(part); offset += partSize; } // upload parts - try (S3Client s3Client = - new S3Client(REGION_NAME, endpoint, BUCKET_NAME, MINIO_ROOT_USER, MINIO_ROOT_PASSWORD)) { + try (final S3Client s3Client = client()) { s3Client.uploadFile(key, "STANDARD", parts.iterator(), "plain/text"); } // download with a minio client - byte[] downloadedContent = minioClient + final byte[] actual = minioClient .getObject( GetObjectArgs.builder().bucket(BUCKET_NAME).object(key).build()) .readAllBytes(); // Verify the content - assertEquals( - contentBytes.length, - downloadedContent.length, - "Downloaded content length does not match expected content length"); - assertEquals(content, new String(downloadedContent), "Downloaded content does not match expected content"); - assertArrayEquals( - contentBytes, downloadedContent, "Downloaded content bytes does not match expected content bytes"); + assertThat(actual) + .hasSameSizeAs(expected) + .isEqualTo(expected) + .containsExactly(expected) + .asString() + .isEqualTo(content); } + /** + * This test aims to verify that the {@link S3Client#uploadTextFile(String, String, String)} + * method will correctly upload a simple text file to the S3 bucket and + * that the file can be downloaded via {@link S3Client#downloadTextFile(String)} + * successfully. + */ @Test @DisplayName("Test upload and download of a text file") void testTextFileUploadAndDownload() throws Exception { - final String key = "test-text-file.txt"; - final String text = "Hello, MinIO!"; - - try (S3Client s3Client = - new S3Client(REGION_NAME, endpoint, BUCKET_NAME, MINIO_ROOT_USER, MINIO_ROOT_PASSWORD)) { - assertTrue(s3Client.uploadTextFile(key, "STANDARD", text)); + // Setup + final String key = "uploadSimpleTextFile.txt"; + final String expected = "Hello, MinIO!"; + // verify that the file does not exist in the bucket before the test + final boolean preCheck = minioClient + .listObjects(ListObjectsArgs.builder() + .bucket(BUCKET_NAME) + .prefix(key) + .maxKeys(100) + .build()) + .iterator() + .hasNext(); + assertThat(preCheck).isFalse(); + try (final S3Client s3Client = client()) { + // upload text file via the client + assertDoesNotThrow(() -> s3Client.uploadTextFile(key, "STANDARD", expected)); // check download with minio client assertEquals( - text, + expected, new String( minioClient .getObject(GetObjectArgs.builder() @@ -193,16 +281,191 @@ void testTextFileUploadAndDownload() throws Exception { StandardCharsets.UTF_8), "Downloaded content does not match expected content"); // check download with s3 client - assertEquals(text, s3Client.downloadTextFile(key), "Downloaded content does not match expected content"); + assertEquals( + expected, s3Client.downloadTextFile(key), "Downloaded content does not match expected content"); } } + /** + * This test aims to verify that the {@link S3Client#listMultipartUploads()} method + * will correctly return existing multipart uploads. + */ + @Test + @DisplayName("Test listMultipartUpload() will correctly return existing multipart uploads") + void testListMultipartUploads() throws Exception { + // Setup + final String key = "testListMultipartUploads.txt"; + try (final S3Client s3Client = client()) { + // verify that there are no multipart uploads before the test + // we need to filter the map by key because MinIO client is reused + final Map> preChek = s3Client.listMultipartUploads().entrySet().stream() + .filter(e -> e.getKey().equals(key)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + assertThat(preChek).isEmpty(); + // create a multipart upload + final String expected = s3Client.createMultipartUpload(key, "STANDARD", "plain/text"); + // Assert + // we need to filter the map by key because MinIO client is reused + final Map> actual = s3Client.listMultipartUploads().entrySet().stream() + .filter(e -> e.getKey().equals(key)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + assertThat(actual) + .isNotEmpty() + .hasSize(1) + .containsKey(key) + .extractingByKey(key) + .asInstanceOf(InstanceOfAssertFactories.LIST) + .isNotEmpty() + .hasSize(1) + .containsExactly(expected); + } + } + + /** + * This test aims to verify that the {@link S3Client#listMultipartUploads()} method + * will correctly return existing multipart uploads for multiple keys and values. + */ + @Test + @DisplayName("Test listMultipartUpload() will correctly return existing multipart uploads") + void testListMultipartUploadsMultiKeyValue() throws Exception { + // Setup + final String key1 = "testListMultipartUploads1.txt"; + final String key2 = "testListMultipartUploads2.txt"; + try (final S3Client s3Client = client()) { + // verify that there are no multipart uploads before the test + // we need to filter the map by key because MinIO client is reused + final Map> preCheckList = s3Client.listMultipartUploads().entrySet().stream() + .filter(e -> e.getKey().equals(key1) || e.getKey().equals(key2)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + final boolean preCheck1 = preCheckList.containsKey(key1); + final boolean preCheck2 = preCheckList.containsKey(key2); + assertThat(preCheck1).isFalse().isEqualTo(preCheck2); + // create a multipart upload + final String key1expected1 = s3Client.createMultipartUpload(key1, "STANDARD", "plain/text"); + final String key1expected2 = s3Client.createMultipartUpload(key1, "STANDARD", "plain/text"); + final String key2expected1 = s3Client.createMultipartUpload(key2, "STANDARD", "plain/text"); + final String key2expected2 = s3Client.createMultipartUpload(key2, "STANDARD", "plain/text"); + // Assert + // we need to filter the map by key because MinIO client is reused + final Map> actual = s3Client.listMultipartUploads().entrySet().stream() + .filter(e -> e.getKey().equals(key1) || e.getKey().equals(key2)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + assertThat(actual).isNotEmpty().hasSize(2).containsKeys(key1, key2); + assertThat(actual.get(key1)).isNotEmpty().hasSize(2).containsExactly(key1expected1, key1expected2); + assertThat(actual.get(key2)).isNotEmpty().hasSize(2).containsExactly(key2expected1, key2expected2); + } + } + + /** + * This test aims to verify that the {@link S3Client#abortMultipartUpload(String, String)} + * method will correctly abort an existing multipart upload. + */ + @Test + @DisplayName("Test abortMultipartUpload() will correctly abort an existing multipart upload") + void testAbortMultipartUpload() throws Exception { + // Setup + final String key = "testAbortMultipartUpload.txt"; + try (final S3Client s3Client = client()) { + // verify that there are no multipart uploads before the test + // we need to filter the map by key because MinIO client is reused + final boolean preCheck = s3Client.listMultipartUploads().entrySet().stream() + .anyMatch(e -> e.getKey().equals(key)); + assertThat(preCheck).isFalse(); + // create a multipart upload + final String uploadId = s3Client.createMultipartUpload(key, "STANDARD", "plain/text"); + // Assert that the upload exists + // we need to filter the map by key because MinIO client is reused + final Map> actualBeforeAbort = s3Client.listMultipartUploads().entrySet().stream() + .filter(e -> e.getKey().equals(key)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + assertThat(actualBeforeAbort) + .isNotEmpty() + .hasSize(1) + .containsKey(key) + .extractingByKey(key) + .asInstanceOf(InstanceOfAssertFactories.LIST) + .isNotEmpty() + .hasSize(1) + .containsExactly(uploadId); + // Abort the multipart upload + s3Client.abortMultipartUpload(key, uploadId); + // Assert that the upload is removed + // we need to filter the map by key because MinIO client is reused + final Map> actualAfterAbort = s3Client.listMultipartUploads().entrySet().stream() + .filter(e -> e.getKey().equals(key)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + assertThat(actualAfterAbort).isEmpty(); + } + } + + /** + * This test aims to verify that the {@link S3Client#abortMultipartUpload(String, String)} + * method will correctly abort an existing multipart upload with multiple parts. + */ + @Test + @DisplayName("Test abortMultipartUpload() will correctly abort an existing multipart upload with multiple parts") + void testAbortMultipartUploadMultiKeyValue() throws Exception { + // Setup + final String key1 = "testAbortMultipartUploads1.txt"; + final String key2 = "testAbortMultipartUploads2.txt"; + try (final S3Client s3Client = client()) { + // verify that there are no multipart uploads before the test + // we need to filter the map by key because MinIO client is reused + final Map> listPreCheck = s3Client.listMultipartUploads().entrySet().stream() + .filter(e -> e.getKey().equals(key1) || e.getKey().equals(key2)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + final boolean preCheck1 = listPreCheck.containsKey(key1); + final boolean preCheck2 = listPreCheck.containsKey(key2); + assertThat(preCheck1).isFalse().isEqualTo(preCheck2); + // create a multipart upload + final String key1expected1 = s3Client.createMultipartUpload(key1, "STANDARD", "plain/text"); + final String key1expected2 = s3Client.createMultipartUpload(key1, "STANDARD", "plain/text"); + final String key2expected1 = s3Client.createMultipartUpload(key2, "STANDARD", "plain/text"); + final String key2expected2 = s3Client.createMultipartUpload(key2, "STANDARD", "plain/text"); + // Assert + // we need to filter the map by key because MinIO client is reused + final Map> actualBeforeAbort = s3Client.listMultipartUploads().entrySet().stream() + .filter(e -> e.getKey().equals(key1) || e.getKey().equals(key2)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + assertThat(actualBeforeAbort).isNotEmpty().hasSize(2).containsKeys(key1, key2); + assertThat(actualBeforeAbort.get(key1)) + .isNotEmpty() + .hasSize(2) + .containsExactly(key1expected1, key1expected2); + assertThat(actualBeforeAbort.get(key2)) + .isNotEmpty() + .hasSize(2) + .containsExactly(key2expected1, key2expected2); + + // Abort one multipart upload + s3Client.abortMultipartUpload(key1, key1expected1); + // Assert that the upload is removed + // we need to filter the map by key because MinIO client is reused + final Map> actual = s3Client.listMultipartUploads().entrySet().stream() + .filter(e -> e.getKey().equals(key1) || e.getKey().equals(key2)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + assertThat(actual).isNotEmpty().hasSize(2).containsKeys(key1, key2); + assertThat(actual.get(key1)).isNotEmpty().hasSize(1).containsExactly(key1expected2); + assertThat(actual.get(key2)).isNotEmpty().hasSize(2).containsExactly(key2expected1, key2expected2); + } + } + + /** + * This test aims to verify that the {@link S3Client#downloadTextFile(String)} + * method will return null when trying to download a non-existent object. + */ @Test @DisplayName("Test fetching a non-existent object") void testFetchNonExistentObject() throws Exception { - try (S3Client s3Client = - new S3Client(REGION_NAME, endpoint, BUCKET_NAME, MINIO_ROOT_USER, MINIO_ROOT_PASSWORD)) { + try (final S3Client s3Client = client()) { assertNull(s3Client.downloadTextFile("non-existent-object.txt")); } } + + /** + * This method will create a new instance of the {@link S3Client} to test. + */ + private S3Client client() throws S3ClientInitializationException { + return new S3Client(REGION_NAME, endpoint, BUCKET_NAME, MINIO_ROOT_USER, MINIO_ROOT_PASSWORD); + } } diff --git a/block-node/s3-archive/src/main/java/org/hiero/block/node/archive/s3/S3ArchivePlugin.java b/block-node/s3-archive/src/main/java/org/hiero/block/node/archive/s3/S3ArchivePlugin.java index d229be0cc7..531bc9e158 100644 --- a/block-node/s3-archive/src/main/java/org/hiero/block/node/archive/s3/S3ArchivePlugin.java +++ b/block-node/s3-archive/src/main/java/org/hiero/block/node/archive/s3/S3ArchivePlugin.java @@ -1,12 +1,15 @@ // SPDX-License-Identifier: Apache-2.0 package org.hiero.block.node.archive.s3; +import static java.lang.System.Logger.Level.ERROR; +import static java.lang.System.Logger.Level.INFO; import static org.hiero.block.node.base.BlockFile.blockNumberFormated; import com.hedera.hapi.block.stream.output.BlockHeader; import com.hedera.pbj.runtime.ParseException; import com.hedera.pbj.runtime.io.buffer.Bytes; import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.IOException; import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; @@ -23,12 +26,14 @@ import org.hiero.block.common.utils.StringUtilities; import org.hiero.block.internal.BlockUnparsed; import org.hiero.block.node.base.s3.S3Client; +import org.hiero.block.node.base.s3.S3ResponseException; import org.hiero.block.node.base.tar.TaredBlockIterator; import org.hiero.block.node.spi.BlockNodeContext; import org.hiero.block.node.spi.BlockNodePlugin; import org.hiero.block.node.spi.ServiceBuilder; import org.hiero.block.node.spi.blockmessaging.BlockNotificationHandler; import org.hiero.block.node.spi.blockmessaging.PersistedNotification; +import org.hiero.block.node.spi.historicalblocks.BlockAccessor; import org.hiero.block.node.spi.historicalblocks.BlockAccessor.Format; /** @@ -77,26 +82,25 @@ public List> configDataTypes() { * {@inheritDoc} */ @Override - public void init(BlockNodeContext context, ServiceBuilder serviceBuilder) { + public void init(final BlockNodeContext context, final ServiceBuilder serviceBuilder) { this.context = context; this.archiveConfig = context.configuration().getConfigData(S3ArchiveConfig.class); // check if enabled by the "endpointUrl" property being non-empty in config if (StringUtilities.isBlank(archiveConfig.endpointUrl())) { - LOGGER.log(System.Logger.Level.INFO, "S3 Archive plugin is disabled. No endpoint URL provided."); + LOGGER.log(INFO, "S3 Archive plugin is disabled. No endpoint URL provided."); return; } else if (StringUtilities.isBlank(archiveConfig.accessKey())) { - LOGGER.log(System.Logger.Level.INFO, "S3 Archive plugin is disabled. No access key provided."); + LOGGER.log(INFO, "S3 Archive plugin is disabled. No access key provided."); return; } else if (StringUtilities.isBlank(archiveConfig.secretKey())) { - LOGGER.log(System.Logger.Level.INFO, "S3 Archive plugin is disabled. No secret key provided."); + LOGGER.log(INFO, "S3 Archive plugin is disabled. No secret key provided."); return; } // set up the executor service this.executorService = context.threadPoolManager() .createSingleThreadExecutor( "S3ArchiveRunner", - (t, e) -> LOGGER.log( - System.Logger.Level.ERROR, "Uncaught exception in thread: " + t.getName(), e)); + (t, e) -> LOGGER.log(ERROR, "Uncaught exception in thread: " + t.getName(), e)); // plugin is enabled enabled.set(true); // register @@ -116,14 +120,13 @@ public void start() { archiveConfig.bucketName(), archiveConfig.accessKey(), archiveConfig.secretKey())) { - String lastArchivedBlockNumberString = s3Client.downloadTextFile(LATEST_ARCHIVED_BLOCK_FILE); + final String lastArchivedBlockNumberString = s3Client.downloadTextFile(LATEST_ARCHIVED_BLOCK_FILE); if (lastArchivedBlockNumberString != null && !lastArchivedBlockNumberString.isEmpty()) { lastArchivedBlockNumber.set(Long.parseLong(lastArchivedBlockNumberString)); } - LOGGER.log(System.Logger.Level.INFO, "Last archived block number: " + lastArchivedBlockNumber); - } catch (Exception e) { - LOGGER.log( - System.Logger.Level.ERROR, "Failed to read latest archived block file: " + e.getMessage(), e); + LOGGER.log(INFO, "Last S3 archived block number: " + lastArchivedBlockNumber); + } catch (final Exception e) { + LOGGER.log(ERROR, "Failed to read latest archived block file: " + e.getMessage(), e); throw new RuntimeException(e); } } @@ -136,7 +139,7 @@ public void start() { public void stop() { BlockNodePlugin.super.stop(); // cancel all pending uploads - for (Future future : pendingUploads) { + for (final Future future : pendingUploads) { future.cancel(true); } } @@ -147,25 +150,21 @@ public void stop() { * {@inheritDoc} */ @Override - public void handlePersisted(PersistedNotification notification) { - // check if there is a new batch of blocks to archive - final long mostRecentPersistedBlockNumber = notification.endBlockNumber(); - long mostRecentArchivedBlockNumber = lastArchivedBlockNumber.get(); - // compute the next batch of blocks to archive - long nextBatchStartBlockNumber = - (mostRecentArchivedBlockNumber / archiveConfig.blocksPerFile()) * archiveConfig.blocksPerFile(); - long nextBatchEndBlockNumber = nextBatchStartBlockNumber + archiveConfig.blocksPerFile() - 1; - // find if there is blocksPerFile blocks past the mostRecentArchivedBlockNumber staring from a multiple of - // blocksPerFile - while (nextBatchEndBlockNumber <= mostRecentPersistedBlockNumber) { - // we have a batch of blocks to archive, so schedule it on background thread - scheduleBatchArchiving(nextBatchStartBlockNumber, nextBatchEndBlockNumber); - // compute the next batch of blocks to archive - mostRecentArchivedBlockNumber = mostRecentArchivedBlockNumber + archiveConfig.blocksPerFile(); - nextBatchStartBlockNumber = - (mostRecentArchivedBlockNumber / archiveConfig.blocksPerFile()) * archiveConfig.blocksPerFile() - + archiveConfig.blocksPerFile(); - nextBatchEndBlockNumber = nextBatchStartBlockNumber + archiveConfig.blocksPerFile() - 1; + public void handlePersisted(final PersistedNotification notification) { + // get the latest persisted block number from the notification + final long latestPersisted = notification.endBlockNumber(); + // compute what should be the start of the next batch to archive + long nextStart = lastArchivedBlockNumber.get() + 1; + // compute the end of the next batch to archive + long nextEnd = nextStart + archiveConfig.blocksPerFile() - 1; + // while the next batch end is less than or equal to the latest + // persisted block number, we can schedule a batch + while (nextEnd <= latestPersisted) { + // schedule the batch archiving + scheduleBatchArchiving(nextStart, nextEnd); + // move to the next batch and try again + nextStart = nextEnd + 1; + nextEnd = nextStart + archiveConfig.blocksPerFile() - 1; } } @@ -189,7 +188,7 @@ private void scheduleBatchArchiving(final long nextBatchStartBlockNumber, final if (nextBatchStartBlockNumber > lastArchivedBlockNumber.get()) { // log started LOGGER.log( - System.Logger.Level.INFO, + INFO, "Uploading archive block batch: " + nextBatchStartBlockNumber + "-" + nextBatchEndBlockNumber); try (final S3Client s3Client = new S3Client( @@ -200,24 +199,29 @@ private void scheduleBatchArchiving(final long nextBatchStartBlockNumber, final archiveConfig.secretKey())) { // fetch the blocks from the persistence plugins and archive uploadBlocksTar(s3Client, nextBatchStartBlockNumber, nextBatchEndBlockNumber); - // update the last archived block number - lastArchivedBlockNumber.set(nextBatchEndBlockNumber); // write the latest archived block number to the archive s3Client.uploadTextFile( LATEST_ARCHIVED_BLOCK_FILE, LATEST_FILE_STORAGE_CLASS, String.valueOf(nextBatchEndBlockNumber)); + // update the last archived block number + lastArchivedBlockNumber.set(nextBatchEndBlockNumber); // log completed LOGGER.log( - System.Logger.Level.INFO, + INFO, "Uploaded archive block batch: " + nextBatchStartBlockNumber + "-" + nextBatchEndBlockNumber); - } catch (Exception e) { + } catch (final S3ResponseException e) { + // todo we could retry here LOGGER.log( - System.Logger.Level.ERROR, - "Failed to upload archive block batch: " + e.getMessage(), + INFO, + "Failed to upload archive block batch due to an exceptional response: " + + e.getMessage(), e); throw new RuntimeException(e); + } catch (final Exception e) { + LOGGER.log(ERROR, "Failed to upload archive block batch: " + e.getMessage(), e); + throw new RuntimeException(e); } } }, @@ -234,17 +238,15 @@ private void scheduleBatchArchiving(final long nextBatchStartBlockNumber, final * @param startBlockNumber the first block number to upload * @param endBlockNumber the last block number to upload, inclusive */ - private void uploadBlocksTar(S3Client s3Client, long startBlockNumber, long endBlockNumber) - throws IllegalStateException { + private void uploadBlocksTar(final S3Client s3Client, final long startBlockNumber, final long endBlockNumber) + throws IllegalStateException, S3ResponseException, IOException { // The HTTP client needs an Iterable of byte arrays, so create one from the blocks - final Iterator tarBlocks = new TaredBlockIterator( - Format.ZSTD_PROTOBUF, - LongStream.range(startBlockNumber, endBlockNumber + 1) - .mapToObj( - blockNumber -> context.historicalBlockProvider().block(blockNumber)) - .iterator()); + final Iterator blocksIterator = LongStream.range(startBlockNumber, endBlockNumber + 1) + .mapToObj(blockNumber -> context.historicalBlockProvider().block(blockNumber)) + .iterator(); + final Iterator tarBlocks = new TaredBlockIterator(Format.ZSTD_PROTOBUF, blocksIterator); // fetch the first blocks consensus time so that we can place the file in a directory based on year and month - BlockUnparsed firstBlock = + final BlockUnparsed firstBlock = context.historicalBlockProvider().block(startBlockNumber).blockUnparsed(); final ZonedDateTime firstBlockConsensusTime; try { @@ -261,7 +263,7 @@ private void uploadBlocksTar(S3Client s3Client, long startBlockNumber, long endB header.blockTimestamp().seconds(), header.blockTimestamp().nanos()), ZoneOffset.UTC); - } catch (ParseException e) { + } catch (final ParseException e) { throw new IllegalStateException("Failed to parse Block Header from first block", e); } diff --git a/common/src/main/java/org/hiero/block/common/utils/StringUtilities.java b/common/src/main/java/org/hiero/block/common/utils/StringUtilities.java index 13c2021d57..35d8c7d4bb 100644 --- a/common/src/main/java/org/hiero/block/common/utils/StringUtilities.java +++ b/common/src/main/java/org/hiero/block/common/utils/StringUtilities.java @@ -5,6 +5,9 @@ /** A utility class that deals with logic related to Strings. */ public final class StringUtilities { + /** Empty {@link String} constant. Non-null, but with no whitespaces. */ + public static final String EMPTY = ""; + /** * This method takes an input {@link String} and checks if it is blank. * A {@link String} is considered blank if it is either {@code null} or