diff --git a/libs.gradle b/libs.gradle index 365ee1a..d6958eb 100644 --- a/libs.gradle +++ b/libs.gradle @@ -13,6 +13,8 @@ ext { annotations: 'com.google.code.findbugs:jsr305:3.0.2', guava : 'com.google.guava:guava:30.1-jre', lang3 : 'org.apache.commons:commons-lang3:3.11', + netty : 'io.netty:netty-all:4.1.22.Final', + reactor : 'io.projectreactor:reactor-core:3.4.5' ] libs.logger = [ diff --git a/protocol/build.gradle b/protocol/build.gradle index 4e4b0da..4677d8a 100644 --- a/protocol/build.gradle +++ b/protocol/build.gradle @@ -1,7 +1,8 @@ apply from: rootDir.toPath().resolve('logic.gradle').toFile() dependencies { - implementation 'io.netty:netty-all:4.1.22.Final' + implementation libs.netty + implementation libs.reactor implementation libs.guava testImplementation libs.lang3 diff --git a/protocol/src/main/java/mc/protocol/ChannelContext.java b/protocol/src/main/java/mc/protocol/ChannelContext.java new file mode 100644 index 0000000..ba85b49 --- /dev/null +++ b/protocol/src/main/java/mc/protocol/ChannelContext.java @@ -0,0 +1,20 @@ +package mc.protocol; + +import io.netty.channel.ChannelHandlerContext; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import mc.protocol.packets.Packet; + +@RequiredArgsConstructor +public class ChannelContext

{ + + @Getter + private final ChannelHandlerContext ctx; + + @Getter + private final P packet; + + public void setState(State state) { + ctx.channel().attr(NetworkAttributes.STATE).set(state); + } +} diff --git a/protocol/src/main/java/mc/protocol/NettyServer.java b/protocol/src/main/java/mc/protocol/NettyServer.java new file mode 100644 index 0000000..b2f20fc --- /dev/null +++ b/protocol/src/main/java/mc/protocol/NettyServer.java @@ -0,0 +1,43 @@ +package mc.protocol; + +import io.netty.bootstrap.ServerBootstrap; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import mc.protocol.di.DaggerProtocolComponent; +import mc.protocol.di.ProtocolComponent; +import mc.protocol.packets.Packet; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +import java.util.Map; + +@SuppressWarnings("rawtypes") +@Slf4j +@RequiredArgsConstructor +public class NettyServer { + + private final ServerBootstrap serverBootstrap; + private final Map, Sinks.Many> observedMap; + + public void bind(String host, int port) { + log.info("Network starting: {}:{}", host, port); + + try { + serverBootstrap.bind(host, port).sync().channel().closeFuture().sync(); + } catch (InterruptedException e) { + if (log.isTraceEnabled()) { + log.trace("{}: {}", e.getClass().getSimpleName(), e.getMessage(), e); + } + } + } + + @SuppressWarnings("unchecked") + public

Flux> packetFlux(Class

packetClass) { + return observedMap.get(packetClass).asFlux().map(ChannelContext.class::cast); + } + + public static NettyServer createServer() { + ProtocolComponent component = DaggerProtocolComponent.create(); + return component.getNettyServer(); + } +} diff --git a/protocol/src/main/java/mc/protocol/PacketInboundHandler.java b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java new file mode 100644 index 0000000..451f3b7 --- /dev/null +++ b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java @@ -0,0 +1,21 @@ +package mc.protocol; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.RequiredArgsConstructor; +import mc.protocol.packets.Packet; +import reactor.core.publisher.Sinks; + +import java.util.Map; + +@SuppressWarnings("rawtypes") +@RequiredArgsConstructor +public class PacketInboundHandler extends SimpleChannelInboundHandler { + + private final Map, Sinks.Many> observedMap; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Packet packet) { + observedMap.get(packet.getClass()).tryEmitNext(new ChannelContext<>(ctx, packet)); + } +} diff --git a/protocol/src/main/java/mc/protocol/State.java b/protocol/src/main/java/mc/protocol/State.java index b9b9d53..01c2971 100644 --- a/protocol/src/main/java/mc/protocol/State.java +++ b/protocol/src/main/java/mc/protocol/State.java @@ -55,6 +55,7 @@ public enum State { @Getter private final int id; + @Getter private final BiMap> serverBoundPackets; private final BiMap> clientBoundPackets; diff --git a/protocol/src/main/java/mc/protocol/di/ProtocolComponent.java b/protocol/src/main/java/mc/protocol/di/ProtocolComponent.java new file mode 100644 index 0000000..a433211 --- /dev/null +++ b/protocol/src/main/java/mc/protocol/di/ProtocolComponent.java @@ -0,0 +1,11 @@ +package mc.protocol.di; + +import dagger.Component; +import mc.protocol.NettyServer; + +@Component(modules = ProtocolModule.class) +@ServerScope +public interface ProtocolComponent { + + NettyServer getNettyServer(); +} diff --git a/server/src/main/java/mc/server/di/NetworkModule.java b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java similarity index 52% rename from server/src/main/java/mc/server/di/NetworkModule.java rename to protocol/src/main/java/mc/protocol/di/ProtocolModule.java index 851566e..3ee3ba3 100644 --- a/server/src/main/java/mc/server/di/NetworkModule.java +++ b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java @@ -1,5 +1,6 @@ -package mc.server.di; +package mc.protocol.di; +import com.google.common.collect.ImmutableMap; import dagger.Module; import dagger.Provides; import io.netty.bootstrap.ServerBootstrap; @@ -11,28 +12,29 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import lombok.extern.slf4j.Slf4j; +import mc.protocol.ChannelContext; +import mc.protocol.NettyServer; +import mc.protocol.PacketInboundHandler; +import mc.protocol.State; import mc.protocol.io.codec.ProtocolDecoder; import mc.protocol.io.codec.ProtocolEncoder; import mc.protocol.io.codec.ProtocolSplitter; -import mc.server.network.Server; -import mc.server.network.netty.handler.HandshakeHandler; -import mc.server.network.netty.NettyServer; -import mc.server.network.netty.handler.KeepAliveHandler; -import mc.server.network.netty.handler.LoginHandler; -import mc.server.network.netty.handler.StatusHandler; +import mc.protocol.packets.Packet; +import reactor.core.publisher.Sinks; import javax.inject.Provider; import java.util.LinkedHashMap; import java.util.Map; +import java.util.stream.Stream; @Module -@Slf4j -public class NetworkModule { +public class ProtocolModule { + @SuppressWarnings("rawtypes") @Provides - Server provideServer(ServerBootstrap serverBootstrap) { - return new NettyServer(serverBootstrap); + NettyServer provideServer(ServerBootstrap serverBootstrap, + Map, Sinks.Many> observedMap) { + return new NettyServer(serverBootstrap, observedMap); } @Provides @@ -47,7 +49,9 @@ public class NetworkModule { } @Provides - ChannelInitializer provideChannelChannelInitializer(Provider> channelHandlerMapProvider) { + ChannelInitializer provideChannelChannelInitializer( + Provider> channelHandlerMapProvider) { + return new ChannelInitializer<>() { @Override protected void initChannel(SocketChannel socketChannel) { @@ -57,36 +61,32 @@ public class NetworkModule { }; } + @SuppressWarnings("rawtypes") @Provides Map provideChannelHandlerMap( - Provider statusHandlerProvider, - Provider loginHandlerProvider, - Provider keepAliveHandlerProvider - ) { + Map, Sinks.Many> observedMap) { + Map map = new LinkedHashMap<>(); + map.put("packet_splitter", new ProtocolSplitter()); map.put("logger", new LoggingHandler(LogLevel.DEBUG)); - map.put("protocol_splitter", new ProtocolSplitter()); - map.put("protocol_decoder", new ProtocolDecoder(true)); - map.put("protocol_encoder", new ProtocolEncoder()); - map.put("handshake_handler", new HandshakeHandler( - statusHandlerProvider, loginHandlerProvider, keepAliveHandlerProvider)); + map.put("packet_decoder", new ProtocolDecoder(true)); + map.put("packet_encoder", new ProtocolEncoder()); + map.put("packet_handler", new PacketInboundHandler(observedMap)); return map; } + @SuppressWarnings("rawtypes") @Provides - StatusHandler provideStatusHandler() { - return new StatusHandler(); - } + @ServerScope + Map, Sinks.Many> provideObservedMap() { + ImmutableMap.Builder, Sinks.Many> builder = ImmutableMap.builder(); - @Provides - LoginHandler provideLoginHandler() { - return new LoginHandler(); - } + Stream.of(State.values()) + .flatMap(state -> state.getServerBoundPackets().values().stream()) + .forEach(packetClass -> builder.put(packetClass, Sinks.many().multicast().directBestEffort())); - @Provides - KeepAliveHandler provideKeepAliveHandler() { - return new KeepAliveHandler(); + return builder.build(); } } diff --git a/protocol/src/main/java/mc/protocol/di/ServerScope.java b/protocol/src/main/java/mc/protocol/di/ServerScope.java new file mode 100644 index 0000000..b90e94e --- /dev/null +++ b/protocol/src/main/java/mc/protocol/di/ServerScope.java @@ -0,0 +1,10 @@ +package mc.protocol.di; + +import javax.inject.Scope; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Scope +@Retention(RetentionPolicy.RUNTIME) +public @interface ServerScope { +} diff --git a/server/build.gradle b/server/build.gradle index 68acb11..491cc7b 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -15,9 +15,7 @@ dependencies { implementation libs.logger.logback - implementation platform('io.projectreactor:reactor-bom:2020.0.6') - implementation 'io.projectreactor:reactor-core' - - implementation 'io.netty:netty-all:4.1.22.Final' + implementation libs.netty + implementation libs.reactor implementation libs.guava } diff --git a/server/src/main/java/mc/server/Main.java b/server/src/main/java/mc/server/Main.java index a435547..d0a445b 100644 --- a/server/src/main/java/mc/server/Main.java +++ b/server/src/main/java/mc/server/Main.java @@ -1,9 +1,13 @@ package mc.server; import lombok.extern.slf4j.Slf4j; -import mc.server.di.DaggerNetworkComponent; -import mc.server.di.NetworkComponent; -import mc.server.network.Server; +import mc.protocol.NettyServer; +import mc.protocol.packets.PingPacket; +import mc.protocol.packets.client.HandshakePacket; +import mc.protocol.packets.client.LoginStartPacket; +import mc.protocol.packets.client.StatusServerRequest; +import mc.protocol.packets.server.DisconnectPacket; +import mc.protocol.packets.server.StatusServerResponse; @Slf4j public class Main { @@ -11,8 +15,49 @@ public class Main { public static void main(String[] args) { log.info("hello"); - NetworkComponent networkComponent = DaggerNetworkComponent.create(); - Server server = networkComponent.getServer(); + NettyServer server = NettyServer.createServer(); + + server.packetFlux(HandshakePacket.class) + .doOnNext(channel -> log.info("{}", channel.getPacket())) + .subscribe(channel -> channel.setState(channel.getPacket().getNextState())); + + server.packetFlux(PingPacket.class) + .doOnNext(channel -> log.info("{}", channel.getPacket())) + .subscribe(channel -> channel.getCtx().writeAndFlush(channel.getPacket()).channel().disconnect()); + + server.packetFlux(StatusServerRequest.class) + .doOnNext(channel -> log.info("{}", channel.getPacket())) + .subscribe(channel -> { + StatusServerResponse response = new StatusServerResponse(); + response.setInfo("{\n" + + " \"version\": {\n" + + " \"name\": \"1.12.2\",\n" + + " \"protocol\": 340\n" + + " },\n" + + " \"players\": {\n" + + " \"max\": 0,\n" + + " \"online\": 0,\n" + + " \"sample\": []\n" + + " },\n" + + " \"description\": {\n" + + " \"text\": \"Hello world\"\n" + + " }\n" + + "}"); + + channel.getCtx().writeAndFlush(response); + }); + + server.packetFlux(LoginStartPacket.class) + .doOnNext(channel -> log.info("{}", channel.getPacket())) + .subscribe(channel -> { + DisconnectPacket disconnectPacket = new DisconnectPacket(); + disconnectPacket.setReason("{\n" + + " \"text\": \"Server is not available.\"\n" + + "}"); + + channel.getCtx().writeAndFlush(disconnectPacket).channel().disconnect(); + }); + server.bind("127.0.0.1", 25565); } } diff --git a/server/src/main/java/mc/server/di/NetworkComponent.java b/server/src/main/java/mc/server/di/NetworkComponent.java deleted file mode 100644 index ae73ed9..0000000 --- a/server/src/main/java/mc/server/di/NetworkComponent.java +++ /dev/null @@ -1,10 +0,0 @@ -package mc.server.di; - -import dagger.Component; -import mc.server.network.Server; - -@Component(modules = NetworkModule.class) -public interface NetworkComponent { - - Server getServer(); -} diff --git a/server/src/main/java/mc/server/network/Server.java b/server/src/main/java/mc/server/network/Server.java deleted file mode 100644 index b346a17..0000000 --- a/server/src/main/java/mc/server/network/Server.java +++ /dev/null @@ -1,6 +0,0 @@ -package mc.server.network; - -public interface Server { - - void bind(String host, int port); -} diff --git a/server/src/main/java/mc/server/network/netty/NettyServer.java b/server/src/main/java/mc/server/network/netty/NettyServer.java deleted file mode 100644 index 881914c..0000000 --- a/server/src/main/java/mc/server/network/netty/NettyServer.java +++ /dev/null @@ -1,26 +0,0 @@ -package mc.server.network.netty; - -import io.netty.bootstrap.ServerBootstrap; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import mc.server.network.Server; - -@Slf4j -@RequiredArgsConstructor -public class NettyServer implements Server { - - private final ServerBootstrap serverBootstrap; - - @Override - public void bind(String host, int port) { - log.info("Network starting: {}:{}", host, port); - - try { - serverBootstrap.bind(host, port).sync().channel().closeFuture().sync(); - } catch (InterruptedException e) { - if (log.isTraceEnabled()) { - log.trace("{}: {}", e.getClass().getSimpleName(), e.getMessage(), e); - } - } - } -} diff --git a/server/src/main/java/mc/server/network/netty/handler/AbstractPacketHandler.java b/server/src/main/java/mc/server/network/netty/handler/AbstractPacketHandler.java deleted file mode 100644 index 3d2d900..0000000 --- a/server/src/main/java/mc/server/network/netty/handler/AbstractPacketHandler.java +++ /dev/null @@ -1,17 +0,0 @@ -package mc.server.network.netty.handler; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import mc.protocol.packets.Packet; - -public abstract class AbstractPacketHandler

extends SimpleChannelInboundHandler { - - @SuppressWarnings("unchecked") - @Override - protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception { - channelRead1(ctx, (P) msg); - } - - @SuppressWarnings("java:S112") - protected abstract void channelRead1(ChannelHandlerContext ctx, P packet) throws Exception; -} diff --git a/server/src/main/java/mc/server/network/netty/handler/HandshakeHandler.java b/server/src/main/java/mc/server/network/netty/handler/HandshakeHandler.java deleted file mode 100644 index 02b19e0..0000000 --- a/server/src/main/java/mc/server/network/netty/handler/HandshakeHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -package mc.server.network.netty.handler; - -import io.netty.channel.ChannelHandlerContext; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import mc.protocol.NetworkAttributes; -import mc.protocol.State; -import mc.protocol.packets.client.HandshakePacket; - -import javax.inject.Provider; - -@Slf4j -@RequiredArgsConstructor -public class HandshakeHandler extends AbstractPacketHandler { - - private final Provider statusHandlerProvider; - private final Provider loginHandlerProvider; - private final Provider keepAliveHandlerProvider; - - @Override - protected void channelRead1(ChannelHandlerContext ctx, HandshakePacket packet) { - log.info("{}", packet); - - ctx.channel().attr(NetworkAttributes.STATE).set(packet.getNextState()); - - if (State.STATUS == packet.getNextState()) { - ctx.pipeline().replace("handshake_handler", "status_handler", statusHandlerProvider.get()); - ctx.pipeline().addAfter("status_handler", "keepalive_handler", keepAliveHandlerProvider.get()); - } else if (State.LOGIN == packet.getNextState()) { - ctx.channel().pipeline().replace("handshake_handler", "login_handler", loginHandlerProvider.get()); - } - } -} diff --git a/server/src/main/java/mc/server/network/netty/handler/KeepAliveHandler.java b/server/src/main/java/mc/server/network/netty/handler/KeepAliveHandler.java deleted file mode 100644 index ea69c2c..0000000 --- a/server/src/main/java/mc/server/network/netty/handler/KeepAliveHandler.java +++ /dev/null @@ -1,16 +0,0 @@ -package mc.server.network.netty.handler; - -import io.netty.channel.ChannelHandlerContext; -import lombok.extern.slf4j.Slf4j; -import mc.protocol.packets.PingPacket; - -@Slf4j -public class KeepAliveHandler extends AbstractPacketHandler { - - @Override - protected void channelRead1(ChannelHandlerContext ctx, PingPacket packet) { - log.info("{}", packet); - - ctx.writeAndFlush(packet).channel().disconnect(); - } -} diff --git a/server/src/main/java/mc/server/network/netty/handler/LoginHandler.java b/server/src/main/java/mc/server/network/netty/handler/LoginHandler.java deleted file mode 100644 index 8a0d92c..0000000 --- a/server/src/main/java/mc/server/network/netty/handler/LoginHandler.java +++ /dev/null @@ -1,18 +0,0 @@ -package mc.server.network.netty.handler; - -import io.netty.channel.ChannelHandlerContext; -import mc.protocol.packets.client.LoginStartPacket; -import mc.protocol.packets.server.DisconnectPacket; - -public class LoginHandler extends AbstractPacketHandler { - - @Override - protected void channelRead1(ChannelHandlerContext ctx, LoginStartPacket packet) { - DisconnectPacket disconnectPacket = new DisconnectPacket(); - disconnectPacket.setReason("{\n" + - " \"text\": \"Server is not available.\"\n" + - "}"); - - ctx.channel().writeAndFlush(disconnectPacket).channel().disconnect(); - } -} diff --git a/server/src/main/java/mc/server/network/netty/handler/StatusHandler.java b/server/src/main/java/mc/server/network/netty/handler/StatusHandler.java deleted file mode 100644 index 2fbcfe5..0000000 --- a/server/src/main/java/mc/server/network/netty/handler/StatusHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -package mc.server.network.netty.handler; - -import io.netty.channel.ChannelHandlerContext; -import lombok.extern.slf4j.Slf4j; -import mc.protocol.packets.client.StatusServerRequest; -import mc.protocol.packets.server.StatusServerResponse; - -@Slf4j -public class StatusHandler extends AbstractPacketHandler { - - @Override - protected void channelRead1(ChannelHandlerContext ctx, StatusServerRequest packet) { - log.info("{}", packet); - - StatusServerResponse response = new StatusServerResponse(); - response.setInfo("{\n" + - " \"version\": {\n" + - " \"name\": \"1.12.2\",\n" + - " \"protocol\": 340\n" + - " },\n" + - " \"players\": {\n" + - " \"max\": 0,\n" + - " \"online\": 0,\n" + - " \"sample\": []\n" + - " },\n" + - " \"description\": {\n" + - " \"text\": \"Hello world\"\n" + - " }\n" + - "}"); - - ctx.channel().writeAndFlush(response); - } -}