Skip to content

Commit

Permalink
Introduce document routing for default index (#5275)
Browse files Browse the repository at this point in the history
* Introduce document routing for default index

* scalafmt

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Feb 11, 2025
1 parent e6f736a commit 18dd61f
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ sealed trait ElasticSearchAction extends Product with Serializable {
*/
def index: IndexLabel

/**
* @return
* To route the document to a particular shard
* @see
* https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-routing-field.html
*/
def routing: Option[String]

/**
* @return
* the id of the document for the current bulk operation
Expand All @@ -27,25 +35,30 @@ sealed trait ElasticSearchAction extends Product with Serializable {
def payload: String

protected def json: Json =
Json.obj("_index" -> index.value.asJson, "_id" -> id.asJson)
Json.fromFields(
List("_index" := index.value, "_id" := id) ++ routing.map { r => "routing" := r }
)
}

object ElasticSearchAction {

private val newLine = System.lineSeparator()

final case class Index(index: IndexLabel, id: String, content: Json) extends ElasticSearchAction {
final case class Index(index: IndexLabel, id: String, routing: Option[String], content: Json)
extends ElasticSearchAction {
def payload: String = Json.obj("index" -> json).noSpaces + newLine + content.noSpaces
}
final case class Create(index: IndexLabel, id: String, content: Json) extends ElasticSearchAction {
final case class Create(index: IndexLabel, id: String, routing: Option[String], content: Json)
extends ElasticSearchAction {
def payload: String = Json.obj("create" -> json).noSpaces + newLine + content.noSpaces
}
final case class Update(index: IndexLabel, id: String, content: Json, retry: Int = 0) extends ElasticSearchAction {
final case class Update(index: IndexLabel, id: String, routing: Option[String], content: Json, retry: Int = 0)
extends ElasticSearchAction {
val modified = if (retry > 0) json deepMerge Json.obj("retry_on_conflict" -> retry.asJson) else json

def payload: String = Json.obj("update" -> modified).noSpaces + newLine + content.asJson.noSpaces
}
final case class Delete(index: IndexLabel, id: String) extends ElasticSearchAction {
final case class Delete(index: IndexLabel, id: String, routing: Option[String]) extends ElasticSearchAction {
def payload: String = Json.obj("delete" -> json).noSpaces + newLine
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import scala.concurrent.duration.FiniteDuration
* the index to push into
* @param documentId
* a function that maps an elem to a documentId
* @param routing
* a function that maps an elem to a routing value
* @see
* https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-routing-field.html
* @param refresh
* the value for the `refresh` Elasticsearch parameter
*/
Expand All @@ -40,6 +44,7 @@ final class ElasticSearchSink private (
override val maxWindow: FiniteDuration,
index: IndexLabel,
documentId: Elem[Json] => String,
routing: Elem[Json] => Option[String],
refresh: Refresh
) extends Sink {
override type In = Json
Expand All @@ -53,11 +58,11 @@ final class ElasticSearchSink private (
val actions = elements.foldLeft(Vector.empty[ElasticSearchAction]) {
case (actions, successElem @ Elem.SuccessElem(_, _, _, _, _, json, _)) =>
if (json.isEmpty()) {
actions :+ Delete(index, documentId(successElem))
actions :+ Delete(index, documentId(successElem), routing(successElem))
} else
actions :+ Index(index, documentId(successElem), json)
actions :+ Index(index, documentId(successElem), routing(successElem), json)
case (actions, droppedElem: Elem.DroppedElem) =>
actions :+ Delete(index, documentId(droppedElem))
actions :+ Delete(index, documentId(droppedElem), routing(droppedElem))
case (actions, _: Elem.FailedElem) => actions
}

Expand Down Expand Up @@ -131,6 +136,7 @@ object ElasticSearchSink {
maxWindow,
index,
eventDocumentId,
_ => None,
refresh
)

Expand Down Expand Up @@ -161,6 +167,7 @@ object ElasticSearchSink {
maxWindow,
index,
elem => elem.id.toString,
_ => None,
refresh
)

Expand Down Expand Up @@ -189,6 +196,7 @@ object ElasticSearchSink {
maxWindow,
index,
elem => s"${elem.project}_${elem.id}",
elem => Some(elem.project.toString),
refresh
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class ElasticSearchViewsQuerySuite
bulk <- allResources.traverse { r =>
r.asDocument(ref).map { d =>
// We create a unique id across all indices
ElasticSearchAction.Index(view.index, genString(), d)
ElasticSearchAction.Index(view.index, genString(), None, d)
}
}
_ <- client.bulk(bulk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ class ElasticSearchClientSpec
"run bulk operation" in {
val index = IndexLabel(genString()).rightValue
val operations = List(
ElasticSearchAction.Index(index, "1", json"""{ "field1" : "value1" }"""),
ElasticSearchAction.Delete(index, "2"),
ElasticSearchAction.Index(index, "2", json"""{ "field1" : "value1" }"""),
ElasticSearchAction.Delete(index, "2"),
ElasticSearchAction.Create(index, "3", json"""{ "field1" : "value3" }"""),
ElasticSearchAction.Update(index, "1", json"""{ "doc" : {"field2" : "value2"} }""")
ElasticSearchAction.Index(index, "1", Some("routing"), json"""{ "field1" : "value1" }"""),
ElasticSearchAction.Delete(index, "2", Some("routing2")),
ElasticSearchAction.Index(index, "2", Some("routing2"), json"""{ "field1" : "value1" }"""),
ElasticSearchAction.Delete(index, "2", Some("routing2")),
ElasticSearchAction.Create(index, "3", Some("routing"), json"""{ "field1" : "value3" }"""),
ElasticSearchAction.Update(index, "1", Some("routing"), json"""{ "doc" : {"field2" : "value2"} }""")
)
esClient.bulk(operations).accepted
eventually {
Expand All @@ -123,12 +123,12 @@ class ElasticSearchClientSpec
"run bulk operation with errors" in {
val index = IndexLabel(genString()).rightValue
val operations = List(
ElasticSearchAction.Index(index, "1", json"""{ "field1" : "value1" }"""),
ElasticSearchAction.Delete(index, "2"),
ElasticSearchAction.Index(index, "2", json"""{ "field1" : 27 }"""),
ElasticSearchAction.Delete(index, "3"),
ElasticSearchAction.Create(index, "3", json"""{ "field1" : "value3" }"""),
ElasticSearchAction.Update(index, "5", json"""{ "doc" : {"field2" : "value2"} }""")
ElasticSearchAction.Index(index, "1", None, json"""{ "field1" : "value1" }"""),
ElasticSearchAction.Delete(index, "2", None),
ElasticSearchAction.Index(index, "2", None, json"""{ "field1" : 27 }"""),
ElasticSearchAction.Delete(index, "3", None),
ElasticSearchAction.Create(index, "3", None, json"""{ "field1" : "value3" }"""),
ElasticSearchAction.Update(index, "5", None, json"""{ "doc" : {"field2" : "value2"} }""")
)
val result = esClient.bulk(operations).accepted
result match {
Expand All @@ -146,7 +146,7 @@ class ElasticSearchClientSpec
val index = IndexLabel(genString()).rightValue
val doc = json"""{ "field1" : 1 }"""

val operations = List(ElasticSearchAction.Index(index, "1", doc))
val operations = List(ElasticSearchAction.Index(index, "1", None, doc))
esClient.bulk(operations, Refresh.WaitFor).accepted

esClient.getSource[Json](index, "1").accepted shouldEqual doc
Expand All @@ -157,9 +157,9 @@ class ElasticSearchClientSpec
val index = IndexLabel(genString()).rightValue

val operations = List(
ElasticSearchAction.Index(index, "1", json"""{ "field1" : 1 }"""),
ElasticSearchAction.Index(index, "2", json"""{ "field1" : 2 }"""),
ElasticSearchAction.Index(index, "3", json"""{ "doc" : {"field2" : 4} }""")
ElasticSearchAction.Index(index, "1", None, json"""{ "field1" : 1 }"""),
ElasticSearchAction.Index(index, "2", None, json"""{ "field1" : 2 }"""),
ElasticSearchAction.Index(index, "3", None, json"""{ "doc" : {"field2" : 4} }""")
)
esClient.bulk(operations, Refresh.WaitFor).accepted

Expand All @@ -174,9 +174,9 @@ class ElasticSearchClientSpec
val index = IndexLabel(genString()).rightValue

val operations = List(
ElasticSearchAction.Index(index, "1", json"""{ "field1" : 1 }"""),
ElasticSearchAction.Index(index, "2", json"""{ "field1" : 2 }"""),
ElasticSearchAction.Index(index, "3", json"""{ "doc" : {"field2" : 4} }""")
ElasticSearchAction.Index(index, "1", None, json"""{ "field1" : 1 }"""),
ElasticSearchAction.Index(index, "2", None, json"""{ "field1" : 2 }"""),
ElasticSearchAction.Index(index, "3", None, json"""{ "doc" : {"field2" : 4} }""")
)
esClient.bulk(operations, Refresh.WaitFor).accepted

Expand All @@ -187,9 +187,9 @@ class ElasticSearchClientSpec
val index = IndexLabel(genString()).rightValue

val operations = List(
ElasticSearchAction.Index(index, "1", json"""{ "field1" : 1 }"""),
ElasticSearchAction.Create(index, "3", json"""{ "field1" : 3 }"""),
ElasticSearchAction.Update(index, "1", json"""{ "doc" : {"field2" : "value2"} }""")
ElasticSearchAction.Index(index, "1", None, json"""{ "field1" : 1 }"""),
ElasticSearchAction.Create(index, "3", None, json"""{ "field1" : 3 }"""),
ElasticSearchAction.Update(index, "1", None, json"""{ "doc" : {"field2" : "value2"} }""")
)
esClient.bulk(operations, Refresh.WaitFor).accepted
val query = QueryBuilder(jobj"""{"query": {"bool": {"must": {"exists": {"field": "field1"} } } } }""")
Expand All @@ -208,9 +208,9 @@ class ElasticSearchClientSpec
val index = IndexLabel(genString()).rightValue

val operations = List(
ElasticSearchAction.Index(index, "1", json"""{ "field1" : 1 }"""),
ElasticSearchAction.Create(index, "3", json"""{ "field1" : 3 }"""),
ElasticSearchAction.Update(index, "1", json"""{ "doc" : {"field2" : "value2"} }""")
ElasticSearchAction.Index(index, "1", None, json"""{ "field1" : 1 }"""),
ElasticSearchAction.Create(index, "3", None, json"""{ "field1" : 3 }"""),
ElasticSearchAction.Update(index, "1", None, json"""{ "doc" : {"field2" : "value2"} }""")
)
esClient.bulk(operations).accepted
val query2 = jobj"""{"query": {"bool": {"must": {"term": {"field1": 3} } } } }"""
Expand All @@ -227,9 +227,9 @@ class ElasticSearchClientSpec
val index = IndexLabel(genString()).rightValue

val operations = List(
ElasticSearchAction.Index(index, "1", json"""{ "field1" : 1 }"""),
ElasticSearchAction.Create(index, "2", json"""{ "field1" : 3 }"""),
ElasticSearchAction.Update(index, "1", json"""{ "doc" : {"field2" : "value2"} }""")
ElasticSearchAction.Index(index, "1", None, json"""{ "field1" : 1 }"""),
ElasticSearchAction.Create(index, "2", None, json"""{ "field1" : 3 }"""),
ElasticSearchAction.Update(index, "1", None, json"""{ "doc" : {"field2" : "value2"} }""")
)

def theCountShouldBe(count: Long): IO[Assertion] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ class DefaultIndexDeletionTaskSuite

def toProjectUri(project: ProjectRef) = ResourceUris.project(project).accessUri

def indexAction(id: Int, project: ProjectRef) = {
val json = json"""{ "_project": "${toProjectUri(project)}", "number": $id }"""
ElasticSearchAction.Index(index, id.toString, Some(project.toString), json)
}

val bulk = List(
ElasticSearchAction.Index(index, "1", json"""{ "_project": "${toProjectUri(projectToDelete)}", "number": 1 }"""),
ElasticSearchAction.Index(index, "2", json"""{ "_project": "${toProjectUri(anotherProject)}","number" : 2 }"""),
ElasticSearchAction.Index(index, "3", json"""{ "_project": "${toProjectUri(projectToDelete)}", "number" : 3 }"""),
ElasticSearchAction.Index(index, "4", json"""{ "_project": "${toProjectUri(anotherProject)}", "number" : 4 }""")
indexAction(1, projectToDelete),
indexAction(2, anotherProject),
indexAction(3, projectToDelete),
indexAction(4, anotherProject)
)

def countInIndex(project: ProjectRef) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ class EventMetricsDeletionTaskSuite
val task = new EventMetricsDeletionTask(client, prefix)

val operations = List(
ElasticSearchAction.Index(index, "1", json"""{ "project": "$projectToDelete", "number": 1 }"""),
ElasticSearchAction.Index(index, "2", json"""{ "project": "$anotherProject","number" : 2 }"""),
ElasticSearchAction.Index(index, "3", json"""{ "project": "$projectToDelete", "number" : 3 }"""),
ElasticSearchAction.Index(index, "4", json"""{ "project": "$anotherProject", "number" : 4 }""")
ElasticSearchAction.Index(index, "1", None, json"""{ "project": "$projectToDelete", "number": 1 }"""),
ElasticSearchAction.Index(index, "2", None, json"""{ "project": "$anotherProject","number" : 2 }"""),
ElasticSearchAction.Index(index, "3", None, json"""{ "project": "$projectToDelete", "number" : 3 }"""),
ElasticSearchAction.Index(index, "4", None, json"""{ "project": "$anotherProject", "number" : 4 }""")
)

def countMetrics(project: ProjectRef) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class EventMetricsQuerySuite extends NexusSuite with ElasticSearchClientSetup.Fi
for {
_ <- client.createIndex(index, Some(metricsMapping.value), Some(metricsSettings.value)).assertEquals(true)
bulk = List(event11, event12, event21).zipWithIndex.map { case (event, i) =>
ElasticSearchAction.Index(index, i.toString, event.asJson)
ElasticSearchAction.Index(index, i.toString, None, event.asJson)
}
_ <- client.bulk(bulk).assertEquals(BulkResponse.Success)
_ <- client.refresh(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ class DefaultIndexQuerySuite extends NexusSuite with ElasticSearchClientSetup.Fi
_ <- client.createIndex(defaultIndex, Some(defaultMapping), Some(defaultSettings))
bulk <- allResources.traverse { r =>
r.asDocument.map { d =>
// We create a unique id across all indices
ElasticSearchAction.Index(defaultIndex, genString(), d)
ElasticSearchAction.Index(defaultIndex, genString(), Some(r.project.toString), d)
}
}
_ <- client.bulk(bulk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ final class GraphAnalyticsSink(
case Noop => acc
case UpdateByQuery(id, types) => acc.update(id, types)
case g: Index =>
val bulkAction = ElasticSearchAction.Index(index, documentId(success), g.asJson)
val bulkAction = ElasticSearchAction.Index(index, documentId(success), None, g.asJson)
acc.add(bulkAction).update(g.id, g.types)
}
//TODO: handle correctly the deletion of individual resources when the feature is implemented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.config.GraphAnalyti
import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.AnalyticsGraph.{Edge, EdgePath, Node}
import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.PropertiesStatistics.Metadata
import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.{AnalyticsGraph, PropertiesStatistics}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.schema
import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures
import ch.epfl.bluebrain.nexus.delta.sdk.generators.ProjectGen
Expand Down Expand Up @@ -54,19 +55,19 @@ class GraphAnalyticsSpec(docker: ElasticSearchDocker)
"GraphAnalytics" should {

"initialize" in {
val idx = GraphAnalytics.index(prefix, project.ref)
val idx = GraphAnalytics.index(prefix, project.ref)
client.createIndex(idx, Some(jsonObjectContentOf("elasticsearch/mappings.json")), None).accepted
val robert = iri"http://localhost/Robert"
val sam = iri"http://localhost/Sam"
val fred = iri"http://localhost/fred"
val anna = iri"http://localhost/Anna"
val robert = iri"http://localhost/Robert"
val sam = iri"http://localhost/Sam"
val fred = iri"http://localhost/fred"
val anna = iri"http://localhost/Anna"
def source(self: Iri, brother: Iri) = jsonContentOf("document-source.json", "id" -> self, "brother" -> brother)
client
.bulk(
List(
ElasticSearchAction.Index(idx, "1", jsonContentOf("document-source.json", "id" -> sam, "brother" -> sam)),
ElasticSearchAction
.Index(idx, "2", jsonContentOf("document-source.json", "id" -> anna, "brother" -> robert)),
ElasticSearchAction.Index(idx, "3", jsonContentOf("document-source.json", "id" -> sam, "brother" -> fred))
ElasticSearchAction.Index(idx, "1", None, source(sam, sam)),
ElasticSearchAction.Index(idx, "2", None, source(anna, robert)),
ElasticSearchAction.Index(idx, "3", None, source(sam, fred))
)
)
.accepted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class SearchSpec
val index = projectionIndex(p.projection, p.view.uuid, prefix)
esClient.createIndex(index, Some(mappings), None).accepted
val newBulk = createDocuments(p).zipWithIndex.map { case (json, idx) =>
ElasticSearchAction.Index(index, idx.toString, json)
ElasticSearchAction.Index(index, idx.toString, None, json)
}
bulk ++ newBulk
}
Expand Down

This file was deleted.

0 comments on commit 18dd61f

Please sign in to comment.