Skip to content

Commit a558ad2

Browse files
committed
Ensure S3InputStream is lazy
1 parent e1895a0 commit a558ad2

File tree

3 files changed

+59
-29
lines changed

3 files changed

+59
-29
lines changed

Diff for: src/main/kotlin/com/lapanthere/signals/S3InputStream.kt

+35-23
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest
1313
import software.amazon.awssdk.services.s3.model.HeadObjectRequest
1414
import java.io.InputStream
1515
import java.io.SequenceInputStream
16+
import java.time.Instant
1617
import java.util.Enumeration
1718

1819
internal val AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors()
@@ -35,14 +36,13 @@ public class S3InputStream(
3536
mutator: (GetObjectRequest.Builder) -> Unit = {}
3637
) : InputStream() {
3738
private val scope = CoroutineScope(Dispatchers.IO)
38-
private val parts = byteRange(
39-
s3.headObject(
40-
HeadObjectRequest.builder()
41-
.bucket(bucket)
42-
.key(key)
43-
.build()
44-
).get().contentLength()
45-
)
39+
private val s3Object = s3.headObject(
40+
HeadObjectRequest.builder()
41+
.bucket(bucket)
42+
.key(key)
43+
.build()
44+
).get()
45+
private val parts = byteRange(s3Object.contentLength())
4646
private val streams = parts.mapIndexed { i, (begin, end) ->
4747
scope.async(CoroutineName("chunk-${i + 1}"), CoroutineStart.LAZY) {
4848
s3.getObject(
@@ -56,26 +56,38 @@ public class S3InputStream(
5656
).await().asInputStream()
5757
}
5858
}.toMutableList()
59-
private val buffer = SequenceInputStream(
60-
object : Enumeration<InputStream> {
61-
private val iterator = streams.iterator()
59+
private val buffer: SequenceInputStream by lazy {
60+
SequenceInputStream(
61+
object : Enumeration<InputStream> {
62+
private val iterator = streams.iterator()
6263

63-
override fun hasMoreElements(): Boolean {
64-
// Starts downloading the next chunks ahead.
65-
streams.take(parallelism).forEach { it.start() }
66-
return iterator.hasNext()
67-
}
64+
override fun hasMoreElements(): Boolean {
65+
// Starts downloading the next chunks ahead.
66+
streams.take(parallelism).forEach { it.start() }
67+
return iterator.hasNext()
68+
}
6869

69-
override fun nextElement(): InputStream = runBlocking {
70-
iterator.use { it.await() }
70+
override fun nextElement(): InputStream = runBlocking {
71+
iterator.use { it.await() }
72+
}
7173
}
72-
}
73-
)
74-
75-
override fun read(): Int {
76-
return buffer.read()
74+
)
7775
}
7876

77+
public val eTag: String? = s3Object.eTag()
78+
public val contentLength: Long? = s3Object.contentLength()
79+
public val lastModified: Instant? = s3Object.lastModified()
80+
public val metadata: Map<String, String> = s3Object.metadata()
81+
public val contentType: String? = s3Object.contentType()
82+
public val contentEncoding: String? = s3Object.contentEncoding()
83+
public val contentDisposition: String? = s3Object.contentDisposition()
84+
public val contentLanguage: String? = s3Object.contentLanguage()
85+
public val versionId: String? = s3Object.versionId()
86+
public val cacheControl: String? = s3Object.cacheControl()
87+
public val expires: Instant? = s3Object.expires()
88+
89+
override fun read(): Int = buffer.read()
90+
7991
override fun close() {
8092
buffer.close()
8193
}

Diff for: src/test/kotlin/com/lapanthere/signals/S3InputStreamTest.kt

+21-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import java.util.concurrent.CompletableFuture
1616
import kotlin.test.Test
1717
import kotlin.test.assertFailsWith
1818

19-
class S3InputStreamTest {
19+
internal class S3InputStreamTest {
2020
private val bucket = "bucket"
2121
private val key = "key"
2222
private val s3: S3AsyncClient = mockk {
@@ -30,6 +30,8 @@ class S3InputStreamTest {
3030
} returns CompletableFuture.completedFuture(
3131
HeadObjectResponse.builder()
3232
.contentLength(6_291_456)
33+
.contentEncoding("application/json")
34+
.eTag("d41d8cd98f00b204e9800998ecf8427e-2")
3335
.build()
3436
)
3537
every {
@@ -59,7 +61,7 @@ class S3InputStreamTest {
5961
}
6062

6163
@Test
62-
fun testDownload() {
64+
fun `download a file`() {
6365
ByteArrayOutputStream().use { target ->
6466
S3InputStream(bucket = bucket, key = key, s3 = s3).use { stream ->
6567
stream.copyTo(target)
@@ -96,7 +98,23 @@ class S3InputStreamTest {
9698
}
9799

98100
@Test
99-
fun testFailure() {
101+
fun `downloading starts when reading starts`() {
102+
S3InputStream(bucket = bucket, key = key, s3 = s3)
103+
verify(exactly = 1) {
104+
s3.headObject(
105+
HeadObjectRequest.builder()
106+
.bucket(bucket)
107+
.key(key)
108+
.build()
109+
)
110+
}
111+
verify(exactly = 0) {
112+
s3.getObject(any<GetObjectRequest>(), any<ByteArrayAsyncResponseTransformer<GetObjectResponse>>())
113+
}
114+
}
115+
116+
@Test
117+
fun `handle exception on failure`() {
100118
every {
101119
s3.getObject(
102120
GetObjectRequest.builder()

Diff for: src/test/kotlin/com/lapanthere/signals/S3OutputStreamTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.CompletableFuture
2222
import kotlin.test.Test
2323
import kotlin.test.assertFailsWith
2424

25-
class S3OutputStreamTest {
25+
internal class S3OutputStreamTest {
2626
private val uploadID = "upload-id"
2727
private val bucket = "bucket"
2828
private val key = "key"
@@ -96,7 +96,7 @@ class S3OutputStreamTest {
9696
}
9797

9898
@Test
99-
fun testUpload() {
99+
fun `uploads a file`() {
100100
ByteArrayInputStream(ByteArray(32)).use { target ->
101101
S3OutputStream(bucket = bucket, key = key, s3 = s3).use { stream ->
102102
target.copyTo(stream)
@@ -154,7 +154,7 @@ class S3OutputStreamTest {
154154
}
155155

156156
@Test
157-
fun testFailure() {
157+
fun `handle exception on failure`() {
158158
every {
159159
s3.uploadPart(
160160
UploadPartRequest.builder()

0 commit comments

Comments
 (0)