11using System . Threading . Channels ;
2- using System . Text ;
32
43namespace GrokSdk ;
54
65/// <summary>
76/// Base message type for different kinds of responses, nullable enabled
87/// </summary>
9- public abstract record GrokMessage
8+ public abstract record GrokMessageBase
109{
1110}
1211
1312/// <summary>
1413/// Text message type inheriting from GrokMessage
1514/// </summary>
1615/// <param name="Message"></param>
17- public record GrokTextMessage ( string Message ) : GrokMessage
16+ public record GrokTextMessage ( string Message ) : GrokMessageBase
1817{
1918}
2019
2120/// <summary>
2221/// Service based messages from Grok
2322/// </summary>
2423/// <param name="Message"></param>
25- public record GrokServiceMessage ( string Message ) : GrokMessage
24+ public record GrokServiceMessage ( string Message ) : GrokMessageBase
2625{
2726}
2827
2928/// <summary>
3029/// Exception handle indicating a failure occured
3130/// </summary>
3231/// <param name="Exception"></param>
33- public record GrokError ( Exception Exception ) : GrokMessage
32+ public record GrokError ( Exception Exception ) : GrokMessageBase
3433{
3534
3635}
@@ -39,155 +38,164 @@ public record GrokError(Exception Exception) : GrokMessage
3938/// The State of the stream
4039/// </summary>
4140/// <param name="StreamState"></param>
42- public record GrokStreamState ( StreamState StreamState ) : GrokMessage
41+ public record GrokStreamState ( StreamState StreamState ) : GrokMessageBase
4342{
4443
4544}
4645
47- // Manages the conversation thread
46+ /// <summary>
47+ /// Manages the conversation thread with Grok, handling messages and tool calls.
48+ /// </summary>
4849public class GrokThread ( GrokClient client )
4950{
5051 private readonly GrokClient _client = client ?? throw new ArgumentNullException ( nameof ( client ) ) ;
51- private readonly List < Message > _history = [ ] ;
52+ private readonly List < GrokMessage > _history = new ( ) ;
53+ private readonly Dictionary < string , GrokToolDefinition > _tools = new ( ) ;
5254
5355 /// <summary>
5456 /// Provide instruction to the system on how it should respond to the user.
5557 /// </summary>
56- /// <param name="message"></param>
58+ /// <param name="message">The instruction message to add. </param>
5759 public void AddSystemInstruction ( string message )
5860 {
59- _history . Add ( new SystemMessage ( ) { Content = message } ) ;
61+ _history . Add ( new GrokSystemMessage { Content = message } ) ;
6062 }
6163
6264 /// <summary>
63- /// Asks a question and streams response text parts as an IAsyncEnumerable.
65+ /// Registers a tool with the thread, making it available for Grok to use.
66+ /// </summary>
67+ /// <param name="tool">The tool definition to register.</param>
68+ public void RegisterTool ( GrokToolDefinition tool )
69+ {
70+ if ( tool == null )
71+ throw new ArgumentNullException ( nameof ( tool ) ) ;
72+
73+
74+ #if NETSTANDARD2_0
75+ if ( _tools . ContainsKey ( tool . Name ) )
76+ {
77+ throw new ArgumentException ( $ "A tool with name '{ tool . Name } ' already exists.") ;
78+ }
79+ _tools . Add ( tool . Name , tool ) ;
80+ #else
81+ if ( ! _tools . TryAdd ( tool . Name , tool ) )
82+ throw new ArgumentException ( $ "A tool with name '{ tool . Name } ' already exists.") ;
83+ #endif
84+
85+ }
86+
87+ /// <summary>
88+ /// Asks a question and processes the response, providing status updates and results via an IAsyncEnumerable.
6489 /// </summary>
6590 /// <param name="question">The question to ask.</param>
91+ /// <param name="files">Files to do an analysis on (optional).</param>
6692 /// <param name="model">The model to use (default: "grok-2-latest").</param>
6793 /// <param name="temperature">The temperature for response generation (default: 0).</param>
6894 /// <param name="cancellationToken">Token to cancel the operation.</param>
69- /// <returns>An IAsyncEnumerable of GrokMessage containing the question and responses.</returns>
95+ /// <returns>An IAsyncEnumerable of GrokMessageBase containing status updates and responses.</returns>
7096 /// <exception cref="ArgumentException">Thrown if the question is null or empty.</exception>
71- public IAsyncEnumerable < GrokMessage > AskQuestion (
97+ public IAsyncEnumerable < GrokMessageBase > AskQuestion (
7298 string ? question ,
73- string ? model = "grok-2-latest" ,
99+ List < byte [ ] > ? files = null ,
100+ string model = "grok-2-latest" ,
74101 float temperature = 0 ,
75102 CancellationToken cancellationToken = default )
76103 {
77104 if ( string . IsNullOrEmpty ( question ) )
78105 throw new ArgumentException ( "Question cannot be null or empty." , nameof ( question ) ) ;
79106
80- _history . Add ( new UserMessage { Content = question } ) ;
107+ _history . Add ( new GrokUserMessage { Content = [ new GrokTextPart ( ) { Text = question } ] } ) ;
81108
82- var channel = Channel . CreateUnbounded < GrokMessage > ( ) ;
109+ var channel = Channel . CreateUnbounded < GrokMessageBase > ( ) ;
83110
84- _ = Task . Run ( async ( ) => await StreamResponsesAsync (
85- new ChatCompletionRequest
111+ _ = Task . Run ( async ( ) =>
112+ {
113+ try
86114 {
87- Messages = _history ,
88- Model = model ,
89- Temperature = temperature ,
90- Stream = true
91- } ,
92- channel ,
93- cancellationToken ) , cancellationToken ) ;
115+ await ProcessConversationAsync ( model , temperature , channel , cancellationToken ) ;
116+ }
117+ catch ( Exception ex )
118+ {
119+ channel . Writer . TryWrite ( new GrokError ( ex ) ) ;
120+ channel . Writer . TryWrite ( new GrokStreamState ( StreamState . Error ) ) ;
121+ channel . Writer . TryComplete ( ex ) ;
122+ }
123+ } , cancellationToken ) ;
94124
95125 return channel . Reader . ReadAllAsync ( cancellationToken ) ;
96126 }
97127
98128 /// <summary>
99- /// Streams the question and responses to the channel using the streaming client .
129+ /// Processes the conversation by handling tool calls and sending the final response .
100130 /// </summary>
101- /// <param name="request">The chat completion request.</param>
131+ /// <param name="model">The model to use.</param>
132+ /// <param name="temperature">The temperature for response generation.</param>
102133 /// <param name="channel">The channel to write messages to.</param>
103134 /// <param name="cancellationToken">Token to cancel the operation.</param>
104- private async Task StreamResponsesAsync (
105- ChatCompletionRequest request ,
106- Channel < GrokMessage > channel ,
135+ private async Task ProcessConversationAsync (
136+ string model ,
137+ float temperature ,
138+ Channel < GrokMessageBase > channel ,
107139 CancellationToken cancellationToken )
108140 {
109- var streamingClient = _client . GetStreamingClient ( ) ;
141+ bool toolCallsPending = true ;
110142
111- var responseBuilder = new StringBuilder ( ) ;
143+ while ( toolCallsPending )
144+ {
145+ // Send "Thinking" status before making the API call
146+ channel . Writer . TryWrite ( new GrokStreamState ( StreamState . Thinking ) ) ;
112147
113- const int maxRetries = 3 ;
114- const int defaultDelayMs = 1000 ; // 1 second default delay if Retry-After is missing
115- int retryCount = 0 ;
148+ var request = new GrokChatCompletionRequest
149+ {
150+ Messages = _history ,
151+ Model = model ,
152+ Temperature = temperature ,
153+ Stream = false , // Always non-streaming
154+ Tools = _tools . Any ( ) ? _tools . Values . Select ( t => new GrokTool
155+ {
156+ Type = GrokToolType . Function ,
157+ Function = new GrokFunctionDefinition
158+ {
159+ Name = t . Name ,
160+ Description = t . Description ,
161+ Parameters = t . Parameters
162+ }
163+ } ) . ToList ( ) : null ,
164+ Tool_choice = _tools . Any ( ) ? Tool_choice . Auto : null
165+ } ;
116166
117- streamingClient . OnChunkReceived += OnChunkReceived ;
118- streamingClient . OnStreamCompleted += OnStreamCompleted ;
119- streamingClient . OnStreamError += OnStreamError ;
120- streamingClient . OnStateChanged += OnStateChanged ;
167+ channel . Writer . TryWrite ( new GrokStreamState ( StreamState . Streaming ) ) ;
168+ var response = await _client . CreateChatCompletionAsync ( request , cancellationToken ) ;
169+ var choice = response . Choices . First ( ) ;
121170
122- while ( retryCount <= maxRetries )
123- {
124- try
171+ if ( choice . Message . Tool_calls ? . Count > 0 )
125172 {
126- await streamingClient . StartStreamAsync ( request , cancellationToken ) ;
127- break ; // Success, exit retry loop
128- }
129- catch ( GrokSdkException ex ) when ( ex . StatusCode == 429 && retryCount < maxRetries )
130- {
131- retryCount ++ ;
132- channel . Writer . TryWrite ( new GrokServiceMessage ( $ "Rate limit hit, retrying ({ retryCount } /{ maxRetries } )...") ) ;
133-
134- // Check for Retry-After header
135- int delayMs = defaultDelayMs ;
136- if ( ex . Headers . TryGetValue ( "Retry-After" , out var retryAfterValues ) )
173+ foreach ( var toolCall in choice . Message . Tool_calls )
137174 {
138- var retryAfter = retryAfterValues ? . FirstOrDefault ( ) ;
139- if ( int . TryParse ( retryAfter , out var seconds ) )
175+ if ( _tools . TryGetValue ( toolCall . Function . Name , out var tool ) )
176+ {
177+ channel . Writer . TryWrite ( new GrokStreamState ( StreamState . Streaming ) ) ;
178+ string result = await tool . Execute ( toolCall . Function . Arguments ) ;
179+
180+ _history . Add ( new GrokToolMessage { Content = result , Tool_call_id = toolCall . Id } ) ;
181+
182+ }
183+ else
140184 {
141- delayMs = seconds * 1000 ; // Convert seconds to milliseconds
185+ throw new InvalidOperationException ( $ "Tool ' { toolCall . Function . Name } ' not found." ) ;
142186 }
143187 }
144-
145- await Task . Delay ( delayMs , cancellationToken ) ;
146188 }
147- catch ( Exception ex )
189+ else
148190 {
149- OnStreamError ( this , ex ) ; // Handle other exceptions immediately
150- break ;
191+ toolCallsPending = false ;
192+ _history . Add ( choice . Message ) ;
193+
194+ // Send the final response to the channel
195+ channel . Writer . TryWrite ( new GrokTextMessage ( choice . Message . Content ) ) ;
196+ channel . Writer . TryWrite ( new GrokStreamState ( StreamState . Done ) ) ;
197+ channel . Writer . Complete ( ) ;
151198 }
152199 }
153-
154- streamingClient . OnChunkReceived -= OnChunkReceived ;
155- streamingClient . OnStreamCompleted -= OnStreamCompleted ;
156- streamingClient . OnStreamError -= OnStreamError ;
157- streamingClient . OnStateChanged -= OnStateChanged ;
158-
159- return ;
160-
161- return ;
162-
163- void OnChunkReceived ( object ? sender , ChatCompletionChunk chunk )
164- {
165- var content = chunk . Choices . FirstOrDefault ( ) ? . Delta . Content ;
166- if ( string . IsNullOrEmpty ( content ) ) return ;
167- channel . Writer . TryWrite ( new GrokTextMessage ( content ) ) ;
168- responseBuilder . Append ( content ) ;
169- }
170-
171- void OnStreamError ( object ? sender , Exception ex )
172- {
173- if ( ex is OperationCanceledException )
174- channel . Writer . TryWrite ( new GrokTextMessage ( "Stream canceled" ) ) ;
175- else
176- channel . Writer . TryWrite ( new GrokError ( ex ) ) ;
177- channel . Writer . TryComplete ( ex ) ;
178- }
179-
180- void OnStateChanged ( object ? sender , StreamState e )
181- {
182- channel . Writer . TryWrite ( new GrokStreamState ( e ) ) ;
183- }
184-
185- void OnStreamCompleted ( object ? sender , EventArgs e )
186- {
187- // Record the full response in the chat history for thread context
188- var fullResponse = responseBuilder . ToString ( ) ;
189- _history . Add ( new AssistantMessage { Content = fullResponse } ) ;
190- channel . Writer . Complete ( ) ;
191- }
192200 }
193201}
0 commit comments