diff --git a/protocol/src/main/java/mc/protocol/NettyServer.java b/protocol/src/main/java/mc/protocol/NettyServer.java index 2c92645..476c9a5 100644 --- a/protocol/src/main/java/mc/protocol/NettyServer.java +++ b/protocol/src/main/java/mc/protocol/NettyServer.java @@ -5,12 +5,15 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import mc.protocol.di.DaggerProtocolComponent; import mc.protocol.di.ProtocolComponent; +import mc.protocol.packets.ClientSidePacket; +import mc.protocol.utils.EventBus; @Slf4j @RequiredArgsConstructor public class NettyServer { private final ServerBootstrap serverBootstrap; + private final EventBus eventBus; public void bind(String host, int port) { log.info("Network starting: {}:{}", host, port); @@ -24,6 +27,10 @@ public class NettyServer { } } + public

void listenPacket(State state, Class

packetClass, EventBus.EventHandler

eventHandler) { + this.eventBus.subscribe(state, packetClass, eventHandler); + } + public static NettyServer createServer() { ProtocolComponent component = DaggerProtocolComponent.create(); return component.getNettyServer(); diff --git a/protocol/src/main/java/mc/protocol/PacketInboundHandler.java b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java index a99fb4d..da372c6 100644 --- a/protocol/src/main/java/mc/protocol/PacketInboundHandler.java +++ b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java @@ -4,19 +4,16 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.RequiredArgsConstructor; import mc.protocol.packets.ClientSidePacket; -import reactor.core.publisher.Sinks; +import mc.protocol.utils.EventBus; @RequiredArgsConstructor public class PacketInboundHandler extends SimpleChannelInboundHandler { - @SuppressWarnings("rawtypes") + private final EventBus eventBus; + @Override protected void channelRead0(ChannelHandlerContext ctx, ClientSidePacket packet) { - Sinks.Many packetSinks = ctx.channel().attr(NetworkAttributes.STATE) - .get().getPacketSinks(packet.getClass()); - - if (packetSinks != null) { - packetSinks.tryEmitNext(new ChannelContext<>(ctx, packet)); - } + State state = ctx.channel().attr(NetworkAttributes.STATE).get(); + eventBus.emit(state, new ChannelContext<>(ctx, packet)); } } diff --git a/protocol/src/main/java/mc/protocol/di/ProtocolModule.java b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java index 3e8a8df..8ea6642 100644 --- a/protocol/src/main/java/mc/protocol/di/ProtocolModule.java +++ b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java @@ -16,6 +16,8 @@ import mc.protocol.PacketInboundHandler; import mc.protocol.io.codec.ProtocolDecoder; import mc.protocol.io.codec.ProtocolEncoder; import mc.protocol.io.codec.ProtocolSplitter; +import mc.protocol.utils.EventBus; +import mc.protocol.utils.SimpleEventBus; import javax.annotation.Nonnull; import javax.inject.Provider; @@ -26,8 +28,8 @@ import java.util.Map; public class ProtocolModule { @Provides - NettyServer provideServer(ServerBootstrap serverBootstrap) { - return new NettyServer(serverBootstrap); + NettyServer provideServer(ServerBootstrap serverBootstrap, EventBus eventBus) { + return new NettyServer(serverBootstrap, eventBus); } @Provides @@ -55,15 +57,21 @@ public class ProtocolModule { } @Provides - Map provideChannelHandlerMap() { + Map provideChannelHandlerMap(EventBus eventBus) { Map map = new LinkedHashMap<>(); map.put("packet_splitter", new ProtocolSplitter()); map.put("logger", new LoggingHandler(LogLevel.DEBUG)); map.put("packet_decoder", new ProtocolDecoder(true)); map.put("packet_encoder", new ProtocolEncoder()); - map.put("packet_handler", new PacketInboundHandler()); + map.put("packet_handler", new PacketInboundHandler(eventBus)); return map; } + + @Provides + @ServerScope + EventBus provideEventBus() { + return new SimpleEventBus(); + } } diff --git a/protocol/src/main/java/mc/protocol/utils/SimpleEventBus.java b/protocol/src/main/java/mc/protocol/utils/SimpleEventBus.java new file mode 100644 index 0000000..32c6bc6 --- /dev/null +++ b/protocol/src/main/java/mc/protocol/utils/SimpleEventBus.java @@ -0,0 +1,25 @@ +package mc.protocol.utils; + +import mc.protocol.ChannelContext; +import mc.protocol.State; +import mc.protocol.packets.ClientSidePacket; + +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class SimpleEventBus implements EventBus { + + private final Table, EventHandler> table = new Table<>(); + + @Override + public

void subscribe(State state, Class

packetClass, EventHandler

eventHandler) { + table.put(state, packetClass, eventHandler); + } + + @Override + public

void emit(State state, ChannelContext

channelContext) { + EventHandler eventHandler = table.getColumnAndRow(state, channelContext.getPacket().getClass()); + + if (eventHandler != null) { + eventHandler.handle(channelContext); + } + } +} diff --git a/protocol/src/main/java/mc/protocol/utils/Table.java b/protocol/src/main/java/mc/protocol/utils/Table.java new file mode 100644 index 0000000..acc1302 --- /dev/null +++ b/protocol/src/main/java/mc/protocol/utils/Table.java @@ -0,0 +1,23 @@ +package mc.protocol.utils; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; + +public class Table { + + private final Map> map = new HashMap<>(); + + @Nullable + public V getColumnAndRow(C column, R row) { + if (!map.containsKey(column)) { + return null; + } + + return map.get(column).get(row); + } + + public void put(C column, R row, V value) { + map.computeIfAbsent(column, c -> new HashMap<>()).put(row, value); + } +} diff --git a/server/src/main/java/mc/server/Main.java b/server/src/main/java/mc/server/Main.java index fedfe80..2962839 100644 --- a/server/src/main/java/mc/server/Main.java +++ b/server/src/main/java/mc/server/Main.java @@ -49,11 +49,11 @@ public class Main { NettyServer server = NettyServer.createServer(); PacketHandler packetHandler = serverComponent.getPacketHandler(); - State.HANDSHAKING.packetFlux(HandshakePacket.class).subscribe(packetHandler::onHandshake); - State.STATUS.packetFlux(PingPacket.class).subscribe(packetHandler::onKeepAlive); - State.STATUS.packetFlux(StatusServerRequestPacket.class).subscribe(packetHandler::onServerStatus); - State.LOGIN.packetFlux(LoginStartPacket.class).subscribe(packetHandler::onLoginStart); - State.PLAY.packetFlux(PingPacket.class).subscribe(packetHandler::onKeepAlivePlay); + server.listenPacket(State.HANDSHAKING, HandshakePacket.class, packetHandler::onHandshake); + server.listenPacket(State.STATUS, PingPacket.class, packetHandler::onKeepAlive); + server.listenPacket(State.STATUS, StatusServerRequestPacket.class, packetHandler::onServerStatus); + server.listenPacket(State.LOGIN, LoginStartPacket.class, packetHandler::onLoginStart); + server.listenPacket(State.PLAY, PingPacket.class, packetHandler::onKeepAlivePlay); server.bind(config.server().host(), config.server().port()); }