From 0448ebdd1ddddd945a61c1d7c7e7294e80f7ce84 Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Mon, 22 Jan 2024 17:33:08 +0000 Subject: [PATCH] remove core bin Signed-off-by: Tyler Ohlsen --- core/bin/main/mappings/doc-level-queries.json | 16 - core/bin/main/mappings/scheduled-jobs.json | 694 ------------------ .../org/opensearch/alerting/core/JobRunner.kt | 17 - .../opensearch/alerting/core/JobSweeper.kt | 512 ------------- .../alerting/core/JobSweeperMetrics.kt | 29 - .../alerting/core/ScheduledJobIndices.kt | 65 -- .../core/action/node/ScheduledJobStats.kt | 88 --- .../action/node/ScheduledJobsStatsAction.kt | 25 - .../action/node/ScheduledJobsStatsRequest.kt | 45 -- .../action/node/ScheduledJobsStatsResponse.kt | 78 -- .../node/ScheduledJobsStatsTransportAction.kt | 139 ---- .../RestScheduledJobStatsHandler.kt | 121 --- .../alerting/core/schedule/JobScheduler.kt | 228 ------ .../core/schedule/JobSchedulerMetrics.kt | 48 -- .../LegacyOpenDistroScheduledJobSettings.kt | 49 -- .../core/settings/ScheduledJobSettings.kt | 51 -- .../opensearchapi/OpenSearchExtensions.kt | 207 ------ core/bin/main/settings/doc-level-queries.json | 10 - .../alerting/core/WriteableTests.kt | 26 - .../alerting/core/model/MockScheduledJob.kt | 33 - .../core/schedule/JobSchedulerTest.kt | 190 ----- .../alerting/core/schedule/MockJobRunner.kt | 31 - 22 files changed, 2702 deletions(-) delete mode 100644 core/bin/main/mappings/doc-level-queries.json delete mode 100644 core/bin/main/mappings/scheduled-jobs.json delete mode 100644 core/bin/main/org/opensearch/alerting/core/JobRunner.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/JobSweeper.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/JobSweeperMetrics.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/ScheduledJobIndices.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsAction.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsRequest.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsResponse.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsTransportAction.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/resthandler/RestScheduledJobStatsHandler.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/schedule/JobScheduler.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/schedule/JobSchedulerMetrics.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/settings/LegacyOpenDistroScheduledJobSettings.kt delete mode 100644 core/bin/main/org/opensearch/alerting/core/settings/ScheduledJobSettings.kt delete mode 100644 core/bin/main/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt delete mode 100644 core/bin/main/settings/doc-level-queries.json delete mode 100644 core/bin/test/org/opensearch/alerting/core/WriteableTests.kt delete mode 100644 core/bin/test/org/opensearch/alerting/core/model/MockScheduledJob.kt delete mode 100644 core/bin/test/org/opensearch/alerting/core/schedule/JobSchedulerTest.kt delete mode 100644 core/bin/test/org/opensearch/alerting/core/schedule/MockJobRunner.kt diff --git a/core/bin/main/mappings/doc-level-queries.json b/core/bin/main/mappings/doc-level-queries.json deleted file mode 100644 index 7f0602df7..000000000 --- a/core/bin/main/mappings/doc-level-queries.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "_meta": { - "schema_version": 1 - }, - "properties": { - "query": { - "type": "percolator_ext" - }, - "monitor_id": { - "type": "text" - }, - "index": { - "type": "text" - } - } -} \ No newline at end of file diff --git a/core/bin/main/mappings/scheduled-jobs.json b/core/bin/main/mappings/scheduled-jobs.json deleted file mode 100644 index 2651c862e..000000000 --- a/core/bin/main/mappings/scheduled-jobs.json +++ /dev/null @@ -1,694 +0,0 @@ -{ - "_meta" : { - "schema_version": 8 - }, - "properties": { - "monitor": { - "dynamic": "false", - "properties": { - "schema_version": { - "type": "integer" - }, - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "owner": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "monitor_type": { - "type": "keyword" - }, - "user": { - "properties": { - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "backend_roles": { - "type" : "text", - "fields" : { - "keyword" : { - "type" : "keyword" - } - } - }, - "roles": { - "type" : "text", - "fields" : { - "keyword" : { - "type" : "keyword" - } - } - }, - "custom_attribute_names": { - "type" : "text", - "fields" : { - "keyword" : { - "type" : "keyword" - } - } - } - } - }, - "type": { - "type": "keyword" - }, - "enabled": { - "type": "boolean" - }, - "enabled_time": { - "type": "date", - "format": "strict_date_time||epoch_millis" - }, - "last_update_time": { - "type": "date", - "format": "strict_date_time||epoch_millis" - }, - "schedule": { - "properties": { - "period": { - "properties": { - "interval": { - "type": "integer" - }, - "unit": { - "type": "keyword" - } - } - }, - "cron": { - "properties": { - "expression": { - "type": "text" - }, - "timezone": { - "type": "keyword" - } - } - } - } - }, - "inputs": { - "type": "nested", - "properties": { - "search": { - "properties": { - "indices": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "query": { - "type": "object", - "enabled": false - } - } - } - } - }, - "data_sources": { - "properties": { - "alerts_index": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "findings_index": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "query_index": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "query_index_mapping": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - } - } - }, - "group_by_fields": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "triggers": { - "type": "nested", - "properties": { - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "min_time_between_executions": { - "type": "integer" - }, - "condition": { - "type": "object", - "enabled": false - }, - "actions": { - "type": "nested", - "properties": { - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "destination_id": { - "type": "keyword" - }, - "subject_template": { - "type": "object", - "enabled": false - }, - "message_template": { - "type": "object", - "enabled": false - }, - "throttle_enabled": { - "type": "boolean" - }, - "throttle": { - "properties": { - "value": { - "type": "integer" - }, - "unit": { - "type": "keyword" - } - } - } - } - }, - "query_level_trigger": { - "properties": { - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "min_time_between_executions": { - "type": "integer" - }, - "condition": { - "type": "object", - "enabled": false - }, - "actions": { - "type": "nested", - "properties": { - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "destination_id": { - "type": "keyword" - }, - "subject_template": { - "type": "object", - "enabled": false - }, - "message_template": { - "type": "object", - "enabled": false - }, - "throttle_enabled": { - "type": "boolean" - }, - "throttle": { - "properties": { - "value": { - "type": "integer" - }, - "unit": { - "type": "keyword" - } - } - } - } - } - } - } - } - }, - "ui_metadata": { - "type": "object", - "enabled": false - } - } - }, - "workflow": { - "dynamic": "false", - "properties": { - "schema_version": { - "type": "integer" - }, - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "owner": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "workflow_type": { - "type": "keyword" - }, - "user": { - "properties": { - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "backend_roles": { - "type" : "text", - "fields" : { - "keyword" : { - "type" : "keyword" - } - } - }, - "roles": { - "type" : "text", - "fields" : { - "keyword" : { - "type" : "keyword" - } - } - }, - "custom_attribute_names": { - "type" : "text", - "fields" : { - "keyword" : { - "type" : "keyword" - } - } - } - } - }, - "type": { - "type": "keyword" - }, - "enabled": { - "type": "boolean" - }, - "audit_delegate_monitor_alerts": { - "type": "boolean" - }, - "enabled_time": { - "type": "date", - "format": "strict_date_time||epoch_millis" - }, - "last_update_time": { - "type": "date", - "format": "strict_date_time||epoch_millis" - }, - "schedule": { - "properties": { - "period": { - "properties": { - "interval": { - "type": "integer" - }, - "unit": { - "type": "keyword" - } - } - }, - "cron": { - "properties": { - "expression": { - "type": "text" - }, - "timezone": { - "type": "keyword" - } - } - } - } - }, - "inputs": { - "type": "nested", - "properties": { - "composite_input": { - "type": "nested", - "properties": { - "sequence": { - "properties": { - "delegates": { - "type": "nested", - "properties": { - "order": { - "type": "integer" - }, - "monitor_id": { - "type": "keyword" - }, - "chained_monitor_findings": { - "properties": { - "monitor_id": { - "type": "keyword" - } - } - } - } - } - } - } - } - } - } - }, - "group_by_fields": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - } - } - }, - "destination": { - "dynamic": "false", - "properties": { - "schema_version": { - "type": "integer" - }, - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "user": { - "properties": { - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "backend_roles": { - "type" : "text", - "fields" : { - "keyword" : { - "type" : "keyword" - } - } - }, - "roles": { - "type" : "text", - "fields" : { - "keyword" : { - "type" : "keyword" - } - } - }, - "custom_attribute_names": { - "type" : "text", - "fields" : { - "keyword" : { - "type" : "keyword" - } - } - } - } - }, - "type": { - "type": "keyword" - }, - "last_update_time": { - "type": "date", - "format": "strict_date_time||epoch_millis" - }, - "chime": { - "properties": { - "url": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - } - } - }, - "slack": { - "properties": { - "url": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - } - } - }, - "custom_webhook": { - "properties": { - "url": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "scheme": { - "type": "keyword" - }, - "host": { - "type": "text" - }, - "port": { - "type": "integer" - }, - "path": { - "type": "keyword" - }, - "query_params": { - "type": "object", - "enabled": false - }, - "header_params": { - "type": "object", - "enabled": false - }, - "username": { - "type": "text" - }, - "password": { - "type": "text" - } - } - }, - "email": { - "properties": { - "email_account_id": { - "type": "keyword" - }, - "recipients": { - "type": "nested", - "properties": { - "type": { - "type": "keyword" - }, - "email_group_id": { - "type": "keyword" - }, - "email": { - "type": "text" - } - } - } - } - } - } - }, - "email_account": { - "properties": { - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "host": { - "type": "text" - }, - "port": { - "type": "integer" - }, - "method": { - "type": "text" - }, - "from": { - "type": "text" - } - } - }, - "email_group": { - "properties": { - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "emails": { - "type": "nested", - "properties": { - "email": { - "type": "text" - } - } - } - } - }, - "metadata" : { - "properties": { - "monitor_id": { - "type": "keyword" - }, - "last_action_execution_times": { - "type": "nested", - "properties": { - "action_id": { - "type": "keyword" - }, - "execution_time": { - "type": "date", - "format": "strict_date_time||epoch_millis" - } - } - }, - "last_run_context": { - "type": "object", - "enabled": false - }, - "source_to_query_index_mapping": { - "type": "object", - "enabled": false - } - } - }, - "workflow_metadata" : { - "properties": { - "workflow_id": { - "type": "keyword" - }, - "monitor_ids": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 1000 - } - } - }, - "latest_run_time": { - "type": "date", - "format": "strict_date_time||epoch_millis" - }, - "latest_execution_id": { - "type": "keyword" - } - } - } - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/JobRunner.kt b/core/bin/main/org/opensearch/alerting/core/JobRunner.kt deleted file mode 100644 index c251c8c6a..000000000 --- a/core/bin/main/org/opensearch/alerting/core/JobRunner.kt +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core - -import org.opensearch.commons.alerting.model.ScheduledJob -import java.time.Instant - -interface JobRunner { - fun postDelete(jobId: String) - - fun postIndex(job: ScheduledJob) - - fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant) -} diff --git a/core/bin/main/org/opensearch/alerting/core/JobSweeper.kt b/core/bin/main/org/opensearch/alerting/core/JobSweeper.kt deleted file mode 100644 index 6ba910707..000000000 --- a/core/bin/main/org/opensearch/alerting/core/JobSweeper.kt +++ /dev/null @@ -1,512 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core - -import org.apache.logging.log4j.LogManager -import org.opensearch.action.bulk.BackoffPolicy -import org.opensearch.action.search.SearchRequest -import org.opensearch.alerting.core.schedule.JobScheduler -import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.REQUEST_TIMEOUT -import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEEPER_ENABLED -import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEEP_BACKOFF_MILLIS -import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEEP_BACKOFF_RETRY_COUNT -import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEEP_PAGE_SIZE -import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEEP_PERIOD -import org.opensearch.alerting.opensearchapi.firstFailureOrNull -import org.opensearch.alerting.opensearchapi.retry -import org.opensearch.client.Client -import org.opensearch.cluster.ClusterChangedEvent -import org.opensearch.cluster.ClusterStateListener -import org.opensearch.cluster.routing.IndexShardRoutingTable -import org.opensearch.cluster.routing.Murmur3HashFunction -import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.lifecycle.LifecycleListener -import org.opensearch.common.logging.Loggers -import org.opensearch.common.lucene.uid.Versions -import org.opensearch.common.settings.Settings -import org.opensearch.common.unit.TimeValue -import org.opensearch.common.util.concurrent.OpenSearchExecutors -import org.opensearch.common.xcontent.LoggingDeprecationHandler -import org.opensearch.common.xcontent.XContentHelper -import org.opensearch.common.xcontent.XContentType -import org.opensearch.commons.alerting.model.ScheduledJob -import org.opensearch.core.common.Strings -import org.opensearch.core.common.bytes.BytesReference -import org.opensearch.core.index.shard.ShardId -import org.opensearch.core.rest.RestStatus -import org.opensearch.core.xcontent.NamedXContentRegistry -import org.opensearch.core.xcontent.XContentParser -import org.opensearch.core.xcontent.XContentParserUtils -import org.opensearch.index.engine.Engine -import org.opensearch.index.query.BoolQueryBuilder -import org.opensearch.index.query.QueryBuilders -import org.opensearch.index.shard.IndexingOperationListener -import org.opensearch.search.builder.SearchSourceBuilder -import org.opensearch.search.sort.FieldSortBuilder -import org.opensearch.threadpool.Scheduler -import org.opensearch.threadpool.ThreadPool -import java.util.TreeMap -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executors - -typealias JobId = String -typealias JobVersion = Long - -/** - * 'Sweeping' is the process of listening for new and updated [ScheduledJob]s and deciding if they should be scheduled for - * execution on this node. The [JobSweeper] runs on every node, sweeping all local active shards that are present on the node. - * - * A [consistent hash][ShardNodes] is used to distribute jobs across all nodes that contain an active instance of the same shard. - * This minimizes any interruptions in job execution when the cluster configuration changes. - * - * There are two types of sweeps: - * - *Full sweeps* occur when the [routing table][IndexShardRoutingTable] for the shard changes (for e.g. a replica has been - * added or removed). The full sweep re-reads all jobs in the shard, deciding which ones to run locally. All full sweeps - * happen asynchronously in the background in a serial manner. See the [sweepAllShards] method. - * - *Single job sweeps* occur when a new version of the job is indexed or deleted. An [IndexingOperationListener] listens - * for index changes and synchronously schedules or removes the job from the scheduler. - */ -class JobSweeper( - private val settings: Settings, - private val client: Client, - private val clusterService: ClusterService, - private val threadPool: ThreadPool, - private val xContentRegistry: NamedXContentRegistry, - private val scheduler: JobScheduler, - private val sweepableJobTypes: List -) : ClusterStateListener, IndexingOperationListener, LifecycleListener() { - private val logger = LogManager.getLogger(javaClass) - - private val fullSweepExecutor = Executors.newSingleThreadExecutor(OpenSearchExecutors.daemonThreadFactory("opendistro_job_sweeper")) - - private val sweptJobs = ConcurrentHashMap>() - - private var scheduledFullSweep: Scheduler.Cancellable? = null - - @Volatile private var lastFullSweepTimeNano = System.nanoTime() - - @Volatile private var requestTimeout = REQUEST_TIMEOUT.get(settings) - - @Volatile private var sweepPeriod = SWEEP_PERIOD.get(settings) - - @Volatile private var sweeperEnabled = SWEEPER_ENABLED.get(settings) - - @Volatile private var sweepPageSize = SWEEP_PAGE_SIZE.get(settings) - - @Volatile private var sweepBackoffMillis = SWEEP_BACKOFF_MILLIS.get(settings) - - @Volatile private var sweepBackoffRetryCount = SWEEP_BACKOFF_RETRY_COUNT.get(settings) - - @Volatile private var sweepSearchBackoff = BackoffPolicy.exponentialBackoff(sweepBackoffMillis, sweepBackoffRetryCount) - - init { - clusterService.addListener(this) - clusterService.addLifecycleListener(this) - clusterService.clusterSettings.addSettingsUpdateConsumer(SWEEP_PERIOD) { - // if sweep period change, restart background sweep with new sweep period - logger.debug("Reinitializing background full sweep with period: ${sweepPeriod.minutes()}") - sweepPeriod = it - initBackgroundSweep() - } - clusterService.clusterSettings.addSettingsUpdateConsumer(SWEEPER_ENABLED) { - sweeperEnabled = it - if (!sweeperEnabled) disable() else enable() - } - clusterService.clusterSettings.addSettingsUpdateConsumer(SWEEP_BACKOFF_MILLIS) { - sweepBackoffMillis = it - sweepSearchBackoff = BackoffPolicy.exponentialBackoff(sweepBackoffMillis, sweepBackoffRetryCount) - } - clusterService.clusterSettings.addSettingsUpdateConsumer(SWEEP_BACKOFF_RETRY_COUNT) { - sweepBackoffRetryCount = it - sweepSearchBackoff = BackoffPolicy.exponentialBackoff(sweepBackoffMillis, sweepBackoffRetryCount) - } - clusterService.clusterSettings.addSettingsUpdateConsumer(SWEEP_PAGE_SIZE) { sweepPageSize = it } - clusterService.clusterSettings.addSettingsUpdateConsumer(REQUEST_TIMEOUT) { requestTimeout = it } - } - - override fun afterStart() { - initBackgroundSweep() - } - - override fun beforeStop() { - scheduledFullSweep?.cancel() - } - - override fun beforeClose() { - fullSweepExecutor.shutdown() - } - - /** - * Initiates a full sweep of all local shards when the index routing table is changed (for e.g. when the node joins - * the cluster, a replica is added, removed or promoted to primary). - * - * This callback won't be invoked concurrently since cluster state changes are applied serially to the node - * in the order they occur on the cluster manager. However we can't block this callback for the duration of a full sweep so - * we perform the sweep in the background in a single threaded executor [fullSweepExecutor]. - */ - override fun clusterChanged(event: ClusterChangedEvent) { - if (!isSweepingEnabled()) return - - if (!event.indexRoutingTableChanged(ScheduledJob.SCHEDULED_JOBS_INDEX)) return - - logger.debug("Scheduled Jobs routing table changed. Running full sweep...") - fullSweepExecutor.submit { - sweepAllShards() - } - } - - /** - * This callback is invoked when a new job (or new version of a job) is indexed. If the job is assigned to the node - * it is scheduled. Relies on all indexing operations using optimistic concurrency control to ensure that stale versions - * of jobs are not scheduled. It schedules job only if it is one of the [sweepableJobTypes] - * - */ - override fun postIndex(shardId: ShardId, index: Engine.Index, result: Engine.IndexResult) { - if (!isSweepingEnabled()) return - - if (result.resultType != Engine.Result.Type.SUCCESS) { - val shardJobs = sweptJobs[shardId] ?: emptyMap() - val currentVersion = shardJobs[index.id()] ?: Versions.NOT_FOUND - logger.debug("Indexing failed for ScheduledJob: ${index.id()}. Continuing with current version $currentVersion") - return - } - - if (isOwningNode(shardId, index.id())) { - val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, index.source(), XContentType.JSON) - if (isSweepableJobType(xcp)) { - val job = parseAndSweepJob(xcp, shardId, index.id(), result.version, index.source(), true) - if (job != null) scheduler.postIndex(job) - } else { - logger.debug("Not a valid job type in document ${index.id()} to sweep.") - } - } - } - - /** - * This callback is invoked when a job is deleted from a shard. The job is descheduled. Relies on all delete operations - * using optimistic concurrency control to ensure that stale versions of jobs are not scheduled. - */ - override fun postDelete(shardId: ShardId, delete: Engine.Delete, result: Engine.DeleteResult) { - if (!isSweepingEnabled()) return - - if (result.resultType != Engine.Result.Type.SUCCESS) { - val shardJobs = sweptJobs[shardId] ?: emptyMap() - val currentVersion = shardJobs[delete.id()] ?: Versions.NOT_FOUND - logger.debug("Deletion failed for ScheduledJob: ${delete.id()}. Continuing with current version $currentVersion") - return - } - - if (isOwningNode(shardId, delete.id())) { - if (scheduler.scheduledJobs().contains(delete.id())) { - sweep(shardId, delete.id(), result.version, null) - } - scheduler.postDelete(delete.id()) - } - } - - fun enable() { - // initialize background sweep - initBackgroundSweep() - // set sweeperEnabled flag to true to make the listeners aware of this setting - sweeperEnabled = true - } - - fun disable() { - // cancel background sweep - scheduledFullSweep?.cancel() - // deschedule existing jobs on this node - logger.info("Descheduling all jobs as sweeping is disabled") - scheduler.deschedule(scheduler.scheduledJobs()) - // set sweeperEnabled flag to false to make the listeners aware of this setting - sweeperEnabled = false - } - - public fun isSweepingEnabled(): Boolean { - // Although it is a single link check, keeping it as a separate function, so we - // can abstract out logic of finding out whether to proceed or not - return sweeperEnabled == true - } - - private fun initBackgroundSweep() { - // if sweeping disabled, background sweep should not be triggered - if (!isSweepingEnabled()) return - - // cancel existing background thread if present - scheduledFullSweep?.cancel() - - // Manually sweep all shards before scheduling the background sweep so it picks up any changes immediately - // since the first run of a task submitted with scheduleWithFixedDelay() happens after the interval has passed. - logger.debug("Performing sweep of scheduled jobs.") - fullSweepExecutor.submit { - sweepAllShards() - } - - // Setup an anti-entropy/self-healing background sweep, in case a sweep that was triggered by an event fails. - val scheduledSweep = Runnable { - val elapsedTime = getFullSweepElapsedTime() - - // Rate limit to at most one full sweep per sweep period - // The schedule runs may wake up a few milliseconds early. - // Delta will be giving some buffer on the schedule to allow waking up slightly earlier. - val delta = sweepPeriod.millis - elapsedTime.millis - if (delta < 20L) { // give 20ms buffer. - fullSweepExecutor.submit { - logger.debug("Performing background sweep of scheduled jobs.") - sweepAllShards() - } - } - } - scheduledFullSweep = threadPool.scheduleWithFixedDelay(scheduledSweep, sweepPeriod, ThreadPool.Names.SAME) - } - - private fun sweepAllShards() { - val clusterState = clusterService.state() - if (!clusterState.routingTable.hasIndex(ScheduledJob.SCHEDULED_JOBS_INDEX)) { - scheduler.deschedule(scheduler.scheduledJobs()) - sweptJobs.clear() - lastFullSweepTimeNano = System.nanoTime() - return - } - - // Find all shards that are currently assigned to this node. - val localNodeId = clusterState.nodes.localNodeId - val localShards = clusterState.routingTable.allShards(ScheduledJob.SCHEDULED_JOBS_INDEX) - // Find all active shards - .filter { it.active() } - // group by shardId - .groupBy { it.shardId() } - // assigned to local node - .filter { (_, shards) -> shards.any { it.currentNodeId() == localNodeId } } - - // Remove all jobs on shards that are no longer assigned to this node. - val removedShards = sweptJobs.keys - localShards.keys - removedShards.forEach { shardId -> - val shardJobs = sweptJobs.remove(shardId) ?: emptyMap() - scheduler.deschedule(shardJobs.keys) - } - - // resweep all shards that are assigned to this node. - localShards.forEach { (shardId, shards) -> - try { - sweepShard(shardId, ShardNodes(localNodeId, shards.map { it.currentNodeId() })) - } catch (e: Exception) { - val shardLogger = Loggers.getLogger(javaClass, shardId) - shardLogger.error("Error while sweeping shard $shardId", e) - } - } - lastFullSweepTimeNano = System.nanoTime() - } - - private fun sweepShard(shardId: ShardId, shardNodes: ShardNodes, startAfter: String = "") { - val logger = Loggers.getLogger(javaClass, shardId) - logger.debug("Sweeping shard $shardId") - - // Remove any jobs that are currently scheduled that are no longer owned by this node - val currentJobs = sweptJobs.getOrPut(shardId) { ConcurrentHashMap() } - currentJobs.keys.filterNot { shardNodes.isOwningNode(it) }.forEach { - scheduler.deschedule(it) - currentJobs.remove(it) - } - - // sweep the shard for new and updated jobs. Uses a search after query to paginate, assuming that any concurrent - // updates and deletes are handled by the index operation listener. - var searchAfter: String? = startAfter - while (searchAfter != null) { - val boolQueryBuilder = BoolQueryBuilder() - sweepableJobTypes.forEach { boolQueryBuilder.should(QueryBuilders.existsQuery(it)) } - val jobSearchRequest = SearchRequest() - .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) - .preference("_shards:${shardId.id}|_only_local") - .source( - SearchSourceBuilder.searchSource() - .version(true) - .sort( - FieldSortBuilder("_id") - .unmappedType("keyword") - .missing("_last") - ) - .searchAfter(arrayOf(searchAfter)) - .size(sweepPageSize) - .query(boolQueryBuilder) - ) - - val response = sweepSearchBackoff.retry { - client.search(jobSearchRequest).actionGet(requestTimeout) - } - if (response.status() != RestStatus.OK) { - logger.error("Error sweeping shard $shardId.", response.firstFailureOrNull()) - return - } - for (hit in response.hits) { - if (shardNodes.isOwningNode(hit.id)) { - val xcp = XContentHelper.createParser( - xContentRegistry, - LoggingDeprecationHandler.INSTANCE, - hit.sourceRef, - XContentType.JSON - ) - parseAndSweepJob(xcp, shardId, hit.id, hit.version, hit.sourceRef) - } - } - searchAfter = response.hits.lastOrNull()?.id - } - } - - private fun sweep( - shardId: ShardId, - jobId: JobId, - newVersion: JobVersion, - job: ScheduledJob?, - failedToParse: Boolean = false - ) { - sweptJobs.getOrPut(shardId) { ConcurrentHashMap() } - // Use [compute] to update atomically in case another thread concurrently indexes/deletes the same job - .compute(jobId) { _, currentVersion -> - val jobCurrentlyScheduled = scheduler.scheduledJobs().contains(jobId) - - if (newVersion <= (currentVersion ?: Versions.NOT_FOUND)) { - if (unchangedJobToBeRescheduled(newVersion, currentVersion, jobCurrentlyScheduled, job)) { - logger.debug("Not skipping job $jobId since it is an unchanged job slated to be rescheduled") - } else { - logger.debug("Skipping job $jobId, $newVersion <= $currentVersion") - return@compute currentVersion - } - } - - // deschedule the currently scheduled version - if (jobCurrentlyScheduled) { - scheduler.deschedule(jobId) - } - - if (failedToParse) { - return@compute currentVersion - } - if (job != null) { - if (job.enabled) { - scheduler.schedule(job) - } - return@compute newVersion - } else { - return@compute null - } - } - } - - /* - * During the job sweep, normally jobs where the currentVersion is equal to the newVersion are skipped since - * there was no change. - * - * However, there exists an edge-case where a job could have been de-scheduled by flipping [SWEEPER_ENABLED] - * to false and then not have undergone any changes when the sweeper is re-enabled. In this case, the job should - * not be skipped so it can be re-scheduled. This utility method checks for this condition so the sweep() method - * can account for it. - */ - private fun unchangedJobToBeRescheduled( - newVersion: JobVersion, - currentVersion: JobVersion?, - jobCurrentlyScheduled: Boolean, - job: ScheduledJob? - ): Boolean { - // newVersion should not be [Versions.NOT_FOUND] here since it's passed in from existing search hits - // or successful doc delete operations - val versionWasUnchanged = newVersion == (currentVersion ?: Versions.NOT_FOUND) - val jobEnabled = job?.enabled ?: false - - return versionWasUnchanged && !jobCurrentlyScheduled && jobEnabled - } - - private fun parseAndSweepJob( - xcp: XContentParser, - shardId: ShardId, - jobId: JobId, - jobVersion: JobVersion, - jobSource: BytesReference, - typeIsParsed: Boolean = false - ): ScheduledJob? { - return try { - val job = parseScheduledJob(xcp, jobId, jobVersion, typeIsParsed) - sweep(shardId, jobId, jobVersion, job) - job - } catch (e: Exception) { - logger.warn( - "Unable to parse ScheduledJob source: {}", - Strings.cleanTruncate(jobSource.utf8ToString(), 1000) - ) - sweep(shardId, jobId, jobVersion, null, true) - null - } - } - - private fun parseScheduledJob(xcp: XContentParser, jobId: JobId, jobVersion: JobVersion, typeIsParsed: Boolean): ScheduledJob { - return if (typeIsParsed) { - ScheduledJob.parse(xcp, xcp.currentName(), jobId, jobVersion) - } else { - ScheduledJob.parse(xcp, jobId, jobVersion) - } - } - - private fun getFullSweepElapsedTime(): TimeValue { - return TimeValue.timeValueNanos(System.nanoTime() - lastFullSweepTimeNano) - } - - fun getJobSweeperMetrics(): JobSweeperMetrics { - if (!isSweepingEnabled()) { - return JobSweeperMetrics(-1, true) - } - val elapsedTime = getFullSweepElapsedTime() - return JobSweeperMetrics(elapsedTime.millis, elapsedTime.millis <= sweepPeriod.millis) - } - - private fun isSweepableJobType(xcp: XContentParser): Boolean { - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) - val jobType = xcp.currentName() - return sweepableJobTypes.contains(jobType) - } - - private fun isOwningNode(shardId: ShardId, jobId: JobId): Boolean { - val localNodeId = clusterService.localNode().id - val shardNodeIds = clusterService.state().routingTable.shardRoutingTable(shardId) - .filter { it.active() } - .map { it.currentNodeId() } - val shardNodes = ShardNodes(localNodeId, shardNodeIds) - return shardNodes.isOwningNode(jobId) - } -} - -/** - * A group of nodes in the cluster that contain active instances of a single OpenSearch shard. This uses a consistent hash to divide - * the jobs indexed in that shard amongst the nodes such that each job is "owned" by exactly one of the nodes. - * The local node must have an active instance of the shard. - * - * Implementation notes: This class is not thread safe. It uses the same [hash function][Murmur3HashFunction] that OpenSearch uses - * for routing. For each real node `100` virtual nodes are added to provide a good distribution. - */ -private class ShardNodes(val localNodeId: String, activeShardNodeIds: Collection) { - - private val circle = TreeMap() - - companion object { - private const val VIRTUAL_NODE_COUNT = 100 - } - - init { - for (node in activeShardNodeIds) { - for (i in 0 until VIRTUAL_NODE_COUNT) { - circle[Murmur3HashFunction.hash(node + i)] = node - } - } - } - - fun isOwningNode(id: JobId): Boolean { - if (circle.isEmpty()) { - return false - } - val hash = Murmur3HashFunction.hash(id) - val nodeId = (circle.higherEntry(hash) ?: circle.firstEntry()).value - return (localNodeId == nodeId) - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/JobSweeperMetrics.kt b/core/bin/main/org/opensearch/alerting/core/JobSweeperMetrics.kt deleted file mode 100644 index 9a10586d1..000000000 --- a/core/bin/main/org/opensearch/alerting/core/JobSweeperMetrics.kt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core - -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.core.common.io.stream.Writeable -import org.opensearch.core.xcontent.ToXContent -import org.opensearch.core.xcontent.ToXContentFragment -import org.opensearch.core.xcontent.XContentBuilder - -data class JobSweeperMetrics(val lastFullSweepTimeMillis: Long, val fullSweepOnTime: Boolean) : ToXContentFragment, Writeable { - - constructor(si: StreamInput) : this(si.readLong(), si.readBoolean()) - - override fun writeTo(out: StreamOutput) { - out.writeLong(lastFullSweepTimeMillis) - out.writeBoolean(fullSweepOnTime) - } - - override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - builder.field("last_full_sweep_time_millis", lastFullSweepTimeMillis) - builder.field("full_sweep_on_time", fullSweepOnTime) - return builder - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/ScheduledJobIndices.kt b/core/bin/main/org/opensearch/alerting/core/ScheduledJobIndices.kt deleted file mode 100644 index a71a7e64f..000000000 --- a/core/bin/main/org/opensearch/alerting/core/ScheduledJobIndices.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core - -import org.opensearch.action.admin.indices.create.CreateIndexRequest -import org.opensearch.action.admin.indices.create.CreateIndexResponse -import org.opensearch.client.AdminClient -import org.opensearch.cluster.health.ClusterIndexHealth -import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.settings.Settings -import org.opensearch.commons.alerting.model.ScheduledJob -import org.opensearch.core.action.ActionListener - -/** - * Initialize the OpenSearch components required to run [ScheduledJobs]. - * - * [initScheduledJobIndex] is called before indexing a new scheduled job. It verifies that the index exists before - * allowing the index to go through. This is to ensure the correct mappings exist for [ScheduledJob]. - */ -class ScheduledJobIndices(private val client: AdminClient, private val clusterService: ClusterService) { - - companion object { - @JvmStatic - fun scheduledJobMappings(): String { - return ScheduledJobIndices::class.java.classLoader.getResource("mappings/scheduled-jobs.json").readText() - } - } - /** - * Initialize the indices required for scheduled jobs. - * First check if the index exists, and if not create the index with the provided callback listeners. - * - * @param actionListener A callback listener for the index creation call. Generally in the form of onSuccess, onFailure - */ - fun initScheduledJobIndex(actionListener: ActionListener) { - if (!scheduledJobIndexExists()) { - var indexRequest = CreateIndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) - .mapping(scheduledJobMappings()) - .settings(Settings.builder().put("index.hidden", true).build()) - client.indices().create(indexRequest, actionListener) - } - } - - fun scheduledJobIndexExists(): Boolean { - val clusterState = clusterService.state() - return clusterState.routingTable.hasIndex(ScheduledJob.SCHEDULED_JOBS_INDEX) - } - - /** - * Check if the index exists. If the index does not exist, return null. - */ - fun scheduledJobIndexHealth(): ClusterIndexHealth? { - var indexHealth: ClusterIndexHealth? = null - - if (scheduledJobIndexExists()) { - val indexRoutingTable = clusterService.state().routingTable.index(ScheduledJob.SCHEDULED_JOBS_INDEX) - val indexMetaData = clusterService.state().metadata().index(ScheduledJob.SCHEDULED_JOBS_INDEX) - - indexHealth = ClusterIndexHealth(indexMetaData, indexRoutingTable) - } - return indexHealth - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt b/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt deleted file mode 100644 index 07792d553..000000000 --- a/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core.action.node - -import org.opensearch.action.support.nodes.BaseNodeResponse -import org.opensearch.alerting.core.JobSweeperMetrics -import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler -import org.opensearch.alerting.core.schedule.JobSchedulerMetrics -import org.opensearch.cluster.node.DiscoveryNode -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.core.xcontent.ToXContent -import org.opensearch.core.xcontent.ToXContentFragment -import org.opensearch.core.xcontent.XContentBuilder -import java.util.Locale - -/** - * Scheduled job stat that will be generated by each node. - */ -class ScheduledJobStats : BaseNodeResponse, ToXContentFragment { - - enum class ScheduleStatus(val status: String) { - RED("red"), - GREEN("green"); - - override fun toString(): String { - return status - } - } - - var status: ScheduleStatus - var jobSweeperMetrics: JobSweeperMetrics? = null - var jobInfos: Array? = null - - constructor(si: StreamInput) : super(si) { - this.status = si.readEnum(ScheduleStatus::class.java) - this.jobSweeperMetrics = si.readOptionalWriteable { JobSweeperMetrics(it) } - this.jobInfos = si.readOptionalArray({ sti: StreamInput -> JobSchedulerMetrics(sti) }, { size -> arrayOfNulls(size) }) - } - - constructor( - node: DiscoveryNode, - status: ScheduleStatus, - jobSweeperMetrics: JobSweeperMetrics?, - jobsInfo: Array? - ) : super(node) { - this.status = status - this.jobSweeperMetrics = jobSweeperMetrics - this.jobInfos = jobsInfo - } - - companion object { - @JvmStatic - fun readScheduledJobStatus(si: StreamInput) = ScheduledJobStats(si) - } - - override fun writeTo(out: StreamOutput) { - super.writeTo(out) - out.writeEnum(status) - out.writeOptionalWriteable(jobSweeperMetrics) - out.writeOptionalArray(jobInfos) - } - - override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - builder.field("name", node.name) - builder.field("schedule_status", status) - builder.field("roles", node.roles.map { it.roleName().uppercase(Locale.getDefault()) }) - if (jobSweeperMetrics != null) { - builder.startObject(RestScheduledJobStatsHandler.JOB_SCHEDULING_METRICS) - jobSweeperMetrics!!.toXContent(builder, params) - builder.endObject() - } - - if (jobInfos != null) { - builder.startObject(RestScheduledJobStatsHandler.JOBS_INFO) - for (job in jobInfos!!) { - builder.startObject(job.scheduledJobId) - job.toXContent(builder, params) - builder.endObject() - } - builder.endObject() - } - return builder - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsAction.kt b/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsAction.kt deleted file mode 100644 index 698c6c44e..000000000 --- a/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsAction.kt +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core.action.node - -import org.opensearch.action.ActionType -import org.opensearch.core.common.io.stream.Writeable - -class ScheduledJobsStatsAction : ActionType(NAME, reader) { - companion object { - val INSTANCE = ScheduledJobsStatsAction() - const val NAME = "cluster:admin/opendistro/_scheduled_jobs/stats" - - val reader = Writeable.Reader { - val response = ScheduledJobsStatsResponse(it) - response - } - } - - override fun getResponseReader(): Writeable.Reader { - return reader - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsRequest.kt b/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsRequest.kt deleted file mode 100644 index 6a82e8204..000000000 --- a/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsRequest.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core.action.node - -import org.opensearch.action.support.nodes.BaseNodesRequest -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.StreamOutput -import java.io.IOException - -/** - * A request to get node (cluster) level ScheduledJobsStatus. - * By default all the parameters will be true. - */ -class ScheduledJobsStatsRequest : BaseNodesRequest { - var jobSchedulingMetrics: Boolean = true - var jobsInfo: Boolean = true - - constructor(si: StreamInput) : super(si) { - jobSchedulingMetrics = si.readBoolean() - jobsInfo = si.readBoolean() - } - constructor(nodeIds: Array) : super(*nodeIds) - - @Throws(IOException::class) - override fun writeTo(out: StreamOutput) { - super.writeTo(out) - out.writeBoolean(jobSchedulingMetrics) - out.writeBoolean(jobsInfo) - } - - fun all(): ScheduledJobsStatsRequest { - jobSchedulingMetrics = true - jobsInfo = true - return this - } - - fun clear(): ScheduledJobsStatsRequest { - jobSchedulingMetrics = false - jobsInfo = false - return this - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsResponse.kt b/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsResponse.kt deleted file mode 100644 index edfcc0cce..000000000 --- a/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsResponse.kt +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core.action.node - -import org.opensearch.action.FailedNodeException -import org.opensearch.action.support.nodes.BaseNodesResponse -import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings -import org.opensearch.alerting.core.settings.ScheduledJobSettings -import org.opensearch.cluster.ClusterName -import org.opensearch.cluster.health.ClusterIndexHealth -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.core.xcontent.ToXContent -import org.opensearch.core.xcontent.ToXContentFragment -import org.opensearch.core.xcontent.XContentBuilder - -/** - * ScheduledJobsStatsResponse is a class that will contain all the response from each node. - */ -class ScheduledJobsStatsResponse : BaseNodesResponse, ToXContentFragment { - - private var scheduledJobEnabled: Boolean = false - private var indexExists: Boolean? = null - private var indexHealth: ClusterIndexHealth? = null - - constructor(si: StreamInput) : super(si) { - this.scheduledJobEnabled = si.readBoolean() - this.indexExists = si.readBoolean() - this.indexHealth = si.readOptionalWriteable { ClusterIndexHealth(si) } - } - - constructor( - clusterName: ClusterName, - nodeResponses: List, - failures: List, - scheduledJobEnabled: Boolean, - indexExists: Boolean, - indexHealth: ClusterIndexHealth? - ) : super(clusterName, nodeResponses, failures) { - this.scheduledJobEnabled = scheduledJobEnabled - this.indexExists = indexExists - this.indexHealth = indexHealth - } - - override fun writeNodesTo( - out: StreamOutput, - nodes: MutableList - ) { - out.writeList(nodes) - } - - override fun readNodesFrom(si: StreamInput): MutableList { - return si.readList { ScheduledJobStats.readScheduledJobStatus(it) } - } - - override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - builder.field(LegacyOpenDistroScheduledJobSettings.SWEEPER_ENABLED.key, scheduledJobEnabled) - builder.field(ScheduledJobSettings.SWEEPER_ENABLED.key, scheduledJobEnabled) - builder.field("scheduled_job_index_exists", indexExists) - builder.field("scheduled_job_index_status", indexHealth?.status?.name?.lowercase()) - val nodesOnSchedule = nodes.count { it.status == ScheduledJobStats.ScheduleStatus.GREEN } - val nodesNotOnSchedule = nodes.count { it.status == ScheduledJobStats.ScheduleStatus.RED } - builder.field("nodes_on_schedule", nodesOnSchedule) - builder.field("nodes_not_on_schedule", nodesNotOnSchedule) - builder.startObject("nodes") - for (scheduledJobStatus in nodes) { - builder.startObject(scheduledJobStatus.node.id) - scheduledJobStatus.toXContent(builder, params) - builder.endObject() - } - builder.endObject() - - return builder - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsTransportAction.kt b/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsTransportAction.kt deleted file mode 100644 index ac6f8f3a1..000000000 --- a/core/bin/main/org/opensearch/alerting/core/action/node/ScheduledJobsStatsTransportAction.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core.action.node - -import org.apache.logging.log4j.LogManager -import org.opensearch.action.FailedNodeException -import org.opensearch.action.support.ActionFilters -import org.opensearch.action.support.nodes.BaseNodeRequest -import org.opensearch.action.support.nodes.TransportNodesAction -import org.opensearch.alerting.core.JobSweeper -import org.opensearch.alerting.core.JobSweeperMetrics -import org.opensearch.alerting.core.ScheduledJobIndices -import org.opensearch.alerting.core.schedule.JobScheduler -import org.opensearch.alerting.core.schedule.JobSchedulerMetrics -import org.opensearch.cluster.health.ClusterIndexHealth -import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.inject.Inject -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.threadpool.ThreadPool -import org.opensearch.transport.TransportService -import java.io.IOException - -private val log = LogManager.getLogger(ScheduledJobsStatsTransportAction::class.java) - -class ScheduledJobsStatsTransportAction : TransportNodesAction { - - private val jobSweeper: JobSweeper - private val jobScheduler: JobScheduler - private val scheduledJobIndices: ScheduledJobIndices - - @Inject - constructor( - threadPool: ThreadPool, - clusterService: ClusterService, - transportService: TransportService, - actionFilters: ActionFilters, - jobSweeper: JobSweeper, - jobScheduler: JobScheduler, - scheduledJobIndices: ScheduledJobIndices - ) : super( - ScheduledJobsStatsAction.NAME, - threadPool, - clusterService, - transportService, - actionFilters, - { ScheduledJobsStatsRequest(it) }, - { ScheduledJobStatusRequest(it) }, - ThreadPool.Names.MANAGEMENT, - ScheduledJobStats::class.java - ) { - this.jobSweeper = jobSweeper - this.jobScheduler = jobScheduler - this.scheduledJobIndices = scheduledJobIndices - } - - override fun newNodeRequest(request: ScheduledJobsStatsRequest): ScheduledJobStatusRequest { - return ScheduledJobStatusRequest(request) - } - - override fun newNodeResponse(si: StreamInput): ScheduledJobStats { - return ScheduledJobStats(si) - } - - override fun newResponse( - request: ScheduledJobsStatsRequest, - responses: MutableList, - failures: MutableList - ): ScheduledJobsStatsResponse { - val scheduledJobEnabled = jobSweeper.isSweepingEnabled() - val scheduledJobIndexExist = scheduledJobIndices.scheduledJobIndexExists() - val indexHealth: ClusterIndexHealth? = if (scheduledJobIndexExist) scheduledJobIndices.scheduledJobIndexHealth() else null - - return ScheduledJobsStatsResponse( - clusterService.clusterName, - responses, - failures, - scheduledJobEnabled, - scheduledJobIndexExist, - indexHealth - ) - } - - override fun nodeOperation(request: ScheduledJobStatusRequest): ScheduledJobStats { - return createScheduledJobStatus(request.request) - } - - private fun createScheduledJobStatus( - scheduledJobsStatusRequest: ScheduledJobsStatsRequest - ): ScheduledJobStats { - val jobSweeperMetrics = jobSweeper.getJobSweeperMetrics() - val jobSchedulerMetrics = jobScheduler.getJobSchedulerMetric() - - val status: ScheduledJobStats.ScheduleStatus = evaluateStatus(jobSchedulerMetrics, jobSweeperMetrics) - return ScheduledJobStats( - this.transportService.localNode, - status, - if (scheduledJobsStatusRequest.jobSchedulingMetrics) jobSweeperMetrics else null, - if (scheduledJobsStatusRequest.jobsInfo) jobSchedulerMetrics.toTypedArray() else null - ) - } - - private fun evaluateStatus( - jobsInfo: List, - jobSweeperMetrics: JobSweeperMetrics - ): ScheduledJobStats.ScheduleStatus { - val allJobsRunningOnTime = jobsInfo.all { it.runningOnTime } - if (allJobsRunningOnTime && jobSweeperMetrics.fullSweepOnTime) { - return ScheduledJobStats.ScheduleStatus.GREEN - } - log.info("Jobs Running on time: $allJobsRunningOnTime, Sweeper on time: ${jobSweeperMetrics.fullSweepOnTime}") - return ScheduledJobStats.ScheduleStatus.RED - } - - class ScheduledJobStatusRequest : BaseNodeRequest { - - lateinit var request: ScheduledJobsStatsRequest - - constructor() : super() - - constructor(si: StreamInput) : super(si) { - request = ScheduledJobsStatsRequest(si) - } - - constructor(request: ScheduledJobsStatsRequest) : super() { - this.request = request - } - - @Throws(IOException::class) - override fun writeTo(out: StreamOutput) { - super.writeTo(out) - request.writeTo(out) - } - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/resthandler/RestScheduledJobStatsHandler.kt b/core/bin/main/org/opensearch/alerting/core/resthandler/RestScheduledJobStatsHandler.kt deleted file mode 100644 index c4f800ab3..000000000 --- a/core/bin/main/org/opensearch/alerting/core/resthandler/RestScheduledJobStatsHandler.kt +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core.resthandler - -import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction -import org.opensearch.alerting.core.action.node.ScheduledJobsStatsRequest -import org.opensearch.client.node.NodeClient -import org.opensearch.core.common.Strings -import org.opensearch.rest.BaseRestHandler -import org.opensearch.rest.RestHandler -import org.opensearch.rest.RestHandler.Route -import org.opensearch.rest.RestRequest -import org.opensearch.rest.RestRequest.Method.GET -import org.opensearch.rest.action.RestActions -import java.util.Locale -import java.util.TreeSet - -/** - * RestScheduledJobStatsHandler is handler for getting ScheduledJob Stats. - */ -class RestScheduledJobStatsHandler(private val path: String) : BaseRestHandler() { - - companion object { - const val JOB_SCHEDULING_METRICS: String = "job_scheduling_metrics" - const val JOBS_INFO: String = "jobs_info" - private val METRICS = mapOf Unit>( - JOB_SCHEDULING_METRICS to { it -> it.jobSchedulingMetrics = true }, - JOBS_INFO to { it -> it.jobsInfo = true } - ) - } - - override fun getName(): String { - return "${path}_jobs_stats" - } - - override fun routes(): List { - return listOf() - } - - override fun replacedRoutes(): MutableList { - return mutableListOf( - RestHandler.ReplacedRoute( - GET, - "/_plugins/$path/{nodeId}/stats/", - GET, - "/_opendistro/$path/{nodeId}/stats/" - ), - RestHandler.ReplacedRoute( - GET, - "/_plugins/$path/{nodeId}/stats/{metric}", - GET, - "/_opendistro/$path/{nodeId}/stats/{metric}" - ), - RestHandler.ReplacedRoute( - GET, - "/_plugins/$path/stats/", - GET, - "/_opendistro/$path/stats/" - ), - RestHandler.ReplacedRoute( - GET, - "/_plugins/$path/stats/{metric}", - GET, - "/_opendistro/$path/stats/{metric}" - ) - ) - } - - override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val scheduledJobNodesStatsRequest = getRequest(request) - return RestChannelConsumer { channel -> - client.execute( - ScheduledJobsStatsAction.INSTANCE, - scheduledJobNodesStatsRequest, - RestActions.NodesResponseRestListener(channel) - ) - } - } - - private fun getRequest(request: RestRequest): ScheduledJobsStatsRequest { - val nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")) - val metrics = Strings.tokenizeByCommaToSet(request.param("metric")) - val scheduledJobsStatsRequest = ScheduledJobsStatsRequest(nodesIds) - scheduledJobsStatsRequest.timeout(request.param("timeout")) - - if (metrics.isEmpty()) { - return scheduledJobsStatsRequest - } else if (metrics.size == 1 && metrics.contains("_all")) { - scheduledJobsStatsRequest.all() - } else if (metrics.contains("_all")) { - throw IllegalArgumentException( - String.format( - Locale.ROOT, - "request [%s] contains _all and individual metrics [%s]", - request.path(), - request.param("metric") - ) - ) - } else { - // use a sorted set so the unrecognized parameters appear in a reliable sorted order - scheduledJobsStatsRequest.clear() - val invalidMetrics = TreeSet() - for (metric in metrics) { - val handler = METRICS[metric] - if (handler != null) { - handler.invoke(scheduledJobsStatsRequest) - } else { - invalidMetrics.add(metric) - } - } - - if (!invalidMetrics.isEmpty()) { - throw IllegalArgumentException(unrecognized(request, invalidMetrics, METRICS.keys, "metric")) - } - } - return scheduledJobsStatsRequest - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/schedule/JobScheduler.kt b/core/bin/main/org/opensearch/alerting/core/schedule/JobScheduler.kt deleted file mode 100644 index a4a729121..000000000 --- a/core/bin/main/org/opensearch/alerting/core/schedule/JobScheduler.kt +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core.schedule - -import org.apache.logging.log4j.LogManager -import org.opensearch.alerting.core.JobRunner -import org.opensearch.common.unit.TimeValue -import org.opensearch.commons.alerting.model.ScheduledJob -import org.opensearch.threadpool.Scheduler -import org.opensearch.threadpool.ThreadPool -import java.time.Duration -import java.time.Instant -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.TimeUnit -import java.util.stream.Collectors - -/** - * JobScheduler is a class for scheduling and descheduling ScheduleJobs. This class keeps list of ScheduledJob Ids that are currently scheduled. - * - * JobScheduler is unaware of the ScheduledJob version and it is up to callers to ensure that the older version of ScheduledJob to be descheduled and schedule the new version. - */ -class JobScheduler(private val threadPool: ThreadPool, private val jobRunner: JobRunner) { - private val logger = LogManager.getLogger(JobScheduler::class.java) - - /** - * Map of ScheduledJobName to Info of the ScheduledJob. - */ - private val scheduledJobIdToInfo = ConcurrentHashMap() - - /** - * Schedules the jobs in [jobsToSchedule] for execution. - * - * @return List of jobs that could not be scheduled - */ - fun schedule(vararg jobsToSchedule: ScheduledJob): List { - return jobsToSchedule.filter { - !this.schedule(it) - } - } - - /** - * Schedules a single [scheduledJob] - * - * The [schedule] does not check for new version of the ScheduledJob. - * The caller should be aware of the update that happened in [ScheduledJob] and must first call [deschedule] if the Job version is updated and then followed by [schedule] - * - * [schedule] is considered successfully scheduled when - * 1. Cron expression is out of Scheduled. eg. past year 2016. - * 2. If the schedule already exists. This is to keep the function idempotent. - * 3. we are able to schedule the job in the [ThreadPool.schedule] - * - * [schedule] is considered unsuccessfully schedule when - * 1. Schedule is disabled. - * 2. In rare race condition where scheduledJob is already marked [ScheduledJobInfo.descheduled] true at the time of making [ThreadPool.schedule] - * 3. any unexpected failures. - * - * @return true if the ScheduledJob is scheduled successfully; - * false otherwise. - */ - fun schedule(scheduledJob: ScheduledJob): Boolean { - logger.info("Scheduling jobId : ${scheduledJob.id}, name: ${scheduledJob.name}") - - if (!scheduledJob.enabled) { - // ensure that the ScheduledJob is not enabled. The caller should be also checking this before calling this function. - return false - } - - val scheduledJobInfo = scheduledJobIdToInfo.getOrPut(scheduledJob.id) { - ScheduledJobInfo(scheduledJob.id, scheduledJob) - } - if (scheduledJobInfo.scheduledCancellable != null) { - // This means that the given ScheduledJob already has schedule running. We should not schedule any more. - return true - } - - // Start the first schedule. - return this.reschedule(scheduledJob, scheduledJobInfo) - } - - /** - * Deschedules the jobs given ScheduledJob [ids]. - * - * caller should retry [deschedule] that failed. - * - * @return List of job ids failed to deschedule. - */ - fun deschedule(ids: Collection): List { - return ids.filter { - !this.deschedule(it) - }.also { - if (it.isNotEmpty()) { - logger.error("Unable to deschedule jobs $it") - } - } - } - - /** - * Mark the scheduledJob as descheduled and try to cancel any future schedule for given scheduledJob id. - * - * [deschedule] is considered successful when - * 1. ScheduledJob id does not exist. - * 2. ScheduledJob is complete. - * 3. ScheduledJob is not complete and is successfully cancelled. - * - * Caller should retry if ScheduledJob [deschedule] fails. - * - * @return true if job is successfully descheduled; - * false otherwise. - */ - fun deschedule(id: String): Boolean { - val scheduledJobInfo = scheduledJobIdToInfo[id] - if (scheduledJobInfo == null) { - logger.info("JobId $id does not exist.") - return true - } else { - logger.info("Descheduling jobId : $id") - scheduledJobInfo.descheduled = true - scheduledJobInfo.actualPreviousExecutionTime = null - scheduledJobInfo.expectedNextExecutionTime = null - var result = true - val scheduledFuture = scheduledJobInfo.scheduledCancellable - - if (scheduledFuture != null && !scheduledFuture.isCancelled) { - result = scheduledFuture.cancel() - } - - if (result) { - // If we have successfully descheduled the job, remove from the info map. - scheduledJobIdToInfo.remove(scheduledJobInfo.scheduledJobId, scheduledJobInfo) - } - return result - } - } - - /** - * @return list of jobIds that are scheduled. - */ - fun scheduledJobs(): Set { - return scheduledJobIdToInfo.keys - } - - private fun reschedule(scheduleJob: ScheduledJob, scheduledJobInfo: ScheduledJobInfo): Boolean { - if (scheduleJob.enabledTime == null) { - logger.info("${scheduleJob.name} there is no enabled time. This job should never have been scheduled.") - return false - } - scheduledJobInfo.expectedNextExecutionTime = scheduleJob.schedule.getExpectedNextExecutionTime( - scheduleJob.enabledTime!!, scheduledJobInfo.expectedNextExecutionTime - ) - - // Validate if there is next execution that needs to happen. - // e.g cron job that is expected to run in 30th of Feb (which doesn't exist). "0/5 * 30 2 *" - if (scheduledJobInfo.expectedNextExecutionTime == null) { - logger.info("${scheduleJob.name} there is no next execution time.") - return true - } - - val duration = Duration.between(Instant.now(), scheduledJobInfo.expectedNextExecutionTime) - - // Create anonymous runnable. - val runnable = Runnable { - // Check again if the scheduled job is marked descheduled. - if (scheduledJobInfo.descheduled) { - return@Runnable // skip running job if job is marked descheduled. - } - - // Order of operations inside here matter, we specifically call getPeriodEndingAt before reschedule because - // reschedule will update expectedNextExecutionTime to the next one which would throw off the startTime/endTime - val (startTime, endTime) = scheduleJob.schedule.getPeriodEndingAt(scheduledJobInfo.expectedNextExecutionTime) - scheduledJobInfo.actualPreviousExecutionTime = Instant.now() - - this.reschedule(scheduleJob, scheduledJobInfo) - - jobRunner.runJob(scheduleJob, startTime, endTime) - } - - // Check descheduled flag as close as possible before we actually schedule a job. - // This way we will can minimize race conditions. - if (scheduledJobInfo.descheduled) { - // Do not reschedule if schedule has been marked descheduled. - return false - } - - // Finally schedule the job in the ThreadPool with next time to execute. - val scheduledCancellable = threadPool.schedule(runnable, TimeValue(duration.toNanos(), TimeUnit.NANOSECONDS), ThreadPool.Names.SAME) - scheduledJobInfo.scheduledCancellable = scheduledCancellable - - return true - } - - fun getJobSchedulerMetric(): List { - return scheduledJobIdToInfo.entries.stream() - .map { entry -> - JobSchedulerMetrics( - entry.value.scheduledJobId, - entry.value.actualPreviousExecutionTime?.toEpochMilli(), - entry.value.scheduledJob.schedule.runningOnTime(entry.value.actualPreviousExecutionTime) - ) - } - .collect(Collectors.toList()) - } - - fun postIndex(job: ScheduledJob) { - jobRunner.postIndex(job) - } - - fun postDelete(jobId: String) { - jobRunner.postDelete(jobId) - } - - /** - * ScheduledJobInfo which we can use to check if the job should be descheduled. - * Some Idea for more use of this class is - * 1. Total number of runs. - * 2. Tracking of number of failed runs (helps to control error handling.) - */ - private data class ScheduledJobInfo( - val scheduledJobId: String, - val scheduledJob: ScheduledJob, - var descheduled: Boolean = false, - var actualPreviousExecutionTime: Instant? = null, - var expectedNextExecutionTime: Instant? = null, - var scheduledCancellable: Scheduler.ScheduledCancellable? = null - ) -} diff --git a/core/bin/main/org/opensearch/alerting/core/schedule/JobSchedulerMetrics.kt b/core/bin/main/org/opensearch/alerting/core/schedule/JobSchedulerMetrics.kt deleted file mode 100644 index dff1ecd52..000000000 --- a/core/bin/main/org/opensearch/alerting/core/schedule/JobSchedulerMetrics.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core.schedule - -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.core.common.io.stream.Writeable -import org.opensearch.core.xcontent.ToXContent -import org.opensearch.core.xcontent.ToXContentFragment -import org.opensearch.core.xcontent.XContentBuilder -import java.time.Instant - -class JobSchedulerMetrics : ToXContentFragment, Writeable { - val scheduledJobId: String - val lastExecutionTime: Long? - val runningOnTime: Boolean - - constructor(scheduledJobId: String, lastExecutionTime: Long?, runningOnTime: Boolean) { - this.scheduledJobId = scheduledJobId - this.lastExecutionTime = lastExecutionTime - this.runningOnTime = runningOnTime - } - - constructor(si: StreamInput) { - scheduledJobId = si.readString() - lastExecutionTime = si.readOptionalLong() - runningOnTime = si.readBoolean() - } - - override fun writeTo(out: StreamOutput) { - out.writeString(scheduledJobId) - out.writeOptionalLong(lastExecutionTime) - out.writeBoolean(runningOnTime) - } - - override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - if (lastExecutionTime != null) - builder.timeField( - "last_execution_time", "last_execution_time_in_millis", - Instant.ofEpochMilli(lastExecutionTime).toEpochMilli() - ) - builder.field("running_on_time", runningOnTime) - return builder - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/settings/LegacyOpenDistroScheduledJobSettings.kt b/core/bin/main/org/opensearch/alerting/core/settings/LegacyOpenDistroScheduledJobSettings.kt deleted file mode 100644 index 3a37ff97f..000000000 --- a/core/bin/main/org/opensearch/alerting/core/settings/LegacyOpenDistroScheduledJobSettings.kt +++ /dev/null @@ -1,49 +0,0 @@ -package org.opensearch.alerting.core.settings - -import org.opensearch.common.settings.Setting -import org.opensearch.common.unit.TimeValue - -/** - * Legacy Opendistro settings used for [ScheduledJob]'s. These include back off settings, retry counts, timeouts etc... - */ - -class LegacyOpenDistroScheduledJobSettings { - - companion object { - val SWEEPER_ENABLED = Setting.boolSetting( - "opendistro.scheduled_jobs.enabled", - true, - Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated - ) - - val REQUEST_TIMEOUT = Setting.positiveTimeSetting( - "opendistro.scheduled_jobs.request_timeout", - TimeValue.timeValueSeconds(10), - Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated - ) - - val SWEEP_BACKOFF_MILLIS = Setting.positiveTimeSetting( - "opendistro.scheduled_jobs.sweeper.backoff_millis", - TimeValue.timeValueMillis(50), - Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated - ) - - val SWEEP_BACKOFF_RETRY_COUNT = Setting.intSetting( - "opendistro.scheduled_jobs.retry_count", - 3, - Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated - ) - - val SWEEP_PERIOD = Setting.positiveTimeSetting( - "opendistro.scheduled_jobs.sweeper.period", - TimeValue.timeValueMinutes(5), - Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated - ) - - val SWEEP_PAGE_SIZE = Setting.intSetting( - "opendistro.scheduled_jobs.sweeper.page_size", - 100, - Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated - ) - } -} diff --git a/core/bin/main/org/opensearch/alerting/core/settings/ScheduledJobSettings.kt b/core/bin/main/org/opensearch/alerting/core/settings/ScheduledJobSettings.kt deleted file mode 100644 index 6bdb18bec..000000000 --- a/core/bin/main/org/opensearch/alerting/core/settings/ScheduledJobSettings.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core.settings - -import org.opensearch.common.settings.Setting - -/** - * settings used for [ScheduledJob]'s. These include back off settings, retry counts, timeouts etc... - */ -class ScheduledJobSettings { - - companion object { - val SWEEPER_ENABLED = Setting.boolSetting( - "plugins.scheduled_jobs.enabled", - LegacyOpenDistroScheduledJobSettings.SWEEPER_ENABLED, - Setting.Property.NodeScope, Setting.Property.Dynamic - ) - val REQUEST_TIMEOUT = Setting.positiveTimeSetting( - "plugins.scheduled_jobs.request_timeout", - LegacyOpenDistroScheduledJobSettings.REQUEST_TIMEOUT, - Setting.Property.NodeScope, Setting.Property.Dynamic - ) - - val SWEEP_BACKOFF_MILLIS = Setting.positiveTimeSetting( - "plugins.scheduled_jobs.sweeper.backoff_millis", - LegacyOpenDistroScheduledJobSettings.SWEEP_BACKOFF_MILLIS, - Setting.Property.NodeScope, Setting.Property.Dynamic - ) - - val SWEEP_BACKOFF_RETRY_COUNT = Setting.intSetting( - "plugins.scheduled_jobs.retry_count", - LegacyOpenDistroScheduledJobSettings.SWEEP_BACKOFF_RETRY_COUNT, - Setting.Property.NodeScope, Setting.Property.Dynamic - ) - - val SWEEP_PERIOD = Setting.positiveTimeSetting( - "plugins.scheduled_jobs.sweeper.period", - LegacyOpenDistroScheduledJobSettings.SWEEP_PERIOD, - Setting.Property.NodeScope, Setting.Property.Dynamic - ) - - val SWEEP_PAGE_SIZE = Setting.intSetting( - "plugins.scheduled_jobs.sweeper.page_size", - LegacyOpenDistroScheduledJobSettings.SWEEP_PAGE_SIZE, - Setting.Property.NodeScope, Setting.Property.Dynamic - ) - } -} diff --git a/core/bin/main/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt b/core/bin/main/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt deleted file mode 100644 index 3e87f207f..000000000 --- a/core/bin/main/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.opensearchapi - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ThreadContextElement -import kotlinx.coroutines.delay -import kotlinx.coroutines.withContext -import org.apache.logging.log4j.Logger -import org.opensearch.OpenSearchException -import org.opensearch.action.bulk.BackoffPolicy -import org.opensearch.action.search.SearchResponse -import org.opensearch.action.search.ShardSearchFailure -import org.opensearch.client.OpenSearchClient -import org.opensearch.common.settings.Settings -import org.opensearch.common.util.concurrent.ThreadContext -import org.opensearch.common.xcontent.XContentHelper -import org.opensearch.common.xcontent.XContentType -import org.opensearch.commons.InjectSecurity -import org.opensearch.commons.authuser.User -import org.opensearch.commons.notifications.NotificationsPluginInterface -import org.opensearch.core.action.ActionListener -import org.opensearch.core.rest.RestStatus -import org.opensearch.core.rest.RestStatus.BAD_GATEWAY -import org.opensearch.core.rest.RestStatus.GATEWAY_TIMEOUT -import org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE -import org.opensearch.core.xcontent.ToXContent -import org.opensearch.index.query.BoolQueryBuilder -import org.opensearch.index.query.QueryBuilders -import org.opensearch.search.builder.SearchSourceBuilder -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlin.coroutines.suspendCoroutine - -/** Convert an object to maps and lists representation */ -fun ToXContent.convertToMap(): Map { - val bytesReference = XContentHelper.toXContent(this, XContentType.JSON, false) - return XContentHelper.convertToMap(bytesReference, false, XContentType.JSON).v2() -} - -/** - * Backs off and retries a lambda that makes a request. This should not be called on any of the [standard][ThreadPool] - * executors since those executors are not meant to be blocked by sleeping. - */ -fun BackoffPolicy.retry(block: () -> T): T { - val iter = iterator() - do { - try { - return block() - } catch (e: OpenSearchException) { - if (iter.hasNext() && e.isRetriable()) { - Thread.sleep(iter.next().millis) - } else { - throw e - } - } - } while (true) -} - -/** - * Backs off and retries a lambda that makes a request. This retries on any Exception unless it detects the - * Notification plugin is not installed. - * - * @param logger - logger used to log intermediate failures - * @param block - the block of code to retry. This should be a suspend function. - */ -suspend fun BackoffPolicy.retryForNotification( - logger: Logger, - block: suspend () -> T -): T { - val iter = iterator() - do { - try { - return block() - } catch (e: java.lang.Exception) { - val isMissingNotificationPlugin = e.message?.contains("failed to find action") ?: false - if (isMissingNotificationPlugin) { - throw OpenSearchException("Notification plugin is not installed. Please install the Notification plugin.", e) - } else if (iter.hasNext()) { - val backoff = iter.next() - logger.warn("Notification operation failed. Retrying in $backoff.", e) - delay(backoff.millis) - } else { - throw e - } - } - } while (true) -} - -/** - * Retries the given [block] of code as specified by the receiver [BackoffPolicy], if [block] throws an [OpenSearchException] - * that is retriable (502, 503, 504). - * - * If all retries fail the final exception will be rethrown. Exceptions caught during intermediate retries are - * logged as warnings to [logger]. Similar to [org.opensearch.action.bulk.Retry], except this retries on - * 502, 503, 504 error codes as well as 429. - * - * @param logger - logger used to log intermediate failures - * @param retryOn - any additional [RestStatus] values that should be retried - * @param block - the block of code to retry. This should be a suspend function. - */ -suspend fun BackoffPolicy.retry( - logger: Logger, - retryOn: List = emptyList(), - block: suspend () -> T -): T { - val iter = iterator() - do { - try { - return block() - } catch (e: OpenSearchException) { - if (iter.hasNext() && (e.isRetriable() || retryOn.contains(e.status()))) { - val backoff = iter.next() - logger.warn("Operation failed. Retrying in $backoff.", e) - delay(backoff.millis) - } else { - throw e - } - } - } while (true) -} - -/** - * Retries on 502, 503 and 504 per elastic client's behavior: https://github.com/elastic/elasticsearch-net/issues/2061 - * 429 must be retried manually as it's not clear if it's ok to retry for requests other than Bulk requests. - */ -fun OpenSearchException.isRetriable(): Boolean { - return (status() in listOf(BAD_GATEWAY, SERVICE_UNAVAILABLE, GATEWAY_TIMEOUT)) -} - -fun SearchResponse.firstFailureOrNull(): ShardSearchFailure? { - return shardFailures?.getOrNull(0) -} - -fun addFilter(user: User, searchSourceBuilder: SearchSourceBuilder, fieldName: String) { - val filterBackendRoles = QueryBuilders.termsQuery(fieldName, user.backendRoles) - val queryBuilder = searchSourceBuilder.query() as BoolQueryBuilder - searchSourceBuilder.query(queryBuilder.filter(filterBackendRoles)) -} - -/** - * Converts [OpenSearchClient] methods that take a callback into a kotlin suspending function. - * - * @param block - a block of code that is passed an [ActionListener] that should be passed to the OpenSearch client API. - */ -suspend fun C.suspendUntil(block: C.(ActionListener) -> Unit): T = - suspendCoroutine { cont -> - block(object : ActionListener { - override fun onResponse(response: T) = cont.resume(response) - - override fun onFailure(e: Exception) = cont.resumeWithException(e) - }) - } - -/** - * Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function. - * - * @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API. - */ -suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener) -> Unit): T = - suspendCoroutine { cont -> - block(object : ActionListener { - override fun onResponse(response: T) = cont.resume(response) - - override fun onFailure(e: Exception) = cont.resumeWithException(e) - }) - } - -class InjectorContextElement( - id: String, - settings: Settings, - threadContext: ThreadContext, - private val roles: List?, - private val user: User? = null -) : ThreadContextElement { - - companion object Key : CoroutineContext.Key - override val key: CoroutineContext.Key<*> - get() = Key - - var rolesInjectorHelper = InjectSecurity(id, settings, threadContext) - - override fun updateThreadContext(context: CoroutineContext) { - rolesInjectorHelper.injectRoles(roles) - // This is from where plugins extract backend roles. It should be passed when calling APIs of other plugins - rolesInjectorHelper.injectUserInfo(user) - } - - override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) { - rolesInjectorHelper.close() - } -} - -suspend fun withClosableContext( - context: InjectorContextElement, - block: suspend CoroutineScope.() -> T -): T { - try { - return withContext(context) { block() } - } finally { - context.rolesInjectorHelper.close() - } -} diff --git a/core/bin/main/settings/doc-level-queries.json b/core/bin/main/settings/doc-level-queries.json deleted file mode 100644 index c5cbfa445..000000000 --- a/core/bin/main/settings/doc-level-queries.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "index": { - "mapping": { - "total_fields": { - "limit": 10000 - } - }, - "hidden": true - } -} \ No newline at end of file diff --git a/core/bin/test/org/opensearch/alerting/core/WriteableTests.kt b/core/bin/test/org/opensearch/alerting/core/WriteableTests.kt deleted file mode 100644 index f48ffa370..000000000 --- a/core/bin/test/org/opensearch/alerting/core/WriteableTests.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core - -import org.joda.time.DateTime -import org.junit.Test -import org.opensearch.alerting.core.schedule.JobSchedulerMetrics -import org.opensearch.common.io.stream.BytesStreamOutput -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.test.OpenSearchTestCase.assertEquals - -class WriteableTests { - - @Test - fun `test jobschedule metrics as stream`() { - val metrics = JobSchedulerMetrics("test", DateTime.now().millis, false) - val out = BytesStreamOutput() - metrics.writeTo(out) - val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) - val newMetrics = JobSchedulerMetrics(sin) - assertEquals("Round tripping metrics doesn't work", metrics.scheduledJobId, newMetrics.scheduledJobId) - } -} diff --git a/core/bin/test/org/opensearch/alerting/core/model/MockScheduledJob.kt b/core/bin/test/org/opensearch/alerting/core/model/MockScheduledJob.kt deleted file mode 100644 index 08e3fb8c4..000000000 --- a/core/bin/test/org/opensearch/alerting/core/model/MockScheduledJob.kt +++ /dev/null @@ -1,33 +0,0 @@ -package org.opensearch.alerting.core.model - -import org.opensearch.commons.alerting.model.Schedule -import org.opensearch.commons.alerting.model.ScheduledJob -import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.core.xcontent.ToXContent -import org.opensearch.core.xcontent.XContentBuilder -import java.io.IOException -import java.time.Instant - -class MockScheduledJob( - override val id: String, - override val version: Long, - override val name: String, - override val type: String, - override val enabled: Boolean, - override val schedule: Schedule, - override var lastUpdateTime: Instant, - override val enabledTime: Instant? -) : ScheduledJob { - override fun fromDocument(id: String, version: Long): ScheduledJob { - TODO("not implemented") - } - - override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { - TODO("not implemented") - } - - @Throws(IOException::class) - override fun writeTo(out: StreamOutput) { - TODO("not implemented") - } -} diff --git a/core/bin/test/org/opensearch/alerting/core/schedule/JobSchedulerTest.kt b/core/bin/test/org/opensearch/alerting/core/schedule/JobSchedulerTest.kt deleted file mode 100644 index a0453e935..000000000 --- a/core/bin/test/org/opensearch/alerting/core/schedule/JobSchedulerTest.kt +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core.schedule - -import org.junit.Before -import org.opensearch.alerting.core.model.MockScheduledJob -import org.opensearch.common.settings.Settings -import org.opensearch.commons.alerting.model.CronSchedule -import org.opensearch.commons.alerting.model.IntervalSchedule -import org.opensearch.threadpool.ThreadPool -import java.time.Instant -import java.time.ZoneId -import java.time.temporal.ChronoUnit -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFalse -import kotlin.test.assertTrue - -class JobSchedulerTest { - - private var testSettings: Settings = Settings.builder().put("node.name", "node-0").build() - private val testThreadPool = ThreadPool(testSettings) - private var jobRunner: MockJobRunner = MockJobRunner() - private var jobScheduler: JobScheduler = JobScheduler(ThreadPool(testSettings), jobRunner) - - @Before - fun `setup`() { - jobRunner = MockJobRunner() - jobScheduler = JobScheduler(ThreadPool(testSettings), jobRunner) - } - - @Test - fun `schedule and deschedule`() { - val mockScheduledJob = MockScheduledJob( - "mockScheduledJob-id", - 1L, - "mockScheduledJob-name", - "MockScheduledJob", - true, - IntervalSchedule(1, ChronoUnit.MINUTES), - Instant.now(), - Instant.now() - ) - - assertTrue(jobScheduler.schedule(mockScheduledJob)) - - assertEquals(setOf("mockScheduledJob-id"), jobScheduler.scheduledJobs(), "List of ScheduledJobs are not the same.") - assertEquals(0, jobRunner.numberOfRun, "Number of JobRunner ran is wrong.") - assertTrue(jobScheduler.deschedule("mockScheduledJob-id"), "Descheduling should be true.") - } - - @Test - fun `schedule cron past year`() { - // This is to run cron in Feb 30 which we should never run. - val cronExpression = "0/5 * 30 2 *" - val jobRunner = MockJobRunner() - val jobScheduler = JobScheduler(testThreadPool, jobRunner) - val mockScheduledJob = MockScheduledJob( - "mockScheduledJob-id", - 1L, - "mockScheduledJob-name", - "MockScheduledJob", - true, - CronSchedule(cronExpression, ZoneId.of("UTC")), - Instant.now(), - Instant.now() - ) - - assertTrue(jobScheduler.schedule(mockScheduledJob)) - assertEquals(setOf("mockScheduledJob-id"), jobScheduler.scheduledJobs(), "List of ScheduledJobs are not the same.") - - assertEquals(0, jobRunner.numberOfRun, "Number of JobRunner ran is wrong.") - - assertTrue(jobScheduler.deschedule("mockScheduledJob-id"), "Descheduling should be true.") - } - - @Test - fun `schedule disabled`() { - val cronExpression = "0/5 * * * *" - val jobRunner = MockJobRunner() - val jobScheduler = JobScheduler(testThreadPool, jobRunner) - val mockScheduledJob = MockScheduledJob( - "mockScheduledJob-id", - 1L, - "mockScheduledJob-name", - "MockScheduledJob", - false, - CronSchedule(cronExpression, ZoneId.of("UTC")), - Instant.now(), - Instant.now() - ) - - assertFalse(jobScheduler.schedule(mockScheduledJob), "We should return false if we try to schedule disabled schedule.") - assertEquals(setOf(), jobScheduler.scheduledJobs(), "List of ScheduledJobs are not the same.") - } - - @Test - fun `deschedule non existing schedule`() { - val cronExpression = "0/5 * * * *" - val jobRunner = MockJobRunner() - val jobScheduler = JobScheduler(testThreadPool, jobRunner) - val mockScheduledJob = MockScheduledJob( - "mockScheduledJob-id", - 1L, - "mockScheduledJob-name", - "MockScheduledJob", - true, - CronSchedule(cronExpression, ZoneId.of("UTC")), - Instant.now(), - Instant.now() - ) - - assertTrue(jobScheduler.schedule(mockScheduledJob)) - assertEquals(setOf("mockScheduledJob-id"), jobScheduler.scheduledJobs(), "List of ScheduledJobs are not the same.") - - assertEquals(0, jobRunner.numberOfRun, "Number of JobRunner ran is wrong.") - - assertTrue(jobScheduler.deschedule("mockScheduledJob-invalid"), "Descheduling should be true.") - assertTrue(jobScheduler.deschedule("mockScheduledJob-id"), "Descheduling should be true.") - } - - @Test - fun `schedule multiple jobs`() { - val cronExpression = "0/5 * * * *" - val mockScheduledJob1 = MockScheduledJob( - "mockScheduledJob-1", - 1L, - "mockScheduledJob-name", - "MockScheduledJob", - true, - CronSchedule(cronExpression, ZoneId.of("UTC")), - Instant.now(), - Instant.now() - ) - val mockScheduledJob2 = MockScheduledJob( - "mockScheduledJob-2", - 1L, - "mockScheduledJob-name", - "MockScheduledJob", - true, - CronSchedule(cronExpression, ZoneId.of("UTC")), - Instant.now(), - Instant.now() - ) - - assertTrue(jobScheduler.schedule(mockScheduledJob1, mockScheduledJob2).isEmpty()) - } - - @Test - fun `schedule null enabled time job`() { - val cronExpression = "0/5 * * * *" - val mockScheduledJob2 = MockScheduledJob( - "mockScheduledJob-2", 1L, "mockScheduledJob-name", "MockScheduledJob", true, - CronSchedule(cronExpression, ZoneId.of("UTC")), Instant.now(), null - ) - - assertFalse(jobScheduler.schedule(mockScheduledJob2)) - } - - @Test - fun `schedule disabled job`() { - val cronExpression = "0/5 * * * *" - val mockScheduledJob1 = MockScheduledJob( - "mockScheduledJob-1", 1L, "mockScheduledJob-name", "MockScheduledJob", false, - CronSchedule(cronExpression, ZoneId.of("UTC")), Instant.now(), Instant.now() - ) - - assertFalse(jobScheduler.schedule(mockScheduledJob1)) - } - - @Test - fun `run Job`() { - val cronExpression = "0/5 * * * *" - val mockScheduledJob = MockScheduledJob( - "mockScheduledJob-id", - 1L, - "mockScheduledJob-name", - "MockScheduledJob", - true, - CronSchedule(cronExpression, ZoneId.of("UTC")), - Instant.now(), - Instant.now() - ) - - jobRunner.runJob(mockScheduledJob, Instant.now(), Instant.now()) - } -} diff --git a/core/bin/test/org/opensearch/alerting/core/schedule/MockJobRunner.kt b/core/bin/test/org/opensearch/alerting/core/schedule/MockJobRunner.kt deleted file mode 100644 index 15fe770b9..000000000 --- a/core/bin/test/org/opensearch/alerting/core/schedule/MockJobRunner.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.core.schedule - -import org.opensearch.alerting.core.JobRunner -import org.opensearch.commons.alerting.model.ScheduledJob -import java.time.Instant - -class MockJobRunner : JobRunner { - var numberOfRun: Int = 0 - private set - var numberOfIndex: Int = 0 - private set - var numberOfDelete: Int = 0 - private set - - override fun postDelete(jobId: String) { - numberOfDelete++ - } - - override fun postIndex(job: ScheduledJob) { - numberOfIndex++ - } - - override fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant) { - numberOfRun++ - } -}