Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ private[compression] trait CompressionCompanionPlatform
crc32 = Some(contentCrc32)
)(s).stream
)
.through(_gunzip_validateTrailer(contentCrc32, inflater))
.through(_gunzip_validateTrailer(inflateParams, contentCrc32, inflater))
)
)
}
Expand Down Expand Up @@ -807,6 +807,7 @@ private[compression] trait CompressionCompanionPlatform
else stream

private def _gunzip_validateTrailer(
inflateParams: InflateParams,
crc32: CRC32,
inflater: Inflater
): Pipe[F, Byte, Byte] =
Expand All @@ -829,30 +830,51 @@ private[compression] trait CompressionCompanionPlatform
Pull.done
} else Pull.raiseError(new ZipException("Failed to read trailer (1)"))

def streamUntilTrailer(last: Chunk[Byte]): Stream[F, Byte] => Pull[F, Byte, Unit] =
// RFC 1952 § 2.2: a gzip stream may concatenate multiple members.
def continueWithNextMember(s: Stream[F, Byte]): Pull[F, Byte, Unit] =
s.pull.uncons.flatMap {
case None => Pull.done
case Some((next, rest)) =>
(Stream.chunk(next) ++ rest)
.through(gunzip(inflateParams))
.flatMap(_.content)
.pull
.echo
}

def consumeTrailer(
buffered: Chunk[Byte],
rest: Stream[F, Byte]
): Pull[F, Byte, Unit] =
if (buffered.size >= gzipTrailerBytes) {
val (trailer, afterTrailer) = buffered.splitAt(gzipTrailerBytes)
validateTrailer(trailer) >>
continueWithNextMember(Stream.chunk(afterTrailer) ++ rest)
} else
rest.pull.uncons.flatMap {
case Some((next, restRest)) => consumeTrailer(buffered ++ next, restRest)
case None => validateTrailer(buffered)
}

def streamUntilTrailer(emitted: Long): Stream[F, Byte] => Pull[F, Byte, Unit] =
_.pull.uncons
.flatMap {
case Some((next, rest)) =>
if (inflater.finished())
if (next.size >= gzipTrailerBytes)
if (last.nonEmpty) Pull.output(last) >> streamUntilTrailer(next)(rest)
else streamUntilTrailer(next)(rest)
else
streamUntilTrailer(last ++ next)(rest)
else if (last.nonEmpty)
Pull.output(last) >> Pull.output(next) >>
streamUntilTrailer(Chunk.empty[Byte])(rest)
else Pull.output(next) >> streamUntilTrailer(Chunk.empty[Byte])(rest)
val contentRemaining = (inflater.getBytesWritten - emitted).max(0L).toInt
val chunkIsAllContent = next.size <= contentRemaining
val chunkStraddlesBoundary = !chunkIsAllContent && contentRemaining > 0
if (chunkIsAllContent)
Pull.output(next) >> streamUntilTrailer(emitted + next.size)(rest)
else if (chunkStraddlesBoundary) {
val (content, postContent) = next.splitAt(contentRemaining)
Pull.output(content) >> consumeTrailer(postContent, rest)
} else
consumeTrailer(next, rest)
case None =>
val preTrailerBytes = last.size - gzipTrailerBytes
if (preTrailerBytes > 0)
Pull.output(last.take(preTrailerBytes)) >>
validateTrailer(last.drop(preTrailerBytes))
else
validateTrailer(last)
Pull.raiseError(new ZipException("Failed to read trailer (1)"))
}

streamUntilTrailer(Chunk.empty[Byte])(stream)
streamUntilTrailer(0L)(stream)
}.stream

/** Like Stream.unconsN, but returns a chunk of elements that do not satisfy the predicate, splitting chunk as necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package fs2

import cats.effect._
import cats.syntax.traverse._
import fs2.compression._
import org.scalacheck.effect.PropF.forAllF

Expand Down Expand Up @@ -325,6 +326,30 @@ class JvmNativeCompressionSuite extends CompressionSuite {
.assertEquals(expectedContent)
}

test("gunzip handles concatenated gzip members (RFC 1952 § 2.2 / block gzip)") {
def gzipMember(s: String): IO[Chunk[Byte]] =
Stream
.chunk(Chunk.array(s.getBytes(StandardCharsets.UTF_8)))
.through(Compression[IO].gzip(8192))
.compile
.to(Chunk)

val parts = List("first member ", "second member ", "third member")
parts
.traverse(gzipMember)
.flatMap { members =>
Stream
.chunk(Chunk.concat(members))
.rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime())
.through(Compression[IO].gunzip(8192))
.flatMap(_.content)
.compile
.toVector
}
.map(bytes => new String(bytes.toArray, StandardCharsets.UTF_8))
.assertEquals(parts.mkString)
}

def toEncodableFileName(fileName: String): String =
new String(
fileName.replaceAll("\u0000", "_").getBytes(StandardCharsets.ISO_8859_1),
Expand Down
Loading