diff --git a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/LineReadingTcpServer.java b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/LineReadingTcpServer.java index 9f4028423a6..9f078e13861 100644 --- a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/LineReadingTcpServer.java +++ b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/LineReadingTcpServer.java @@ -30,7 +30,11 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -38,12 +42,13 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLHandshakeException; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.util.Closer; import org.apache.logging.log4j.status.StatusLogger; /** * A simple TCP server implementation reading the accepted connection's input stream into a blocking queue of lines. *

- * The implementation is thread-safe, yet connections are handled sequentially, i.e., no parallelization. + * The implementation is thread-safe. * The input stream of the connection is decoded in UTF-8. *

* This class can also be used for secure (i.e., SSL) connections. @@ -60,9 +65,11 @@ final class LineReadingTcpServer implements AutoCloseable { private ServerSocket serverSocket; - private Socket clientSocket; + private Map clientSocketMap; - private Thread readerThread; + private ExecutorService executorService; + + private Thread acceptThread; private final BlockingQueue lines = new LinkedBlockingQueue<>(); @@ -77,8 +84,10 @@ final class LineReadingTcpServer implements AutoCloseable { synchronized void start(final String name, final int port) throws IOException { if (!running) { running = true; + clientSocketMap = new ConcurrentHashMap(); + executorService = Executors.newCachedThreadPool(); serverSocket = createServerSocket(port); - readerThread = createReaderThread(name); + acceptThread = createAcceptThread(name); } } @@ -94,8 +103,8 @@ private ServerSocket createServerSocket(final int port) throws IOException { return serverSocket; } - private Thread createReaderThread(final String name) { - final String threadName = "LineReadingTcpSocketServerReader-" + name; + private Thread createAcceptThread(final String name) { + final String threadName = "LineReadingTcpSocketServerAcceptor-" + name; final Thread thread = new Thread(this::acceptClients, threadName); thread.setDaemon(true); // Avoid blocking JVM exit thread.setUncaughtExceptionHandler((ignored, error) -> LOGGER.error("uncaught reader thread exception", error)); @@ -106,28 +115,33 @@ private Thread createReaderThread(final String name) { private void acceptClients() { try { while (running) { - acceptClient(); + // Accept the client connection + final Socket clientSocket; + try { + clientSocket = serverSocket.accept(); + } catch (SocketException ignored) { + continue; + } + clientSocketMap.put(clientSocket.getRemoteSocketAddress().toString(), clientSocket); + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + readClient(clientSocket); + } catch (Exception e) { + // do nothing + } + } + }; + executorService.submit(runnable); } } catch (final Exception error) { LOGGER.error("failed accepting client connections", error); } } - private void acceptClient() throws Exception { - - // Accept the client connection - final Socket clientSocket; - try { - clientSocket = serverSocket.accept(); - } catch (SocketException ignored) { - return; - } + private void readClient(Socket clientSocket) throws Exception { clientSocket.setSoLinger(true, 0); // Enable immediate forceful close - synchronized (this) { - if (running) { - this.clientSocket = clientSocket; - } - } // Read from the client try (final InputStream clientInputStream = clientSocket.getInputStream(); @@ -137,7 +151,7 @@ private void acceptClient() throws Exception { while (running) { final String line = clientBufferedReader.readLine(); if (line == null) { - break; + continue; } lines.put(line); } @@ -155,13 +169,7 @@ private void acceptClient() throws Exception { // Clean up the client connection. finally { try { - synchronized (this) { - if (!clientSocket.isClosed()) { - clientSocket.shutdownOutput(); - clientSocket.close(); - } - this.clientSocket = null; - } + clientSocket.close(); } catch (final Exception error) { LOGGER.error("failed closing client socket", error); } @@ -172,30 +180,27 @@ private void acceptClient() throws Exception { public void close() throws Exception { // Stop the reader, if running - Thread stoppedReaderThread = null; + Thread stoppedAcceptThread = null; synchronized (this) { if (running) { running = false; - // `acceptClient()` might have closed the client socket due to a connection failure and haven't created - // a new one yet. Hence, here we double-check if the client connection is in place. - if (clientSocket != null && !clientSocket.isClosed()) { - // Interrupting a thread is not sufficient to unblock operations waiting on socket I/O: - // https://stackoverflow.com/a/4426050/1278899 Hence, here we close the client socket to unblock the - // read from the client socket. - clientSocket.close(); - } serverSocket.close(); - stoppedReaderThread = readerThread; - clientSocket = null; serverSocket = null; - readerThread = null; + stoppedAcceptThread = acceptThread; + acceptThread = null; + for (Map.Entry entry : clientSocketMap.entrySet()) { + Closer.closeSilently(entry.getValue()); + } + clientSocketMap.clear(); + clientSocketMap = null; + executorService.awaitTermination(0, TimeUnit.MILLISECONDS); } } // We wait for the termination of the reader thread outside the synchronized block. Otherwise, there is a chance // of deadlock with this `join()` and the synchronized block inside the `acceptClient()`. - if (stoppedReaderThread != null) { - stoppedReaderThread.join(); + if (stoppedAcceptThread != null) { + stoppedAcceptThread.join(); } } diff --git a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/MultipleSocketAppenderBuilderTest.java b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/MultipleSocketAppenderBuilderTest.java new file mode 100644 index 00000000000..e969b484c5a --- /dev/null +++ b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/MultipleSocketAppenderBuilderTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.core.appender; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class MultipleSocketAppenderBuilderTest { + @Test + public void test() { + Assertions.assertEquals(MultipleSocketAppender.newBuilder().getConnectTimeoutMillis(), 0); + Assertions.assertEquals(MultipleSocketAppender.newBuilder().getReconnectionDelayMillis(), 0); + Assertions.assertTrue(MultipleSocketAppender.newBuilder().getImmediateFlush()); + Assertions.assertTrue(MultipleSocketAppender.newBuilder().getImmediateFail()); + } +} diff --git a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/MultipleSocketAppenderTest.java b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/MultipleSocketAppenderTest.java new file mode 100644 index 00000000000..1e242bf9522 --- /dev/null +++ b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/MultipleSocketAppenderTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.core.appender; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder; +import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder; +import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory; +import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration; +import org.apache.logging.log4j.plugins.di.DI; +import org.apache.logging.log4j.status.StatusLogger; +import org.apache.logging.log4j.test.junit.UsingStatusListener; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +public class MultipleSocketAppenderTest { + + private static final String CLASS_NAME = MultipleSocketAppenderTest.class.getSimpleName(); + + private static final int EPHEMERAL_PORT = 0; + + private static final String APPENDER_NAME = "TestMultipleSocket"; + + private static final AtomicInteger LOGGER_CONTEXT_COUNTER = new AtomicInteger(); + + @Test + @UsingStatusListener + void test1() throws Exception { + try (final LineReadingTcpServer server = new LineReadingTcpServer()) { + server.start("Main", EPHEMERAL_PORT); + final String serverHost = server.getServerSocket().getInetAddress().getHostAddress(); + final int serverPort = server.getServerSocket().getLocalPort(); + final Configuration config = createConfiguration(serverHost + ":" + serverPort); + try (final LoggerContext loggerContext = createStartedLoggerContext(config)) { + final BufferingErrorHandler errorHandler = new BufferingErrorHandler(); + loggerContext.getConfiguration().getAppender(APPENDER_NAME).setHandler(errorHandler); + + StatusLogger.getLogger().info("verifyLoggingSuccess..."); + verifyLoggingSuccess(loggerContext, server, errorHandler); + + server.close(); + + StatusLogger.getLogger().info("verifyLoggingFailure..."); + verifyLoggingFailure(loggerContext, errorHandler); + + server.start("Main", serverPort); + // reconnecting + Thread.sleep(10000); + + StatusLogger.getLogger().info("verifyLoggingSuccess..."); + verifyLoggingSuccess(loggerContext, server, errorHandler); + } + } + } + + @Test + @UsingStatusListener + void test2() throws Exception { + try (final LineReadingTcpServer server = new LineReadingTcpServer()) { + server.start("Main", EPHEMERAL_PORT); + final String serverHost = server.getServerSocket().getInetAddress().getHostAddress(); + final int serverPort = server.getServerSocket().getLocalPort(); + final Configuration config = + createConfiguration(serverHost + ":" + serverPort + "," + serverHost + ":" + serverPort); + try (final LoggerContext loggerContext = createStartedLoggerContext(config)) { + final BufferingErrorHandler errorHandler = new BufferingErrorHandler(); + loggerContext.getConfiguration().getAppender(APPENDER_NAME).setHandler(errorHandler); + + StatusLogger.getLogger().info("verifyLoggingSuccess..."); + verifyLoggingSuccess(loggerContext, server, errorHandler); + + server.close(); + + StatusLogger.getLogger().info("verifyLoggingFailure..."); + verifyLoggingFailure(loggerContext, errorHandler); + + server.start("Main", serverPort); + // reconnecting + Thread.sleep(10000); + + StatusLogger.getLogger().info("verifyLoggingSuccess..."); + verifyLoggingSuccess(loggerContext, server, errorHandler); + } + } + } + + private static Configuration createConfiguration(String serverList) { + // Create the configuration builder + final ConfigurationBuilder configBuilder = + ConfigurationBuilderFactory.newConfigurationBuilder() + .setStatusLevel(Level.INFO) + .setConfigurationName(MultipleSocketAppenderTest.class.getSimpleName()); + // Create the appender configuration + final AppenderComponentBuilder appenderComponentBuilder = configBuilder + .newAppender(APPENDER_NAME, "MultipleSocket") + .addAttribute("serverList", serverList) + .addAttribute("ignoreExceptions", false) + .addAttribute("reconnectionDelayMillis", 5000) + .addAttribute("immediateFlush", true) + .add(configBuilder.newLayout("PatternLayout").addAttribute("pattern", "%m%n")); + // Create the configuration + return configBuilder + .add(appenderComponentBuilder) + .add(configBuilder.newRootLogger(Level.ALL).add(configBuilder.newAppenderRef(APPENDER_NAME))) + .build(false); + } + + private static LoggerContext createStartedLoggerContext(final Configuration configuration) { + final String name = String.format( + "%s-%02d", MultipleSocketAppenderTest.class.getSimpleName(), LOGGER_CONTEXT_COUNTER.getAndIncrement()); + final LoggerContext loggerContext = new LoggerContext(name, null, (String) null, DI.createInitializedFactory()); + loggerContext.start(configuration); + return loggerContext; + } + + private static void verifyLoggingSuccess( + final LoggerContext loggerContext, + final LineReadingTcpServer server, + final BufferingErrorHandler errorHandler) + throws Exception { + // Report status + StatusLogger.getLogger().debug("[{}] verifying logging success", CLASS_NAME); + // Create messages to log + final int messageCount = 2; + final List expectedMessages = IntStream.range(0, messageCount) + .mapToObj(messageIndex -> String.format("m%02d", messageIndex)) + .collect(Collectors.toList()); + // Log the 1st message + final Logger logger = loggerContext.getRootLogger(); + Awaitility.await("first socket append") + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(120, TimeUnit.SECONDS) + .ignoreExceptions() + .untilAsserted(() -> { + final String message = expectedMessages.get(0); + logger.info(message); + }); + // Reset the error handler + errorHandler.clear(); + // Log the rest of the messages + for (int messageIndex = 1; messageIndex < expectedMessages.size(); messageIndex++) { + final String message = expectedMessages.get(messageIndex); + logger.info(message); + } + // Verify the messages received by the server + final List actualMessages = server.pollLines(messageCount); + Assertions.assertThat(actualMessages).containsExactlyElementsOf(expectedMessages); + // Verify the error handler state + Assertions.assertThat(errorHandler.getBuffer()).isEmpty(); + } + + private static void verifyLoggingFailure( + final LoggerContext loggerContext, final BufferingErrorHandler errorHandler) { + StatusLogger.getLogger().debug("[{}] verifying logging failure", CLASS_NAME); + final Logger logger = loggerContext.getRootLogger(); + final int retryCount = 3; + for (int i = 0; i < retryCount; i++) { + try { + logger.info("should fail #" + i); + Assertions.fail("should have failed #" + i); + } catch (final AppenderLoggingException ignored) { + Assertions.assertThat(errorHandler.getBuffer()).hasSize(2 * (i + 1)); + } + } + } +} diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/MultipleSocketAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/MultipleSocketAppender.java new file mode 100644 index 00000000000..9ab375acbc5 --- /dev/null +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/MultipleSocketAppender.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.core.appender; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.core.AbstractLifeCycle; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.net.SocketOptions; +import org.apache.logging.log4j.plugins.Configurable; +import org.apache.logging.log4j.plugins.Plugin; +import org.apache.logging.log4j.plugins.PluginBuilderAttribute; +import org.apache.logging.log4j.plugins.PluginElement; +import org.apache.logging.log4j.plugins.PluginFactory; +import org.apache.logging.log4j.plugins.validation.constraints.Required; + +/** + * Sends log events over multiple sockets. + *

+ * for TCP only + */ +@Configurable(elementType = Appender.ELEMENT_TYPE, printObject = true) +@Plugin("MultipleSocket") +public final class MultipleSocketAppender extends AbstractAppender { + /** + * Builds MultipleSocketAppender instances. + */ + public static class Builder extends AbstractAppender.Builder + implements org.apache.logging.log4j.plugins.util.Builder { + /** + * address:port[,address:port] + */ + @PluginBuilderAttribute + @Required(message = "server list is required") + private String serverList; + + @PluginElement("SocketOptions") + private SocketOptions socketOptions; + + @PluginBuilderAttribute + private int connectTimeoutMillis = 0; + + @PluginBuilderAttribute + private int reconnectionDelayMillis = 0; + + /** + * for future versions + */ + @PluginBuilderAttribute + private boolean immediateFlush = true; + + @PluginBuilderAttribute + private boolean immediateFail = true; + + @Override + public MultipleSocketAppender build() { + final Layout layout = getLayout(); + if (layout == null) { + AbstractLifeCycle.LOGGER.error("No layout provided for MultipleSocketAppender"); + return null; + } + + final String name = getName(); + if (name == null) { + AbstractLifeCycle.LOGGER.error("No name provided for MultipleSocketAppender"); + return null; + } + + final MultipleSocketManager multipleSocketManager = new MultipleSocketManager( + getConfiguration().getLoggerContext(), + getName(), + getConfiguration(), + serverList, + socketOptions, + connectTimeoutMillis, + reconnectionDelayMillis, + immediateFlush, + immediateFail); + + final MultipleSocketAppender multipleSocketAppender = new MultipleSocketAppender( + getName(), + getLayout(), + getFilter(), + isIgnoreExceptions(), + multipleSocketManager, + getPropertyArray()); + + return multipleSocketAppender; + } + + public String getServerList() { + return serverList; + } + + public SocketOptions getSocketOptions() { + return socketOptions; + } + + public int getConnectTimeoutMillis() { + return connectTimeoutMillis; + } + + public int getReconnectionDelayMillis() { + return reconnectionDelayMillis; + } + + public boolean getImmediateFlush() { + return immediateFlush; + } + + public boolean getImmediateFail() { + return immediateFail; + } + + public Builder setServerList(final String serverList) { + this.serverList = serverList; + return asBuilder(); + } + + public Builder setSocketOptions(final SocketOptions socketOptions) { + this.socketOptions = socketOptions; + return asBuilder(); + } + + public Builder setConnectTimeoutMillis(final int connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + return asBuilder(); + } + + public Builder setReconnectionDelayMillis(final int reconnectionDelayMillis) { + this.reconnectionDelayMillis = reconnectionDelayMillis; + return asBuilder(); + } + + public Builder setImmediateFlush(final boolean immediateFlush) { + this.immediateFlush = immediateFlush; + return asBuilder(); + } + + public Builder setImmediateFail(final boolean immediateFail) { + this.immediateFail = immediateFail; + return asBuilder(); + } + } + + /** + * @return a builder for a MultipleSocketAppender. + */ + @PluginFactory + public static Builder newBuilder() { + return new Builder(); + } + + private final MultipleSocketManager manager; + + private MultipleSocketAppender( + final String name, + final Layout layout, + final Filter filter, + final boolean ignoreExceptions, + final MultipleSocketManager manager, + final Property[] properties) { + super(name, filter, layout, ignoreExceptions, properties); + Objects.requireNonNull(layout, "layout"); + this.manager = Objects.requireNonNull(manager, "manager"); + } + + @Override + public void start() { + super.start(); + } + + @Override + public void append(final LogEvent event) { + try { + manager.send(getLayout(), event); + } catch (final Exception e) { + error("Unable to send event in appender [" + getName() + "]", event, e); + throw new AppenderLoggingException(e); + } + } + + @Override + public boolean stop(final long timeout, final TimeUnit timeUnit) { + setStopping(); + boolean stopped = super.stop(timeout, timeUnit, false); + stopped &= manager.stop(timeout, timeUnit); + setStopped(); + return stopped; + } + + @Override + public String toString() { + return "MultipleSocketAppender{" + "name=" + getName() + ", state=" + getState() + '}'; + } +} diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/MultipleSocketManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/MultipleSocketManager.java new file mode 100644 index 00000000000..09e56f9c874 --- /dev/null +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/MultipleSocketManager.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.core.appender; + +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.net.SocketOptions; +import org.apache.logging.log4j.core.util.Closer; +import org.apache.logging.log4j.core.util.Log4jThread; + +public class MultipleSocketManager extends AbstractManager { + public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000; + + private final Random random = new Random(); + + private final Configuration configuration; + + private String serverListString; + + private String[] serverList; + + private String[] serverAddressList; + + private Integer[] serverPortList; + + private SocketOptions socketOptions; + + private Socket[] socketList; + + private final int connectTimeoutMillis; + + private final int reconnectionDelayMillis; + + /** + * for future versions + */ + private boolean immediateFlush; + + private boolean immediateFail; + + private ConnectThread connectThread; + + public MultipleSocketManager( + final LoggerContext loggerContext, + final String name, + final Configuration configuration, + final String serverListString, + final SocketOptions socketOptions, + final int connectTimeoutMillis, + final int reconnectionDelayMillis, + final boolean immediateFlush, + final boolean immediateFail) { + super(loggerContext, name); + this.configuration = Objects.requireNonNull(configuration); + this.serverListString = Objects.requireNonNull(serverListString); + try { + this.serverList = serverListString.split(","); + this.serverAddressList = new String[this.serverList.length]; + this.serverPortList = new Integer[this.serverList.length]; + for (int i = 0; i < this.serverList.length; ++i) { + String[] server = this.serverList[i].split(":"); + this.serverAddressList[i] = server[0]; + this.serverPortList[i] = Integer.valueOf(server[1]); + } + } catch (Exception e) { + LOGGER.error("parse server list failed. {}", serverListString, e); + throw new RuntimeException(e); + } + this.socketOptions = socketOptions; + this.socketList = new Socket[this.serverList.length]; + this.connectTimeoutMillis = connectTimeoutMillis; + if (reconnectionDelayMillis > 0) { + this.reconnectionDelayMillis = reconnectionDelayMillis; + } else { + this.reconnectionDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS; + } + this.immediateFlush = immediateFlush; + this.immediateFail = immediateFail; + this.connectThread = new ConnectThread(this); + this.connectThread.setDaemon(true); + this.connectThread.setPriority(Thread.MIN_PRIORITY); + try { + this.connectThread.connect(); + } catch (Exception e) { + LOGGER.error("connectThread.connect failed. {}", serverListString, e); + } + this.connectThread.start(); + } + + public void send(final Layout layout, final LogEvent event) throws Exception { + final byte[] byteArray = layout.toByteArray(event); + for (int i = 0; i < 2; ++i) { + // select socket randomly + final int index = random.nextInt(serverList.length); + synchronized (serverList[index]) { + if (socketList[index] == null && !immediateFail) { + continue; + } + try { + final OutputStream outputStream = socketList[index].getOutputStream(); + outputStream.write(byteArray); + // TODO + // immediateFlush + outputStream.flush(); + break; + } catch (final Exception e) { + LOGGER.error("outputStream.write failed. {} {} {}", index, socketList[index], i, e); + if (!immediateFail) { + continue; + } + throw e; + } + } + } + } + + @Override + public boolean releaseSub(final long timeout, final TimeUnit timeUnit) { + if (connectThread != null) { + connectThread.shutdown(); + connectThread.interrupt(); + connectThread = null; + } + boolean closed = true; + for (int i = 0; i < serverList.length; ++i) { + synchronized (serverList[i]) { + if (socketList[i] != null) { + LOGGER.debug("closing socket {} {} {}", i, serverList[i], socketList[i]); + try { + socketList[i].close(); + } catch (final Exception e) { + LOGGER.error("socket.close failed. {} {}", i, socketList[i], e); + closed = false; + } + socketList[i] = null; + } + } + } + return closed; + } + + /** + * Handles reconnecting to servers on a Thread. + */ + private class ConnectThread extends Log4jThread { + private final Random random = new Random(); + + private boolean shutdown = false; + + private final Object owner; + + public ConnectThread(final MultipleSocketManager owner) { + super("MultipleSocketManager-ConnectThread"); + this.owner = owner; + } + + public void shutdown() { + shutdown = true; + } + + @Override + public void run() { + while (!shutdown) { + try { + sleep(reconnectionDelayMillis + random.nextInt(reconnectionDelayMillis + 1)); + connect(); + } catch (final InterruptedException e) { + LOGGER.debug("Reconnection interrupted.", e); + } catch (final Exception e) { + LOGGER.debug("Unable to connect to {}", serverListString, e); + } + } + } + + void connect() throws Exception { + int successCount = 0; + for (int i = 0; i < serverList.length; ++i) { + LOGGER.debug("resolving server {} {}", i, serverList[i]); + List inetSocketAddressList = null; + try { + inetSocketAddressList = resolveServer(serverAddressList[i], serverPortList[i]); + } catch (Exception e) { + LOGGER.error("could not resolve server. {} {}", i, serverList[i], e); + continue; + } + for (InetSocketAddress inetSocketAddress : inetSocketAddressList) { + LOGGER.debug("creating socket {} {} {}", i, serverList[i], inetSocketAddress); + Socket newSocket = null; + try { + newSocket = createSocket(inetSocketAddress, socketOptions, connectTimeoutMillis, i); + } catch (Exception e) { + LOGGER.error("createSocket failed. {} {} {}", i, serverList[i], inetSocketAddress, e); + Closer.closeSilently(newSocket); + continue; + } + Socket oldSocket = null; + synchronized (serverList[i]) { + oldSocket = socketList[i]; + socketList[i] = newSocket; + } + if (oldSocket != null) { + LOGGER.debug("closing socket {} {} {}", i, serverList[i], oldSocket); + try { + oldSocket.close(); + } catch (final Exception e) { + LOGGER.error("socket.close failed. {} {} {}", i, serverList[i], oldSocket, e); + } + } + ++successCount; + break; + } + } + if (successCount < 1) { + throw new Exception("connect failed." + successCount + "/" + serverList.length); + } + } + + @Override + public String toString() { + return "ConnectThread [shutdown=" + shutdown + "]"; + } + } + + public List resolveServer(final String address, final int port) throws Exception { + final InetAddress[] inetAddressList = InetAddress.getAllByName(address); + final List inetSocketAddressList = new ArrayList<>(inetAddressList.length); + for (InetAddress inetAddress : inetAddressList) { + inetSocketAddressList.add(new InetSocketAddress(inetAddress, port)); + } + return inetSocketAddressList; + } + + protected static Socket createSocket( + final InetSocketAddress inetSocketAddress, + final SocketOptions socketOptions, + final int connectTimeoutMillis, + final int index) + throws Exception { + final Socket socket = new Socket(); + if (socketOptions != null) { + // Not sure which options must be applied before or after the connect() call. + socketOptions.apply(socket); + } + LOGGER.debug("connecting socket {} {} {}", index, inetSocketAddress.toString(), connectTimeoutMillis); + socket.connect(inetSocketAddress, connectTimeoutMillis); + if (socketOptions != null) { + // Not sure which options must be applied before or after the connect() call. + socketOptions.apply(socket); + } + return socket; + } +} diff --git a/log4j-parent/pom.xml b/log4j-parent/pom.xml index ca73d461df2..c85949361b4 100644 --- a/log4j-parent/pom.xml +++ b/log4j-parent/pom.xml @@ -104,6 +104,7 @@ 3.17.0 4.0.0 0.9.0 + 2.36.0 7.0.5 4.0.25 33.4.0-jre