1
+ // Entry Point for the LRS ReRoute Service.
2
+ //
3
+ // This is a NodeJS Express application
4
+ //
5
+ const http = require ( "http" )
6
+ const WebSocket = require ( 'ws' ) ;
7
+ const express = require ( "express" ) ;
8
+ const kafkaConsumer = require ( "simple-kafka-consumer" ) ;
9
+
10
+ const config = require ( "./config" ) ;
11
+
12
+ const APP_PORT = ( process . env . APP_PORT || 3000 ) ;
13
+ const WS_PASSWORD = ( process . env . WS_PASSWORD || config . websocket . password ) ;
14
+
15
+ const app = express ( ) ;
16
+ const server = http . createServer ( app ) ;
17
+
18
+
19
+ /**
20
+ * The point of this page is to montior the status of our Kafka cluster.
21
+ *
22
+ * To that end, our monitoring page will open a socket with this service
23
+ * to relay messages to our page.
24
+ */
25
+ const openSockets = [ ]
26
+ const wss = new WebSocket . Server ( {
27
+ path : config . root + "/kafka" ,
28
+ server,
29
+ } ) ;
30
+
31
+ wss . on ( 'connection' , function ( ws ) {
32
+
33
+ // Assign to our sockets
34
+ openSockets . push ( ws ) ;
35
+ ws . authenticated = false ;
36
+ ws . noMessages = true ;
37
+
38
+ ws . on ( "message" , function ( data ) {
39
+ if ( ws . noMessages && data == WS_PASSWORD ) {
40
+ ws . authenticated = true ;
41
+ console . log ( "AUTHENTICATED SOCKET" )
42
+ } else if ( data != "keep-alive" ) {
43
+ ws . close ( ) ;
44
+ console . log ( "CLOSING: " , data )
45
+ }
46
+ ws . noMessages = false ;
47
+ } ) ;
48
+
49
+ // If the socket is closed, stop sending messages to it
50
+ ws . on ( "close" , function close ( ) {
51
+ let index = openSockets . indexOf ( ws ) ;
52
+ openSockets . splice ( index , 1 ) ;
53
+ } ) ;
54
+
55
+ // Send an message when they connect here
56
+ ws . send ( 'Connected to Kafka Web Socket Monitor!' ) ;
57
+ } ) ;
58
+
59
+ setInterval ( ( ) => {
60
+ for ( let socket of openSockets ) {
61
+ if ( socket . authenticated )
62
+ socket . send ( "keep-alive" ) ;
63
+ }
64
+ } , 5000 )
65
+
66
+ // Broadcast a message to each open socket
67
+ //
68
+ function broadcast ( message ) {
69
+ for ( let k = 0 ; k < openSockets . length ; k ++ ) {
70
+ if ( openSockets [ k ] . authenticated ) {
71
+ openSockets [ k ] . send ( message ) ;
72
+ }
73
+ }
74
+ }
75
+
76
+ var recentCount = 0 ;
77
+ var throttleCount = 100 ;
78
+ var throttleTimerMS = 500 ;
79
+ var throttleWarned = false ;
80
+
81
+ // Configure this with our environment and config values
82
+ kafkaConsumer . configure ( {
83
+ brokers : ( process . env . KAFKA_BROKER || config . kafka . brokers . join ( "," ) ) ,
84
+ saslUser : ( process . env . KAFKA_SASL_USER || config . kafka . sasl ? config . kafka . sasl . username : undefined ) ,
85
+ saslPass : ( process . env . KAFKA_SASL_PASS || config . kafka . sasl ? config . kafka . sasl . password : undefined ) ,
86
+ topics : config . kafka . topics . map ( topic => topic . name ) ,
87
+ consumerGroup : config . kafka . consumerGroup
88
+ } )
89
+
90
+ // Set up our consumer to broadcast its traffic to our web sockets
91
+ kafkaConsumer . initConsumer ( ( topic , offset , message ) => {
92
+
93
+ console . log ( topic , offset , message . length > 100 ? message . substr ( 0 , 100 ) + " ..." : message )
94
+ recentCount ++ ;
95
+
96
+ if ( recentCount >= throttleCount ) {
97
+ if ( throttleWarned == false ) {
98
+ broadcast ( `[${ message . topic } , # ${ offset } , (throttled ${ recentCount } )]\n"High message rates will be throttled for performance with this page."` )
99
+ }
100
+
101
+ throttleWarned = true ;
102
+ return ;
103
+ }
104
+
105
+ broadcast ( `[${ topic } , # ${ offset } ]\n${ message } \n` )
106
+ } )
107
+
108
+ // Limit how many we can receive on a duration
109
+ setInterval ( function ( ) {
110
+ recentCount = 0 ;
111
+ throttleWarned = false ;
112
+ } , throttleTimerMS ) ;
113
+
114
+ /**
115
+ * Lastly, configure that express instance to serve this page.
116
+ */
117
+ app . set ( "view engine" , "ejs" ) ;
118
+ app . use ( express . static ( "public" ) ) ;
119
+ app . use ( express . static ( "scripts" ) ) ;
120
+ app . use ( express . static ( "views" ) ) ;
121
+ app . use ( config . root , express . static ( "public" ) ) ;
122
+ app . use ( config . root , express . static ( "scripts" ) ) ;
123
+ app . use ( config . root , express . static ( "views" ) ) ;
124
+
125
+ app . use ( "*" , function ( req , res , next ) {
126
+
127
+ if ( req . baseUrl . startsWith ( config . root ) == false )
128
+ res . redirect ( config . root . substr ( 1 ) + req . url ) ;
129
+ else
130
+ next ( ) ;
131
+ } ) ;
132
+
133
+ // Main page.
134
+ app . get ( config . root , function ( req , res , next ) {
135
+ res . render ( "index.ejs" , {
136
+ password : WS_PASSWORD ,
137
+ root : config . root ,
138
+ topics : config . kafka . topics
139
+ } ) ;
140
+ } ) ;
141
+
142
+ // Then start the server.
143
+ server . listen ( APP_PORT , function ( ) {
144
+ console . log ( "\nKafka Web Socket Example listening on port %s" , APP_PORT ) ;
145
+ } ) ;
0 commit comments