Skip to content

Commit

Permalink
* some omitted files
Browse files Browse the repository at this point in the history
  • Loading branch information
benedeki committed Feb 4, 2025
1 parent 1488d1f commit 96ffa33
Show file tree
Hide file tree
Showing 8 changed files with 609 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.exceptions

class ReaderException(message: String) extends Exception(message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.exceptions

import sttp.model.{StatusCode, Uri}
import za.co.absa.atum.model.envelopes.ErrorResponse

abstract class RequestException(message: String) extends ReaderException(message)


object RequestException {
type CirceError = io.circe.Error

final case class HttpException(
message: String,
statusCode: StatusCode,
errorResponse: ErrorResponse,
request: Uri
) extends RequestException(message)

final case class ParsingException(
message: String,
body: String
) extends RequestException(message)
object ParsingException {
def fromCirceError(error: CirceError, body: String): ParsingException = {
ParsingException(error.getMessage, body)
}
}


final case class NoDataException(
message: String
) extends RequestException(message)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.result

import sttp.monad.MonadError

abstract class AbstractPage [T <: Iterable[_], F[_]: MonadError] {
def items: T
def hasNext: Boolean
def limit: Int
def pageStart: Long
def pageEnd: Long

def pageSize: Int = (pageEnd - pageStart).toInt + 1
def hasPrior: Boolean = pageStart > 0
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.result

import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.reader.core.RequestResult.{RequestFail, RequestResult}
import za.co.absa.atum.reader.exceptions.RequestException.NoDataException
import za.co.absa.atum.reader.result.GroupedPage.GroupPageRoller

import scala.collection.immutable.ListMap

case class GroupedPage[K, V, F[_]: MonadError](
items: ListMap[K, Vector[V]],
hasNext: Boolean,
limit: Int,
pageStart: Long,
pageEnd: Long,
private[reader] val pageRoller: GroupPageRoller[K, V, F]
) extends AbstractPage[Map[K, Vector[V]], F] {

def apply(key: K): Vector[V] = items(key)
def keys: Iterable[K] = items.keys
def groupCount: Int = items.size

def map[K1, V1](f: ((K, Vector[V])) => (K1, Vector[V1])): GroupedPage[K1, V1, F] = {
val newItems = items.map(f)
val newPageRoller: GroupPageRoller[K1, V1, F] = (limit, offset) => pageRoller(limit, offset).map(_.map(_.map(f)))
this.copy(items = newItems, pageRoller = newPageRoller)
}

def mapValues[B](f: V => B): GroupedPage[K, B, F] = {
def mapper(item: (K, Vector[V])): (K, Vector[B]) = (item._1, item._2.map(f))

val newItems = items.map(mapper)
val newPageRoller: GroupPageRoller[K, B, F] = (limit, offset) => pageRoller(limit, offset).map(_.map(_.mapValues(f)))
this.copy(items = newItems, pageRoller = newPageRoller)

}

def prior(newPageSize: Int): F[RequestResult[GroupedPage[K, V, F]]] = {
if (hasPrior) {
val newOffset = (pageStart - limit).max(0)
pageRoller(newPageSize, newOffset)
} else {
MonadError[F].unit(RequestFail(NoDataException("No prior page")))
}
}

def prior: F[RequestResult[GroupedPage[K, V, F]]] = prior(limit)

def next(newPageSize: Int): F[RequestResult[GroupedPage[K, V, F]]] = {
if (hasNext) {
pageRoller(newPageSize, pageStart + limit)
} else {
MonadError[F].unit(RequestFail(NoDataException("No next page")))
}
}

def next: F[RequestResult[GroupedPage[K, V, F]]] = next(limit)

def +(other: GroupedPage[K, V, F]): GroupedPage[K, V, F] = {
val newItems = other.items.foldLeft(items) { case (acc, (k, v)) =>
if (acc.contains(k)) {
acc.updated(k, acc(k) ++ v)
} else {
acc + (k -> v)
}
}
val newHasNext = hasNext && other.hasNext
val newPageStart = pageStart min other.pageStart
val newPageEnd = pageEnd max other.pageEnd
this.copy(items = newItems, hasNext = newHasNext, pageStart = newPageStart, pageEnd = newPageEnd)
}
}

object GroupedPage {
type GroupPageRoller[K, V, F[_]] = (Int, Long) => F[RequestResult[GroupedPage[K, V, F]]]
}
96 changes: 96 additions & 0 deletions reader/src/main/scala/za/co/absa/atum/reader/result/Page.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.result

import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.reader.core.RequestResult.{RequestFail, RequestPageResultOps, RequestResult}
import za.co.absa.atum.reader.exceptions.RequestException.NoDataException
import za.co.absa.atum.reader.result.GroupedPage.GroupPageRoller
import za.co.absa.atum.reader.result.Page.PageRoller

import scala.collection.immutable.ListMap

case class Page[T, F[_]: MonadError](
items: Vector[T],
hasNext: Boolean,
limit: Int,
pageStart: Long,
pageEnd: Long,
private[reader] val pageRoller: PageRoller[T, F]
) extends AbstractPage[Vector[T], F] {

def apply(index: Int): T = items(index)

def map[B](f: T => B): Page[B, F] = {
val newItems = items.map(f)
val newPageRoller: PageRoller[B, F] = (limit, offset) => pageRoller(limit, offset).map(_.pageMap(f))
this.copy(items = newItems, pageRoller = newPageRoller)
}

def prior(newPageSize: Int): F[RequestResult[Page[T, F]]] = {
if (hasPrior) {
val newOffset = (pageStart - newPageSize).max(0)
pageRoller(newPageSize, newOffset)
} else {
MonadError[F].unit(RequestFail(NoDataException("No prior page")))
}
}

def prior(): F[RequestResult[Page[T, F]]] = prior(limit)

def next(newPageSize: Int): F[RequestResult[Page[T, F]]] = {
if (hasNext) {
pageRoller(newPageSize, pageStart + pageSize)
} else {
MonadError[F].unit(RequestFail(NoDataException("No next page")))
}
}

def next: F[RequestResult[Page[T, F]]] = next(limit)

def +(other: Page[T, F]): Page[T, F] = {
val newItems = items ++ other.items
val newPageStart = pageStart min other.pageStart
val newPageEnd = pageEnd max other.pageEnd
val newHasNext = hasNext && other.hasNext
this.copy(items = newItems, hasNext = newHasNext, pageStart = newPageStart, pageEnd = newPageEnd)
}

def groupBy[K](f: T => K): GroupedPage[K, T, F] = {
val (newItems, itemsCounts) = items.foldLeft(ListMap.empty[K, Vector[T]], 0) { case ((groupsAcc, count), item) =>
val k = f(item)
(groupsAcc.updated(k, groupsAcc.getOrElse(k, Vector.empty) :+ item), count + 1)
}
val newPageRoller: GroupPageRoller[K, T, F] = (limit, offset) =>
pageRoller(limit, offset)
.map(_.map(_.groupBy(f)))

GroupedPage(
newItems,
hasNext,
limit,
pageStart,
pageEnd,
newPageRoller
)
}
}

object Page {
type PageRoller[T, F[_]] = (Int, Long) => F[RequestResult[Page[T, F]]]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.result

import org.scalatest.funsuite.AnyFunSuiteLike
import sttp.client3.Identity
import sttp.client3.monad.IdMonad
import sttp.monad.MonadError


class AbstractPageUnitTests extends AnyFunSuiteLike {
private implicit val monad: MonadError[Identity] = IdMonad

test("Basic test") {
val page = new AbstractPage[Iterable[Int], Identity] {
override def items: Iterable[Int] = Seq(1, 2, 3)
override def hasNext: Boolean = true
override def limit: Int = 3
override def pageStart: Long = 0
override def pageEnd: Long = 2
}

assert(page.items.size == 3)
assert(page.hasNext)
assert(page.limit == 3)
assert(page.pageStart == 0)
assert(page.pageSize == 3)
assert(!page.hasPrior)

val anotherPage = new AbstractPage[Iterable[Int], Identity] {
override def items: Iterable[Int] = Seq(1, 2, 3)
override def hasNext: Boolean = true
override def limit: Int = 3
override def pageStart: Long = 1
override def pageEnd: Long = 2
}
assert(anotherPage.hasPrior)
}
}
Loading

0 comments on commit 96ffa33

Please sign in to comment.