Skip to content

Commit 24ceae1

Browse files
authored
Optimize thread concurrency (#5514)
* refactor * fix core tests, --filter=[core][unit] * skip nts, --filter=[] * optimize code
1 parent 84601ce commit 24ceae1

File tree

9 files changed

+182
-17
lines changed

9 files changed

+182
-17
lines changed

ext-src/php_swoole.cc

+6-2
Original file line numberDiff line numberDiff line change
@@ -1095,8 +1095,6 @@ PHP_RSHUTDOWN_FUNCTION(swoole) {
10951095

10961096
rshutdown_callbacks.execute();
10971097

1098-
swoole_event_free();
1099-
11001098
php_swoole_server_rshutdown();
11011099
php_swoole_http_server_rshutdown();
11021100
php_swoole_websocket_server_rshutdown();
@@ -1110,6 +1108,8 @@ PHP_RSHUTDOWN_FUNCTION(swoole) {
11101108
php_swoole_thread_rshutdown();
11111109
#endif
11121110

1111+
swoole_event_free();
1112+
11131113
SWOOLE_G(req_status) = PHP_SWOOLE_RSHUTDOWN_END;
11141114

11151115
#ifdef PHP_STREAM_FLAG_NO_CLOSE
@@ -1507,5 +1507,9 @@ static PHP_FUNCTION(swoole_test_fn) {
15071507
if (SW_STRCASEEQ(test_case, test_case_len, "fatal_error")) {
15081508
swoole_fatal_error(SW_ERROR_FOR_TEST, "test");
15091509
php_printf("never be executed here\n");
1510+
} else if (SW_STRCASEEQ(test_case, test_case_len, "bailout")) {
1511+
zend_bailout();
1512+
} else if (SW_STRCASEEQ(test_case, test_case_len, "abort")) {
1513+
abort();
15101514
}
15111515
}

ext-src/swoole_http_server.cc

+17-6
Original file line numberDiff line numberDiff line change
@@ -394,10 +394,15 @@ bool swoole_http_server_onBeforeRequest(HttpContext *ctx) {
394394
ctx->onBeforeRequest = nullptr;
395395
ctx->onAfterResponse = swoole_http_server_onAfterResponse;
396396
Server *serv = (Server *) ctx->private_data;
397-
sw_worker()->concurrency++;
398-
sw_atomic_add_fetch(&serv->gs->concurrency, 1);
397+
if (!sw_server() || !sw_worker() || SwooleWG.shutdown) {
398+
return false;
399+
}
400+
401+
auto worker = sw_worker();
399402
swoole_trace("serv->gs->concurrency=%u, max_concurrency=%u", serv->gs->concurrency, serv->gs->max_concurrency);
400-
if (sw_worker()->concurrency > serv->worker_max_concurrency) {
403+
sw_atomic_add_fetch(&serv->gs->concurrency, 1);
404+
worker->concurrency++;
405+
if (worker->concurrency > serv->worker_max_concurrency) {
401406
swoole_trace_log(SW_TRACE_COROUTINE,
402407
"exceed worker_max_concurrency[%u] limit, request[%p] queued",
403408
serv->worker_max_concurrency,
@@ -412,12 +417,18 @@ bool swoole_http_server_onBeforeRequest(HttpContext *ctx) {
412417
void swoole_http_server_onAfterResponse(HttpContext *ctx) {
413418
ctx->onAfterResponse = nullptr;
414419
Server *serv = (Server *) ctx->private_data;
415-
sw_worker()->concurrency--;
416-
sw_atomic_sub_fetch(&serv->gs->concurrency, 1);
420+
if (!sw_server() || !sw_worker() || SwooleWG.shutdown) {
421+
return;
422+
}
423+
424+
auto worker = sw_worker();
417425
swoole_trace("serv->gs->concurrency=%u, max_concurrency=%u", serv->gs->concurrency, serv->gs->max_concurrency);
426+
sw_atomic_sub_fetch(&serv->gs->concurrency, 1);
427+
worker->concurrency--;
428+
418429
if (!queued_http_contexts.empty()) {
419430
HttpContext *ctx = queued_http_contexts.front();
420-
swoole_trace("[POP 1] concurrency=%u, ctx=%p, request=%p", sw_worker()->concurrency, ctx, ctx->request.zobject);
431+
swoole_trace("[POP 1] concurrency=%u, ctx=%p, request=%p", worker->concurrency, ctx, ctx->request.zobject);
421432
queued_http_contexts.pop();
422433
swoole_event_defer(
423434
[](void *private_data) {

ext-src/swoole_server.cc

+6
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ void php_swoole_server_rshutdown() {
100100
serv->drain_worker_pipe();
101101

102102
if (serv->is_started() && serv->worker_is_running() && !serv->is_user_worker()) {
103+
SwooleWG.shutdown = true;
104+
#ifdef SW_THREAD
105+
if (serv->is_thread_mode()) {
106+
serv->abort_worker(sw_worker());
107+
}
108+
#endif
103109
if (php_swoole_is_fatal_error()) {
104110
swoole_error_log(SW_LOG_ERROR,
105111
SW_ERROR_PHP_FATAL_ERROR,

include/swoole_server.h

+1
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,7 @@ class Server {
991991
Connection *add_connection(ListenPort *ls, network::Socket *_socket, int server_fd);
992992
void abort_connection(Reactor *reactor, ListenPort *ls, network::Socket *_socket);
993993
void abort_worker(Worker *worker);
994+
void reset_worker_counter(Worker *worker);
994995
int connection_incoming(Reactor *reactor, Connection *conn);
995996

996997
int get_idle_worker_num();

src/reactor/base.cc

+4-4
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ ReactorImpl *make_reactor_kqueue(Reactor *_reactor, int max_events);
4848

4949
ReactorImpl *make_reactor_select(Reactor *_reactor);
5050

51-
void ReactorImpl::after_removal_failure(network::Socket *_socket) {
51+
void ReactorImpl::after_removal_failure(Socket *_socket) {
5252
if (!_socket->silent_remove) {
5353
swoole_sys_warning("failed to delete events[fd=%d#%d, type=%d, events=%d]",
5454
_socket->fd,
@@ -297,7 +297,7 @@ ssize_t Reactor::_write(Reactor *reactor, Socket *socket, const void *buf, size_
297297
return write_func(reactor, socket, n, send_fn, append_fn);
298298
}
299299

300-
ssize_t Reactor::_writev(Reactor *reactor, network::Socket *socket, const iovec *iov, size_t iovcnt) {
300+
ssize_t Reactor::_writev(Reactor *reactor, Socket *socket, const iovec *iov, size_t iovcnt) {
301301
#ifdef SW_USE_OPENSSL
302302
if (socket->ssl) {
303303
swoole_error_log(SW_LOG_WARNING, SW_ERROR_OPERATION_NOT_SUPPORT, "does not support SSL");
@@ -359,13 +359,13 @@ int Reactor::_writable_callback(Reactor *reactor, Event *ev) {
359359
return SW_OK;
360360
}
361361

362-
void Reactor::drain_write_buffer(swSocket *socket) {
362+
void Reactor::drain_write_buffer(Socket *socket) {
363363
Event event = {};
364364
event.socket = socket;
365365
event.fd = socket->fd;
366366

367367
while (!Buffer::empty(socket->out_buffer)) {
368-
if (socket->wait_event(network::Socket::default_write_timeout, SW_EVENT_WRITE) == SW_ERR) {
368+
if (socket->wait_event(Socket::default_write_timeout, SW_EVENT_WRITE) == SW_ERR) {
369369
break;
370370
}
371371
_writable_callback(this, &event);

src/server/master.cc

+15-5
Original file line numberDiff line numberDiff line change
@@ -1899,11 +1899,10 @@ void Server::abort_connection(Reactor *reactor, ListenPort *ls, Socket *_socket)
18991899
}
19001900
}
19011901

1902-
void Server::abort_worker(Worker *worker) {
1903-
// see https://github.com/swoole/swoole-src/issues/5407
1904-
// see https://github.com/swoole/swoole-src/issues/5432
1902+
// see https://github.com/swoole/swoole-src/issues/5407
1903+
// see https://github.com/swoole/swoole-src/issues/5432
1904+
void Server::reset_worker_counter(Worker *worker) {
19051905
auto value = worker->concurrency;
1906-
19071906
if (value > 0 && sw_atomic_value_cmp_set(&worker->concurrency, value, 0) == value) {
19081907
sw_atomic_sub_fetch(&gs->concurrency, value);
19091908
if ((int) gs->concurrency < 0) {
@@ -1913,14 +1912,25 @@ void Server::abort_worker(Worker *worker) {
19131912
worker->request_count = 0;
19141913
worker->response_count = 0;
19151914
worker->dispatch_count = 0;
1915+
}
1916+
1917+
void Server::abort_worker(Worker *worker) {
1918+
reset_worker_counter(worker);
19161919

1917-
if (!is_process_mode()) {
1920+
if (is_base_mode()) {
19181921
SW_LOOP_N(SW_SESSION_LIST_SIZE) {
19191922
Session *session = get_session(i);
19201923
if (session->reactor_id == worker->id) {
19211924
session->fd = 0;
19221925
}
19231926
}
1927+
} else if (is_thread_mode()) {
1928+
sw_reactor()->destroyed = true;
1929+
foreach_connection([this, worker](Connection *conn) {
1930+
if (conn->reactor_id == worker->id) {
1931+
close(conn->session_id, true);
1932+
}
1933+
});
19241934
}
19251935
}
19261936

src/server/task_worker.cc

+1
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ static void TaskWorker_onStart(ProcessPool *pool, Worker *worker) {
270270
} else {
271271
SwooleWG.run_always = true;
272272
}
273+
SwooleWG.shutdown = false;
273274
}
274275

275276
static void TaskWorker_onStop(ProcessPool *pool, Worker *worker) {

src/server/worker.cc

+13
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,11 @@ void Server::stop_async_worker(Worker *worker) {
382382
worker->status = SW_WORKER_EXIT;
383383
Reactor *reactor = SwooleTG.reactor;
384384

385+
SwooleWG.shutdown = true;
386+
if (worker->type == SW_PROCESS_EVENTWORKER) {
387+
reset_worker_counter(worker);
388+
}
389+
385390
/**
386391
* force to end.
387392
*/
@@ -435,6 +440,14 @@ void Server::stop_async_worker(Worker *worker) {
435440
if (gs->event_workers.push_message(SW_WORKER_MESSAGE_STOP, &msg, sizeof(msg)) < 0) {
436441
swoole_sys_warning("failed to push WORKER_STOP message");
437442
}
443+
} else if (is_thread_mode()) {
444+
foreach_connection([this, reactor](Connection *conn) {
445+
if (conn->reactor_id == reactor->id) {
446+
close(conn->session_id, true);
447+
}
448+
});
449+
} else {
450+
assert(0);
438451
}
439452

440453
reactor->set_wait_exit(true);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
--TEST--
2+
swoole_http_server: reset concurrency [SWOOLE_THREAD]
3+
--SKIPIF--
4+
<?php require __DIR__ . '/../../include/skipif.inc';
5+
skip_if_nts();
6+
?>
7+
--FILE--
8+
<?php
9+
require __DIR__ . '/../../include/bootstrap.php';
10+
11+
use Swoole\Coroutine\Http\Client;
12+
use Swoole\Http\Server;
13+
use Swoole\Http\Request;
14+
use Swoole\Http\Response;
15+
use Swoole\Thread\Atomic;
16+
use Swoole\Thread;
17+
use function Swoole\Coroutine\run;
18+
19+
const N = 64;
20+
const WORKER_NUM = 4;
21+
22+
$port = get_constant_port(__FILE__);
23+
24+
$serv = new Swoole\Http\Server('127.0.0.1', $port, SWOOLE_THREAD);
25+
$serv->set(array(
26+
'worker_num' => WORKER_NUM,
27+
'max_concurrency' => 160,
28+
'log_level' => SWOOLE_LOG_ERROR,
29+
'log_file' => '/dev/null',
30+
'init_arguments' => function () {
31+
global $queue, $atomic1, $atomic2;
32+
$queue = new Swoole\Thread\Queue();
33+
$atomic1 = new Swoole\Thread\Atomic(0);
34+
$atomic2 = new Swoole\Thread\Atomic(0);
35+
return [$queue, $atomic1, $atomic2];
36+
}
37+
));
38+
$serv->on('WorkerStart', function (Server $serv, $workerId) use ($port) {
39+
[$queue, $atomic1, $atomic2] = Thread::getArguments();
40+
if ($atomic1->add() == WORKER_NUM) {
41+
$queue->push("begin\n", Thread\Queue::NOTIFY_ALL);
42+
}
43+
});
44+
$serv->on('WorkerStop', function (Server $serv, $workerId) {
45+
echo 'WORKER STOP', PHP_EOL;
46+
});
47+
$serv->on('pipeMessage', function (Server $serv, $wid, $msg) {
48+
swoole_test_fn('bailout');
49+
});
50+
$serv->on('Request', function (Request $req, Response $resp) use ($serv) {
51+
[$queue, $atomic1, $atomic2] = Thread::getArguments();
52+
$c = $atomic2->add();
53+
if ($c < N) {
54+
Co::sleep(100);
55+
} elseif ($c == N) {
56+
$stats = $serv->stats();
57+
Assert::eq($stats['concurrency'], N);
58+
$wid = $serv->getWorkerId();
59+
for ($i = 0; $i < WORKER_NUM; $i++) {
60+
if ($i !== $wid) {
61+
$serv->sendMessage('error', $i);
62+
}
63+
}
64+
swoole_test_fn('bailout');
65+
} else {
66+
$stats = $serv->stats();
67+
Assert::eq($stats['concurrency'], 1);
68+
$resp->end(json_encode($stats));
69+
}
70+
});
71+
$serv->on('shutdown', function () {
72+
global $queue, $atomic1, $atomic2;
73+
echo 'SHUTDOWN', PHP_EOL;
74+
Assert::eq($atomic1->get(), WORKER_NUM * 2);
75+
Assert::eq($atomic2->get(), N + 1);
76+
});
77+
$serv->addProcess(new Swoole\Process(function ($process) use ($serv, $port) {
78+
[$queue, $atomic1, $atomic2] = Thread::getArguments();
79+
$queue->pop(-1);
80+
run(function () use ($port, $serv, $atomic1, $queue) {
81+
$n = N;
82+
$coroutines = [];
83+
while ($n--) {
84+
$coroutines[] = go(function () use ($port) {
85+
$client = new Client('127.0.0.1', $port);
86+
$client->set(['timeout' => 10]);
87+
Assert::eq($client->get('/'), false);
88+
Assert::eq($client->getStatusCode(), SWOOLE_HTTP_CLIENT_ESTATUS_SERVER_RESET);
89+
});
90+
}
91+
92+
Co::join($coroutines);
93+
94+
while (1) {
95+
if ($atomic1->get() == WORKER_NUM * 2) {
96+
break;
97+
}
98+
Co::sleep(0.1);
99+
}
100+
101+
$client = new Client('127.0.0.1', $port);
102+
Assert::assert($client->get('/'));
103+
$stats = json_decode($client->getBody());
104+
Assert::eq($stats->concurrency, 1);
105+
$serv->shutdown();
106+
107+
echo "DONE\n";
108+
});
109+
$serv->shutdown();
110+
}));
111+
$serv->start();
112+
?>
113+
--EXPECT--
114+
DONE
115+
WORKER STOP
116+
WORKER STOP
117+
WORKER STOP
118+
WORKER STOP
119+
SHUTDOWN

0 commit comments

Comments
 (0)