diff --git a/cosmos-server/src/it/scala/com/mesosphere/cosmos/repository/PackageRepositorySpec.scala b/cosmos-server/src/it/scala/com/mesosphere/cosmos/repository/PackageRepositorySpec.scala index 6812edfa..256daac2 100644 --- a/cosmos-server/src/it/scala/com/mesosphere/cosmos/repository/PackageRepositorySpec.scala +++ b/cosmos-server/src/it/scala/com/mesosphere/cosmos/repository/PackageRepositorySpec.scala @@ -157,16 +157,10 @@ final class PackageRepositorySpec val bogusRepository = PackageRepository("bogus", Uri.parse(uriText)) assertAdd(defaultRepos :+ bogusRepository, bogusRepository) - val expectedMsg = s"URI for repository [${bogusRepository.name}] is invalid: ${bogusRepository.uri}" - eventually { - assertResult(expectedMsg) { - val response = searchPackages(SearchRequest(None)) - assertResult(Status.BadRequest)(response.status) - val Xor.Right(err) = decode[ErrorResponse](response.contentString) - val repo = err.message - repo - } + val response = searchPackages(SearchRequest(None)) + assertResult(Status.BadRequest)(response.status) + assert(decode[ErrorResponse](response.contentString).isRight) } } } diff --git a/cosmos-server/src/main/scala/com/mesosphere/cosmos/CosmosError.scala b/cosmos-server/src/main/scala/com/mesosphere/cosmos/CosmosError.scala index e72a8c93..cb6b796a 100644 --- a/cosmos-server/src/main/scala/com/mesosphere/cosmos/CosmosError.scala +++ b/cosmos-server/src/main/scala/com/mesosphere/cosmos/CosmosError.scala @@ -96,8 +96,23 @@ case class RepositoryAddIndexOutOfBounds(attempted: Int, max: Int) extends Cosmo case class UnsupportedRepositoryVersion(version: UniverseVersion) extends CosmosError case class UnsupportedRepositoryUri(uri: Uri) extends CosmosError -case class InvalidRepositoryUri(repository: PackageRepository, causedBy: Throwable) - extends CosmosError(causedBy) +case class RepositoryUriSyntax( + repository: PackageRepository, + causedBy: Throwable +) extends CosmosError(causedBy) { + override def getData: Option[JsonObject] = { + Some(JsonObject.singleton("cause", causedBy.getMessage.asJson)) + } +} + +case class RepositoryUriConnection( + repository: PackageRepository, + causedBy: Throwable +) extends CosmosError(causedBy) { + override def getData: Option[JsonObject] = { + Some(JsonObject.singleton("cause", causedBy.getMessage.asJson)) + } +} case class RepositoryNotPresent(nameOrUri: Ior[String, Uri]) extends CosmosError { override def getData: Option[JsonObject] = { diff --git a/cosmos-server/src/main/scala/com/mesosphere/cosmos/UniversePackageCache.scala b/cosmos-server/src/main/scala/com/mesosphere/cosmos/UniversePackageCache.scala index 3d0ae6a5..75d7ade2 100644 --- a/cosmos-server/src/main/scala/com/mesosphere/cosmos/UniversePackageCache.scala +++ b/cosmos-server/src/main/scala/com/mesosphere/cosmos/UniversePackageCache.scala @@ -1,13 +1,13 @@ package com.mesosphere.cosmos -import java.net.{MalformedURLException, UnknownHostException} +import java.io.{IOException, InputStream} +import java.net.{MalformedURLException, URISyntaxException, URL} import java.nio.file._ import java.time.LocalDateTime import java.util.Base64 import java.util.concurrent.atomic.AtomicReference import java.util.zip.ZipInputStream -import cats.data.Validated.{Invalid, Valid} import cats.data.Xor.{Left, Right} import cats.data._ import cats.std.list._ // allows for traversU in verifySchema @@ -118,7 +118,7 @@ final class UniversePackageCache private ( updateMutex.acquireAndRun { // TODO: How often we check should be configurable if (lastModified.get().plusMinutes(1).isBefore(LocalDateTime.now())) { - updateUniverseCache() + updateUniverseCache(repository, universeDir)(_.openStream()) .onSuccess { _ => lastModified.set(LocalDateTime.now()) } } else { /* We don't need to fetch the latest package information; just return the current @@ -129,17 +129,23 @@ final class UniversePackageCache private ( } } - private[this] def updateUniverseCache(): Future[Path] = { - Future(universeBundle.toURI.toURL.openStream()) - .handle { - case e @ (_: IllegalArgumentException | _: MalformedURLException | _: UnknownHostException) => - throw new InvalidRepositoryUri(repository, e) - } +} + +object UniversePackageCache { + + def apply(repository: PackageRepository, dataDir: Path): UniversePackageCache = { + new UniversePackageCache(repository, dataDir) + } + + private[cosmos] def updateUniverseCache(repository: PackageRepository, universeDir: Path)( + streamUrl: URL => InputStream + ): Future[Path] = { + streamBundle(repository, streamUrl) .map { bundleStream => try { extractBundle( new ZipInputStream(bundleStream), - universeBundle, + repository.uri, universeDir ) } finally { @@ -148,10 +154,20 @@ final class UniversePackageCache private ( } } - private[this] def base64(universeBundle: Uri): String = { - Base64.getUrlEncoder().encodeToString( - universeBundle.toString.getBytes(Charsets.Utf8) - ) + private[cosmos] def streamBundle( + repository: PackageRepository, + streamUrl: URL => InputStream + ): Future[InputStream] = { + Future(Xor.Right(repository.uri.toURI.toURL)) + .handle { + case t @ (_: IllegalArgumentException | _: MalformedURLException | _: URISyntaxException) => + Xor.Left(RepositoryUriSyntax(repository, t)) + } + .map(_.map(streamUrl)) + .handle { case t: IOException => + Xor.Left(RepositoryUriConnection(repository, t)) + } + .map(_.valueOr(throw _)) } private[this] def extractBundle( @@ -209,13 +225,19 @@ final class UniversePackageCache private ( path } catch { case e: Exception => - // Only delete directory on failures because we want to keep it around on success. - TwitterFiles.delete(tempDir.toFile) - throw e + // Only delete directory on failures because we want to keep it around on success. + TwitterFiles.delete(tempDir.toFile) + throw e } } - private[this] def readSymbolicLink(path: Path): Option[Path] = { + private def base64(universeBundle: Uri): String = { + Base64.getUrlEncoder().encodeToString( + universeBundle.toString.getBytes(Charsets.Utf8) + ) + } + + private def readSymbolicLink(path: Path): Option[Path] = { Try(Files.readSymbolicLink(path)) .map { path => Some(path) @@ -228,14 +250,6 @@ final class UniversePackageCache private ( .get } -} - -object UniversePackageCache { - - def apply(repository: PackageRepository, dataDir: Path): UniversePackageCache = { - new UniversePackageCache(repository, dataDir) - } - private[cosmos] def search( bundleDir: Path, universeIndex: UniverseIndex, diff --git a/cosmos-server/src/main/scala/com/mesosphere/cosmos/circe/Encoders.scala b/cosmos-server/src/main/scala/com/mesosphere/cosmos/circe/Encoders.scala index f24e870e..f5385be1 100644 --- a/cosmos-server/src/main/scala/com/mesosphere/cosmos/circe/Encoders.scala +++ b/cosmos-server/src/main/scala/com/mesosphere/cosmos/circe/Encoders.scala @@ -242,8 +242,10 @@ object Encoders { case UnsupportedRepositoryVersion(version) => s"Repository version [$version] is not supported" case UnsupportedRepositoryUri(uri) => s"Repository URI [$uri] uses an unsupported scheme. " + "Only http and https are supported" - case InvalidRepositoryUri(repository, _) => - s"URI for repository [${repository.name}] is invalid: ${repository.uri}" + case RepositoryUriSyntax(repository, _) => + s"URI for repository [${repository.name}] has invalid syntax: ${repository.uri}" + case RepositoryUriConnection(repository, _) => + s"Could not access data at URI for repository [${repository.name}]: ${repository.uri}" case RepositoryNotPresent(nameOrUri) => nameOrUri match { case Ior.Both(n, u) => s"Neither repository name [$n] nor URI [$u] are present in the list" diff --git a/cosmos-server/src/test/scala/com/mesosphere/cosmos/UniversePackageCacheSpec.scala b/cosmos-server/src/test/scala/com/mesosphere/cosmos/UniversePackageCacheSpec.scala index 23328dff..07c838e0 100644 --- a/cosmos-server/src/test/scala/com/mesosphere/cosmos/UniversePackageCacheSpec.scala +++ b/cosmos-server/src/test/scala/com/mesosphere/cosmos/UniversePackageCacheSpec.scala @@ -1,44 +1,46 @@ package com.mesosphere.cosmos +import java.io.{ByteArrayInputStream, IOException, InputStream} +import java.net.{MalformedURLException, URL} import java.nio.file.{Files, Path, Paths} import com.mesosphere.cosmos.circe.Encoders._ -import com.mesosphere.cosmos.model.SearchResult +import com.mesosphere.cosmos.model.{PackageRepository, SearchResult} +import com.mesosphere.cosmos.test.TestUtil import com.mesosphere.universe._ -import com.twitter.io.Charsets +import com.netaporter.uri.Uri +import com.twitter.io.{Charsets, StreamIO} +import com.twitter.util.{Await, Throw} import io.circe.syntax._ -import org.scalatest.{BeforeAndAfterAll, FreeSpec} +import org.scalatest.FreeSpec -final class UniversePackageCacheSpec extends FreeSpec with BeforeAndAfterAll { +final class UniversePackageCacheSpec extends FreeSpec { import UniversePackageCacheSpec._ // TODO: This is not really a unit test because we have to use a temporary directory to // store inputs for the test. Refactor UniversePackageCache so that it uses a trait to access // package files, allowing this test to mock the trait. See issue #275. - var bundleDir: Path = _ - - override def beforeAll(): Unit = { - bundleDir = Files.createTempDirectory(getClass.getSimpleName) - initializePackageCache(bundleDir, List((SomeIndexEntry, SomeResource))) - } - - override def afterAll(): Unit = { - assert(com.twitter.io.Files.delete(bundleDir.toFile)) - } - "Issue #270: include package resources in search results" - { "packageResources()" in { + val bundleDir = Files.createTempDirectory(getClass.getSimpleName) + initializePackageCache(bundleDir, List((SomeIndexEntry, SomeResource))) + val repoDir = UniversePackageCache.repoDirectory(bundleDir) val universeIndex = UniverseIndex(SomeUniverseVersion, List(SomeIndexEntry)) assertResult(SomeResource) { UniversePackageCache.packageResources(repoDir, universeIndex, SomeIndexEntry.name) } + + TestUtil.deleteRecursively(bundleDir) } "search()" in { + val bundleDir = Files.createTempDirectory(getClass.getSimpleName) + initializePackageCache(bundleDir, List((SomeIndexEntry, SomeResource))) + val universeIndex = UniverseIndex(SomeUniverseVersion, List(SomeIndexEntry)) val searchResult = SearchResult( @@ -54,10 +56,83 @@ final class UniversePackageCacheSpec extends FreeSpec with BeforeAndAfterAll { assertResult(List(searchResult)) { UniversePackageCache.search(bundleDir, universeIndex, queryOpt = None) } + + TestUtil.deleteRecursively(bundleDir) } } + "streamBundle()" - { + "URI/URL syntax" - { + "relative URI" in { + val expectedRepo = PackageRepository(name = "FooBar", uri = Uri.parse("foo/bar")) + val Throw(RepositoryUriSyntax(actualRepo, causedBy)) = + Await.result(UniversePackageCache.streamBundle(expectedRepo, _.openStream()).liftToTry) + assertResult(expectedRepo)(actualRepo) + assert(causedBy.isInstanceOf[IllegalArgumentException]) + } + + "unknown protocol" in { + val expectedRepo = PackageRepository(name = "FooBar", uri = Uri.parse("foo://bar.com")) + val Throw(RepositoryUriSyntax(actualRepo, causedBy)) = + Await.result(UniversePackageCache.streamBundle(expectedRepo, _.openStream()).liftToTry) + assertResult(expectedRepo)(actualRepo) + assert(causedBy.isInstanceOf[MalformedURLException]) + } + } + + "Connection failure" in { + val expectedRepo = PackageRepository(name = "BadRepo", uri = Uri.parse("http://example.com")) + val errorMessage = "No one's home" + val streamUrl: URL => InputStream = _ => throw new IOException(errorMessage) + val Throw(RepositoryUriConnection(actualRepo, causedBy)) = + Await.result(UniversePackageCache.streamBundle(expectedRepo, streamUrl).liftToTry) + assertResult(expectedRepo)(actualRepo) + assert(causedBy.isInstanceOf[IOException]) + assertResult(errorMessage)(causedBy.getMessage) + } + + "Connection success" in { + val repository = PackageRepository(name = "GoodRepo", uri = Uri.parse("http://example.com")) + val bundleContent = "Pretend this is a package repository zip file" + val bundleStream = new ByteArrayInputStream(bundleContent.getBytes(Charsets.Utf8)) + val streamFuture = UniversePackageCache.streamBundle(repository, _ => bundleStream) + val downloadStream = Await.result(streamFuture) + val downloadContent = StreamIO.buffer(downloadStream).toString(Charsets.Utf8.name) + assertResult(bundleContent)(downloadContent) + } + } + + "updateUniverseCache()" - { + "uses streamBundle()" - { + "error" in { + val universeDir = Files.createTempDirectory(getClass.getSimpleName) + + val repository = PackageRepository(name = "BadRepo", uri = Uri.parse("http://example.com")) + val bundleDirFuture = UniversePackageCache.updateUniverseCache(repository, universeDir) { + _ => throw new IOException("No one's home") + } + val Throw(t) = Await.result(bundleDirFuture.liftToTry) + assert(t.isInstanceOf[RepositoryUriConnection]) + + TestUtil.deleteRecursively(universeDir) + } + + "success" in { + val universeDir = Files.createTempDirectory(getClass.getSimpleName) + + val repository = PackageRepository(name = "GoodRepo", uri = Uri.parse("http://example.com")) + val bundleContent = "Not a zip file, but good enough to pass the test" + val bundleStream = new ByteArrayInputStream(bundleContent.getBytes(Charsets.Utf8)) + val bundleDirFuture = + UniversePackageCache.updateUniverseCache(repository, universeDir)(_ => bundleStream) + assert(Await.result(bundleDirFuture.liftToTry).isReturn) + + TestUtil.deleteRecursively(universeDir) + } + } + } + } object UniversePackageCacheSpec { diff --git a/cosmos-server/src/test/scala/com/mesosphere/cosmos/circe/EncodersDecodersSpec.scala b/cosmos-server/src/test/scala/com/mesosphere/cosmos/circe/EncodersDecodersSpec.scala index 57032006..2dc57271 100644 --- a/cosmos-server/src/test/scala/com/mesosphere/cosmos/circe/EncodersDecodersSpec.scala +++ b/cosmos-server/src/test/scala/com/mesosphere/cosmos/circe/EncodersDecodersSpec.scala @@ -1,11 +1,13 @@ package com.mesosphere.cosmos.circe import cats.data.Xor -import com.mesosphere.cosmos.model.AppId +import com.mesosphere.cosmos.model.{AppId, PackageRepository} +import com.mesosphere.cosmos.{ErrorResponse, RepositoryUriConnection, RepositoryUriSyntax} import com.mesosphere.universe.Images +import com.netaporter.uri.Uri import io.circe.parse._ import io.circe.syntax._ -import io.circe.{Decoder, Json} +import io.circe.{Decoder, Json, JsonObject} import org.scalatest.FreeSpec class EncodersDecodersSpec extends FreeSpec { @@ -53,6 +55,30 @@ class EncodersDecodersSpec extends FreeSpec { } } + "CosmosError" - { + "RepositoryUriSyntax" in { + assertRoundTrip("RepositoryUriSyntax", RepositoryUriSyntax.apply) + } + + + "RepositoryUriConnection" in { + assertRoundTrip("RepositoryUriConnection", RepositoryUriConnection.apply) + } + + def assertRoundTrip( + errorType: String, + errorConstructor: (PackageRepository, Throwable) => Exception + ): Unit = { + val repo = PackageRepository("repo", Uri.parse("http://example.com")) + val cause = "original failure message" + val error = errorConstructor(repo, new Throwable(cause)) + + val Xor.Right(roundTripError) = error.asJson.as[ErrorResponse] + assertResult(errorType)(roundTripError.`type`) + assertResult(Some(JsonObject.singleton("cause", cause.asJson)))(roundTripError.data) + } + } + private[this] def decodeJson[A: Decoder](json: Json)(implicit decoder: Decoder[A]): A = { decoder.decodeJson(json).getOrElse(throw new AssertionError("Unable to decode")) } diff --git a/cosmos-server/src/test/scala/com/mesosphere/cosmos/test/TestUtil.scala b/cosmos-server/src/test/scala/com/mesosphere/cosmos/test/TestUtil.scala new file mode 100644 index 00000000..df54e2c0 --- /dev/null +++ b/cosmos-server/src/test/scala/com/mesosphere/cosmos/test/TestUtil.scala @@ -0,0 +1,29 @@ +package com.mesosphere.cosmos.test + +import java.io.IOException +import java.nio.file._ +import java.nio.file.attribute.BasicFileAttributes + +object TestUtil { + + def deleteRecursively(path: Path): Unit = { + val visitor = new SimpleFileVisitor[Path] { + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + Files.delete(file) + FileVisitResult.CONTINUE + } + + override def postVisitDirectory(dir: Path, e: IOException): FileVisitResult = { + Option(e) match { + case Some(failure) => throw failure + case _ => + Files.delete(dir) + FileVisitResult.CONTINUE + } + } + } + + val _ = Files.walkFileTree(path, visitor) + } + +}