1
1
import itertools
2
2
import re
3
3
from abc import abstractmethod
4
- from typing import List , Optional , Tuple
4
+ from typing import Any , List , Optional , Tuple
5
5
6
6
import pydantic
7
7
import structlog
18
18
from codegate .pipeline .output import OutputPipelineContext , OutputPipelineStep
19
19
from codegate .pipeline .secrets .manager import SecretsManager
20
20
from codegate .pipeline .secrets .signatures import CodegateSignatures , Match
21
- from codegate .pipeline .systemmsg import add_or_update_system_message
22
- from codegate .types .common import (
23
- ChatCompletionRequest ,
24
- ChatCompletionSystemMessage ,
25
- Delta ,
26
- ModelResponse ,
27
- StreamingChoices ,
28
- )
29
21
30
22
31
23
logger = structlog .get_logger ("codegate" )
@@ -279,7 +271,7 @@ def _redact_text(
279
271
return text_encryptor .obfuscate (text , snippet )
280
272
281
273
async def process (
282
- self , request : ChatCompletionRequest , context : PipelineContext
274
+ self , reques : Any , context : PipelineContext
283
275
) -> PipelineResult :
284
276
"""
285
277
Process the request to find and protect secrets in all messages.
@@ -292,68 +284,34 @@ async def process(
292
284
PipelineResult containing the processed request and context with redaction metadata
293
285
"""
294
286
295
- ##### NEW CODE PATH #####
296
-
297
- if type (request ) != ChatCompletionRequest :
298
- secrets_manager = context .sensitive .manager
299
- if not secrets_manager or not isinstance (secrets_manager , SecretsManager ):
300
- raise ValueError ("Secrets manager not found in context" )
301
- session_id = context .sensitive .session_id
302
- if not session_id :
303
- raise ValueError ("Session ID not found in context" )
304
-
305
- total_matches = []
306
-
307
- # get last user message block to get index for the first relevant user message
308
- last_user_message = self .get_last_user_message_block (request , context .client )
309
- last_assistant_idx = last_user_message [1 ] - 1 if last_user_message else - 1
310
-
311
- # Process all messages
312
- for i , message in enumerate (request .get_messages ()):
313
- for content in message .get_content ():
314
- txt = content .get_text ()
315
- if txt is not None :
316
- redacted_content , secrets_matched = self ._redact_message_content (
317
- "" .join (txt for txt in content .get_text ()), secrets_manager , session_id , context
318
- )
319
- content .set_text (redacted_content )
320
- if i > last_assistant_idx :
321
- total_matches += secrets_matched
322
-
323
- # Not count repeated secret matches
324
- request = self ._finalize_redaction (context , total_matches , request )
325
- return PipelineResult (request = request , context = context )
326
-
327
- ##### OLD CODE PATH #####
328
-
329
- if "messages" not in request :
330
- return PipelineResult (request = request , context = context )
331
-
332
287
secrets_manager = context .sensitive .manager
333
288
if not secrets_manager or not isinstance (secrets_manager , SecretsManager ):
334
289
raise ValueError ("Secrets manager not found in context" )
335
290
session_id = context .sensitive .session_id
336
291
if not session_id :
337
292
raise ValueError ("Session ID not found in context" )
338
293
339
- new_request = request .copy ()
340
294
total_matches = []
341
295
342
296
# get last user message block to get index for the first relevant user message
343
- last_user_message = self .get_last_user_message_block (new_request , context .client )
297
+ last_user_message = self .get_last_user_message_block (request , context .client )
344
298
last_assistant_idx = last_user_message [1 ] - 1 if last_user_message else - 1
345
299
346
300
# Process all messages
347
- for i , message in enumerate (new_request ["messages" ]):
348
- if "content" in message and message ["content" ]:
349
- redacted_content , secrets_matched = self ._redact_message_content (
350
- message ["content" ], secrets_manager , session_id , context
351
- )
352
- new_request ["messages" ][i ]["content" ] = redacted_content
353
- if i > last_assistant_idx :
354
- total_matches += secrets_matched
355
- new_request = self ._finalize_redaction (context , total_matches , new_request )
356
- return PipelineResult (request = new_request , context = context )
301
+ for i , message in enumerate (request .get_messages ()):
302
+ for content in message .get_content ():
303
+ txt = content .get_text ()
304
+ if txt is not None :
305
+ redacted_content , secrets_matched = self ._redact_message_content (
306
+ "" .join (txt for txt in content .get_text ()), secrets_manager , session_id , context
307
+ )
308
+ content .set_text (redacted_content )
309
+ if i > last_assistant_idx :
310
+ total_matches += secrets_matched
311
+
312
+ # Not count repeated secret matches
313
+ request = self ._finalize_redaction (context , total_matches , request )
314
+ return PipelineResult (request = request , context = context )
357
315
358
316
def _redact_message_content (self , message_content , secrets_manager , session_id , context ):
359
317
# Extract any code snippets
@@ -403,14 +361,7 @@ def _finalize_redaction(self, context, total_matches, new_request):
403
361
logger .info (f"Total secrets redacted since last assistant message: { total_redacted } " )
404
362
context .metadata ["redacted_secrets_count" ] = total_redacted
405
363
if total_redacted > 0 :
406
- if isinstance (new_request , pydantic .BaseModel ):
407
- new_request .add_system_prompt (Config .get_config ().prompts .secrets_redacted )
408
- return new_request
409
- system_message = ChatCompletionSystemMessage (
410
- content = Config .get_config ().prompts .secrets_redacted ,
411
- role = "system" ,
412
- )
413
- return add_or_update_system_message (new_request , system_message , context )
364
+ new_request .add_system_prompt (Config .get_config ().prompts .secrets_redacted )
414
365
return new_request
415
366
416
367
@@ -448,10 +399,10 @@ def _find_complete_redaction(self, text: str) -> tuple[Optional[re.Match[str]],
448
399
449
400
async def process_chunk (
450
401
self ,
451
- chunk : ModelResponse ,
402
+ chunk : Any ,
452
403
context : OutputPipelineContext ,
453
404
input_context : Optional [PipelineContext ] = None ,
454
- ) -> list [ModelResponse ]:
405
+ ) -> list [Any ]:
455
406
"""Process a single chunk of the stream"""
456
407
if not input_context :
457
408
raise ValueError ("Input context not found" )
@@ -460,9 +411,6 @@ async def process_chunk(
460
411
if input_context .sensitive .session_id == "" :
461
412
raise ValueError ("Session ID not found in input context" )
462
413
463
- # if len(chunk.choices) == 0 or not chunk.choices[0].delta.content:
464
- # return [chunk]
465
-
466
414
for content in chunk .get_content ():
467
415
# Check the buffered content
468
416
buffered_content = "" .join (context .buffer )
@@ -517,37 +465,20 @@ class SecretRedactionNotifier(OutputPipelineStep):
517
465
def name (self ) -> str :
518
466
return "secret-redaction-notifier"
519
467
520
- def _create_chunk (self , original_chunk : ModelResponse , content : str ) -> ModelResponse :
468
+ def _create_chunk (self , original_chunk : Any , content : str ) -> Any :
521
469
"""
522
470
Creates a new chunk with the given content, preserving the original chunk's metadata
523
471
"""
524
- if isinstance (original_chunk , ModelResponse ):
525
- return ModelResponse (
526
- id = original_chunk .id ,
527
- choices = [
528
- StreamingChoices (
529
- finish_reason = None ,
530
- index = 0 ,
531
- delta = Delta (content = content , role = "assistant" ),
532
- logprobs = None ,
533
- )
534
- ],
535
- created = original_chunk .created ,
536
- model = original_chunk .model ,
537
- object = "chat.completion.chunk" ,
538
- )
539
- else :
540
- # TODO verify if deep-copy is necessary
541
- copy = original_chunk .model_copy (deep = True )
542
- copy .set_text (content )
543
- return copy
472
+ copy = original_chunk .model_copy (deep = True )
473
+ copy .set_text (content )
474
+ return copy
544
475
545
476
async def process_chunk (
546
477
self ,
547
- chunk : ModelResponse ,
478
+ chunk : Any ,
548
479
context : OutputPipelineContext ,
549
480
input_context : Optional [PipelineContext ] = None ,
550
- ) -> list [ModelResponse ]:
481
+ ) -> list [Any ]:
551
482
"""Process a single chunk of the stream"""
552
483
if (
553
484
not input_context
@@ -567,20 +498,21 @@ async def process_chunk(
567
498
)
568
499
569
500
# Check if this is the first chunk (delta role will be present, others will not)
570
- # if len(chunk.choices) > 0 and chunk.choices[0].delta.role:
571
501
for _ in itertools .takewhile (lambda x : x [0 ] == 1 , enumerate (chunk .get_content ())):
572
502
redacted_count = input_context .metadata ["redacted_secrets_count" ]
573
503
secret_text = "secret" if redacted_count == 1 else "secrets"
574
504
# Create notification chunk
575
505
if tool_name in ["cline" , "kodu" ]:
506
+ # NOTE: Original code was ensuring that role was
507
+ # "assistant" here, we might have to do that as well,
508
+ # but I believe it was defensive programming or
509
+ # leftover of some refactoring.
576
510
notification_chunk = self ._create_chunk (
577
511
chunk ,
578
512
f"<thinking>\n 🛡️ [CodeGate prevented { redacted_count } { secret_text } ]"
579
513
f"(http://localhost:9090/?search=codegate-secrets) from being leaked "
580
514
f"by redacting them.</thinking>\n \n " ,
581
515
)
582
- # TODO fix this
583
- # notification_chunk.choices[0].delta.role = "assistant"
584
516
else :
585
517
notification_chunk = self ._create_chunk (
586
518
chunk ,
0 commit comments