Skip to content

Commit

Permalink
Merge pull request #9 from PaperMC/dev/3.0.0
Browse files Browse the repository at this point in the history
[pull] main from PaperMC:dev/3.0.0
  • Loading branch information
pull[bot] authored Feb 28, 2025
2 parents 99aa7f2 + d4ea40a commit 5937b4e
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ public boolean isForceKeyAuthentication() {
return forceKeyAuthentication;
}

public boolean isEnableReusePort() {
return advanced.isEnableReusePort();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down Expand Up @@ -716,6 +720,8 @@ private static class Advanced {
private boolean logPlayerConnections = true;
@Expose
private boolean acceptTransfers = false;
@Expose
private boolean enableReusePort = false;

private Advanced() {
}
Expand All @@ -741,6 +747,7 @@ private Advanced(CommentedConfig config) {
this.logCommandExecutions = config.getOrElse("log-command-executions", false);
this.logPlayerConnections = config.getOrElse("log-player-connections", true);
this.acceptTransfers = config.getOrElse("accepts-transfers", false);
this.enableReusePort = config.getOrElse("enable-reuse-port", false);
}
}

Expand Down Expand Up @@ -804,6 +811,10 @@ public boolean isAcceptTransfers() {
return this.acceptTransfers;
}

public boolean isEnableReusePort() {
return enableReusePort;
}

@Override
public String toString() {
return "Advanced{"
Expand All @@ -821,6 +832,7 @@ public String toString() {
+ ", logCommandExecutions=" + logCommandExecutions
+ ", logPlayerConnections=" + logPlayerConnections
+ ", acceptTransfers=" + acceptTransfers
+ ", enableReusePort=" + enableReusePort
+ '}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.velocitypowered.proxy.network;

import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.velocitypowered.api.event.proxy.ListenerBoundEvent;
import com.velocitypowered.api.event.proxy.ListenerCloseEvent;
import com.velocitypowered.api.network.ListenerType;
Expand All @@ -28,14 +30,17 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.unix.UnixChannelOption;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import java.net.InetSocketAddress;
import java.net.http.HttpClient;
import java.util.HashMap;
import java.util.Collection;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -49,7 +54,7 @@ public final class ConnectionManager {
private static final WriteBufferWaterMark SERVER_WRITE_MARK = new WriteBufferWaterMark(1 << 20,
1 << 21);
private static final Logger LOGGER = LogManager.getLogger(ConnectionManager.class);
private final Map<InetSocketAddress, Endpoint> endpoints = new HashMap<>();
private final Multimap<InetSocketAddress, Endpoint> endpoints = HashMultimap.create();
private final TransportType transportType;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
Expand Down Expand Up @@ -93,7 +98,6 @@ public void logChannelInformation() {
public void bind(final InetSocketAddress address) {
final ServerBootstrap bootstrap = new ServerBootstrap()
.channelFactory(this.transportType.serverSocketChannelFactory)
.group(this.bossGroup, this.workerGroup)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, SERVER_WRITE_MARK)
.childHandler(this.serverChannelInitializer.get())
.childOption(ChannelOption.TCP_NODELAY, true)
Expand All @@ -104,26 +108,50 @@ public void bind(final InetSocketAddress address) {
bootstrap.option(ChannelOption.TCP_FASTOPEN, 3);
}

bootstrap.bind()
.addListener((ChannelFutureListener) future -> {
final Channel channel = future.channel();
if (future.isSuccess()) {
this.endpoints.put(address, new Endpoint(channel, ListenerType.MINECRAFT));

// Warn people with console access that HAProxy is in use, see PR: #1436
if (this.server.getConfiguration().isProxyProtocol()) {
LOGGER.warn("Using HAProxy and listening on {}, please ensure this listener is adequately firewalled.", channel.localAddress());
}
if (server.getConfiguration().isEnableReusePort()) {
// We don't need a boss group, since each worker will bind to the socket
bootstrap.option(UnixChannelOption.SO_REUSEPORT, true)
.group(this.workerGroup);
} else {
bootstrap.group(this.bossGroup, this.workerGroup);
}

LOGGER.info("Listening on {}", channel.localAddress());
final int binds = server.getConfiguration().isEnableReusePort()
? ((MultithreadEventExecutorGroup) this.workerGroup).executorCount() : 1;

// Fire the proxy bound event after the socket is bound
server.getEventManager().fireAndForget(
new ListenerBoundEvent(address, ListenerType.MINECRAFT));
} else {
LOGGER.error("Can't bind to {}", address, future.cause());
}
});
for (int bind = 0; bind < binds; bind++) {
// Wait for each bind to open. If we encounter any errors, don't try to bind again.
int finalBind = bind;
ChannelFuture f = bootstrap.bind()
.addListener((ChannelFutureListener) future -> {
final Channel channel = future.channel();
if (future.isSuccess()) {
this.endpoints.put(address, new Endpoint(channel, ListenerType.MINECRAFT));

LOGGER.info("Listening on {}", channel.localAddress());

if (finalBind == 0) {
// Warn people with console access that HAProxy is in use, see PR: #1436
if (this.server.getConfiguration().isProxyProtocol()) {
LOGGER.warn(
"Using HAProxy and listening on {}, please ensure this listener is adequately firewalled.",
channel.localAddress());
}

// Fire the proxy bound event after the socket is bound
server.getEventManager().fireAndForget(
new ListenerBoundEvent(address, ListenerType.MINECRAFT));
}
} else {
LOGGER.error("Can't bind to {}", address, future.cause());
}
});
f.syncUninterruptibly();

if (!f.isSuccess()) {
break;
}
}
}

/**
Expand Down Expand Up @@ -181,17 +209,20 @@ public Bootstrap createWorker(@Nullable EventLoopGroup group) {
* @param oldBind the endpoint to close
*/
public void close(InetSocketAddress oldBind) {
Endpoint endpoint = endpoints.remove(oldBind);
Collection<Endpoint> endpoints = this.endpoints.removeAll(oldBind);
Preconditions.checkState(!endpoints.isEmpty(), "Endpoint was not registered");

ListenerType type = endpoints.iterator().next().getType();

// Fire proxy close event to notify plugins of socket close. We block since plugins
// should have a chance to be notified before the server stops accepting connections.
server.getEventManager().fire(new ListenerCloseEvent(oldBind, endpoint.getType())).join();

Channel serverChannel = endpoint.getChannel();
server.getEventManager().fire(new ListenerCloseEvent(oldBind, type)).join();

Preconditions.checkState(serverChannel != null, "Endpoint %s not registered", oldBind);
LOGGER.info("Closing endpoint {}", serverChannel.localAddress());
serverChannel.close().syncUninterruptibly();
for (Endpoint endpoint : endpoints) {
Channel serverChannel = endpoint.getChannel();
LOGGER.info("Closing endpoint {}", serverChannel.localAddress());
serverChannel.close().syncUninterruptibly();
}
}

/**
Expand All @@ -200,24 +231,28 @@ public void close(InetSocketAddress oldBind) {
* @param interrupt should closing forward interruptions
*/
public void closeEndpoints(boolean interrupt) {
for (final Map.Entry<InetSocketAddress, Endpoint> entry : this.endpoints.entrySet()) {
for (final Map.Entry<InetSocketAddress, Collection<Endpoint>> entry : this.endpoints.asMap()
.entrySet()) {
final InetSocketAddress address = entry.getKey();
final Endpoint endpoint = entry.getValue();
final Collection<Endpoint> endpoints = entry.getValue();
ListenerType type = endpoints.iterator().next().getType();

// Fire proxy close event to notify plugins of socket close. We block since plugins
// should have a chance to be notified before the server stops accepting connections.
server.getEventManager().fire(new ListenerCloseEvent(address, endpoint.getType())).join();

LOGGER.info("Closing endpoint {}", address);
if (interrupt) {
try {
endpoint.getChannel().close().sync();
} catch (final InterruptedException e) {
LOGGER.info("Interrupted whilst closing endpoint", e);
Thread.currentThread().interrupt();
server.getEventManager().fire(new ListenerCloseEvent(address, type)).join();

for (Endpoint endpoint : endpoints) {
LOGGER.info("Closing endpoint {}", address);
if (interrupt) {
try {
endpoint.getChannel().close().sync();
} catch (final InterruptedException e) {
LOGGER.info("Interrupted whilst closing endpoint", e);
Thread.currentThread().interrupt();
}
} else {
endpoint.getChannel().close().syncUninterruptibly();
}
} else {
endpoint.getChannel().close().syncUninterruptibly();
}
}
this.endpoints.clear();
Expand Down
6 changes: 6 additions & 0 deletions proxy/src/main/resources/default-velocity.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ log-player-connections = true
# Transfer packet (Minecraft 1.20.5) to be received.
accepts-transfers = false

# Enables support for SO_REUSEPORT. This may help the proxy scale better on multicore systems
# with a lot of incoming connections, and provide better CPU utilization than the existing
# strategy of having a single thread accepting connections and distributing them to worker
# threads. Disabled by default. Requires Linux or macOS.
enable-reuse-port = false

[query]
# Whether to enable responding to GameSpy 4 query responses or not.
enabled = false
Expand Down

0 comments on commit 5937b4e

Please sign in to comment.