Skip to content

Commit 1950121

Browse files
committed
add stream completion + fix conversation history
1 parent 08d51f2 commit 1950121

File tree

4 files changed

+192
-103
lines changed

4 files changed

+192
-103
lines changed

src/modules/llms/api/athropic.ts

Lines changed: 96 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import axios from 'axios'
2-
// import { type Readable } from 'stream'
1+
import axios, { type AxiosResponse } from 'axios'
2+
import { type Readable } from 'stream'
3+
import { GrammyError } from 'grammy'
4+
import { pino } from 'pino'
35

46
import config from '../../../config'
5-
import { type ChatConversation } from '../../types' // , type OnCallBackQueryData, type OnMessageContext,
7+
import { type OnCallBackQueryData, type OnMessageContext, type ChatConversation } from '../../types'
68
import { type LlmCompletion } from './llmApi'
79
import { LlmsModelsEnum } from '../types'
8-
// import { GrammyError } from 'grammy'
9-
import { pino } from 'pino'
1010

1111
const logger = pino({
1212
name: 'anthropic - llmsBot',
@@ -55,94 +55,95 @@ export const anthropicCompletion = async (
5555
price: 0
5656
}
5757
}
58-
// export const anthropicCompletion = async (
59-
// conversation: ChatConversation[],
60-
// model = LlmsModelsEnum.CLAUDE_OPUS,
61-
// ctx: OnMessageContext | OnCallBackQueryData,
62-
// msgId: number
63-
// ): Promise<LlmCompletion> => {
64-
// const data = {
65-
// model,
66-
// stream: true, // Set stream to true to receive the completion as a stream
67-
// system: config.openAi.chatGpt.chatCompletionContext,
68-
// max_tokens: +config.openAi.chatGpt.maxTokens,
69-
// messages: conversation
70-
// }
71-
// let wordCount = 0
72-
// let wordCountMinimum = 2
73-
// const url = `${API_ENDPOINT}/anthropic/completions`
74-
// if (!ctx.chat?.id) {
75-
// throw new Error('Context chat id should not be empty after openAI streaming')
76-
// }
77-
// const response = await axios.post(url, data, { responseType: 'stream' })
78-
79-
// // Create a Readable stream from the response
80-
// const completionStream: Readable = response.data
81-
82-
// // Read and process the stream
83-
// let completion = ''
84-
// let outputTokens = ''
85-
// let inputTokens = ''
86-
// completionStream.on('data', (chunk: any) => {
87-
// const sendMessage = async (completion: string): Promise<void> => {
88-
// await ctx.api
89-
// .editMessageText(ctx.chat?.id, msgId, completion)
90-
// .catch(async (e: any) => {
91-
// if (e instanceof GrammyError) {
92-
// if (e.error_code !== 400) {
93-
// throw e
94-
// } else {
95-
// logger.error(e)
96-
// }
97-
// } else {
98-
// throw e
99-
// }
100-
// })
101-
// }
102-
// const msg = chunk.toString()
103-
// if (msg) {
104-
// if (msg.startsWith('Input Token')) {
105-
// inputTokens = msg.split('Input Token: ')[1]
106-
// } else if (msg.startsWith('Text')) {
107-
// wordCount++
108-
// completion += msg.split('Text: ')[1]
109-
// if (wordCount > wordCountMinimum) { // if (chunck === '.' && wordCount > wordCountMinimum) {
110-
// if (wordCountMinimum < 64) {
111-
// wordCountMinimum *= 2
112-
// }
113-
// completion = completion.replaceAll('...', '')
114-
// completion += '...'
115-
// wordCount = 0
116-
// if (ctx.chat?.id) {
117-
// await sendMessage(completion)
118-
// }
119-
// }
120-
// } else if (msg.startsWith('Output Tokens')) {
121-
// outputTokens = msg.split('Output Tokens: ')[1]
122-
// }
123-
// }
124-
// })
125-
126-
// completionStream.on('end', () => {
127-
// const totalOutputTokens = outputTokens // response.headers['x-openai-output-tokens']
128-
// const totalInputTokens = inputTokens // response.headers['x-openai-input-tokens']
129-
// console.log('FCO stream', completion)
130-
// // You can also process the completion content here
13158

132-
// return {
133-
// completion: {
134-
// content: completion,
135-
// role: 'assistant',
136-
// model
137-
// },
138-
// usage: parseInt(totalOutputTokens, 10) + parseInt(totalInputTokens, 10),
139-
// price: 0
140-
// }
141-
// })
142-
143-
// return {
144-
// completion: undefined,
145-
// usage: 0,
146-
// price: 0
147-
// }
148-
// }
59+
export const anthropicStreamCompletion = async (
60+
conversation: ChatConversation[],
61+
model = LlmsModelsEnum.CLAUDE_OPUS,
62+
ctx: OnMessageContext | OnCallBackQueryData,
63+
msgId: number,
64+
limitTokens = true
65+
): Promise<LlmCompletion> => {
66+
const data = {
67+
model,
68+
stream: true, // Set stream to true to receive the completion as a stream
69+
system: config.openAi.chatGpt.chatCompletionContext,
70+
max_tokens: limitTokens ? +config.openAi.chatGpt.maxTokens : undefined,
71+
messages: conversation.map(m => { return { content: m.content, role: m.role } })
72+
}
73+
let wordCount = 0
74+
let wordCountMinimum = 2
75+
const url = `${API_ENDPOINT}/anthropic/completions`
76+
if (!ctx.chat?.id) {
77+
throw new Error('Context chat id should not be empty after openAI streaming')
78+
}
79+
const response: AxiosResponse = await axios.post(url, data, { responseType: 'stream' })
80+
// Create a Readable stream from the response
81+
const completionStream: Readable = response.data
82+
// Read and process the stream
83+
let completion = ''
84+
let outputTokens = ''
85+
let inputTokens = ''
86+
for await (const chunk of completionStream) {
87+
const msg = chunk.toString()
88+
if (msg) {
89+
if (msg.startsWith('Input Token')) {
90+
inputTokens = msg.split('Input Token: ')[1]
91+
} else if (msg.startsWith('Text')) {
92+
wordCount++
93+
completion += msg.split('Text: ')[1]
94+
if (wordCount > wordCountMinimum) { // if (chunck === '.' && wordCount > wordCountMinimum) {
95+
if (wordCountMinimum < 64) {
96+
wordCountMinimum *= 2
97+
}
98+
completion = completion.replaceAll('...', '')
99+
completion += '...'
100+
wordCount = 0
101+
if (ctx.chat?.id) {
102+
await ctx.api
103+
.editMessageText(ctx.chat?.id, msgId, completion)
104+
.catch(async (e: any) => {
105+
if (e instanceof GrammyError) {
106+
if (e.error_code !== 400) {
107+
throw e
108+
} else {
109+
logger.error(e)
110+
}
111+
} else {
112+
throw e
113+
}
114+
})
115+
}
116+
}
117+
} else if (msg.startsWith('Output Tokens')) {
118+
outputTokens = msg.split('Output Tokens: ')[1]
119+
}
120+
}
121+
}
122+
completion = completion.replaceAll('...', '')
123+
await ctx.api
124+
.editMessageText(ctx.chat?.id, msgId, completion)
125+
.catch((e: any) => {
126+
if (e instanceof GrammyError) {
127+
if (e.error_code !== 400) {
128+
throw e
129+
} else {
130+
logger.error(e)
131+
}
132+
} else {
133+
throw e
134+
}
135+
})
136+
const totalOutputTokens = outputTokens // response.headers['x-openai-output-tokens']
137+
const totalInputTokens = inputTokens // response.headers['x-openai-input-tokens']
138+
return {
139+
completion: {
140+
content: completion,
141+
role: 'assistant',
142+
model
143+
},
144+
usage: parseInt(totalOutputTokens, 10) + parseInt(totalInputTokens, 10),
145+
price: 0,
146+
inputTokens: parseInt(totalInputTokens, 10),
147+
outputTokens: parseInt(totalOutputTokens, 10)
148+
}
149+
}

src/modules/llms/api/llmApi.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import axios from 'axios'
22
import config from '../../../config'
33
import { type ChatConversation } from '../../types'
44
import pino from 'pino'
5-
import { LlmsModelsEnum } from '../types'
5+
import { LlmsModels, LlmsModelsEnum } from '../types'
6+
import { type ChatModel } from '../../open-ai/types'
67

78
const API_ENDPOINT = config.llms.apiEndpoint // config.llms.apiEndpoint // 'http://localhost:8080' // http://127.0.0.1:5000' // config.llms.apiEndpoint
89

@@ -18,6 +19,8 @@ export interface LlmCompletion {
1819
completion: ChatConversation | undefined
1920
usage: number
2021
price: number
22+
inputTokens?: number
23+
outputTokens?: number
2124
}
2225

2326
interface LlmAddUrlDocument {
@@ -33,6 +36,10 @@ interface QueryUrlDocument {
3336
conversation?: ChatConversation[]
3437
}
3538

39+
export const getChatModel = (modelName: string): ChatModel => {
40+
return LlmsModels[modelName]
41+
}
42+
3643
export const llmAddUrlDocument = async (args: LlmAddUrlDocument): Promise<string> => {
3744
const data = { ...args }
3845
const endpointUrl = `${API_ENDPOINT}/collections/document`

src/modules/llms/helpers.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import {
88
import { type ParseMode } from 'grammy/types'
99
import { LlmsModelsEnum } from './types'
1010
import { type Message } from 'grammy/out/types'
11-
import { llmAddUrlDocument } from './api/llmApi'
11+
import { type LlmCompletion, getChatModel, llmAddUrlDocument } from './api/llmApi'
12+
import { getChatModelPrice } from '../open-ai/api/openAi'
13+
import config from '../../config'
1214

1315
export enum SupportedCommands {
1416
bardF = 'bard',
@@ -213,11 +215,18 @@ export const hasPrefix = (prompt: string): string => {
213215
)
214216
}
215217

216-
export const getPromptPrice = (completion: string, data: ChatPayload): { price: number, promptTokens: number, completionTokens: number } => {
218+
export const getPromptPrice = (completion: LlmCompletion, data: ChatPayload): { price: number, promptTokens: number, completionTokens: number } => {
219+
const { ctx, model } = data
220+
const modelPrice = getChatModel(model)
221+
const price =
222+
getChatModelPrice(modelPrice, true, completion.inputTokens ?? 0, completion.outputTokens ?? 0) *
223+
config.openAi.chatGpt.priceAdjustment
224+
ctx.session.llms.usage += completion.outputTokens ?? 0
225+
ctx.session.llms.price += price
217226
return {
218-
price: 0,
219-
promptTokens: 10,
220-
completionTokens: 60
227+
price,
228+
promptTokens: completion.inputTokens ?? 0,
229+
completionTokens: completion.outputTokens ?? 0
221230
}
222231
}
223232

src/modules/llms/index.ts

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { sleep } from '../sd-images/utils'
1919
import {
2020
addDocToCollection,
2121
addUrlToCollection,
22+
getPromptPrice,
2223
hasBardPrefix,
2324
hasClaudeOpusPrefix,
2425
hasLlamaPrefix,
@@ -38,7 +39,7 @@ import * as Sentry from '@sentry/node'
3839
import { now } from '../../utils/perf'
3940
import { AxiosError } from 'axios'
4041
import OpenAI from 'openai'
41-
import { anthropicCompletion } from './api/athropic'
42+
import { anthropicCompletion, anthropicStreamCompletion } from './api/athropic'
4243
export class LlmsBot implements PayableBot {
4344
public readonly module = 'LlmsBot'
4445
private readonly logger: Logger
@@ -548,6 +549,72 @@ export class LlmsBot implements PayableBot {
548549
ctx.transient.analytics.actualResponseTime = now()
549550
}
550551

552+
private async completionGen (data: ChatPayload, msgId?: number, outputFormat = 'text'): Promise< { price: number, chat: ChatConversation[] }> {
553+
const { conversation, ctx, model } = data
554+
try {
555+
if (!msgId) {
556+
ctx.transient.analytics.firstResponseTime = now()
557+
msgId = (
558+
await ctx.reply('...', {
559+
message_thread_id:
560+
ctx.message?.message_thread_id ??
561+
ctx.message?.reply_to_message?.message_thread_id
562+
})
563+
).message_id
564+
}
565+
if (outputFormat === 'text') {
566+
const isTypingEnabled = config.openAi.chatGpt.isTypingEnabled
567+
if (isTypingEnabled) {
568+
ctx.chatAction = 'typing'
569+
}
570+
const completion = await anthropicStreamCompletion(
571+
conversation,
572+
model as LlmsModelsEnum,
573+
ctx,
574+
msgId,
575+
true // telegram messages has a character limit
576+
)
577+
if (isTypingEnabled) {
578+
ctx.chatAction = null
579+
}
580+
if (completion) {
581+
ctx.transient.analytics.sessionState = RequestState.Success
582+
ctx.transient.analytics.actualResponseTime = now()
583+
const price = getPromptPrice(completion, data)
584+
this.logger.info(
585+
`streamChatCompletion result = tokens: ${price.promptTokens + price.completionTokens} | ${model} | price: ${price.price}¢` // }
586+
)
587+
conversation.push({
588+
role: 'assistant',
589+
content: completion.completion?.content ?? ''
590+
})
591+
return {
592+
price: price.price,
593+
chat: conversation
594+
}
595+
}
596+
} else {
597+
const response = await anthropicCompletion(conversation, model as LlmsModelsEnum)
598+
conversation.push({
599+
role: 'assistant',
600+
content: response.completion?.content ?? ''
601+
})
602+
return {
603+
price: response.price,
604+
chat: conversation
605+
}
606+
}
607+
return {
608+
price: 0,
609+
chat: conversation
610+
}
611+
} catch (e: any) {
612+
Sentry.captureException(e)
613+
ctx.chatAction = null
614+
throw e
615+
}
616+
}
617+
551618
private async promptGen (data: ChatPayload): Promise<{ price: number, chat: ChatConversation[] }> {
552619
const { conversation, ctx, model } = data
553620
if (!ctx.chat?.id) {
@@ -686,7 +753,12 @@ export class LlmsBot implements PayableBot {
686753
model: model ?? config.llms.model,
687754
ctx
688755
}
689-
const result = await this.promptGen(payload)
756+
let result: { price: number, chat: ChatConversation[] } = { price: 0, chat: [] }
757+
if (model === LlmsModelsEnum.CLAUDE_OPUS || model === LlmsModelsEnum.CLAUDE_SONNET) {
758+
result = await this.completionGen(payload) // , prompt.msgId, prompt.outputFormat)
759+
} else {
760+
result = await this.promptGen(payload)
761+
}
690762
ctx.session.llms.chatConversation = [...result.chat]
691763
if (
692764
!(await this.payments.pay(ctx as OnMessageContext, result.price))

0 commit comments

Comments
 (0)