diff --git a/core/src/main/scala/xs4s/package.scala b/core/src/main/scala/xs4s/package.scala index ae763a7..2a1f801 100644 --- a/core/src/main/scala/xs4s/package.scala +++ b/core/src/main/scala/xs4s/package.scala @@ -1,4 +1,4 @@ -import javax.xml.stream.{XMLEventReader, XMLInputFactory} +import javax.xml.stream.{XMLEventReader, XMLInputFactory, XMLOutputFactory} import javax.xml.stream.events.XMLEvent import xs4s.additions.{XMLEventReaderMaker, XMLLoader} @@ -34,6 +34,9 @@ package object xs4s { private[xs4s] def defaultXmlInputFactory: XMLInputFactory = XMLInputFactory.newInstance() + private[xs4s] def defaultXmlOutputFactory: XMLOutputFactory = + XMLOutputFactory.newInstance() + /** * Utilities to replicate [[scala.xml.XML]] */ diff --git a/fs2/src/main/scala/xs4s/fs2compat/Fs2Syntax.scala b/fs2/src/main/scala/xs4s/fs2compat/Fs2Syntax.scala index b9995cf..d3e19ef 100644 --- a/fs2/src/main/scala/xs4s/fs2compat/Fs2Syntax.scala +++ b/fs2/src/main/scala/xs4s/fs2compat/Fs2Syntax.scala @@ -2,7 +2,7 @@ package xs4s.fs2compat import cats.effect.{Blocker, ContextShift, Resource, Sync} import _root_.fs2.{Pipe, Stream} -import javax.xml.stream.XMLEventReader +import javax.xml.stream.{XMLEventReader, XMLEventWriter} import javax.xml.stream.events.XMLEvent import xs4s.XmlElementExtractor import xs4s.generic.Scanner @@ -38,6 +38,25 @@ trait Fs2Syntax { .apply[XMLEvent](blocker, reader.toIterator, chunkSize)) } + implicit class RichFs2XmlEventStream[F[_] : Sync](stream: Stream[F, XMLEvent]) { + + /** Writes an XMLEvent Stream to an XMLEventWriter */ + def writeXmlEventStream( + xmlEventWriter: Resource[F, XMLEventWriter]): F[Unit] = + Stream + .resource(xmlEventWriter) + .flatMap( + stream + .chunks + .fold(_) { (writer, events) => + events.foreach(writer.add) + writer.flush() + writer + }) + .compile + .drain + } + implicit class RichXmlElementExtractor[O]( xmlElementExtractor: XmlElementExtractor[O]) { def toFs2PipeThrowError[F[_]]: Pipe[F, XMLEvent, O] = diff --git a/fs2/src/main/scala/xs4s/fs2compat/package.scala b/fs2/src/main/scala/xs4s/fs2compat/package.scala index a594d14..c871d40 100644 --- a/fs2/src/main/scala/xs4s/fs2compat/package.scala +++ b/fs2/src/main/scala/xs4s/fs2compat/package.scala @@ -2,7 +2,7 @@ package xs4s import fs2._ import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Sync} -import javax.xml.stream.XMLInputFactory +import javax.xml.stream.{XMLInputFactory, XMLOutputFactory} import javax.xml.stream.events.XMLEvent import syntax.fs2._ @@ -32,4 +32,21 @@ package object fs2compat { F.delay(xmlInputFactory.createXMLEventReader(inputStream)))( xmlEventReader => F.delay(xmlEventReader.close())), chunkSize)) + + /** + * Turns an FS2 XMLEvent Stream into a stream of Bytes. + **/ + def xmlEventStreamToByteStream[F[_]: ConcurrentEffect: ContextShift]( + blocker: Blocker, + xmlOutputFactory: XMLOutputFactory = defaultXmlOutputFactory, + chunkSize: Int)( + implicit F: Sync[F]): Pipe[F, XMLEvent, Byte] = + (xmlEventStream: Stream[F, XMLEvent]) => + io.readOutputStream(blocker, chunkSize)( + outputStream => + xmlEventStream.writeXmlEventStream( + Resource.make( + F.delay(xmlOutputFactory.createXMLEventWriter(outputStream)))( + XMLEventWriter => F.delay(XMLEventWriter.close())) + )) } diff --git a/fs2v3/src/main/scala/xs4s/fs2compat/Fs2Syntax.scala b/fs2v3/src/main/scala/xs4s/fs2compat/Fs2Syntax.scala index dcf4c86..1d49c8d 100644 --- a/fs2v3/src/main/scala/xs4s/fs2compat/Fs2Syntax.scala +++ b/fs2v3/src/main/scala/xs4s/fs2compat/Fs2Syntax.scala @@ -6,7 +6,7 @@ import xs4s.XmlElementExtractor import xs4s.generic.Scanner import xs4s.syntax.core._ -import javax.xml.stream.XMLEventReader +import javax.xml.stream.{XMLEventReader, XMLEventWriter} import javax.xml.stream.events.XMLEvent import scala.language.higherKinds @@ -37,6 +37,25 @@ trait Fs2Syntax { .apply[XMLEvent](reader.toIterator, chunkSize)) } + implicit class RichFs2XmlEventStream[F[_] : Sync](stream: Stream[F, XMLEvent]) { + + /** Writes an XMLEvent Stream to an XMLEventWriter */ + def writeXmlEventStream( + xmlEventWriter: Resource[F, XMLEventWriter]): F[Unit] = + Stream + .resource(xmlEventWriter) + .flatMap( + stream + .chunks + .fold(_) { (writer, events) => + events.foreach(writer.add) + writer.flush() + writer + }) + .compile + .drain + } + implicit class RichXmlElementExtractor[O]( xmlElementExtractor: XmlElementExtractor[O]) { def toFs2PipeThrowError[F[_]]: Pipe[F, XMLEvent, O] = diff --git a/fs2v3/src/main/scala/xs4s/fs2compat/package.scala b/fs2v3/src/main/scala/xs4s/fs2compat/package.scala index f93d8f6..6a39cb9 100644 --- a/fs2v3/src/main/scala/xs4s/fs2compat/package.scala +++ b/fs2v3/src/main/scala/xs4s/fs2compat/package.scala @@ -4,7 +4,7 @@ import cats.effect.{Async, Resource, Sync} import fs2._ import xs4s.syntax.fs2._ -import javax.xml.stream.XMLInputFactory +import javax.xml.stream.{XMLInputFactory, XMLOutputFactory} import javax.xml.stream.events.XMLEvent import scala.language.higherKinds @@ -29,4 +29,19 @@ package object fs2compat { F.delay(xmlInputFactory.createXMLEventReader(inputStream)))( xmlEventReader => F.delay(xmlEventReader.close())), chunkSize)) + + /** + * Turns an FS2 XMLEvent Stream into a stream of Bytes. + **/ + def xmlEventStreamToByteStream[F[_] : Async]( + xmlOutputFactory: XMLOutputFactory = defaultXmlOutputFactory, + chunkSize: Int)(implicit F: Sync[F]): Pipe[F, XMLEvent, Byte] = + (xmlEventStream: Stream[F, XMLEvent]) => + io.readOutputStream(chunkSize)( + outputStream => + xmlEventStream.writeXmlEventStream( + Resource.make( + F.delay(xmlOutputFactory.createXMLEventWriter(outputStream)))( + XMLEventWriter => F.delay(XMLEventWriter.close())) + )) } diff --git a/fs2v3/src/test/scala/xs4s/fs2compat/Fs2CompatSpec.scala b/fs2v3/src/test/scala/xs4s/fs2compat/Fs2CompatSpec.scala index 3d70eb9..8e58293 100644 --- a/fs2v3/src/test/scala/xs4s/fs2compat/Fs2CompatSpec.scala +++ b/fs2v3/src/test/scala/xs4s/fs2compat/Fs2CompatSpec.scala @@ -2,7 +2,7 @@ package xs4s.fs2compat import cats.effect.IO import cats.effect.unsafe.implicits.global -import fs2.Stream +import fs2.{Chunk, Stream} import org.scalatest.freespec.AnyFreeSpec import xs4s.XmlElementExtractor import xs4s.syntax.fs2.RichXmlElementExtractor @@ -10,25 +10,25 @@ import xs4s.syntax.fs2.RichXmlElementExtractor import scala.xml.Elem final class Fs2CompatSpec extends AnyFreeSpec { - "It works" in { - val input = - s""" - | - |Embedded - |General - |Doubly embedded - |Nested - | - | + private val input = + s""" + | + |Embedded + |General + |Doubly embedded + |Nested + | + | """.stripMargin + "It works" in { val anchorElementExtractor: XmlElementExtractor[Elem] = XmlElementExtractor.filterElementsByName("item") val textStream: Stream[IO, String] = fs2.Stream .apply[IO, String](input) .flatMap(str => fs2.Stream.emits(str.getBytes().toList)) - .through(byteStreamToXmlEventStream[IO]()) + .through(byteStreamToXmlEventStream[IO](chunkSize = 10240)) .through(anchorElementExtractor.toFs2PipeThrowError) .map(_.text) @@ -38,4 +38,20 @@ final class Fs2CompatSpec extends AnyFreeSpec { "Doubly embedded", "Nested")) } + + "Roundtrip" in { + val stringStream: Stream[IO, String] = fs2.Stream + .apply[IO, String](input) + .flatMap(str => fs2.Stream.emits(str.getBytes().toList)) + .through(byteStreamToXmlEventStream[IO](chunkSize = 10240)) + .through(xmlEventStreamToByteStream[IO](chunkSize = 10240)) + .through(fs2.text.utf8Decode) + + val string = stringStream.compile.string.unsafeRunSync() + .replaceAll("<\\?xml.*?>", "") + .replaceAll("\r?\n", "\n") + + assert( + string == input.trim.replaceAll("\r?\n", "\n")) + } }