Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to query and get stats for the default index #5277

Merged
merged 3 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
)(api, uuidF)
}

make[MigrateDefaultIndexing].from { (xas: Transactors) => MigrateDefaultIndexing(xas) }

make[ElasticSearchCoordinator].fromEffect {
(
views: ElasticSearchViews,
Expand All @@ -119,16 +121,18 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
supervisor: Supervisor,
client: ElasticSearchClient,
config: ElasticSearchViewsConfig,
cr: RemoteContextResolution @Id("aggregate")
cr: RemoteContextResolution @Id("aggregate"),
migration: MigrateDefaultIndexing
) =>
ElasticSearchCoordinator(
views,
graphStream,
registry,
supervisor,
client,
config
)(cr)
migration.run >>
ElasticSearchCoordinator(
views,
graphStream,
registry,
supervisor,
client,
config
)(cr)
}

make[DefaultIndexingCoordinator].fromEffect {
Expand All @@ -139,6 +143,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
client: ElasticSearchClient,
files: ElasticSearchFiles,
config: ElasticSearchViewsConfig,
baseUri: BaseUri,
cr: RemoteContextResolution @Id("aggregate")
) =>
DefaultIndexingCoordinator(
Expand All @@ -149,7 +154,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
files.defaultMapping,
files.defaultSettings,
config
)(cr)
)(baseUri, cr)
}

make[EventMetricsProjection].fromEffect {
Expand Down Expand Up @@ -229,6 +234,17 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
)
}

make[DefaultIndexRoutes].from {
(
identities: Identities,
aclCheck: AclCheck,
defaultIndexQuery: DefaultIndexQuery,
projections: Projections,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) => new DefaultIndexRoutes(identities, aclCheck, defaultIndexQuery, projections)(cr, ordering)
}

make[ListingRoutes].from {
(
identities: Identities,
Expand Down Expand Up @@ -385,6 +401,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
(
es: ElasticSearchViewsRoutes,
query: ListingRoutes,
defaultIndex: DefaultIndexRoutes,
indexing: ElasticSearchIndexingRoutes,
idResolutionRoute: IdResolutionRoutes,
historyRoutes: ElasticSearchHistoryRoutes,
Expand All @@ -397,6 +414,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
schemeDirectives,
es.routes,
query.routes,
defaultIndex.routes,
indexing.routes,
idResolutionRoute.routes,
historyRoutes.routes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,23 +135,6 @@ final class ElasticSearchViews private (
} yield res
}.span("createElasticSearchView")

/**
* Creates a new ElasticSearchView without checking whether the provided project is deprecated or not.
*
* @param iri
* id of the view to be created
* @param project
* project in which the view has to be created
* @param value
* configuration of the view to be created
*/
private[elasticsearch] def internalCreate(
iri: Iri,
project: ProjectRef,
value: ElasticSearchViewValue
)(implicit subject: Subject): IO[ViewResource] =
eval(CreateElasticSearchView(iri, project, value, value.toJson(iri), subject))

/**
* Updates an existing ElasticSearchView.
*
Expand Down Expand Up @@ -401,7 +384,7 @@ object ElasticSearchViews {
/**
* The default Elasticsearch API mappings
*/
val mappings: ApiMappings = ApiMappings("view" -> schema.original, "documents" -> defaultViewId)
val mappings: ApiMappings = ApiMappings("view" -> schema.original)

def projectionName(viewDef: ActiveViewDef): String =
projectionName(viewDef.ref.project, viewDef.ref.viewId, viewDef.indexingRev)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import cats.effect.IO
import cats.effect.std.Env
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.defaultViewId
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import doobie.syntax.all._

trait MigrateDefaultIndexing {
def run: IO[Unit]
}

object MigrateDefaultIndexing {

private val logger = Logger[MigrateDefaultIndexing]

private def trigger = Env[IO].get("MIGRATE_DEFAULT_INDEXING").map(_.getOrElse("false").toBoolean)

def apply(xas: Transactors): MigrateDefaultIndexing = new MigrateDefaultIndexing {

private def deleteDefaultViews() =
sql"""
DELETE FROM scoped_events WHERE type = 'elasticsearch' AND id = $defaultViewId;
DELETE FROM scoped_states WHERE type = 'elasticsearch' AND id = $defaultViewId;
DELETE FROM projection_offsets WHERE module = 'elasticsearch' AND resource_id = $defaultViewId;
""".stripMargin.update.run.void.transact(xas.write)

override def run: IO[Unit] =
trigger.flatMap { enabled =>
IO.whenA(enabled) {
logger.info("Deleting existing default elasticsearch views and their progress...") >>
deleteDefaultViews() >>
logger.info("Deleting existing default elasticsearch views is complete.")
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength:
private val scriptPath = "_scripts"
private val docPath = "_doc"
private val allIndexPath = "_all"
private val aliasPath = "_aliases"
private val bulkPath = "_bulk"
private val refreshPath = "_refresh"
private val indexTemplate = "_index_template"
Expand Down Expand Up @@ -466,6 +467,31 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength:
case resp if resp.status.isSuccess() => discardEntity(resp)
}

def createAlias(indexAlias: IndexAlias): IO[Unit] = {
val aliasPayload = Json.obj(
"index" := indexAlias.index.value,
"alias" := indexAlias.alias.value,
"routing" := indexAlias.routing,
"filter" := indexAlias.filter
)
aliasAction(Json.obj("add" := aliasPayload))
}

def removeAlias(index: IndexLabel, alias: IndexLabel): IO[Unit] = {
val aliasPayload = Json.obj(
"index" := index.value,
"alias" := alias.value
)
aliasAction(Json.obj("remove" := aliasPayload))
}

private def aliasAction(aliasAction: Json) = {
val aliasWrap = Json.obj("actions" := Json.arr(aliasAction))
client.run(Post(endpoint / aliasPath, aliasWrap).withHttpCredentials) {
case resp if resp.status.isSuccess() => discardEntity(resp)
}
}

private def discardEntity(resp: HttpResponse) =
IO.delay(resp.discardEntityBytes()) >> IO.unit

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client

import io.circe.JsonObject

/**
* Allows to create an alias against the given index with the optional routing and filter
* @param index
* the target index
* @param alias
* the name of the alias to create
* @param routing
* the optional routing to route requests for this alias to a specific shard
* @param filter
* the optional filter to limit the documents an alias can access
*/
final case class IndexAlias(index: IndexLabel, alias: IndexLabel, routing: Option[String], filter: Option[JsonObject])
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.deletion
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.DefaultIndexConfig
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.defaultProjectTargetAlias
import ch.epfl.bluebrain.nexus.delta.sdk.deletion.ProjectDeletionTask
import ch.epfl.bluebrain.nexus.delta.sdk.deletion.model.ProjectDeletionReport
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceUris}
Expand All @@ -13,15 +14,19 @@ final class DefaultIndexDeletionTask(client: ElasticSearchClient, defaultIndexCo
baseUri: BaseUri
) extends ProjectDeletionTask {

override def apply(project: ProjectRef)(implicit subject: Identity.Subject): IO[ProjectDeletionReport.Stage] =
private val reportStage =
ProjectDeletionReport.Stage("default-index", "The project has been successfully removed from the default index.")

override def apply(project: ProjectRef)(implicit subject: Identity.Subject): IO[ProjectDeletionReport.Stage] = {
val targetIndex = defaultIndexConfig.index
val targetAlias = defaultProjectTargetAlias(defaultIndexConfig, project)
searchByProject(project).flatMap { search =>
client
.deleteByQuery(search, defaultIndexConfig.index)
.as(
ProjectDeletionReport
.Stage("default-index", "The project has been successfully removed from the default index.")
)
client.removeAlias(targetIndex, targetAlias) >>
client
.deleteByQuery(search, defaultIndexConfig.index)
.as(reportStage)
}
}

private[deletion] def searchByProject(project: ProjectRef) =
IO.fromEither {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.DefaultIndexConfig
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.DefaultIndexingCoordinator.{defaultIndexingId, defaultIndexingPipeline}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.DefaultIndexingCoordinator.defaultIndexingPipeline
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
Expand All @@ -25,7 +25,7 @@ final class DefaultIndexingAction(sink: Sink, override val timeout: FiniteDurati

private def compile(project: ProjectRef, elem: Elem[GraphResource]) =
CompiledProjection.compile(
DefaultIndexingCoordinator.defaultIndexingMetadata(project),
defaultIndexingProjectionMetadata(project),
ExecutionStrategy.TransientSingleNode,
Source(_ => Stream(elem)),
defaultIndexingPipeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchC
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.ElasticSearchViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{DefaultMapping, DefaultSettings}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef}
Expand Down Expand Up @@ -49,7 +48,8 @@ object DefaultIndexingCoordinator {
fetchProjects: Offset => ElemStream[ProjectDef],
graphStream: GraphResourceStream,
supervisor: Supervisor,
sink: Sink
sink: Sink,
createAlias: ProjectRef => IO[Unit]
)(implicit cr: RemoteContextResolution)
extends DefaultIndexingCoordinator {

Expand All @@ -64,7 +64,7 @@ object DefaultIndexingCoordinator {
private def compile(project: ProjectRef): IO[CompiledProjection] =
IO.fromEither(
CompiledProjection.compile(
defaultIndexingMetadata(project),
defaultIndexingProjectionMetadata(project),
ExecutionStrategy.PersistentSingleNode,
Source(graphStream.continuous(project, SelectFilter.latest, _)),
defaultIndexingPipeline,
Expand All @@ -76,6 +76,7 @@ object DefaultIndexingCoordinator {
private def start(project: ProjectRef): IO[Unit] =
for {
compiled <- compile(project)
_ <- createAlias(project)
status <- supervisor.describe(compiled.metadata.name)
_ <- status match {
case Some(value) if value.status == ExecutionStatus.Running =>
Expand All @@ -90,22 +91,11 @@ object DefaultIndexingCoordinator {
private def destroy(project: ProjectRef): IO[Unit] = {
logger.info(s"Project '$project' has been marked as deleted, stopping the default indexing...") >>
supervisor
.destroy(projectionName(project))
.destroy(defaultIndexingProjection(project))
.void
}
}

val defaultIndexingId: IriOrBNode.Iri = nxv + "default-indexing"

private[indexing] def projectionName(ref: ProjectRef): String = s"default-indexing-$ref"

def defaultIndexingMetadata(project: ProjectRef): ProjectionMetadata = ProjectionMetadata(
"default-indexing",
projectionName(project),
Some(project),
Some(defaultIndexingId)
)

def defaultIndexingPipeline(implicit cr: RemoteContextResolution): NonEmptyChain[Operation] =
NonEmptyChain(
DefaultLabelPredicates.withConfig(()),
Expand All @@ -123,21 +113,21 @@ object DefaultIndexingCoordinator {
defaultMapping: DefaultMapping,
defaultSettings: DefaultSettings,
config: ElasticSearchViewsConfig
)(implicit cr: RemoteContextResolution): IO[DefaultIndexingCoordinator] =
)(implicit baseUri: BaseUri, cr: RemoteContextResolution): IO[DefaultIndexingCoordinator] =
if (config.indexingEnabled) {
client.createIndex(config.defaultIndex.index, Some(defaultMapping.value), Some(defaultSettings.value)) >>
apply(
projects.states(_).map(_.map { p => ProjectDef(p.project, p.markedForDeletion) }),
graphStream,
supervisor,
ElasticSearchSink.defaultIndexing(
client,
config.batch.maxElements,
config.batch.maxInterval,
config.defaultIndex.index,
Refresh.False
)
)
val batch = config.batch
val targetIndex = config.defaultIndex.index

def fetchProjects(offset: Offset) =
projects.states(offset).map(_.map { p => ProjectDef(p.project, p.markedForDeletion) })

def elasticsearchSink =
ElasticSearchSink.defaultIndexing(client, batch.maxElements, batch.maxInterval, targetIndex, Refresh.False)

def createAlias(project: ProjectRef) = client.createAlias(indexingAlias(config.defaultIndex, project))

client.createIndex(targetIndex, Some(defaultMapping.value), Some(defaultSettings.value)) >>
apply(fetchProjects, graphStream, supervisor, elasticsearchSink, createAlias)
} else {
Noop.log.as(Noop)
}
Expand All @@ -146,14 +136,10 @@ object DefaultIndexingCoordinator {
fetchProjects: Offset => ElemStream[ProjectDef],
graphStream: GraphResourceStream,
supervisor: Supervisor,
sink: Sink
sink: Sink,
createAlias: ProjectRef => IO[Unit]
)(implicit cr: RemoteContextResolution): IO[DefaultIndexingCoordinator] = {
val coordinator = new Active(
fetchProjects,
graphStream,
supervisor,
sink
)
val coordinator = new Active(fetchProjects, graphStream, supervisor, sink, createAlias)
val compiled =
CompiledProjection.fromStream(metadata, ExecutionStrategy.EveryNode, offset => coordinator.run(offset))
supervisor.run(compiled).as(coordinator)
Expand Down
Loading