Skip to content

Commit

Permalink
Improve error reporting for repository Zip download (#302)
Browse files Browse the repository at this point in the history
Replace the `CosmosError` subclass `InvalidRepositoryUri` with
`RepositoryUriSyntax` and `RepositoryUriConnection`. The JSON
encodings of these new types include the message from the
`Throwable` cause of the error, to further assist with diagnosis.

To facilitate unit testing, the `UniversePackageCache` method
`updateUniverseCache` was moved to the companion object, allowing
it to be tested without creating an instance of a cache. This allowed
most of the tests to be written without use of the filesystem.

Fixes #299.
  • Loading branch information
cruhland authored and BenWhitehead committed Apr 8, 2016
1 parent 8bf20b1 commit e1da3bf
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,10 @@ final class PackageRepositorySpec
val bogusRepository = PackageRepository("bogus", Uri.parse(uriText))
assertAdd(defaultRepos :+ bogusRepository, bogusRepository)

val expectedMsg = s"URI for repository [${bogusRepository.name}] is invalid: ${bogusRepository.uri}"

eventually {
assertResult(expectedMsg) {
val response = searchPackages(SearchRequest(None))
assertResult(Status.BadRequest)(response.status)
val Xor.Right(err) = decode[ErrorResponse](response.contentString)
val repo = err.message
repo
}
val response = searchPackages(SearchRequest(None))
assertResult(Status.BadRequest)(response.status)
assert(decode[ErrorResponse](response.contentString).isRight)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,23 @@ case class RepositoryAddIndexOutOfBounds(attempted: Int, max: Int) extends Cosmo
case class UnsupportedRepositoryVersion(version: UniverseVersion) extends CosmosError
case class UnsupportedRepositoryUri(uri: Uri) extends CosmosError

case class InvalidRepositoryUri(repository: PackageRepository, causedBy: Throwable)
extends CosmosError(causedBy)
case class RepositoryUriSyntax(
repository: PackageRepository,
causedBy: Throwable
) extends CosmosError(causedBy) {
override def getData: Option[JsonObject] = {
Some(JsonObject.singleton("cause", causedBy.getMessage.asJson))
}
}

case class RepositoryUriConnection(
repository: PackageRepository,
causedBy: Throwable
) extends CosmosError(causedBy) {
override def getData: Option[JsonObject] = {
Some(JsonObject.singleton("cause", causedBy.getMessage.asJson))
}
}

case class RepositoryNotPresent(nameOrUri: Ior[String, Uri]) extends CosmosError {
override def getData: Option[JsonObject] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.mesosphere.cosmos

import java.net.{MalformedURLException, UnknownHostException}
import java.io.{IOException, InputStream}
import java.net.{MalformedURLException, URISyntaxException, URL}
import java.nio.file._
import java.time.LocalDateTime
import java.util.Base64
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.ZipInputStream

import cats.data.Validated.{Invalid, Valid}
import cats.data.Xor.{Left, Right}
import cats.data._
import cats.std.list._ // allows for traversU in verifySchema
Expand Down Expand Up @@ -118,7 +118,7 @@ final class UniversePackageCache private (
updateMutex.acquireAndRun {
// TODO: How often we check should be configurable
if (lastModified.get().plusMinutes(1).isBefore(LocalDateTime.now())) {
updateUniverseCache()
updateUniverseCache(repository, universeDir)(_.openStream())
.onSuccess { _ => lastModified.set(LocalDateTime.now()) }
} else {
/* We don't need to fetch the latest package information; just return the current
Expand All @@ -129,17 +129,23 @@ final class UniversePackageCache private (
}
}

private[this] def updateUniverseCache(): Future[Path] = {
Future(universeBundle.toURI.toURL.openStream())
.handle {
case e @ (_: IllegalArgumentException | _: MalformedURLException | _: UnknownHostException) =>
throw new InvalidRepositoryUri(repository, e)
}
}

object UniversePackageCache {

def apply(repository: PackageRepository, dataDir: Path): UniversePackageCache = {
new UniversePackageCache(repository, dataDir)
}

private[cosmos] def updateUniverseCache(repository: PackageRepository, universeDir: Path)(
streamUrl: URL => InputStream
): Future[Path] = {
streamBundle(repository, streamUrl)
.map { bundleStream =>
try {
extractBundle(
new ZipInputStream(bundleStream),
universeBundle,
repository.uri,
universeDir
)
} finally {
Expand All @@ -148,10 +154,20 @@ final class UniversePackageCache private (
}
}

private[this] def base64(universeBundle: Uri): String = {
Base64.getUrlEncoder().encodeToString(
universeBundle.toString.getBytes(Charsets.Utf8)
)
private[cosmos] def streamBundle(
repository: PackageRepository,
streamUrl: URL => InputStream
): Future[InputStream] = {
Future(Xor.Right(repository.uri.toURI.toURL))
.handle {
case t @ (_: IllegalArgumentException | _: MalformedURLException | _: URISyntaxException) =>
Xor.Left(RepositoryUriSyntax(repository, t))
}
.map(_.map(streamUrl))
.handle { case t: IOException =>
Xor.Left(RepositoryUriConnection(repository, t))
}
.map(_.valueOr(throw _))
}

private[this] def extractBundle(
Expand Down Expand Up @@ -209,13 +225,19 @@ final class UniversePackageCache private (
path
} catch {
case e: Exception =>
// Only delete directory on failures because we want to keep it around on success.
TwitterFiles.delete(tempDir.toFile)
throw e
// Only delete directory on failures because we want to keep it around on success.
TwitterFiles.delete(tempDir.toFile)
throw e
}
}

private[this] def readSymbolicLink(path: Path): Option[Path] = {
private def base64(universeBundle: Uri): String = {
Base64.getUrlEncoder().encodeToString(
universeBundle.toString.getBytes(Charsets.Utf8)
)
}

private def readSymbolicLink(path: Path): Option[Path] = {
Try(Files.readSymbolicLink(path))
.map { path =>
Some(path)
Expand All @@ -228,14 +250,6 @@ final class UniversePackageCache private (
.get
}

}

object UniversePackageCache {

def apply(repository: PackageRepository, dataDir: Path): UniversePackageCache = {
new UniversePackageCache(repository, dataDir)
}

private[cosmos] def search(
bundleDir: Path,
universeIndex: UniverseIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,10 @@ object Encoders {
case UnsupportedRepositoryVersion(version) => s"Repository version [$version] is not supported"
case UnsupportedRepositoryUri(uri) => s"Repository URI [$uri] uses an unsupported scheme. " +
"Only http and https are supported"
case InvalidRepositoryUri(repository, _) =>
s"URI for repository [${repository.name}] is invalid: ${repository.uri}"
case RepositoryUriSyntax(repository, _) =>
s"URI for repository [${repository.name}] has invalid syntax: ${repository.uri}"
case RepositoryUriConnection(repository, _) =>
s"Could not access data at URI for repository [${repository.name}]: ${repository.uri}"
case RepositoryNotPresent(nameOrUri) =>
nameOrUri match {
case Ior.Both(n, u) => s"Neither repository name [$n] nor URI [$u] are present in the list"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,46 @@
package com.mesosphere.cosmos

import java.io.{ByteArrayInputStream, IOException, InputStream}
import java.net.{MalformedURLException, URL}
import java.nio.file.{Files, Path, Paths}

import com.mesosphere.cosmos.circe.Encoders._
import com.mesosphere.cosmos.model.SearchResult
import com.mesosphere.cosmos.model.{PackageRepository, SearchResult}
import com.mesosphere.cosmos.test.TestUtil
import com.mesosphere.universe._
import com.twitter.io.Charsets
import com.netaporter.uri.Uri
import com.twitter.io.{Charsets, StreamIO}
import com.twitter.util.{Await, Throw}
import io.circe.syntax._
import org.scalatest.{BeforeAndAfterAll, FreeSpec}
import org.scalatest.FreeSpec

final class UniversePackageCacheSpec extends FreeSpec with BeforeAndAfterAll {
final class UniversePackageCacheSpec extends FreeSpec {

import UniversePackageCacheSpec._

// TODO: This is not really a unit test because we have to use a temporary directory to
// store inputs for the test. Refactor UniversePackageCache so that it uses a trait to access
// package files, allowing this test to mock the trait. See issue #275.
var bundleDir: Path = _

override def beforeAll(): Unit = {
bundleDir = Files.createTempDirectory(getClass.getSimpleName)
initializePackageCache(bundleDir, List((SomeIndexEntry, SomeResource)))
}

override def afterAll(): Unit = {
assert(com.twitter.io.Files.delete(bundleDir.toFile))
}

"Issue #270: include package resources in search results" - {

"packageResources()" in {
val bundleDir = Files.createTempDirectory(getClass.getSimpleName)
initializePackageCache(bundleDir, List((SomeIndexEntry, SomeResource)))

val repoDir = UniversePackageCache.repoDirectory(bundleDir)
val universeIndex = UniverseIndex(SomeUniverseVersion, List(SomeIndexEntry))

assertResult(SomeResource) {
UniversePackageCache.packageResources(repoDir, universeIndex, SomeIndexEntry.name)
}

TestUtil.deleteRecursively(bundleDir)
}

"search()" in {
val bundleDir = Files.createTempDirectory(getClass.getSimpleName)
initializePackageCache(bundleDir, List((SomeIndexEntry, SomeResource)))

val universeIndex = UniverseIndex(SomeUniverseVersion, List(SomeIndexEntry))

val searchResult = SearchResult(
Expand All @@ -54,10 +56,83 @@ final class UniversePackageCacheSpec extends FreeSpec with BeforeAndAfterAll {
assertResult(List(searchResult)) {
UniversePackageCache.search(bundleDir, universeIndex, queryOpt = None)
}

TestUtil.deleteRecursively(bundleDir)
}

}

"streamBundle()" - {
"URI/URL syntax" - {
"relative URI" in {
val expectedRepo = PackageRepository(name = "FooBar", uri = Uri.parse("foo/bar"))
val Throw(RepositoryUriSyntax(actualRepo, causedBy)) =
Await.result(UniversePackageCache.streamBundle(expectedRepo, _.openStream()).liftToTry)
assertResult(expectedRepo)(actualRepo)
assert(causedBy.isInstanceOf[IllegalArgumentException])
}

"unknown protocol" in {
val expectedRepo = PackageRepository(name = "FooBar", uri = Uri.parse("foo://bar.com"))
val Throw(RepositoryUriSyntax(actualRepo, causedBy)) =
Await.result(UniversePackageCache.streamBundle(expectedRepo, _.openStream()).liftToTry)
assertResult(expectedRepo)(actualRepo)
assert(causedBy.isInstanceOf[MalformedURLException])
}
}

"Connection failure" in {
val expectedRepo = PackageRepository(name = "BadRepo", uri = Uri.parse("http://example.com"))
val errorMessage = "No one's home"
val streamUrl: URL => InputStream = _ => throw new IOException(errorMessage)
val Throw(RepositoryUriConnection(actualRepo, causedBy)) =
Await.result(UniversePackageCache.streamBundle(expectedRepo, streamUrl).liftToTry)
assertResult(expectedRepo)(actualRepo)
assert(causedBy.isInstanceOf[IOException])
assertResult(errorMessage)(causedBy.getMessage)
}

"Connection success" in {
val repository = PackageRepository(name = "GoodRepo", uri = Uri.parse("http://example.com"))
val bundleContent = "Pretend this is a package repository zip file"
val bundleStream = new ByteArrayInputStream(bundleContent.getBytes(Charsets.Utf8))
val streamFuture = UniversePackageCache.streamBundle(repository, _ => bundleStream)
val downloadStream = Await.result(streamFuture)
val downloadContent = StreamIO.buffer(downloadStream).toString(Charsets.Utf8.name)
assertResult(bundleContent)(downloadContent)
}
}

"updateUniverseCache()" - {
"uses streamBundle()" - {
"error" in {
val universeDir = Files.createTempDirectory(getClass.getSimpleName)

val repository = PackageRepository(name = "BadRepo", uri = Uri.parse("http://example.com"))
val bundleDirFuture = UniversePackageCache.updateUniverseCache(repository, universeDir) {
_ => throw new IOException("No one's home")
}
val Throw(t) = Await.result(bundleDirFuture.liftToTry)
assert(t.isInstanceOf[RepositoryUriConnection])

TestUtil.deleteRecursively(universeDir)
}

"success" in {
val universeDir = Files.createTempDirectory(getClass.getSimpleName)

val repository = PackageRepository(name = "GoodRepo", uri = Uri.parse("http://example.com"))
val bundleContent = "Not a zip file, but good enough to pass the test"
val bundleStream = new ByteArrayInputStream(bundleContent.getBytes(Charsets.Utf8))
val bundleDirFuture =
UniversePackageCache.updateUniverseCache(repository, universeDir)(_ => bundleStream)
assert(Await.result(bundleDirFuture.liftToTry).isReturn)

TestUtil.deleteRecursively(universeDir)
}
}
}

}

object UniversePackageCacheSpec {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.mesosphere.cosmos.circe

import cats.data.Xor
import com.mesosphere.cosmos.model.AppId
import com.mesosphere.cosmos.model.{AppId, PackageRepository}
import com.mesosphere.cosmos.{ErrorResponse, RepositoryUriConnection, RepositoryUriSyntax}
import com.mesosphere.universe.Images
import com.netaporter.uri.Uri
import io.circe.parse._
import io.circe.syntax._
import io.circe.{Decoder, Json}
import io.circe.{Decoder, Json, JsonObject}
import org.scalatest.FreeSpec

class EncodersDecodersSpec extends FreeSpec {
Expand Down Expand Up @@ -53,6 +55,30 @@ class EncodersDecodersSpec extends FreeSpec {
}
}

"CosmosError" - {
"RepositoryUriSyntax" in {
assertRoundTrip("RepositoryUriSyntax", RepositoryUriSyntax.apply)
}


"RepositoryUriConnection" in {
assertRoundTrip("RepositoryUriConnection", RepositoryUriConnection.apply)
}

def assertRoundTrip(
errorType: String,
errorConstructor: (PackageRepository, Throwable) => Exception
): Unit = {
val repo = PackageRepository("repo", Uri.parse("http://example.com"))
val cause = "original failure message"
val error = errorConstructor(repo, new Throwable(cause))

val Xor.Right(roundTripError) = error.asJson.as[ErrorResponse]
assertResult(errorType)(roundTripError.`type`)
assertResult(Some(JsonObject.singleton("cause", cause.asJson)))(roundTripError.data)
}
}

private[this] def decodeJson[A: Decoder](json: Json)(implicit decoder: Decoder[A]): A = {
decoder.decodeJson(json).getOrElse(throw new AssertionError("Unable to decode"))
}
Expand Down
Loading

0 comments on commit e1da3bf

Please sign in to comment.