Skip to content

Commit

Permalink
Merge branch 'master' into rxsocks
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Mar 4, 2025
2 parents 81ba5c4 + f1f9d0f commit 3123f63
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 32 deletions.
32 changes: 25 additions & 7 deletions rxlib/src/main/java/org/rx/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.rx.bean.$;
import org.rx.bean.RandomList;
import org.rx.bean.Tuple;
import org.rx.core.*;
Expand All @@ -12,7 +13,9 @@
import org.rx.net.TransportFlags;
import org.rx.net.dns.DnsClient;
import org.rx.net.dns.DnsServer;
import org.rx.net.http.*;
import org.rx.net.http.AuthenticProxy;
import org.rx.net.http.HttpClient;
import org.rx.net.http.HttpServer;
import org.rx.net.rpc.Remoting;
import org.rx.net.rpc.RpcClientConfig;
import org.rx.net.rpc.RpcServerConfig;
Expand All @@ -27,6 +30,8 @@
import org.rx.net.transport.TcpClientConfig;
import org.rx.net.transport.TcpServerConfig;
import org.rx.util.function.Action;
import org.rx.util.function.BiFunc;
import org.rx.util.function.Func;
import org.rx.util.function.TripleAction;

import java.math.BigInteger;
Expand All @@ -38,6 +43,7 @@
import java.util.Map;
import java.util.Objects;

import static org.rx.bean.$.$;
import static org.rx.core.Extends.*;
import static org.rx.core.Sys.toJsonString;
import static org.rx.core.Tasks.awaitQuietly;
Expand Down Expand Up @@ -110,6 +116,7 @@ static void launchClient(Map<String, String> options, Integer port, Integer conn
}

RandomList<UpstreamSupport> shadowServers = new RandomList<>();
$<UpstreamSupport> defSS = $();
RandomList<DnsServer.ResolveInterceptor> dnsInterceptors = new RandomList<>();
SocksConfig frontConf = new SocksConfig(port);
YamlConfiguration watcher = new YamlConfiguration("conf.yml").enableWatch();
Expand All @@ -129,17 +136,18 @@ static void launchClient(Map<String, String> options, Integer port, Integer conn
}
shadowServers.clear();
dnsInterceptors.clear();
int defW = 0;
for (AuthenticEndpoint shadowServer : svrs) {
RpcClientConfig<SocksSupport> rpcConf = RpcClientConfig.poolMode(Sockets.newEndpoint(shadowServer.getEndpoint(), shadowServer.getEndpoint().getPort() + 1),
conf.rpcMinSize, conf.rpcMaxSize);
TcpClientConfig tcpConfig = rpcConf.getTcpConfig();
tcpConfig.setTransportFlags(TransportFlags.BACKEND_AES_COMBO.flags());
String weight = shadowServer.getParameters().get("w");
if (Strings.isEmpty(weight)) {
int weight = Reflects.convertQuietly(shadowServer.getParameters().get("w"), int.class, 0);
if (weight <= 0) {
continue;
}
SocksSupport facade = Remoting.createFacade(SocksSupport.class, rpcConf);
shadowServers.add(new UpstreamSupport(shadowServer, new SocksSupport() {
UpstreamSupport upstream = new UpstreamSupport(shadowServer, new SocksSupport() {
@Override
public void fakeEndpoint(BigInteger hash, String realEndpoint) {
facade.fakeEndpoint(hash, realEndpoint);
Expand All @@ -157,7 +165,12 @@ public List<InetAddress> resolveHost(String host) {
public void addWhiteList(InetAddress endpoint) {
facade.addWhiteList(endpoint);
}
}), Integer.parseInt(weight));
});
shadowServers.add(upstream, weight);
if (defW < weight) {
defSS.v = upstream;
defW = weight;
}
}
dnsInterceptors.addAll(Linq.from(shadowServers).<DnsServer.ResolveInterceptor>select(UpstreamSupport::getSupport).toList());
log.info("reload svrs {}", toJsonString(svrs));
Expand Down Expand Up @@ -215,8 +228,13 @@ public void addWhiteList(InetAddress endpoint) {
e.setHandled(true);
}
};
BiFunc<SocksContext, Func<UpstreamSupport>> routerFn = e -> {
InetAddress srcHost = e.getSource().getAddress();
// String destHost = e.getFirstDestination().getHost();
return () -> shadowServers.next(srcHost, conf.steeringTTL, true);
};
frontSvr.onRoute.replace(firstRoute, (s, e) -> {
e.setUpstream(new Socks5Upstream(e.getFirstDestination(), frontConf, () -> shadowServers.next(e.getSource(), conf.steeringTTL, true)));
e.setUpstream(new Socks5Upstream(e.getFirstDestination(), frontConf, routerFn.apply(e)));
});
frontSvr.onUdpRoute.replace(firstRoute, (s, e) -> {
UnresolvedEndpoint dstEp = e.getFirstDestination();
Expand Down Expand Up @@ -246,7 +264,7 @@ public void addWhiteList(InetAddress endpoint) {
// }
// return;
// }
e.setUpstream(new Socks5UdpUpstream(dstEp, frontConf, () -> shadowServers.next(e.getSource(), conf.steeringTTL, true)));
e.setUpstream(new Socks5UdpUpstream(dstEp, frontConf, routerFn.apply(e)));
});
frontSvr.setAesRouter(SocksProxyServer.DNS_AES_ROUTER);
Main app = new Main(frontSvr);
Expand Down
26 changes: 26 additions & 0 deletions rxlib/src/main/java/org/rx/net/Sockets.java
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,32 @@ public static boolean isLanIp(InetAddress ip) {
return isBypass(RxConfig.INSTANCE.getNet().getLanIps(), hostAddress);
}

/**
* 10.0.0.0/8
* 172.16.0.0/12
* 192.168.0.0/16
*
* @param ip
* @return
*/
public static boolean isPrivateIp(InetAddress ip) {
byte[] ipBytes = ip.getAddress();
// 将字节数组转换为无符号整数,用于比较
int first = ipBytes[0] & 0xFF; // 第一段
int second = ipBytes[1] & 0xFF; // 第二段

// 检查是否在 10.0.0.0 - 10.255.255.255 范围内
if (first == 10) {
return true;
}
// 检查是否在 172.16.0.0 - 172.31.255.255 范围内
if (first == 172 && second >= 16 && second <= 31) {
return true;
}
// 检查是否在 192.168.0.0 - 192.168.255.255 范围内
return first == 192 && second == 168;
}

public static boolean isValidIp(String ip) {
return NetUtil.isValidIpV4Address(ip) || NetUtil.isValidIpV6Address(ip);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
log.debug("connect to backend {}[{}]", finalDestinationEp, realEp);
Channel outbound = f.channel();
outbound.pipeline().addLast(BackendRelayHandler.DEFAULT);

SocksSupport.ENDPOINT_TRACER.link(inbound, outbound);
outbound.pipeline().addLast(BackendRelayHandler.DEFAULT);
});

ctx.fireChannelRead(msg).pipeline().remove(this);
Expand Down
4 changes: 1 addition & 3 deletions rxlib/src/main/java/org/rx/net/socks/ProxyManageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import java.net.InetSocketAddress;

import static org.rx.core.Extends.tryAs;

@Slf4j
public class ProxyManageHandler extends ChannelTrafficShapingHandler {
public static ProxyManageHandler get(ChannelHandlerContext ctx) {
Expand All @@ -36,7 +34,7 @@ public ProxyManageHandler(Authenticator authenticator, long checkInterval) {

public void setUser(@NonNull SocksUser user, ChannelHandlerContext ctx) {
this.user = user;
InetSocketAddress realEp = (InetSocketAddress) SocksSupport.ENDPOINT_TRACER.head(ctx.channel());
InetSocketAddress realEp = SocksSupport.ENDPOINT_TRACER.head(ctx.channel());
info = user.getLoginIps().computeIfAbsent(realEp.getAddress(), SocksUser.LoginInfo::new);
if (user.getMaxIpCount() != -1 && user.getLoginIps().size() > user.getMaxIpCount()) {
log.error("SocksUser {} maxIpCount={}\nconnectedIps={} incomingIp={}", user.getUsername(), user.getMaxIpCount(), user.getLoginIps().keySet(), realEp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import lombok.extern.slf4j.Slf4j;
import org.rx.core.StringBuilder;
import org.rx.core.Tasks;
import org.rx.exception.TraceHandler;
import org.rx.net.AESCodec;
import org.rx.net.Sockets;
import org.rx.net.TransportFlags;
Expand Down Expand Up @@ -99,6 +98,7 @@ private void connect(Channel inbound, Socks5AddressType dstAddrType, SocksContex
return;
}
Channel outbound = f.channel();
SocksSupport.ENDPOINT_TRACER.link(inbound, outbound);
StringBuilder aesMsg = new StringBuilder();
Socks5ProxyHandler proxyHandler;
SocksConfig config = server.getConfig();
Expand Down
36 changes: 17 additions & 19 deletions rxlib/src/main/java/org/rx/net/support/EndpointTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,31 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;

import static org.rx.core.Sys.fastCacheKey;

@Slf4j
public final class EndpointTracer {
static final InetSocketAddress unknownAddr = Sockets.newAnyEndpoint(0);
final Cache<String, SocketAddress> index = Cache.getInstance(MemoryCache.class);
static final InetSocketAddress unknownEp = Sockets.newAnyEndpoint(0);
final Cache<InetSocketAddress, InetSocketAddress> index = Cache.getInstance(MemoryCache.class);

String key(SocketAddress sa) {
return fastCacheKey("EpTrace", sa);
InetSocketAddress key(SocketAddress sa) {
return ((InetSocketAddress) sa);
}

public void link(Channel inbound, Channel outbound) {
SocketAddress addr = index.get(key(outbound.localAddress()), k -> inbound.remoteAddress());
// log.info("EpTracer link {} <- {} {}", data.head, inbound, outbound);
InetSocketAddress source = index.get(key(inbound.remoteAddress()), k -> (InetSocketAddress) inbound.remoteAddress());
index.put(key(outbound.localAddress()), source);
log.info("EpTracer link {} <- ({} -> {})", Sockets.toString(source), inbound, outbound);
}

public SocketAddress head(Channel channel) {
//inbound channel
SocketAddress addr = index.get(key(channel.remoteAddress()));
if (addr == null) {
//outbound channel
addr = index.get(key(channel.localAddress()));
}
if (addr == null) {
addr = unknownAddr;
public InetSocketAddress head(Channel inbound) {
return head((InetSocketAddress) inbound.remoteAddress());
}

public InetSocketAddress head(InetSocketAddress remoteAddr) {
InetSocketAddress source = index.get(key(remoteAddr));
if (source == null || Sockets.isPrivateIp(source.getAddress())) {
source = unknownEp;
}
// log.info("EpTracer head {} <- {}", head, channel);
return addr;
log.info("EpTracer head {} <- ({})", Sockets.toString(source), remoteAddr);
return source;
}
}

0 comments on commit 3123f63

Please sign in to comment.