From 5f431ff1389599ab4a0ea5d630be00f0f160ef5a Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Thu, 6 May 2021 13:14:42 +0300 Subject: [PATCH] =?UTF-8?q?=D1=80=D0=B5=D1=84=D0=B0=D0=BA=D1=82=D0=BE?= =?UTF-8?q?=D1=80=D0=B8=D0=BD=D0=B3=20=D0=BF=D1=80=D0=BE=D1=82=D0=BE=D0=BA?= =?UTF-8?q?=D0=BE=D0=BB=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- protocol/build.gradle | 3 +- .../main/java/mc/protocol/ChannelContext.java | 20 ------ .../mc/protocol/NettyConnectionContext.java | 49 +++++++++++++ .../main/java/mc/protocol/NettyServer.java | 71 ++++++++++++++++--- .../mc/protocol/PacketInboundHandler.java | 5 +- protocol/src/main/java/mc/protocol/State.java | 9 +-- .../mc/protocol/api/ConnectionContext.java | 19 +++++ .../src/main/java/mc/protocol/api/Server.java | 11 +++ .../mc/protocol/di/ProtocolComponent.java | 4 +- .../java/mc/protocol/di/ProtocolModule.java | 60 ++-------------- .../mc/protocol/io/codec/ProtocolDecoder.java | 14 ++-- server/src/main/java/mc/server/Main.java | 11 ++- .../main/java/mc/server/PacketHandler.java | 44 ++++++------ 13 files changed, 200 insertions(+), 120 deletions(-) delete mode 100644 protocol/src/main/java/mc/protocol/ChannelContext.java create mode 100644 protocol/src/main/java/mc/protocol/NettyConnectionContext.java create mode 100644 protocol/src/main/java/mc/protocol/api/ConnectionContext.java create mode 100644 protocol/src/main/java/mc/protocol/api/Server.java diff --git a/protocol/build.gradle b/protocol/build.gradle index 03ac861..b8fb582 100644 --- a/protocol/build.gradle +++ b/protocol/build.gradle @@ -1,8 +1,9 @@ apply from: rootDir.toPath().resolve('logic.gradle').toFile() dependencies { - api libs.netty api libs.reactor + + implementation libs.netty implementation libs.json testImplementation libs.lang3 diff --git a/protocol/src/main/java/mc/protocol/ChannelContext.java b/protocol/src/main/java/mc/protocol/ChannelContext.java deleted file mode 100644 index 51d7a89..0000000 --- a/protocol/src/main/java/mc/protocol/ChannelContext.java +++ /dev/null @@ -1,20 +0,0 @@ -package mc.protocol; - -import io.netty.channel.ChannelHandlerContext; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import mc.protocol.packets.ClientSidePacket; - -@RequiredArgsConstructor -public class ChannelContext

{ - - @Getter - private final ChannelHandlerContext ctx; - - @Getter - private final P packet; - - public void setState(State state) { - ctx.channel().attr(NetworkAttributes.STATE).set(state); - } -} diff --git a/protocol/src/main/java/mc/protocol/NettyConnectionContext.java b/protocol/src/main/java/mc/protocol/NettyConnectionContext.java new file mode 100644 index 0000000..a4cdc65 --- /dev/null +++ b/protocol/src/main/java/mc/protocol/NettyConnectionContext.java @@ -0,0 +1,49 @@ +package mc.protocol; + +import io.netty.channel.ChannelHandlerContext; +import lombok.RequiredArgsConstructor; +import mc.protocol.api.ConnectionContext; +import mc.protocol.packets.ClientSidePacket; +import mc.protocol.packets.ServerSidePacket; + +@RequiredArgsConstructor +public class NettyConnectionContext

implements ConnectionContext

{ + + private final ChannelHandlerContext ctx; + private final P packet; + + @Override + public State getState() { + return ctx.channel().attr(NetworkAttributes.STATE).get(); + } + + @Override + public void setState(State state) { + ctx.channel().attr(NetworkAttributes.STATE).set(state); + } + + @Override + public P clientPacket() { + return packet; + } + + @Override + public void send(ServerSidePacket packet) { + ctx.write(packet); + } + + @Override + public void sendNow(ServerSidePacket packet) { + ctx.writeAndFlush(packet); + } + + @Override + public void flushSending() { + ctx.flush(); + } + + @Override + public void disconnect() { + ctx.disconnect(); + } +} diff --git a/protocol/src/main/java/mc/protocol/NettyServer.java b/protocol/src/main/java/mc/protocol/NettyServer.java index 2c92645..16325ac 100644 --- a/protocol/src/main/java/mc/protocol/NettyServer.java +++ b/protocol/src/main/java/mc/protocol/NettyServer.java @@ -1,22 +1,40 @@ package mc.protocol; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import mc.protocol.di.DaggerProtocolComponent; -import mc.protocol.di.ProtocolComponent; +import mc.protocol.api.ConnectionContext; +import mc.protocol.api.Server; +import mc.protocol.io.codec.ProtocolDecoder; +import mc.protocol.io.codec.ProtocolEncoder; +import mc.protocol.io.codec.ProtocolSplitter; + +import javax.annotation.Nonnull; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.Consumer; @Slf4j @RequiredArgsConstructor -public class NettyServer { +public class NettyServer implements Server { - private final ServerBootstrap serverBootstrap; + private Consumer> consumerNewConnection; + private Consumer> consumerDisconnect; + @Override public void bind(String host, int port) { log.info("Network starting: {}:{}", host, port); try { - serverBootstrap.bind(host, port).sync().channel().closeFuture().sync(); + createServerBootstrap().bind(host, port).sync().channel().closeFuture().sync(); } catch (InterruptedException e) { if (log.isTraceEnabled()) { log.trace("{}: {}", e.getClass().getSimpleName(), e.getMessage(), e); @@ -24,8 +42,45 @@ public class NettyServer { } } - public static NettyServer createServer() { - ProtocolComponent component = DaggerProtocolComponent.create(); - return component.getNettyServer(); + @Override + public void onNewConnect(Consumer> consumer) { + this.consumerNewConnection = consumer; + } + + @Override + public void onDisonnect(Consumer> consumer) { + this.consumerDisconnect = consumer; + } + + private ServerBootstrap createServerBootstrap() { + ServerBootstrap bootstrap = new ServerBootstrap(); + + bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup()) + .channel(NioServerSocketChannel.class) + .childHandler(createChannelChannelInitializer()); + + return bootstrap; + } + + private ChannelInitializer createChannelChannelInitializer() { + return new ChannelInitializer<>() { + @Override + protected void initChannel(@Nonnull SocketChannel socketChannel) { + ChannelPipeline pipeline = socketChannel.pipeline(); + createChannelHandlerMap().forEach(pipeline::addLast); + } + }; + } + + private Map createChannelHandlerMap() { + Map map = new LinkedHashMap<>(); + + map.put("packet_splitter", new ProtocolSplitter()); + map.put("logger", new LoggingHandler(LogLevel.DEBUG)); + map.put("packet_decoder", new ProtocolDecoder(true, consumerNewConnection, consumerDisconnect)); + map.put("packet_encoder", new ProtocolEncoder()); + map.put("packet_handler", new PacketInboundHandler()); + + return map; } } diff --git a/protocol/src/main/java/mc/protocol/PacketInboundHandler.java b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java index a99fb4d..3f170e9 100644 --- a/protocol/src/main/java/mc/protocol/PacketInboundHandler.java +++ b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java @@ -3,6 +3,7 @@ package mc.protocol; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.RequiredArgsConstructor; +import mc.protocol.api.ConnectionContext; import mc.protocol.packets.ClientSidePacket; import reactor.core.publisher.Sinks; @@ -12,11 +13,11 @@ public class PacketInboundHandler extends SimpleChannelInboundHandler packetSinks = ctx.channel().attr(NetworkAttributes.STATE) + Sinks.Many packetSinks = ctx.channel().attr(NetworkAttributes.STATE) .get().getPacketSinks(packet.getClass()); if (packetSinks != null) { - packetSinks.tryEmitNext(new ChannelContext<>(ctx, packet)); + packetSinks.tryEmitNext(new NettyConnectionContext<>(ctx, packet)); } } } diff --git a/protocol/src/main/java/mc/protocol/State.java b/protocol/src/main/java/mc/protocol/State.java index 180f4d7..72e39b4 100644 --- a/protocol/src/main/java/mc/protocol/State.java +++ b/protocol/src/main/java/mc/protocol/State.java @@ -2,6 +2,7 @@ package mc.protocol; import lombok.Getter; import lombok.RequiredArgsConstructor; +import mc.protocol.api.ConnectionContext; import mc.protocol.packets.ClientSidePacket; import mc.protocol.packets.Packet; import mc.protocol.packets.PingPacket; @@ -84,7 +85,7 @@ public enum State { private final Map, Integer> serverSidePackets; @SuppressWarnings("rawtypes") - private final Map, Sinks.Many> observedMap = new HashMap<>(); + private final Map, Sinks.Many> observedMap = new HashMap<>(); State(int id, Map> clientSidePackets) { this.id = id; @@ -104,13 +105,13 @@ public enum State { @SuppressWarnings("rawtypes") - public

Sinks.Many getPacketSinks(Class

packetClass) { + public

Sinks.Many getPacketSinks(Class

packetClass) { return observedMap.get(packetClass); } @SuppressWarnings("unchecked") - public

Flux> packetFlux(Class

packetClass) { + public

Flux> packetFlux(Class

packetClass) { return observedMap.computeIfAbsent(packetClass, aClass -> Sinks.many().multicast().directBestEffort()) - .asFlux().map(ChannelContext.class::cast); + .asFlux().map(ConnectionContext.class::cast); } } diff --git a/protocol/src/main/java/mc/protocol/api/ConnectionContext.java b/protocol/src/main/java/mc/protocol/api/ConnectionContext.java new file mode 100644 index 0000000..b2ed00e --- /dev/null +++ b/protocol/src/main/java/mc/protocol/api/ConnectionContext.java @@ -0,0 +1,19 @@ +package mc.protocol.api; + +import mc.protocol.State; +import mc.protocol.packets.ClientSidePacket; +import mc.protocol.packets.ServerSidePacket; + +public interface ConnectionContext

{ + + State getState(); + void setState(State state); + + P clientPacket(); + + void send(ServerSidePacket packet); + void sendNow(ServerSidePacket packet); + void flushSending(); + + void disconnect(); +} diff --git a/protocol/src/main/java/mc/protocol/api/Server.java b/protocol/src/main/java/mc/protocol/api/Server.java new file mode 100644 index 0000000..e06d137 --- /dev/null +++ b/protocol/src/main/java/mc/protocol/api/Server.java @@ -0,0 +1,11 @@ +package mc.protocol.api; + +import java.util.function.Consumer; + +public interface Server { + + void bind(String host, int port); + + void onNewConnect(Consumer> consumer); + void onDisonnect(Consumer> consumer); +} diff --git a/protocol/src/main/java/mc/protocol/di/ProtocolComponent.java b/protocol/src/main/java/mc/protocol/di/ProtocolComponent.java index a433211..84caa5b 100644 --- a/protocol/src/main/java/mc/protocol/di/ProtocolComponent.java +++ b/protocol/src/main/java/mc/protocol/di/ProtocolComponent.java @@ -1,11 +1,11 @@ package mc.protocol.di; import dagger.Component; -import mc.protocol.NettyServer; +import mc.protocol.api.Server; @Component(modules = ProtocolModule.class) @ServerScope public interface ProtocolComponent { - NettyServer getNettyServer(); + Server getServer(); } diff --git a/protocol/src/main/java/mc/protocol/di/ProtocolModule.java b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java index 3e8a8df..c298873 100644 --- a/protocol/src/main/java/mc/protocol/di/ProtocolModule.java +++ b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java @@ -2,68 +2,16 @@ package mc.protocol.di; import dagger.Module; import dagger.Provides; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; import mc.protocol.NettyServer; -import mc.protocol.PacketInboundHandler; -import mc.protocol.io.codec.ProtocolDecoder; -import mc.protocol.io.codec.ProtocolEncoder; -import mc.protocol.io.codec.ProtocolSplitter; - -import javax.annotation.Nonnull; -import javax.inject.Provider; -import java.util.LinkedHashMap; -import java.util.Map; +import mc.protocol.api.Server; @Module public class ProtocolModule { @Provides - NettyServer provideServer(ServerBootstrap serverBootstrap) { - return new NettyServer(serverBootstrap); + @ServerScope + Server provideServer() { + return new NettyServer(); } - @Provides - ServerBootstrap provideServerBootstrap(ChannelInitializer channelChannelInitializer) { - ServerBootstrap bootstrap = new ServerBootstrap(); - - bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup()) - .channel(NioServerSocketChannel.class) - .childHandler(channelChannelInitializer); - - return bootstrap; - } - - @Provides - ChannelInitializer provideChannelChannelInitializer( - Provider> channelHandlerMapProvider) { - - return new ChannelInitializer<>() { - @Override - protected void initChannel(@Nonnull SocketChannel socketChannel) { - ChannelPipeline pipeline = socketChannel.pipeline(); - channelHandlerMapProvider.get().forEach(pipeline::addLast); - } - }; - } - - @Provides - Map provideChannelHandlerMap() { - Map map = new LinkedHashMap<>(); - - map.put("packet_splitter", new ProtocolSplitter()); - map.put("logger", new LoggingHandler(LogLevel.DEBUG)); - map.put("packet_decoder", new ProtocolDecoder(true)); - map.put("packet_encoder", new ProtocolEncoder()); - map.put("packet_handler", new PacketInboundHandler()); - - return map; - } } diff --git a/protocol/src/main/java/mc/protocol/io/codec/ProtocolDecoder.java b/protocol/src/main/java/mc/protocol/io/codec/ProtocolDecoder.java index b7ee9d5..6bc2e65 100644 --- a/protocol/src/main/java/mc/protocol/io/codec/ProtocolDecoder.java +++ b/protocol/src/main/java/mc/protocol/io/codec/ProtocolDecoder.java @@ -5,30 +5,36 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import mc.protocol.NettyConnectionContext; import mc.protocol.NetworkAttributes; import mc.protocol.State; +import mc.protocol.api.ConnectionContext; import mc.protocol.io.NetByteBuf; import mc.protocol.packets.ClientSidePacket; import mc.protocol.packets.UnknownPacket; +import javax.annotation.Nonnull; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; @Slf4j @RequiredArgsConstructor public class ProtocolDecoder extends ByteToMessageDecoder { private final boolean readUnknownPackets; + private final Consumer> consumerNewConnection; + private final Consumer> consumerDisconnect; @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - ctx.channel().attr(NetworkAttributes.STATE).set(State.HANDSHAKING); + public void channelActive(@Nonnull ChannelHandlerContext ctx) throws Exception { + consumerNewConnection.accept(new NettyConnectionContext<>(ctx, null)); super.channelActive(ctx); } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - ctx.channel().attr(NetworkAttributes.STATE).set(null); + public void channelInactive(@Nonnull ChannelHandlerContext ctx) throws Exception { + consumerDisconnect.accept(new NettyConnectionContext<>(ctx, null)); super.channelInactive(ctx); } diff --git a/server/src/main/java/mc/server/Main.java b/server/src/main/java/mc/server/Main.java index fedfe80..553757a 100644 --- a/server/src/main/java/mc/server/Main.java +++ b/server/src/main/java/mc/server/Main.java @@ -7,8 +7,10 @@ import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.util.PathConverter; import lombok.extern.slf4j.Slf4j; -import mc.protocol.NettyServer; import mc.protocol.State; +import mc.protocol.api.Server; +import mc.protocol.di.DaggerProtocolComponent; +import mc.protocol.di.ProtocolComponent; import mc.protocol.packets.PingPacket; import mc.protocol.packets.client.HandshakePacket; import mc.protocol.packets.client.LoginStartPacket; @@ -38,6 +40,8 @@ public class Main { private void run(OptionSet optionSet) { log.info("mc-project launch"); + ProtocolComponent protocolComponent = DaggerProtocolComponent.create(); + ConfigModule configModule = new ConfigModule((Path) optionSet.valueOf(CLI_CONFIG)); ServerComponent serverComponent = DaggerServerComponent.builder() @@ -46,9 +50,12 @@ public class Main { Config config = serverComponent.getConfig(); - NettyServer server = NettyServer.createServer(); + Server server = protocolComponent.getServer(); PacketHandler packetHandler = serverComponent.getPacketHandler(); + server.onNewConnect(connectionContext -> connectionContext.setState(State.HANDSHAKING)); + server.onDisonnect(connectionContext -> connectionContext.setState(null)); + State.HANDSHAKING.packetFlux(HandshakePacket.class).subscribe(packetHandler::onHandshake); State.STATUS.packetFlux(PingPacket.class).subscribe(packetHandler::onKeepAlive); State.STATUS.packetFlux(StatusServerRequestPacket.class).subscribe(packetHandler::onServerStatus); diff --git a/server/src/main/java/mc/server/PacketHandler.java b/server/src/main/java/mc/server/PacketHandler.java index 49ebee7..be7786c 100644 --- a/server/src/main/java/mc/server/PacketHandler.java +++ b/server/src/main/java/mc/server/PacketHandler.java @@ -3,6 +3,7 @@ package mc.server; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import mc.protocol.*; +import mc.protocol.api.ConnectionContext; import mc.protocol.model.Location; import mc.protocol.model.Look; import mc.protocol.model.ServerInfo; @@ -27,22 +28,23 @@ import java.util.UUID; @RequiredArgsConstructor public class PacketHandler { - private final Config config; private final Random random = new Random(System.currentTimeMillis()); + private final Config config; - public void onHandshake(ChannelContext channel) { - channel.setState(channel.getPacket().getNextState()); + public void onHandshake(ConnectionContext context) { + context.setState(context.clientPacket().getNextState()); } - public void onKeepAlive(ChannelContext channel) { - channel.getCtx().writeAndFlush(channel.getPacket()).channel().disconnect(); + public void onKeepAlive(ConnectionContext context) { + context.sendNow(context.clientPacket()); + context.disconnect(); } - public void onKeepAlivePlay(ChannelContext channel) { - channel.getCtx().writeAndFlush(channel.getPacket()); + public void onKeepAlivePlay(ConnectionContext context) { + context.sendNow(context.clientPacket()); } - public void onServerStatus(ChannelContext channel) { + public void onServerStatus(ConnectionContext context) { ServerInfo serverInfo = new ServerInfo(); serverInfo.version().name(ProtocolConstant.PROTOCOL_NAME); serverInfo.version().protocol(ProtocolConstant.PROTOCOL_NUMBER); @@ -58,18 +60,18 @@ public class PacketHandler { StatusServerResponse response = new StatusServerResponse(); response.setInfo(serverInfo); - channel.getCtx().writeAndFlush(response); + context.sendNow(response); } - public void onLoginStart(ChannelContext channel) { - LoginStartPacket loginStartPacket = channel.getPacket(); + public void onLoginStart(ConnectionContext context) { + LoginStartPacket loginStartPacket = context.clientPacket(); var loginSuccessPacket = new LoginSuccessPacket(); loginSuccessPacket.setUuid(UUID.randomUUID()); loginSuccessPacket.setName(loginStartPacket.getName()); - channel.getCtx().writeAndFlush(loginSuccessPacket); - channel.setState(State.PLAY); + context.sendNow(loginSuccessPacket); + context.setState(State.PLAY); var joinGamePacket = new JoinGamePacket(); joinGamePacket.setEntityId(random.nextInt()); @@ -78,14 +80,14 @@ public class PacketHandler { joinGamePacket.setDifficulty(Difficulty.PEACEFUL); joinGamePacket.setLevelType(LevelType.FLAT); - channel.getCtx().write(joinGamePacket); + context.send(joinGamePacket); Location spawnLocation = new Location(0d, 63d, 0d); var spawnPositionPacket = new SpawnPositionPacket(); spawnPositionPacket.setSpawn(spawnLocation); - channel.getCtx().write(spawnPositionPacket); + context.send(spawnPositionPacket); var playerAbilitiesPacket = new PlayerAbilitiesPacket(); playerAbilitiesPacket.setCatFly(true); @@ -95,29 +97,29 @@ public class PacketHandler { playerAbilitiesPacket.setFieldOfView(0.0f); playerAbilitiesPacket.setFlyingSpeed(0.05f); - channel.getCtx().write(playerAbilitiesPacket); + context.send(playerAbilitiesPacket); - channel.getCtx().flush(); + context.flushSending(); var chunkDataPacket = new ChunkDataPacket(); chunkDataPacket.setX(0); chunkDataPacket.setZ(0); - channel.getCtx().writeAndFlush(chunkDataPacket); + context.sendNow(chunkDataPacket); var playerPositionAndLookPacket = new SPlayerPositionAndLookPacket(); playerPositionAndLookPacket.setPosition(spawnLocation); playerPositionAndLookPacket.setLook(new Look(0f, 0f)); playerPositionAndLookPacket.setTeleportId(random.nextInt()); - channel.getCtx().write(playerPositionAndLookPacket); + context.send(playerPositionAndLookPacket); PingPacket pingPacket = new PingPacket(); pingPacket.setPayload(System.currentTimeMillis()); - channel.getCtx().write(pingPacket); + context.send(pingPacket); - channel.getCtx().flush(); + context.flushSending(); } private static String faviconToBase64(Path iconPath) {