Skip to content

Commit a787b16

Browse files
asteriscosAlexRuiz7
authored andcommitted
Apply POC email notification (#6)
* Apply POC email notification * Roll back changes to IDEs config files * Roll back changes to IDEs config files * Refactor --------- Co-authored-by: Alex Ruiz <[email protected]>
1 parent b2af4e2 commit a787b16

File tree

8 files changed

+247
-10
lines changed

8 files changed

+247
-10
lines changed

Diff for: .gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ $RECYCLE.BIN/
159159
.idea/modules.xml
160160
.idea/*.iml
161161
.idea/modules
162+
.idea/*.xml
162163
*.iml
163164
*.ipr
164165

Diff for: .idea/kotlinc.xml

+3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: .idea/misc.xml

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: .project

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<projectDescription>
3-
<name>opensearch-reports-scheduler</name>
3+
<name>wazuh-indexer-reports-scheduler</name>
44
<comment>Project reports-scheduler created by Buildship.</comment>
55
<projects>
66
</projects>

Diff for: build.gradle

+32-8
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ repositories {
172172
}
173173

174174
dependencies {
175+
// Needed for integ tests
176+
zipArchive group: 'org.opensearch.plugin', name:'opensearch-notifications-core', version: "${opensearch_build}"
177+
zipArchive group: 'org.opensearch.plugin', name:'notifications', version: "${opensearch_build}"
175178
zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}"
176179
implementation "org.opensearch:opensearch:${opensearch_version}"
177180
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
@@ -301,14 +304,35 @@ integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty(
301304
testClusters.integTest {
302305
testDistribution = "INTEG_TEST"
303306
// need to install job-scheduler first, need to assemble job-scheduler first
304-
plugin(provider(new Callable<RegularFile>(){
305-
@Override
306-
RegularFile call() throws Exception {
307-
return new RegularFile() {
308-
@Override
309-
File getAsFile() {
310-
return configurations.zipArchive.asFileTree.getSingleFile()
311-
}
307+
plugin(provider({
308+
new RegularFile() {
309+
@Override
310+
File getAsFile() {
311+
return configurations.zipArchive.asFileTree.matching {
312+
include '**/opensearch-job-scheduler*'
313+
}.singleFile
314+
}
315+
}
316+
}))
317+
318+
plugin(provider({
319+
new RegularFile() {
320+
@Override
321+
File getAsFile() {
322+
return configurations.zipArchive.asFileTree.matching {
323+
include '**/opensearch-notifications-core*'
324+
}.singleFile
325+
}
326+
}
327+
}))
328+
329+
plugin(provider({
330+
new RegularFile() {
331+
@Override
332+
File getAsFile() {
333+
return configurations.zipArchive.asFileTree.matching {
334+
include '**/notifications*'
335+
}.singleFile
312336
}
313337
}
314338
}))

Diff for: src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt

+1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, JobSch
104104
repositoriesServiceSupplier: Supplier<RepositoriesService>
105105
): Collection<Any> {
106106
PluginSettings.addSettingsUpdateConsumer(clusterService)
107+
ReportDefinitionJobRunner.initialize(client, clusterService)
107108
ReportDefinitionsIndex.initialize(client, clusterService)
108109
ReportInstancesIndex.initialize(client, clusterService)
109110
return emptyList()

Diff for: src/main/kotlin/org/opensearch/reportsscheduler/scheduler/ReportDefinitionJobRunner.kt

+87
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,72 @@ package org.opensearch.reportsscheduler.scheduler
88
import kotlinx.coroutines.CoroutineScope
99
import kotlinx.coroutines.Dispatchers
1010
import kotlinx.coroutines.launch
11+
import org.opensearch.action.search.SearchRequest
12+
import org.opensearch.action.search.SearchResponse
13+
import org.opensearch.client.Client
14+
import org.opensearch.client.node.NodeClient
15+
import org.opensearch.cluster.service.ClusterService
16+
import org.opensearch.commons.notifications.model.NotificationConfigInfo
17+
import org.opensearch.index.query.QueryBuilders
1118
import org.opensearch.jobscheduler.spi.JobExecutionContext
1219
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
1320
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
1421
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
1522
import org.opensearch.reportsscheduler.index.ReportInstancesIndex
1623
import org.opensearch.reportsscheduler.model.ReportDefinitionDetails
1724
import org.opensearch.reportsscheduler.model.ReportInstance
25+
import org.opensearch.reportsscheduler.util.NotificationApiUtils.getNotificationConfigInfo
26+
import org.opensearch.reportsscheduler.util.SecureIndexClient
27+
import org.opensearch.reportsscheduler.util.buildReportLink
1828
import org.opensearch.reportsscheduler.util.logger
29+
import org.opensearch.reportsscheduler.util.sendNotificationWithHTML
30+
import org.opensearch.search.builder.SearchSourceBuilder
1931
import java.time.Instant
2032

2133
internal object ReportDefinitionJobRunner : ScheduledJobRunner {
2234
private val log by logger(ReportDefinitionJobRunner::class.java)
2335
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
2436

37+
private lateinit var client: Client
38+
private lateinit var clusterService: ClusterService
39+
40+
/**
41+
* Initialize the class
42+
* @param client The ES client
43+
* @param clusterService The ES cluster service
44+
*/
45+
fun initialize(client: Client, clusterService: ClusterService) {
46+
this.client = SecureIndexClient(client)
47+
this.clusterService = clusterService
48+
}
49+
50+
private suspend fun createNotification(
51+
configInfo: NotificationConfigInfo,
52+
reportDefinitionDetails: ReportDefinitionDetails,
53+
id: String,
54+
hits: Long?
55+
) {
56+
val title: String = reportDefinitionDetails.reportDefinition.delivery!!.title
57+
val textMessage: String = reportDefinitionDetails.reportDefinition.delivery.textDescription
58+
val htmlMessage: String? = reportDefinitionDetails.reportDefinition.delivery.htmlDescription
59+
60+
val urlDefinition: String =
61+
buildReportLink(reportDefinitionDetails.reportDefinition.source.origin, reportDefinitionDetails.tenant, id)
62+
63+
val textWithURL: String =
64+
textMessage.replace("{{urlDefinition}}", urlDefinition).replace("{{hits}}", hits.toString())
65+
val htmlWithURL: String? =
66+
htmlMessage?.replace("{{urlDefinition}}", urlDefinition)?.replace("{{hits}}", hits.toString())
67+
68+
log.info("esto es el mensaje html $htmlMessage")
69+
configInfo.sendNotificationWithHTML(
70+
this.client,
71+
title,
72+
textWithURL,
73+
htmlWithURL
74+
)
75+
}
76+
2577
override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
2678
if (job !is ReportDefinitionDetails) {
2779
log.warn("$LOG_PREFIX:job is not of type ReportDefinitionDetails:${job.javaClass.name}")
@@ -48,6 +100,41 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
48100
log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance")
49101
} else {
50102
log.info("$LOG_PREFIX:runJob-created job:$id")
103+
104+
// Wazuh - Make queries
105+
val builderSearchResponse: SearchSourceBuilder = SearchSourceBuilder()
106+
.query(
107+
QueryBuilders.boolQuery()
108+
.must(
109+
QueryBuilders.rangeQuery("timestamp")
110+
.gt(beginTime)
111+
.lte(currentTime)
112+
)
113+
.must(
114+
QueryBuilders.matchQuery("agent.id", "001")
115+
)
116+
)
117+
val jobSearchRequest: SearchRequest =
118+
SearchRequest().indices("wazuh-alerts-*").source(builderSearchResponse)
119+
val response: SearchResponse = client.search(jobSearchRequest).actionGet()
120+
121+
val reportDefinitionId = reportDefinitionDetails.reportDefinition.delivery!!.configIds[0]
122+
val configInfo: NotificationConfigInfo? = getNotificationConfigInfo(
123+
client as NodeClient,
124+
reportDefinitionId
125+
)
126+
127+
if (configInfo != null) {
128+
createNotification(
129+
configInfo,
130+
reportDefinitionDetails,
131+
id,
132+
response.hits.totalHits?.value
133+
)
134+
log.info("Notification with id $id was sent.")
135+
} else {
136+
log.error("NotificationConfigInfo with id $reportDefinitionId was not found.")
137+
}
51138
}
52139
}
53140
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.reportsscheduler.util
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.OpenSearchSecurityException
10+
import org.opensearch.OpenSearchStatusException
11+
import org.opensearch.client.Client
12+
import org.opensearch.client.node.NodeClient
13+
import org.opensearch.commons.notifications.NotificationsPluginInterface
14+
import org.opensearch.commons.notifications.action.GetNotificationConfigRequest
15+
import org.opensearch.commons.notifications.action.GetNotificationConfigResponse
16+
import org.opensearch.commons.notifications.action.SendNotificationResponse
17+
import org.opensearch.commons.notifications.model.ChannelMessage
18+
import org.opensearch.commons.notifications.model.EventSource
19+
import org.opensearch.commons.notifications.model.NotificationConfigInfo
20+
import org.opensearch.commons.notifications.model.SeverityType
21+
import org.opensearch.core.action.ActionListener
22+
import org.opensearch.core.rest.RestStatus
23+
import kotlin.coroutines.resume
24+
import kotlin.coroutines.resumeWithException
25+
import kotlin.coroutines.suspendCoroutine
26+
27+
object NotificationApiUtils {
28+
29+
private val logger = LogManager.getLogger(NotificationApiUtils::class)
30+
31+
/**
32+
* Gets a NotificationConfigInfo object by ID if it exists.
33+
*/
34+
suspend fun getNotificationConfigInfo(client: NodeClient, id: String): NotificationConfigInfo? {
35+
return try {
36+
val res: GetNotificationConfigResponse =
37+
getNotificationConfig(client, GetNotificationConfigRequest(setOf(id)))
38+
res.searchResult.objectList.firstOrNull()
39+
} catch (e: OpenSearchSecurityException) {
40+
throw e
41+
} catch (e: OpenSearchStatusException) {
42+
if (e.status() == RestStatus.NOT_FOUND) {
43+
logger.debug("Notification config [$id] was not found")
44+
}
45+
null
46+
}
47+
}
48+
49+
private suspend fun getNotificationConfig(
50+
client: NodeClient,
51+
getNotificationConfigRequest: GetNotificationConfigRequest
52+
): GetNotificationConfigResponse {
53+
val getNotificationConfigResponse: GetNotificationConfigResponse =
54+
NotificationsPluginInterface.suspendUntil {
55+
this.getNotificationConfig(
56+
client,
57+
getNotificationConfigRequest,
58+
it
59+
)
60+
}
61+
return getNotificationConfigResponse
62+
}
63+
}
64+
65+
/**
66+
* Extension function for publishing a notification to a channel in the Notification plugin.
67+
*/
68+
suspend fun NotificationConfigInfo.sendNotificationWithHTML(
69+
client: Client,
70+
title: String,
71+
compiledMessage: String,
72+
compiledMessageHTML: String?
73+
): String {
74+
val config = this
75+
val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
76+
this.sendNotification(
77+
(client as NodeClient),
78+
EventSource(title, config.configId, SeverityType.INFO),
79+
ChannelMessage(compiledMessage, compiledMessageHTML, null),
80+
listOf(config.configId),
81+
it
82+
)
83+
}
84+
validateResponseStatus(res.getStatus(), res.notificationEvent.toString())
85+
return res.notificationEvent.toString()
86+
}
87+
88+
/**
89+
* Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function.
90+
*
91+
* @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API.
92+
*/
93+
suspend fun <T> NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener<T>) -> Unit): T =
94+
suspendCoroutine { cont ->
95+
block(object : ActionListener<T> {
96+
override fun onResponse(response: T) = cont.resume(response)
97+
98+
override fun onFailure(e: Exception) = cont.resumeWithException(e)
99+
})
100+
}
101+
102+
/**
103+
* All valid response statuses.
104+
*/
105+
private val VALID_RESPONSE_STATUS = setOf(
106+
RestStatus.OK.status,
107+
RestStatus.CREATED.status,
108+
RestStatus.ACCEPTED.status,
109+
RestStatus.NON_AUTHORITATIVE_INFORMATION.status,
110+
RestStatus.NO_CONTENT.status,
111+
RestStatus.RESET_CONTENT.status,
112+
RestStatus.PARTIAL_CONTENT.status,
113+
RestStatus.MULTI_STATUS.status
114+
)
115+
116+
@Throws(OpenSearchStatusException::class)
117+
fun validateResponseStatus(restStatus: RestStatus, responseContent: String) {
118+
if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) {
119+
throw OpenSearchStatusException("Failed: $responseContent", restStatus)
120+
}
121+
}

0 commit comments

Comments
 (0)