Skip to content

Commit 3e7b1cc

Browse files
committed
feat: 加入并发
1 parent 9cdd2bd commit 3e7b1cc

File tree

4 files changed

+78
-61
lines changed

4 files changed

+78
-61
lines changed

Diff for: README-zh.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Channel Monitor 是一个用于监控OneAPI/NewAPI渠道的工具,它直接读
1515
- [x] 支持排除不予监控的渠道和模型
1616
- [x] 支持间隔时间配置
1717
- [x] 支持多种数据库类型(MySQL、SQLite、PostgreSQL、SQL Server)
18-
- [ ] TODO: 多线程并发测试
18+
- [x] 并发测试
1919

2020

2121
## 安装

Diff for: README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Channel Monitor is a tool designed for monitoring OneAPI/NewAPI channels. It dir
1515
- [x] Support exclusion of channels and models from monitoring
1616
- [x] Support configurable intervals
1717
- [x] Support multiple database types, including MySQL, SQLite, PostgreSQL, and SQL Server
18-
- [ ] TODO: Multi-threaded concurrent testing
18+
- [x] Concurrent testing
1919

2020
## Installation
2121

Diff for: VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v0.3.1
1+
v0.3.2

Diff for: main.go

+75-58
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"log"
88
"net/http"
99
"strings"
10+
"sync"
1011
"time"
1112

1213
"gorm.io/gorm"
@@ -51,7 +52,7 @@ func fetchChannels() ([]Channel, error) {
5152
c.BaseURL = "https://api.openai.com"
5253
}
5354
}
54-
// 检查是否在排除列表中
55+
// 检查是否在排��列表中
5556
if contains(config.ExcludeChannel, c.ID) {
5657
log.Printf("渠道 %s(ID:%d) 在排除列表中,跳过\n", c.Name, c.ID)
5758
continue
@@ -81,7 +82,9 @@ func containsString(slice []string, item string) bool {
8182
return false
8283
}
8384

84-
func testModels(channel Channel) ([]string, error) {
85+
func testModels(channel Channel, wg *sync.WaitGroup, mu *sync.Mutex) {
86+
defer wg.Done()
87+
8588
var availableModels []string
8689
modelList := []string{}
8790
if config.ForceModels {
@@ -91,7 +94,8 @@ func testModels(channel Channel) ([]string, error) {
9194
// 从/v1/models接口获取模型列表
9295
req, err := http.NewRequest("GET", channel.BaseURL+"/v1/models", nil)
9396
if err != nil {
94-
return nil, fmt.Errorf("创建请求失败:%v", err)
97+
log.Printf("创建请求失败:%v\n", err)
98+
return
9599
}
96100
req.Header.Set("Authorization", "Bearer "+channel.Key)
97101

@@ -104,7 +108,8 @@ func testModels(channel Channel) ([]string, error) {
104108
defer resp.Body.Close()
105109
body, _ := ioutil.ReadAll(resp.Body)
106110
if resp.StatusCode != http.StatusOK {
107-
return nil, fmt.Errorf("获取模型列表失败,状态码:%d,响应:%s", resp.StatusCode, string(body))
111+
log.Printf("获取模型列表失败,状态码:%d,响应:%s\n", resp.StatusCode, string(body))
112+
return
108113
}
109114

110115
// 解析响应JSON
@@ -115,7 +120,8 @@ func testModels(channel Channel) ([]string, error) {
115120
}
116121

117122
if err := json.Unmarshal(body, &response); err != nil {
118-
return nil, fmt.Errorf("解析模型列表失败:%v", err)
123+
log.Printf("解析模型列表失败:%v\n", err)
124+
return
119125
}
120126
// 提取模型ID列表
121127
for _, model := range response.Data {
@@ -127,56 +133,74 @@ func testModels(channel Channel) ([]string, error) {
127133
}
128134
}
129135
}
130-
// 测试模型
136+
// 测试模型并发处理
137+
modelWg := sync.WaitGroup{}
138+
modelMu := sync.Mutex{}
131139
for _, model := range modelList {
132-
url := channel.BaseURL
133-
if !strings.Contains(channel.BaseURL, "/v1/chat/completions") {
134-
if !strings.HasSuffix(channel.BaseURL, "/chat") {
135-
if !strings.HasSuffix(channel.BaseURL, "/v1") {
136-
url += "/v1"
140+
modelWg.Add(1)
141+
go func(model string) {
142+
defer modelWg.Done()
143+
url := channel.BaseURL
144+
if !strings.Contains(channel.BaseURL, "/v1/chat/completions") {
145+
if !strings.HasSuffix(channel.BaseURL, "/chat") {
146+
if !strings.HasSuffix(channel.BaseURL, "/v1") {
147+
url += "/v1"
148+
}
149+
url += "/chat"
137150
}
138-
url += "/chat"
151+
url += "/completions"
139152
}
140-
url += "/completions"
141-
}
142153

143-
// 构造请求
144-
reqBody := map[string]interface{}{
145-
"model": model,
146-
"messages": []map[string]string{
147-
{"role": "user", "content": "Hello! Reply in short"},
148-
},
149-
}
150-
jsonData, _ := json.Marshal(reqBody)
154+
// 构造请求
155+
reqBody := map[string]interface{}{
156+
"model": model,
157+
"messages": []map[string]string{
158+
{"role": "user", "content": "Hello! Reply in short"},
159+
},
160+
}
161+
jsonData, _ := json.Marshal(reqBody)
151162

152-
log.Printf("测试渠道 %s(ID:%d) 的模型 %s\n", channel.Name, channel.ID, model)
163+
log.Printf("测试渠道 %s(ID:%d) 的模型 %s\n", channel.Name, channel.ID, model)
153164

154-
req, err := http.NewRequest("POST", url, strings.NewReader(string(jsonData)))
155-
if err != nil {
156-
log.Println("创建请求失败:", err)
157-
continue
158-
}
159-
req.Header.Set("Content-Type", "application/json")
160-
req.Header.Set("Authorization", "Bearer "+channel.Key)
165+
req, err := http.NewRequest("POST", url, strings.NewReader(string(jsonData)))
166+
if err != nil {
167+
log.Printf("创建请求失败:%v\n", err)
168+
return
169+
}
170+
req.Header.Set("Content-Type", "application/json")
171+
req.Header.Set("Authorization", "Bearer "+channel.Key)
161172

162-
client := &http.Client{Timeout: 10 * time.Second}
163-
resp, err := client.Do(req)
164-
if err != nil {
165-
log.Printf("\033[31m请求失败:%v\033[0m\n", err)
166-
continue
167-
}
168-
defer resp.Body.Close()
173+
client := &http.Client{Timeout: 10 * time.Second}
174+
resp, err := client.Do(req)
175+
if err != nil {
176+
log.Printf("\033[31m请求失败:%v\033[0m\n", err)
177+
return
178+
}
179+
defer resp.Body.Close()
169180

170-
body, _ := ioutil.ReadAll(resp.Body)
171-
if resp.StatusCode == http.StatusOK {
172-
// 根据返回内容判断是否成功
173-
availableModels = append(availableModels, model)
174-
log.Printf("\033[32m渠道 %s(ID:%d) 的模型 %s 测试成功\033[0m\n", channel.Name, channel.ID, model)
175-
} else {
176-
log.Printf("\033[31m渠道 %s(ID:%d) 的模型 %s 测试失败,状态码:%d,响应:%s\033[0m\n", channel.Name, channel.ID, model, resp.StatusCode, string(body))
177-
}
181+
body, _ := ioutil.ReadAll(resp.Body)
182+
if resp.StatusCode == http.StatusOK {
183+
// 根据返回内容判断是否成功
184+
modelMu.Lock()
185+
availableModels = append(availableModels, model)
186+
modelMu.Unlock()
187+
log.Printf("\033[32m渠道 %s(ID:%d) 的模型 %s 测试成功\033[0m\n", channel.Name, channel.ID, model)
188+
} else {
189+
log.Printf("\033[31m渠道 %s(ID:%d) 的模型 %s 测试失败,状态码:%d,响应:%s\033[0m\n", channel.Name, channel.ID, model, resp.StatusCode, string(body))
190+
}
191+
}(model)
192+
}
193+
modelWg.Wait()
194+
195+
// 更新模型
196+
mu.Lock()
197+
err := updateModels(channel.ID, availableModels)
198+
mu.Unlock()
199+
if err != nil {
200+
log.Printf("\033[31m更新渠道 %s(ID:%d) 的模型失败:%v\033[0m\n", channel.Name, channel.ID, err)
201+
} else {
202+
log.Printf("渠道 %s(ID:%d) 可用模型:%v\n", channel.Name, channel.ID, availableModels)
178203
}
179-
return availableModels, nil
180204
}
181205

182206
func updateModels(channelID int, models []string) error {
@@ -349,23 +373,16 @@ func main() {
349373
continue
350374
}
351375

376+
var wg sync.WaitGroup
377+
var mu sync.Mutex
352378
for _, channel := range channels {
353379
if channel.Name == "refresh" {
354380
continue
355381
}
356-
log.Printf("开始测试渠道 %s(ID:%d) 的模型\n", channel.Name, channel.ID)
357-
models, err := testModels(channel)
358-
if err != nil {
359-
log.Printf("\033[31m渠道 %s(ID:%d) 测试模型失败:%v\033[0m\n", channel.Name, channel.ID, err)
360-
continue
361-
}
362-
err = updateModels(channel.ID, models)
363-
if err != nil {
364-
log.Printf("\033[31m更新渠道 %s(ID:%d) 的模型失败:%v\033[0m\n", channel.Name, channel.ID, err)
365-
} else {
366-
log.Printf("渠道 %s(ID:%d) 可用模型:%v\n", channel.Name, channel.ID, models)
367-
}
382+
wg.Add(1)
383+
go testModels(channel, &wg, &mu)
368384
}
385+
wg.Wait()
369386

370387
// 等待下一个周期
371388
<-ticker.C

0 commit comments

Comments
 (0)