Skip to content

Test/realtime chat #1034

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="SharpHook" Version="5.3.9" />
<PackageVersion Include="SixLabors.ImageSharp" Version="3.1.7" />
<PackageVersion Include="System.ClientModel" Version="1.3.0" />
<PackageVersion Include="System.ComponentModel.Annotations" Version="5.0.0" />
<PackageVersion Include="System.IdentityModel.Tokens.Jwt" Version="8.0.0" />
<PackageVersion Include="System.Memory.Data" Version="8.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class Agent
public PluginDef Plugin { get; set; }

[JsonIgnore]
public bool Installed => Plugin.Enabled;
public bool Installed => Plugin?.Enabled == true;

/// <summary>
/// Default is True, user will enable this by installing appropriate plugin.
Expand Down Expand Up @@ -168,6 +168,7 @@ public static Agent Clone(Agent agent)
Functions = agent.Functions,
Responses = agent.Responses,
Samples = agent.Samples,
Templates = agent.Templates,
Utilities = agent.Utilities,
McpTools = agent.McpTools,
Knowledges = agent.Knowledges,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ public class AgentTemplate

public AgentTemplate()
{

}

public AgentTemplate(string name, string content)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public interface IKnowledgeService
Task<bool> CreateVectorCollection(string collectionName, string collectionType, int dimension, string provider, string model);
Task<bool> DeleteVectorCollection(string collectionName);
Task<IEnumerable<VectorCollectionConfig>> GetVectorCollections(string? type = null);
Task<VectorCollectionDetails?> GetVectorCollectionDetails(string collectionName);
Task<IEnumerable<VectorSearchResult>> SearchVectorKnowledge(string query, string collectionName, VectorSearchOptions options);
Task<StringIdPagedItems<VectorSearchResult>> GetPagedVectorCollectionData(string collectionName, VectorFilter filter);
Task<bool> DeleteVectorCollectionData(string collectionName, string id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ public interface IContentGeneratingHook
/// <param name="instruction"></param>
/// <param name="functions"></param>
/// <returns></returns>
Task OnSessionUpdated(Agent agent, string instruction, FunctionDef[] functions) => Task.CompletedTask;
Task OnSessionUpdated(Agent agent, string instruction, FunctionDef[] functions, bool isInit = false) => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ public interface IRealTimeCompletion
string Model { get; }
void SetModelName(string model);

Task Connect(RealtimeHubConnection conn,
Task Connect(
RealtimeHubConnection conn,
Action onModelReady,
Action<string, string> onModelAudioDeltaReceived,
Action onModelAudioResponseDone,
Expand All @@ -23,7 +24,7 @@ Task Connect(RealtimeHubConnection conn,
Task SendEventToModel(object message);
Task Disconnect();

Task<string> UpdateSession(RealtimeHubConnection conn);
Task<string> UpdateSession(RealtimeHubConnection conn, bool isInit = false);
Task InsertConversationItem(RoleDialogModel message);
Task RemoveConversationItem(string itemId);
Task TriggerModelInference(string? instructions = null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace BotSharp.Abstraction.Templating.Constants;

public static class TemplateRenderConstant
{
public const string RENDER_AGENT = "render_agent";
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ namespace BotSharp.Abstraction.Templating;
public interface ITemplateRender
{
string Render(string template, Dictionary<string, object> dict);
void Register(Type type);
void RegisterType(Type type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Task<bool> DoesCollectionExist(string collectionName)
=> throw new NotImplementedException();
Task<IEnumerable<string>> GetCollections()
=> throw new NotImplementedException();
Task<VectorCollectionDetails?> GetCollectionDetails(string collectionName)
=> throw new NotImplementedException();
Task<StringIdPagedItems<VectorCollectionData>> GetPagedCollectionData(string collectionName, VectorFilter filter)
=> throw new NotImplementedException();
Task<IEnumerable<VectorCollectionData>> GetCollectionData(string collectionName, IEnumerable<Guid> ids,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
namespace BotSharp.Abstraction.VectorStorage.Models;

public class VectorCollectionDetails
{
[JsonPropertyName("status")]
public string Status { get; set; }

[JsonPropertyName("optimizer_status")]
public string OptimizerStatus { get; set; }

[JsonPropertyName("segments_count")]
public ulong SegmentsCount { get; set; }

[JsonPropertyName("vectors_count")]
public ulong VectorsCount { get; set; }

[JsonPropertyName("indexed_vectors_count")]
public ulong IndexedVectorsCount { get; set; }

[JsonPropertyName("points_count")]
public ulong PointsCount { get; set; }

[JsonPropertyName("inner_config")]
public VectorCollectionDetailConfig? InnerConfig { get; set; }

[JsonPropertyName("basic_info")]
public VectorCollectionConfig? BasicInfo { get; set; }
}

public class VectorCollectionDetailConfig
{
public VectorCollectionDetailConfigParam? Param { get; set; }
}

public class VectorCollectionDetailConfigParam
{
[JsonPropertyName("shard_number")]
public uint? ShardNumber { get; set; }

[JsonPropertyName("sharding_method")]
public string? ShardingMethod { get; set; }

[JsonPropertyName("replication_factor")]
public uint? ReplicationFactor { get; set; }

[JsonPropertyName("write_consistency_factor")]
public uint? WriteConsistencyFactor { get; set; }

[JsonPropertyName("read_fan_out_factor")]
public uint? ReadFanOutFactor { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public async Task<IMcpClient> GetMcpClientAsync(string serverId)
transport = new SseClientTransport(new SseClientTransportOptions
{
Name = config.Name,
Endpoint = new Uri(config.SseConfig.EndPoint)
Endpoint = new Uri(config.SseConfig.EndPoint),
AdditionalHeaders = config.SseConfig.AdditionalHeaders,
ConnectionTimeout = config.SseConfig.ConnectionTimeout
});
}
else if (config.StdioConfig != null)
Expand All @@ -33,7 +35,8 @@ public async Task<IMcpClient> GetMcpClientAsync(string serverId)
Name = config.Name,
Command = config.StdioConfig.Command,
Arguments = config.StdioConfig.Arguments,
EnvironmentVariables = config.StdioConfig.EnvironmentVariables
EnvironmentVariables = config.StdioConfig.EnvironmentVariables,
ShutdownTimeout = config.StdioConfig.ShutdownTimeout
});
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

<ItemGroup>
<PackageReference Include="NAudio" />
<PackageReference Include="System.ClientModel" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace BotSharp.Core.Realtime.Models.Chat;

public class ChatSessionUpdate
{
public string RawResponse { get; set; }

public ChatSessionUpdate()
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace BotSharp.Core.Realtime.Models.Options;

public class ChatSessionOptions
{
public int? BufferSize { get; set; }
public JsonSerializerOptions? JsonOptions { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ public async Task ConnectToModel(Func<string, Task>? responseToUser = null, Func

_completer = _services.GetServices<IRealTimeCompletion>().First(x => x.Provider == settings.Provider);

await _completer.Connect(_conn,
await _completer.Connect(
conn: _conn,
onModelReady: async () =>
{
// Not TriggerModelInference, waiting for user utter.
var instruction = await _completer.UpdateSession(_conn);
var instruction = await _completer.UpdateSession(_conn, isInit: true);
var data = _conn.OnModelReady();
await (init?.Invoke(data) ?? Task.CompletedTask);
await HookEmitter.Emit<IRealtimeHook>(_services, async hook => await hook.OnModelReady(agent, _completer));
await (init?.Invoke(data) ?? Task.CompletedTask);
},
onModelAudioDeltaReceived: async (audioDeltaData, itemId) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public async Task ConnectAsync(string conversationId)
var waveFormat = new WaveFormat(24000, 16, 1); // 24000 Hz, 16-bit PCM, Mono
_bufferedWaveProvider = new BufferedWaveProvider(waveFormat);
_bufferedWaveProvider.BufferDuration = TimeSpan.FromMinutes(10);
//_bufferedWaveProvider.BufferLength = 1024;
_bufferedWaveProvider.DiscardOnBufferOverflow = true;

_waveOut = new WaveOutEvent()
Expand Down
4 changes: 4 additions & 0 deletions src/Infrastructure/BotSharp.Core.Realtime/Using.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@
global using BotSharp.Abstraction.Routing;
global using BotSharp.Abstraction.Agents.Enums;
global using BotSharp.Abstraction.Conversations.Models;

global using BotSharp.Core.Realtime.Models.Chat;
global using BotSharp.Core.Realtime.Models.Options;
global using BotSharp.Core.Realtime.Websocket.Chat;
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using BotSharp.Core.Realtime.Websocket.Common;
using System.ClientModel;
using System.Runtime.CompilerServices;

namespace BotSharp.Core.Realtime.Websocket.Chat;

public class BotSharpRealtimeSession : IDisposable
{
private readonly IServiceProvider _services;
private readonly WebSocket _websocket;
private readonly ChatSessionOptions? _sessionOptions;
private readonly object _singleReceiveLock = new();
private AsyncWebsocketDataCollectionResult _receivedCollectionResult;

public BotSharpRealtimeSession(
IServiceProvider services,
WebSocket websocket,
ChatSessionOptions? sessionOptions)
{
_services = services;
_websocket = websocket;
_sessionOptions = sessionOptions;
}

public async IAsyncEnumerable<ChatSessionUpdate> ReceiveUpdatesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await foreach (ClientResult result in ReceiveInnerUpdatesAsync(cancellationToken))
{
var update = HandleSessionResult(result);
yield return update;
}
}

private async IAsyncEnumerable<ClientResult> ReceiveInnerUpdatesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
lock (_singleReceiveLock)
{
_receivedCollectionResult ??= new(_websocket, _sessionOptions, cancellationToken);
}

await foreach (var result in _receivedCollectionResult)
{
yield return result;
}
}

private ChatSessionUpdate HandleSessionResult(ClientResult result)
{
using var response = result.GetRawResponse();
var bytes = response.Content.ToArray();
var text = Encoding.UTF8.GetString(bytes, 0, bytes.Length);
return new ChatSessionUpdate
{
RawResponse = text
};
}

public async Task SendEvent(string message)
{
if (_websocket.State == WebSocketState.Open)
{
var buffer = Encoding.UTF8.GetBytes(message);
await _websocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);
}
}

public async Task Disconnect()
{
if (_websocket.State == WebSocketState.Open)
{
await _websocket.CloseAsync(WebSocketCloseStatus.Empty, null, CancellationToken.None);
}
}

public void Dispose()
{
_websocket.Dispose();
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
using BotSharp.Core.Realtime.Models.Options;
using System.ClientModel;
using System.Net.WebSockets;

namespace BotSharp.Plugin.OpenAI.Providers.Realtime.Session;
namespace BotSharp.Core.Realtime.Websocket.Common;

public class AsyncWebsocketDataCollectionResult : AsyncCollectionResult<ClientResult>
internal class AsyncWebsocketDataCollectionResult : AsyncCollectionResult<ClientResult>
{
private readonly WebSocket _webSocket;
private readonly ChatSessionOptions? _sessionOptions;
private readonly CancellationToken _cancellationToken;

public AsyncWebsocketDataCollectionResult(
WebSocket webSocket,
ChatSessionOptions? sessionOptions,
CancellationToken cancellationToken)
{
_webSocket = webSocket;
_sessionOptions = sessionOptions;
_cancellationToken = cancellationToken;
}

Expand All @@ -23,7 +26,7 @@ public AsyncWebsocketDataCollectionResult(

public override async IAsyncEnumerable<ClientResult> GetRawPagesAsync()
{
await using var enumerator = new AsyncWebsocketDataResultEnumerator(_webSocket, _cancellationToken);
await using var enumerator = new AsyncWebsocketDataResultEnumerator(_webSocket, _sessionOptions, _cancellationToken);
while (await enumerator.MoveNextAsync().ConfigureAwait(false))
{
yield return enumerator.Current;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
using System;
using BotSharp.Core.Realtime.Models.Options;
using System.Buffers;
using System.ClientModel;
using System.Net.WebSockets;

namespace BotSharp.Plugin.OpenAI.Providers.Realtime.Session;
namespace BotSharp.Core.Realtime.Websocket.Common;

public class AsyncWebsocketDataResultEnumerator : IAsyncEnumerator<ClientResult>
internal class AsyncWebsocketDataResultEnumerator : IAsyncEnumerator<ClientResult>
{
private readonly WebSocket _webSocket;
private readonly ChatSessionOptions? _sessionOptions;
private readonly CancellationToken _cancellationToken;
private readonly byte[] _buffer;

private const int DEFAULT_BUFFER_SIZE = 1024 * 32;

public AsyncWebsocketDataResultEnumerator(
WebSocket webSocket,
ChatSessionOptions? sessionOptions,
CancellationToken cancellationToken)
{
_webSocket = webSocket;
_sessionOptions = sessionOptions;
_cancellationToken = cancellationToken;
_buffer = ArrayPool<byte>.Shared.Rent(1024 * 32);
var bufferSize = sessionOptions?.BufferSize > 0 ? sessionOptions.BufferSize.Value : DEFAULT_BUFFER_SIZE;
_buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
}

public ClientResult Current { get; private set; }
Expand All @@ -31,7 +36,7 @@ public ValueTask DisposeAsync()

public async ValueTask<bool> MoveNextAsync()
{
var response = new AiWebsocketPipelineResponse();
var response = new WebsocketPipelineResponse();
while (!response.IsComplete)
{
var receivedResult = await _webSocket.ReceiveAsync(new(_buffer), _cancellationToken);
Expand Down
Loading
Loading