Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3442] add MultipleSocketAppender #3482

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,25 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Closer;
import org.apache.logging.log4j.status.StatusLogger;

/**
* A simple TCP server implementation reading the accepted connection's input stream into a blocking queue of lines.
* <p>
* The implementation is thread-safe, yet connections are handled sequentially, i.e., no parallelization.
* The implementation is thread-safe.
* The input stream of the connection is decoded in UTF-8.
* </p><p>
* This class can also be used for secure (i.e., SSL) connections.
Expand All @@ -60,9 +65,11 @@ final class LineReadingTcpServer implements AutoCloseable {

private ServerSocket serverSocket;

private Socket clientSocket;
private Map<String, Socket> clientSocketMap;

private Thread readerThread;
private ExecutorService executorService;

private Thread acceptThread;

private final BlockingQueue<String> lines = new LinkedBlockingQueue<>();

Expand All @@ -77,8 +84,10 @@ final class LineReadingTcpServer implements AutoCloseable {
synchronized void start(final String name, final int port) throws IOException {
if (!running) {
running = true;
clientSocketMap = new ConcurrentHashMap<String, Socket>();
executorService = Executors.newCachedThreadPool();
serverSocket = createServerSocket(port);
readerThread = createReaderThread(name);
acceptThread = createAcceptThread(name);
}
}

Expand All @@ -94,8 +103,8 @@ private ServerSocket createServerSocket(final int port) throws IOException {
return serverSocket;
}

private Thread createReaderThread(final String name) {
final String threadName = "LineReadingTcpSocketServerReader-" + name;
private Thread createAcceptThread(final String name) {
final String threadName = "LineReadingTcpSocketServerAcceptor-" + name;
final Thread thread = new Thread(this::acceptClients, threadName);
thread.setDaemon(true); // Avoid blocking JVM exit
thread.setUncaughtExceptionHandler((ignored, error) -> LOGGER.error("uncaught reader thread exception", error));
Expand All @@ -106,28 +115,33 @@ private Thread createReaderThread(final String name) {
private void acceptClients() {
try {
while (running) {
acceptClient();
// Accept the client connection
final Socket clientSocket;
try {
clientSocket = serverSocket.accept();
} catch (SocketException ignored) {
continue;
}
clientSocketMap.put(clientSocket.getRemoteSocketAddress().toString(), clientSocket);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
readClient(clientSocket);
} catch (Exception e) {
// do nothing
}
}
};
executorService.submit(runnable);
}
} catch (final Exception error) {
LOGGER.error("failed accepting client connections", error);
}
}

private void acceptClient() throws Exception {

// Accept the client connection
final Socket clientSocket;
try {
clientSocket = serverSocket.accept();
} catch (SocketException ignored) {
return;
}
private void readClient(Socket clientSocket) throws Exception {
clientSocket.setSoLinger(true, 0); // Enable immediate forceful close
synchronized (this) {
if (running) {
this.clientSocket = clientSocket;
}
}

// Read from the client
try (final InputStream clientInputStream = clientSocket.getInputStream();
Expand All @@ -137,7 +151,7 @@ private void acceptClient() throws Exception {
while (running) {
final String line = clientBufferedReader.readLine();
if (line == null) {
break;
continue;
}
lines.put(line);
}
Expand All @@ -155,13 +169,7 @@ private void acceptClient() throws Exception {
// Clean up the client connection.
finally {
try {
synchronized (this) {
if (!clientSocket.isClosed()) {
clientSocket.shutdownOutput();
clientSocket.close();
}
this.clientSocket = null;
}
clientSocket.close();
} catch (final Exception error) {
LOGGER.error("failed closing client socket", error);
}
Expand All @@ -172,30 +180,27 @@ private void acceptClient() throws Exception {
public void close() throws Exception {

// Stop the reader, if running
Thread stoppedReaderThread = null;
Thread stoppedAcceptThread = null;
synchronized (this) {
if (running) {
running = false;
// `acceptClient()` might have closed the client socket due to a connection failure and haven't created
// a new one yet. Hence, here we double-check if the client connection is in place.
if (clientSocket != null && !clientSocket.isClosed()) {
// Interrupting a thread is not sufficient to unblock operations waiting on socket I/O:
// https://stackoverflow.com/a/4426050/1278899 Hence, here we close the client socket to unblock the
// read from the client socket.
clientSocket.close();
}
serverSocket.close();
stoppedReaderThread = readerThread;
clientSocket = null;
serverSocket = null;
readerThread = null;
stoppedAcceptThread = acceptThread;
acceptThread = null;
for (Map.Entry<String, Socket> entry : clientSocketMap.entrySet()) {
Closer.closeSilently(entry.getValue());
}
clientSocketMap.clear();
clientSocketMap = null;
executorService.awaitTermination(0, TimeUnit.MILLISECONDS);
}
}

// We wait for the termination of the reader thread outside the synchronized block. Otherwise, there is a chance
// of deadlock with this `join()` and the synchronized block inside the `acceptClient()`.
if (stoppedReaderThread != null) {
stoppedReaderThread.join();
if (stoppedAcceptThread != null) {
stoppedAcceptThread.join();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.logging.log4j.core.appender;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class MultipleSocketAppenderBuilderTest {
@Test
public void test() {
Assertions.assertEquals(MultipleSocketAppender.newBuilder().getConnectTimeoutMillis(), 0);
Assertions.assertEquals(MultipleSocketAppender.newBuilder().getReconnectionDelayMillis(), 0);
Assertions.assertTrue(MultipleSocketAppender.newBuilder().getImmediateFlush());
Assertions.assertTrue(MultipleSocketAppender.newBuilder().getImmediateFail());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.logging.log4j.core.appender;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder;
import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory;
import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
import org.apache.logging.log4j.plugins.di.DI;
import org.apache.logging.log4j.status.StatusLogger;
import org.apache.logging.log4j.test.junit.UsingStatusListener;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

public class MultipleSocketAppenderTest {

private static final String CLASS_NAME = MultipleSocketAppenderTest.class.getSimpleName();

private static final int EPHEMERAL_PORT = 0;

private static final String APPENDER_NAME = "TestMultipleSocket";

private static final AtomicInteger LOGGER_CONTEXT_COUNTER = new AtomicInteger();

@Test
@UsingStatusListener
void test1() throws Exception {
try (final LineReadingTcpServer server = new LineReadingTcpServer()) {
server.start("Main", EPHEMERAL_PORT);
final String serverHost = server.getServerSocket().getInetAddress().getHostAddress();
final int serverPort = server.getServerSocket().getLocalPort();
final Configuration config = createConfiguration(serverHost + ":" + serverPort);
try (final LoggerContext loggerContext = createStartedLoggerContext(config)) {
final BufferingErrorHandler errorHandler = new BufferingErrorHandler();
loggerContext.getConfiguration().getAppender(APPENDER_NAME).setHandler(errorHandler);

StatusLogger.getLogger().info("verifyLoggingSuccess...");
verifyLoggingSuccess(loggerContext, server, errorHandler);

server.close();

StatusLogger.getLogger().info("verifyLoggingFailure...");
verifyLoggingFailure(loggerContext, errorHandler);

server.start("Main", serverPort);
// reconnecting
Thread.sleep(10000);

StatusLogger.getLogger().info("verifyLoggingSuccess...");
verifyLoggingSuccess(loggerContext, server, errorHandler);
}
}
}

@Test
@UsingStatusListener
void test2() throws Exception {
try (final LineReadingTcpServer server = new LineReadingTcpServer()) {
server.start("Main", EPHEMERAL_PORT);
final String serverHost = server.getServerSocket().getInetAddress().getHostAddress();
final int serverPort = server.getServerSocket().getLocalPort();
final Configuration config =
createConfiguration(serverHost + ":" + serverPort + "," + serverHost + ":" + serverPort);
try (final LoggerContext loggerContext = createStartedLoggerContext(config)) {
final BufferingErrorHandler errorHandler = new BufferingErrorHandler();
loggerContext.getConfiguration().getAppender(APPENDER_NAME).setHandler(errorHandler);

StatusLogger.getLogger().info("verifyLoggingSuccess...");
verifyLoggingSuccess(loggerContext, server, errorHandler);

server.close();

StatusLogger.getLogger().info("verifyLoggingFailure...");
verifyLoggingFailure(loggerContext, errorHandler);

server.start("Main", serverPort);
// reconnecting
Thread.sleep(10000);

StatusLogger.getLogger().info("verifyLoggingSuccess...");
verifyLoggingSuccess(loggerContext, server, errorHandler);
}
}
}

private static Configuration createConfiguration(String serverList) {
// Create the configuration builder
final ConfigurationBuilder<BuiltConfiguration> configBuilder =
ConfigurationBuilderFactory.newConfigurationBuilder()
.setStatusLevel(Level.INFO)
.setConfigurationName(MultipleSocketAppenderTest.class.getSimpleName());
// Create the appender configuration
final AppenderComponentBuilder appenderComponentBuilder = configBuilder
.newAppender(APPENDER_NAME, "MultipleSocket")
.addAttribute("serverList", serverList)
.addAttribute("ignoreExceptions", false)
.addAttribute("reconnectionDelayMillis", 5000)
.addAttribute("immediateFlush", true)
.add(configBuilder.newLayout("PatternLayout").addAttribute("pattern", "%m%n"));
// Create the configuration
return configBuilder
.add(appenderComponentBuilder)
.add(configBuilder.newRootLogger(Level.ALL).add(configBuilder.newAppenderRef(APPENDER_NAME)))
.build(false);
}

private static LoggerContext createStartedLoggerContext(final Configuration configuration) {
final String name = String.format(
"%s-%02d", MultipleSocketAppenderTest.class.getSimpleName(), LOGGER_CONTEXT_COUNTER.getAndIncrement());
final LoggerContext loggerContext = new LoggerContext(name, null, (String) null, DI.createInitializedFactory());
loggerContext.start(configuration);
return loggerContext;
}

private static void verifyLoggingSuccess(
final LoggerContext loggerContext,
final LineReadingTcpServer server,
final BufferingErrorHandler errorHandler)
throws Exception {
// Report status
StatusLogger.getLogger().debug("[{}] verifying logging success", CLASS_NAME);
// Create messages to log
final int messageCount = 2;
final List<String> expectedMessages = IntStream.range(0, messageCount)
.mapToObj(messageIndex -> String.format("m%02d", messageIndex))
.collect(Collectors.toList());
// Log the 1st message
final Logger logger = loggerContext.getRootLogger();
Awaitility.await("first socket append")
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(120, TimeUnit.SECONDS)
.ignoreExceptions()
.untilAsserted(() -> {
final String message = expectedMessages.get(0);
logger.info(message);
});
// Reset the error handler
errorHandler.clear();
// Log the rest of the messages
for (int messageIndex = 1; messageIndex < expectedMessages.size(); messageIndex++) {
final String message = expectedMessages.get(messageIndex);
logger.info(message);
}
// Verify the messages received by the server
final List<String> actualMessages = server.pollLines(messageCount);
Assertions.assertThat(actualMessages).containsExactlyElementsOf(expectedMessages);
// Verify the error handler state
Assertions.assertThat(errorHandler.getBuffer()).isEmpty();
}

private static void verifyLoggingFailure(
final LoggerContext loggerContext, final BufferingErrorHandler errorHandler) {
StatusLogger.getLogger().debug("[{}] verifying logging failure", CLASS_NAME);
final Logger logger = loggerContext.getRootLogger();
final int retryCount = 3;
for (int i = 0; i < retryCount; i++) {
try {
logger.info("should fail #" + i);
Assertions.fail("should have failed #" + i);
} catch (final AppenderLoggingException ignored) {
Assertions.assertThat(errorHandler.getBuffer()).hasSize(2 * (i + 1));
}
}
}
}
Loading