Skip to content

Commit bbc6bc4

Browse files
committed
[RORDEV-1263] Data stream support in ROR audit (#1076)
1 parent 3123da0 commit bbc6bc4

File tree

374 files changed

+18080
-2389
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

374 files changed

+18080
-2389
lines changed

core/src/main/scala/tech/beshu/ror/accesscontrol/audit/AuditingTool.scala

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,27 @@ package tech.beshu.ror.accesscontrol.audit
1818

1919
import cats.Show
2020
import cats.data.NonEmptyList
21+
import cats.implicits.*
2122
import monix.eval.Task
2223
import org.apache.logging.log4j.scala.Logging
2324
import org.json.JSONObject
2425
import tech.beshu.ror.accesscontrol.audit.AuditingTool.Settings.AuditSink
2526
import tech.beshu.ror.accesscontrol.audit.AuditingTool.Settings.AuditSink.{Disabled, Enabled}
27+
import tech.beshu.ror.accesscontrol.audit.sink.*
2628
import tech.beshu.ror.accesscontrol.blocks.Block.{History, Verbosity}
2729
import tech.beshu.ror.accesscontrol.blocks.metadata.UserMetadata
2830
import tech.beshu.ror.accesscontrol.blocks.{Block, BlockContext}
29-
import tech.beshu.ror.accesscontrol.domain.{AuditCluster, RorAuditIndexTemplate, RorAuditLoggerName}
31+
import tech.beshu.ror.accesscontrol.domain.{AuditCluster, RorAuditDataStream, RorAuditIndexTemplate, RorAuditLoggerName}
3032
import tech.beshu.ror.accesscontrol.logging.ResponseContext
3133
import tech.beshu.ror.accesscontrol.request.RequestContext
3234
import tech.beshu.ror.audit.instances.DefaultAuditLogSerializer
3335
import tech.beshu.ror.audit.{AuditLogSerializer, AuditRequestContext, AuditResponseContext}
34-
import tech.beshu.ror.es.AuditSinkService
3536
import tech.beshu.ror.implicits.*
3637

3738
import java.time.Clock
3839

39-
class AuditingTool private(auditSinks: NonEmptyList[BaseAuditSink])
40-
(implicit loggingContext: LoggingContext) {
40+
final class AuditingTool private(auditSinks: NonEmptyList[BaseAuditSink])
41+
(implicit loggingContext: LoggingContext) {
4142

4243
def audit[B <: BlockContext](response: ResponseContext[B]): Task[Unit] = {
4344
val auditResponseContext = toAuditResponse(response)
@@ -141,18 +142,23 @@ class AuditingTool private(auditSinks: NonEmptyList[BaseAuditSink])
141142
object AuditingTool extends Logging {
142143

143144
final case class Settings(auditSinks: NonEmptyList[Settings.AuditSink])
145+
144146
object Settings {
145147

146148
sealed trait AuditSink
149+
147150
object AuditSink {
148151
final case class Enabled(config: AuditSink.Config) extends AuditSink
152+
149153
case object Disabled extends AuditSink
150154

151155
sealed trait Config
156+
152157
object Config {
153158
final case class EsIndexBasedSink(logSerializer: AuditLogSerializer,
154159
rorAuditIndexTemplate: RorAuditIndexTemplate,
155160
auditCluster: AuditCluster) extends Config
161+
156162
object EsIndexBasedSink {
157163
val default: EsIndexBasedSink = EsIndexBasedSink(
158164
logSerializer = new DefaultAuditLogSerializer,
@@ -161,8 +167,21 @@ object AuditingTool extends Logging {
161167
)
162168
}
163169

170+
final case class EsDataStreamBasedSink(logSerializer: AuditLogSerializer,
171+
rorAuditDataStream: RorAuditDataStream,
172+
auditCluster: AuditCluster) extends Config
173+
174+
object EsDataStreamBasedSink {
175+
val default: EsDataStreamBasedSink = EsDataStreamBasedSink(
176+
logSerializer = new DefaultAuditLogSerializer,
177+
rorAuditDataStream = RorAuditDataStream.default,
178+
auditCluster = AuditCluster.LocalAuditCluster
179+
)
180+
}
181+
164182
final case class LogBasedSink(logSerializer: AuditLogSerializer,
165183
loggerName: RorAuditLoggerName) extends Config
184+
166185
object LogBasedSink {
167186
val default: LogBasedSink = LogBasedSink(
168187
logSerializer = new DefaultAuditLogSerializer,
@@ -174,16 +193,12 @@ object AuditingTool extends Logging {
174193
}
175194

176195
def create(settings: Settings,
177-
auditSinkServiceCreator: AuditCluster => AuditSinkService)
196+
auditSinkServiceCreator: AuditSinkServiceCreator)
178197
(implicit clock: Clock,
179-
loggingContext: LoggingContext): Option[AuditingTool] = {
180-
createAuditSinks(settings, auditSinkServiceCreator) match {
198+
loggingContext: LoggingContext): Task[Option[AuditingTool]] = {
199+
createAuditSinks(settings, auditSinkServiceCreator).map {
181200
case Some(auditSinks) =>
182-
implicit val auditSinkShow: Show[BaseAuditSink] = Show.show {
183-
case _: EsIndexBasedAuditSink => "index"
184-
case _: LogBasedAuditSink => "log"
185-
}
186-
logger.info(s"The audit is enabled with the given outputs: [${auditSinks.show}]")
201+
logger.info(s"The audit is enabled with the given outputs: [${auditSinks.toList.show}]")
187202
Some(new AuditingTool(auditSinks))
188203
case None =>
189204
logger.info("The audit is disabled because no output is enabled")
@@ -192,20 +207,51 @@ object AuditingTool extends Logging {
192207
}
193208

194209
private def createAuditSinks(settings: Settings,
195-
auditSinkServiceCreator: AuditCluster => AuditSinkService)
196-
(implicit clock: Clock): Option[NonEmptyList[BaseAuditSink]] = {
210+
auditSinkServiceCreator: AuditSinkServiceCreator)
211+
(using Clock): Task[Option[NonEmptyList[SupportedAuditSink]]] = {
197212
settings
198213
.auditSinks
199214
.toList
200-
.flatMap {
201-
case Enabled(AuditSink.Config.EsIndexBasedSink(logSerializer, rorAuditIndexTemplate, auditCluster)) =>
202-
EsIndexBasedAuditSink(logSerializer, rorAuditIndexTemplate, auditSinkServiceCreator(auditCluster)).some
215+
.map[Task[Option[SupportedAuditSink]]] {
216+
case Enabled(config: AuditSink.Config.EsIndexBasedSink) =>
217+
val serviceCreator: IndexBasedAuditSinkServiceCreator = auditSinkServiceCreator match {
218+
case creator: DataStreamAndIndexBasedAuditSinkServiceCreator => creator
219+
case creator: IndexBasedAuditSinkServiceCreator => creator
220+
}
221+
createIndexSink(config, serviceCreator).map(_.some)
222+
case Enabled(config: AuditSink.Config.EsDataStreamBasedSink) =>
223+
auditSinkServiceCreator match {
224+
case creator: DataStreamAndIndexBasedAuditSinkServiceCreator =>
225+
createDataStreamSink(config, creator).map(_.some)
226+
case _: IndexBasedAuditSinkServiceCreator =>
227+
// todo improvement - make this state impossible
228+
Task.raiseError(new IllegalStateException("Data stream audit sink is not supported in this version"))
229+
}
203230
case Enabled(AuditSink.Config.LogBasedSink(serializer, loggerName)) =>
204-
new LogBasedAuditSink(serializer, loggerName).some
231+
Task.delay(new LogBasedAuditSink(serializer, loggerName).some)
205232
case Disabled =>
206-
None
233+
Task.pure(None)
207234
}
208-
.toNel
235+
.sequence
236+
.map(_.flatten.toNel)
209237
}
210238

239+
private def createIndexSink(config: AuditSink.Config.EsIndexBasedSink,
240+
serviceCreator: IndexBasedAuditSinkServiceCreator)(using Clock): Task[SupportedAuditSink] = Task.delay {
241+
val service = serviceCreator.index(config.auditCluster)
242+
EsIndexBasedAuditSink(config.logSerializer, config.rorAuditIndexTemplate, service)
243+
}
244+
245+
private def createDataStreamSink(config: AuditSink.Config.EsDataStreamBasedSink,
246+
serviceCreator: DataStreamAndIndexBasedAuditSinkServiceCreator): Task[SupportedAuditSink] =
247+
Task.delay(serviceCreator.dataStream(config.auditCluster))
248+
.flatMap(EsDataStreamBasedAuditSink.create(config.logSerializer, config.rorAuditDataStream, _))
249+
250+
private type SupportedAuditSink = EsIndexBasedAuditSink | EsDataStreamBasedAuditSink | LogBasedAuditSink
251+
252+
private given Show[SupportedAuditSink] = Show.show {
253+
case _: EsIndexBasedAuditSink => "index"
254+
case _: LogBasedAuditSink => "log"
255+
case _: EsDataStreamBasedAuditSink => "data_stream"
256+
}
211257
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* This file is part of ReadonlyREST.
3+
*
4+
* ReadonlyREST is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* ReadonlyREST is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with ReadonlyREST. If not, see http://www.gnu.org/licenses/
16+
*/
17+
package tech.beshu.ror.accesscontrol.audit.sink
18+
19+
import cats.data.NonEmptyList
20+
import eu.timepit.refined.types.string.NonEmptyString
21+
import monix.eval.Task
22+
import org.apache.logging.log4j.scala.Logging
23+
import tech.beshu.ror.accesscontrol.domain.{DataStreamName, RorAuditDataStream, TemplateName}
24+
import tech.beshu.ror.es.DataStreamService
25+
import tech.beshu.ror.es.DataStreamService.DataStreamSettings
26+
import tech.beshu.ror.es.DataStreamService.DataStreamSettings.*
27+
import tech.beshu.ror.implicits.*
28+
import tech.beshu.ror.utils.RefinedUtils.*
29+
30+
import java.util.concurrent.TimeUnit
31+
32+
final class AuditDataStreamCreator(services: NonEmptyList[DataStreamService]) extends Logging {
33+
34+
def createIfNotExists(dataStreamName: RorAuditDataStream): Task[Unit] = {
35+
services.toList.traverse(createIfNotExists(_, dataStreamName)).map((_: List[Unit]) => ())
36+
}
37+
38+
private def createIfNotExists(service: DataStreamService, dataStreamName: RorAuditDataStream): Task[Unit] = {
39+
service
40+
.checkDataStreamExists(dataStreamName.dataStream)
41+
.flatMap {
42+
case true =>
43+
Task.delay(logger.info(s"Data stream ${dataStreamName.dataStream.show} already exists"))
44+
case false =>
45+
val settings = defaultSettingsFor(dataStreamName.dataStream)
46+
setupDataStream(service, settings)
47+
}
48+
}
49+
50+
private def setupDataStream(service: DataStreamService, settings: DataStreamSettings): Task[Unit] = {
51+
for {
52+
_ <- Task.delay(logger.info(s"Trying to setup ROR audit data stream ${settings.dataStreamName.show} with settings.."))
53+
_ <- service.fullySetupDataStream(settings)
54+
_ <- Task.delay(logger.info(s"ROR audit data stream ${settings.dataStreamName.show} created."))
55+
} yield ()
56+
}
57+
58+
private def defaultSettingsFor(dataStreamName: DataStreamName.Full) = {
59+
val defaultLifecyclePolicy = LifecyclePolicy(
60+
id = NonEmptyString.unsafeFrom(s"${dataStreamName.value.value}-lifecycle-policy"),
61+
hotPhase = LifecyclePolicy.HotPhase(
62+
LifecyclePolicy.Rollover(
63+
maxAge = positiveFiniteDuration(1, TimeUnit.DAYS),
64+
maxPrimaryShardSizeInGb = Some(positiveInt(50))
65+
)
66+
),
67+
warmPhase = Some(LifecyclePolicy.WarmPhase(
68+
minAge = positiveFiniteDuration(14, TimeUnit.DAYS),
69+
shrink = Some(LifecyclePolicy.Shrink(numberOfShards = positiveInt(1))),
70+
forceMerge = Some(LifecyclePolicy.ForceMerge(maxNumSegments = positiveInt(1)))
71+
)),
72+
coldPhase = Some(LifecyclePolicy.ColdPhase(
73+
minAge = positiveFiniteDuration(30, TimeUnit.DAYS),
74+
freeze = true
75+
))
76+
)
77+
78+
val defaultMappings = ComponentTemplateMappings(
79+
templateName = templateNameFrom(s"${dataStreamName.value.value}-mappings"),
80+
timestampField = "@timestamp",
81+
metadata = metadata("Data mappings for ReadonlyREST audit data stream")
82+
)
83+
84+
val settings = ComponentTemplateSettings(
85+
templateName = templateNameFrom(s"${dataStreamName.value.value}-settings"),
86+
lifecyclePolicyId = defaultLifecyclePolicy.id,
87+
metadata = metadata("Index settings for ReadonlyREST audit data stream")
88+
)
89+
90+
val indexTemplate = IndexTemplateSettings(
91+
templateName = templateNameFrom(s"${dataStreamName.value.value}-template"),
92+
dataStreamName = dataStreamName,
93+
componentTemplates = NonEmptyList.of(defaultMappings.templateName, settings.templateName),
94+
metadata = metadata("Index template for ReadonlyREST audit data stream")
95+
)
96+
97+
DataStreamSettings(
98+
dataStreamName,
99+
defaultLifecyclePolicy,
100+
defaultMappings,
101+
settings,
102+
indexTemplate,
103+
)
104+
}
105+
106+
private def templateNameFrom(value: String) = {
107+
TemplateName
108+
.fromString(value)
109+
.getOrElse(throw new IllegalStateException("Template name should be non-empty"))
110+
}
111+
112+
private def metadata(description: String) = Map("description" -> description)
113+
114+
}
115+
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* This file is part of ReadonlyREST.
3+
*
4+
* ReadonlyREST is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* ReadonlyREST is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with ReadonlyREST. If not, see http://www.gnu.org/licenses/
16+
*/
17+
package tech.beshu.ror.accesscontrol.audit.sink
18+
19+
import tech.beshu.ror.accesscontrol.domain.AuditCluster
20+
import tech.beshu.ror.es.{DataStreamBasedAuditSinkService, IndexBasedAuditSinkService}
21+
22+
sealed trait AuditSinkServiceCreator
23+
24+
trait IndexBasedAuditSinkServiceCreator
25+
extends AuditSinkServiceCreator {
26+
27+
def index(cluster: AuditCluster): IndexBasedAuditSinkService
28+
}
29+
30+
trait DataStreamAndIndexBasedAuditSinkServiceCreator
31+
extends AuditSinkServiceCreator
32+
with IndexBasedAuditSinkServiceCreator {
33+
34+
def dataStream(cluster: AuditCluster): DataStreamBasedAuditSinkService
35+
}
36+
37+

core/src/main/scala/tech/beshu/ror/accesscontrol/audit/BaseAuditSink.scala renamed to core/src/main/scala/tech/beshu/ror/accesscontrol/audit/sink/BaseAuditSink.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* You should have received a copy of the GNU General Public License
1515
* along with ReadonlyREST. If not, see http://www.gnu.org/licenses/
1616
*/
17-
package tech.beshu.ror.accesscontrol.audit
17+
package tech.beshu.ror.accesscontrol.audit.sink
1818

1919
import monix.eval.Task
2020
import org.json.JSONObject
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* This file is part of ReadonlyREST.
3+
*
4+
* ReadonlyREST is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* ReadonlyREST is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with ReadonlyREST. If not, see http://www.gnu.org/licenses/
16+
*/
17+
package tech.beshu.ror.accesscontrol.audit.sink
18+
19+
import monix.eval.Task
20+
import org.json.JSONObject
21+
import tech.beshu.ror.accesscontrol.domain.RorAuditDataStream
22+
import tech.beshu.ror.audit.{AuditLogSerializer, AuditResponseContext}
23+
import tech.beshu.ror.es.DataStreamBasedAuditSinkService
24+
25+
private[audit] final class EsDataStreamBasedAuditSink private(serializer: AuditLogSerializer,
26+
rorAuditDataStream: RorAuditDataStream,
27+
auditSinkService: DataStreamBasedAuditSinkService)
28+
extends BaseAuditSink(serializer) {
29+
30+
override protected def submit(event: AuditResponseContext, serializedEvent: JSONObject): Task[Unit] = Task {
31+
auditSinkService.submit(
32+
dataStreamName = rorAuditDataStream.dataStream,
33+
documentId = event.requestContext.id,
34+
jsonRecord = serializedEvent.toString
35+
)
36+
}
37+
38+
override def close(): Task[Unit] = Task.delay(auditSinkService.close())
39+
40+
}
41+
42+
object EsDataStreamBasedAuditSink {
43+
def create(serializer: AuditLogSerializer,
44+
rorAuditDataStream: RorAuditDataStream,
45+
auditSinkService: DataStreamBasedAuditSinkService): Task[EsDataStreamBasedAuditSink] = {
46+
auditSinkService
47+
.dataStreamCreator
48+
.createIfNotExists(rorAuditDataStream)
49+
.map((_: Unit) => new EsDataStreamBasedAuditSink(serializer, rorAuditDataStream, auditSinkService))
50+
}
51+
}

0 commit comments

Comments
 (0)