Skip to content

Commit 7e1be4e

Browse files
committed
tmp
1 parent d5f4ab2 commit 7e1be4e

File tree

2 files changed

+68
-11
lines changed

2 files changed

+68
-11
lines changed

src/frontend/src/session.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,15 +1577,15 @@ impl SessionManagerImpl {
15771577
}
15781578

15791579
// Check HBA configuration for LDAP authentication
1580-
let connection_type = match peer_addr.as_ref() {
1581-
Address::Tcp(_) => ConnectionType::Host,
1582-
Address::Unix(_) => ConnectionType::Local,
1583-
};
1584-
1585-
let client_addr = match peer_addr.as_ref() {
1586-
Address::Tcp(socket_addr) => Some(&socket_addr.ip()),
1587-
_ => None,
1580+
let (connection_type, client_addr) = match peer_addr.as_ref() {
1581+
Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1582+
Address::Unix(_) => (ConnectionType::Local, None),
15881583
};
1584+
tracing::debug!(
1585+
"receive connection: type={:?}, client_addr={:?}",
1586+
connection_type,
1587+
client_addr
1588+
);
15891589

15901590
let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
15911591
&connection_type,

src/utils/pgwire/src/pg_server.rs

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,35 @@ pub async fn pg_serve(
285285
let listener = Listener::bind(addr).await?;
286286
tracing::info!(addr, "server started");
287287

288+
// FIXME: only for debugging, remove it after hba config feature is complete.
289+
// check and create another unix socket if addr is binding locally with a port
290+
let extra_unix_listener = match &listener {
291+
Listener::Tcp(_) => {
292+
let host_and_port = addr.split(':').collect::<Vec<_>>();
293+
let port = if host_and_port.len() == 2 {
294+
host_and_port[1].parse::<i16>().ok()
295+
} else {
296+
None
297+
};
298+
if let Some(port) = port {
299+
let local_unix_addr = format!("unix:/tmp/.s.PGSQL.{}", port);
300+
match Listener::bind(&local_unix_addr).await {
301+
Ok(l) => {
302+
tracing::info!(local_unix_addr, "also bind a local unix socket");
303+
Some(l)
304+
}
305+
Err(e) => {
306+
tracing::warn!(error = %e.as_report(), local_unix_addr, "failed to bind a local unix socket");
307+
None
308+
}
309+
}
310+
} else {
311+
None
312+
}
313+
}
314+
Listener::Unix(_) => None,
315+
};
316+
288317
let acceptor_runtime = BackgroundShutdownRuntime::from({
289318
let mut builder = tokio::runtime::Builder::new_multi_thread();
290319
builder.worker_threads(1);
@@ -300,17 +329,20 @@ pub async fn pg_serve(
300329
#[cfg(madsim)]
301330
let worker_runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
302331
let session_mgr_clone = session_mgr.clone();
332+
let worker_runtime_clone = worker_runtime.clone();
333+
let context_clone = context.clone();
334+
let tcp_keepalive_clone = tcp_keepalive.clone();
303335
let f = async move {
304336
loop {
305-
let conn_ret = listener.accept(&tcp_keepalive).await;
337+
let conn_ret = listener.accept(&tcp_keepalive_clone).await;
306338
match conn_ret {
307339
Ok((stream, peer_addr)) => {
308340
tracing::info!(%peer_addr, "accept connection");
309-
worker_runtime.spawn(handle_connection(
341+
worker_runtime_clone.spawn(handle_connection(
310342
stream,
311343
session_mgr_clone.clone(),
312344
Arc::new(peer_addr),
313-
context.clone(),
345+
context_clone.clone(),
314346
));
315347
}
316348

@@ -322,6 +354,31 @@ pub async fn pg_serve(
322354
};
323355
acceptor_runtime.spawn(f);
324356

357+
if let Some(local_unix_listener) = extra_unix_listener {
358+
let session_mgr_clone = session_mgr.clone();
359+
let f = async move {
360+
loop {
361+
let conn_ret = local_unix_listener.accept(&tcp_keepalive).await;
362+
match conn_ret {
363+
Ok((stream, peer_addr)) => {
364+
tracing::info!(%peer_addr, "accept connection from local unix socket");
365+
worker_runtime.spawn(handle_connection(
366+
stream,
367+
session_mgr_clone.clone(),
368+
Arc::new(peer_addr),
369+
context.clone(),
370+
));
371+
}
372+
373+
Err(e) => {
374+
tracing::error!(error = %e.as_report(), "failed to accept connection from local unix socket",);
375+
}
376+
}
377+
}
378+
};
379+
acceptor_runtime.spawn(f);
380+
}
381+
325382
// Wait for the shutdown signal.
326383
shutdown.cancelled().await;
327384

0 commit comments

Comments
 (0)