From 0f1c9bfb1b94ef627ca8f2d4ccb4b09af1072214 Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Wed, 5 May 2021 20:09:32 +0300 Subject: [PATCH 1/3] =?UTF-8?q?=D0=B8=D0=BD=D1=82=D0=B5=D1=80=D1=84=D0=B5?= =?UTF-8?q?=D0=B9=D1=81=20EventBus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/mc/protocol/utils/EventBus.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 protocol/src/main/java/mc/protocol/utils/EventBus.java diff --git a/protocol/src/main/java/mc/protocol/utils/EventBus.java b/protocol/src/main/java/mc/protocol/utils/EventBus.java new file mode 100644 index 0000000..036b25c --- /dev/null +++ b/protocol/src/main/java/mc/protocol/utils/EventBus.java @@ -0,0 +1,17 @@ +package mc.protocol.utils; + +import mc.protocol.ChannelContext; +import mc.protocol.State; +import mc.protocol.packets.ClientSidePacket; + +public interface EventBus { + +

void subscribe(State state, Class

packetClass, EventHandler

eventHandler); + +

void emit(State state, ChannelContext

channelContext); + + @FunctionalInterface + interface EventHandler

{ + void handle(ChannelContext

channelContext); + } +} From 205e813fc4c011e7cce9bb0483cd9d86ce2bb3ec Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Wed, 5 May 2021 20:09:49 +0300 Subject: [PATCH 2/3] =?UTF-8?q?=D0=BF=D1=80=D0=BE=D1=81=D1=82=D0=B0=D1=8F?= =?UTF-8?q?=20=D1=80=D0=B5=D0=B0=D0=BB=D0=B8=D0=B7=D0=B0=D1=86=D0=B8=D1=8F?= =?UTF-8?q?=20EventBus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/mc/protocol/NettyServer.java | 7 ++++++ .../mc/protocol/PacketInboundHandler.java | 13 ++++------ .../java/mc/protocol/di/ProtocolModule.java | 16 +++++++++--- .../mc/protocol/utils/SimpleEventBus.java | 25 +++++++++++++++++++ .../main/java/mc/protocol/utils/Table.java | 23 +++++++++++++++++ server/src/main/java/mc/server/Main.java | 10 ++++---- 6 files changed, 77 insertions(+), 17 deletions(-) create mode 100644 protocol/src/main/java/mc/protocol/utils/SimpleEventBus.java create mode 100644 protocol/src/main/java/mc/protocol/utils/Table.java diff --git a/protocol/src/main/java/mc/protocol/NettyServer.java b/protocol/src/main/java/mc/protocol/NettyServer.java index 2c92645..476c9a5 100644 --- a/protocol/src/main/java/mc/protocol/NettyServer.java +++ b/protocol/src/main/java/mc/protocol/NettyServer.java @@ -5,12 +5,15 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import mc.protocol.di.DaggerProtocolComponent; import mc.protocol.di.ProtocolComponent; +import mc.protocol.packets.ClientSidePacket; +import mc.protocol.utils.EventBus; @Slf4j @RequiredArgsConstructor public class NettyServer { private final ServerBootstrap serverBootstrap; + private final EventBus eventBus; public void bind(String host, int port) { log.info("Network starting: {}:{}", host, port); @@ -24,6 +27,10 @@ public class NettyServer { } } + public

void listenPacket(State state, Class

packetClass, EventBus.EventHandler

eventHandler) { + this.eventBus.subscribe(state, packetClass, eventHandler); + } + public static NettyServer createServer() { ProtocolComponent component = DaggerProtocolComponent.create(); return component.getNettyServer(); diff --git a/protocol/src/main/java/mc/protocol/PacketInboundHandler.java b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java index a99fb4d..da372c6 100644 --- a/protocol/src/main/java/mc/protocol/PacketInboundHandler.java +++ b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java @@ -4,19 +4,16 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.RequiredArgsConstructor; import mc.protocol.packets.ClientSidePacket; -import reactor.core.publisher.Sinks; +import mc.protocol.utils.EventBus; @RequiredArgsConstructor public class PacketInboundHandler extends SimpleChannelInboundHandler { - @SuppressWarnings("rawtypes") + private final EventBus eventBus; + @Override protected void channelRead0(ChannelHandlerContext ctx, ClientSidePacket packet) { - Sinks.Many packetSinks = ctx.channel().attr(NetworkAttributes.STATE) - .get().getPacketSinks(packet.getClass()); - - if (packetSinks != null) { - packetSinks.tryEmitNext(new ChannelContext<>(ctx, packet)); - } + State state = ctx.channel().attr(NetworkAttributes.STATE).get(); + eventBus.emit(state, new ChannelContext<>(ctx, packet)); } } diff --git a/protocol/src/main/java/mc/protocol/di/ProtocolModule.java b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java index 3e8a8df..8ea6642 100644 --- a/protocol/src/main/java/mc/protocol/di/ProtocolModule.java +++ b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java @@ -16,6 +16,8 @@ import mc.protocol.PacketInboundHandler; import mc.protocol.io.codec.ProtocolDecoder; import mc.protocol.io.codec.ProtocolEncoder; import mc.protocol.io.codec.ProtocolSplitter; +import mc.protocol.utils.EventBus; +import mc.protocol.utils.SimpleEventBus; import javax.annotation.Nonnull; import javax.inject.Provider; @@ -26,8 +28,8 @@ import java.util.Map; public class ProtocolModule { @Provides - NettyServer provideServer(ServerBootstrap serverBootstrap) { - return new NettyServer(serverBootstrap); + NettyServer provideServer(ServerBootstrap serverBootstrap, EventBus eventBus) { + return new NettyServer(serverBootstrap, eventBus); } @Provides @@ -55,15 +57,21 @@ public class ProtocolModule { } @Provides - Map provideChannelHandlerMap() { + Map provideChannelHandlerMap(EventBus eventBus) { Map map = new LinkedHashMap<>(); map.put("packet_splitter", new ProtocolSplitter()); map.put("logger", new LoggingHandler(LogLevel.DEBUG)); map.put("packet_decoder", new ProtocolDecoder(true)); map.put("packet_encoder", new ProtocolEncoder()); - map.put("packet_handler", new PacketInboundHandler()); + map.put("packet_handler", new PacketInboundHandler(eventBus)); return map; } + + @Provides + @ServerScope + EventBus provideEventBus() { + return new SimpleEventBus(); + } } diff --git a/protocol/src/main/java/mc/protocol/utils/SimpleEventBus.java b/protocol/src/main/java/mc/protocol/utils/SimpleEventBus.java new file mode 100644 index 0000000..32c6bc6 --- /dev/null +++ b/protocol/src/main/java/mc/protocol/utils/SimpleEventBus.java @@ -0,0 +1,25 @@ +package mc.protocol.utils; + +import mc.protocol.ChannelContext; +import mc.protocol.State; +import mc.protocol.packets.ClientSidePacket; + +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class SimpleEventBus implements EventBus { + + private final Table, EventHandler> table = new Table<>(); + + @Override + public

void subscribe(State state, Class

packetClass, EventHandler

eventHandler) { + table.put(state, packetClass, eventHandler); + } + + @Override + public

void emit(State state, ChannelContext

channelContext) { + EventHandler eventHandler = table.getColumnAndRow(state, channelContext.getPacket().getClass()); + + if (eventHandler != null) { + eventHandler.handle(channelContext); + } + } +} diff --git a/protocol/src/main/java/mc/protocol/utils/Table.java b/protocol/src/main/java/mc/protocol/utils/Table.java new file mode 100644 index 0000000..acc1302 --- /dev/null +++ b/protocol/src/main/java/mc/protocol/utils/Table.java @@ -0,0 +1,23 @@ +package mc.protocol.utils; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; + +public class Table { + + private final Map> map = new HashMap<>(); + + @Nullable + public V getColumnAndRow(C column, R row) { + if (!map.containsKey(column)) { + return null; + } + + return map.get(column).get(row); + } + + public void put(C column, R row, V value) { + map.computeIfAbsent(column, c -> new HashMap<>()).put(row, value); + } +} diff --git a/server/src/main/java/mc/server/Main.java b/server/src/main/java/mc/server/Main.java index fedfe80..2962839 100644 --- a/server/src/main/java/mc/server/Main.java +++ b/server/src/main/java/mc/server/Main.java @@ -49,11 +49,11 @@ public class Main { NettyServer server = NettyServer.createServer(); PacketHandler packetHandler = serverComponent.getPacketHandler(); - State.HANDSHAKING.packetFlux(HandshakePacket.class).subscribe(packetHandler::onHandshake); - State.STATUS.packetFlux(PingPacket.class).subscribe(packetHandler::onKeepAlive); - State.STATUS.packetFlux(StatusServerRequestPacket.class).subscribe(packetHandler::onServerStatus); - State.LOGIN.packetFlux(LoginStartPacket.class).subscribe(packetHandler::onLoginStart); - State.PLAY.packetFlux(PingPacket.class).subscribe(packetHandler::onKeepAlivePlay); + server.listenPacket(State.HANDSHAKING, HandshakePacket.class, packetHandler::onHandshake); + server.listenPacket(State.STATUS, PingPacket.class, packetHandler::onKeepAlive); + server.listenPacket(State.STATUS, StatusServerRequestPacket.class, packetHandler::onServerStatus); + server.listenPacket(State.LOGIN, LoginStartPacket.class, packetHandler::onLoginStart); + server.listenPacket(State.PLAY, PingPacket.class, packetHandler::onKeepAlivePlay); server.bind(config.server().host(), config.server().port()); } From c4a6a019084cfd01e15c1e9b9d3dedf51b037a6a Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Wed, 5 May 2021 20:43:09 +0300 Subject: [PATCH 3/3] =?UTF-8?q?=D1=83=D0=B1=D0=B8=D1=80=D0=B0=D0=B5=D0=BC?= =?UTF-8?q?=20reactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- protocol/build.gradle | 1 - protocol/src/main/java/mc/protocol/State.java | 18 ------------------ 2 files changed, 19 deletions(-) diff --git a/protocol/build.gradle b/protocol/build.gradle index 03ac861..b27f6a5 100644 --- a/protocol/build.gradle +++ b/protocol/build.gradle @@ -2,7 +2,6 @@ apply from: rootDir.toPath().resolve('logic.gradle').toFile() dependencies { api libs.netty - api libs.reactor implementation libs.json testImplementation libs.lang3 diff --git a/protocol/src/main/java/mc/protocol/State.java b/protocol/src/main/java/mc/protocol/State.java index 180f4d7..e94fe07 100644 --- a/protocol/src/main/java/mc/protocol/State.java +++ b/protocol/src/main/java/mc/protocol/State.java @@ -8,12 +8,9 @@ import mc.protocol.packets.PingPacket; import mc.protocol.packets.ServerSidePacket; import mc.protocol.packets.client.*; import mc.protocol.packets.server.*; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; import javax.annotation.Nullable; import java.util.Collections; -import java.util.HashMap; import java.util.Map; @RequiredArgsConstructor @@ -83,9 +80,6 @@ public enum State { private final Map> clientSidePackets; private final Map, Integer> serverSidePackets; - @SuppressWarnings("rawtypes") - private final Map, Sinks.Many> observedMap = new HashMap<>(); - State(int id, Map> clientSidePackets) { this.id = id; this.clientSidePackets = clientSidePackets; @@ -101,16 +95,4 @@ public enum State { public Integer getServerSidePacketId(Class clazz) { return serverSidePackets == null ? null : serverSidePackets.get(clazz); } - - - @SuppressWarnings("rawtypes") - public

Sinks.Many getPacketSinks(Class

packetClass) { - return observedMap.get(packetClass); - } - - @SuppressWarnings("unchecked") - public

Flux> packetFlux(Class

packetClass) { - return observedMap.computeIfAbsent(packetClass, aClass -> Sinks.many().multicast().directBestEffort()) - .asFlux().map(ChannelContext.class::cast); - } }