diff --git a/protocol/build.gradle b/protocol/build.gradle index 8b01584..0e4b3c6 100644 --- a/protocol/build.gradle +++ b/protocol/build.gradle @@ -1,8 +1,6 @@ apply from: rootDir.toPath().resolve('logic.gradle').toFile() dependencies { - api libs.reactor - implementation libs.netty implementation libs.json implementation libs.objpool diff --git a/protocol/src/main/java/mc/protocol/NettyServer.java b/protocol/src/main/java/mc/protocol/NettyServer.java index 3020209..da14177 100644 --- a/protocol/src/main/java/mc/protocol/NettyServer.java +++ b/protocol/src/main/java/mc/protocol/NettyServer.java @@ -16,6 +16,8 @@ import mc.protocol.api.Server; import mc.protocol.io.codec.ProtocolDecoder; import mc.protocol.io.codec.ProtocolEncoder; import mc.protocol.io.codec.ProtocolSplitter; +import mc.protocol.packets.ClientSidePacket; +import mc.protocol.utils.EventBus; import mc.protocol.utils.PacketPool; import javax.annotation.Nonnull; @@ -28,6 +30,7 @@ import java.util.function.Consumer; public class NettyServer implements Server { private final PacketPool packetPool; + private final EventBus eventBus; private Consumer> consumerNewConnection; private Consumer> consumerDisconnect; @@ -54,6 +57,11 @@ public class NettyServer implements Server { this.consumerDisconnect = consumer; } + @Override + public

void listenPacket(State state, Class

packetClass, EventBus.EventHandler

eventHandler) { + this.eventBus.subscribe(state, packetClass, eventHandler); + } + private ServerBootstrap createServerBootstrap() { ServerBootstrap bootstrap = new ServerBootstrap(); @@ -81,7 +89,7 @@ public class NettyServer implements Server { map.put("logger", new LoggingHandler(LogLevel.DEBUG)); map.put("packet_decoder", new ProtocolDecoder(true, packetPool, consumerNewConnection, consumerDisconnect)); map.put("packet_encoder", new ProtocolEncoder()); - map.put("packet_handler", new PacketInboundHandler(packetPool)); + map.put("packet_handler", new PacketInboundHandler(packetPool, eventBus)); return map; } diff --git a/protocol/src/main/java/mc/protocol/PacketInboundHandler.java b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java index 6ce4c3e..fb5afad 100644 --- a/protocol/src/main/java/mc/protocol/PacketInboundHandler.java +++ b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java @@ -3,25 +3,20 @@ package mc.protocol; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.RequiredArgsConstructor; -import mc.protocol.api.ConnectionContext; import mc.protocol.packets.ClientSidePacket; +import mc.protocol.utils.EventBus; import mc.protocol.utils.PacketPool; -import reactor.core.publisher.Sinks; @RequiredArgsConstructor public class PacketInboundHandler extends SimpleChannelInboundHandler { private final PacketPool poolPackets; + private final EventBus eventBus; - @SuppressWarnings("rawtypes") @Override protected void channelRead0(ChannelHandlerContext ctx, ClientSidePacket packet) throws Exception { - Sinks.Many packetSinks = ctx.channel().attr(NetworkAttributes.STATE) - .get().getPacketSinks(packet.getClass()); - - if (packetSinks != null) { - packetSinks.tryEmitNext(new NettyConnectionContext<>(ctx, packet)); - } + State state = ctx.channel().attr(NetworkAttributes.STATE).get(); + eventBus.emit(state, new NettyConnectionContext<>(ctx, packet)); poolPackets.returnObject(packet); } diff --git a/protocol/src/main/java/mc/protocol/State.java b/protocol/src/main/java/mc/protocol/State.java index 3e3c4ff..6159b43 100644 --- a/protocol/src/main/java/mc/protocol/State.java +++ b/protocol/src/main/java/mc/protocol/State.java @@ -2,19 +2,15 @@ package mc.protocol; import lombok.Getter; import lombok.RequiredArgsConstructor; -import mc.protocol.api.ConnectionContext; import mc.protocol.packets.ClientSidePacket; import mc.protocol.packets.Packet; import mc.protocol.packets.PingPacket; import mc.protocol.packets.ServerSidePacket; import mc.protocol.packets.client.*; import mc.protocol.packets.server.*; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; import javax.annotation.Nullable; import java.util.Collections; -import java.util.HashMap; import java.util.Map; @RequiredArgsConstructor @@ -85,9 +81,6 @@ public enum State { private final Map> clientSidePackets; private final Map, Integer> serverSidePackets; - @SuppressWarnings("rawtypes") - private final Map, Sinks.Many> observedMap = new HashMap<>(); - State(int id, Map> clientSidePackets) { this.id = id; this.clientSidePackets = clientSidePackets; @@ -103,16 +96,4 @@ public enum State { public Integer getServerSidePacketId(Class clazz) { return serverSidePackets == null ? null : serverSidePackets.get(clazz); } - - - @SuppressWarnings("rawtypes") - public

Sinks.Many getPacketSinks(Class

packetClass) { - return observedMap.get(packetClass); - } - - @SuppressWarnings("unchecked") - public

Flux> packetFlux(Class

packetClass) { - return observedMap.computeIfAbsent(packetClass, aClass -> Sinks.many().multicast().directBestEffort()) - .asFlux().map(ConnectionContext.class::cast); - } } diff --git a/protocol/src/main/java/mc/protocol/api/Server.java b/protocol/src/main/java/mc/protocol/api/Server.java index e06d137..2e4af87 100644 --- a/protocol/src/main/java/mc/protocol/api/Server.java +++ b/protocol/src/main/java/mc/protocol/api/Server.java @@ -1,5 +1,9 @@ package mc.protocol.api; +import mc.protocol.State; +import mc.protocol.packets.ClientSidePacket; +import mc.protocol.utils.EventBus; + import java.util.function.Consumer; public interface Server { @@ -8,4 +12,6 @@ public interface Server { void onNewConnect(Consumer> consumer); void onDisonnect(Consumer> consumer); + +

void listenPacket(State state, Class

packetClass, EventBus.EventHandler

eventHandler); } diff --git a/protocol/src/main/java/mc/protocol/di/ProtocolModule.java b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java index e228b86..b4c1d0d 100644 --- a/protocol/src/main/java/mc/protocol/di/ProtocolModule.java +++ b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java @@ -7,8 +7,10 @@ import mc.protocol.State; import mc.protocol.api.Server; import mc.protocol.packets.ClientSidePacket; import mc.protocol.packets.UnknownPacket; +import mc.protocol.utils.EventBus; import mc.protocol.utils.PacketFactory; import mc.protocol.utils.PacketPool; +import mc.protocol.utils.SimpleEventBus; import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.impl.GenericObjectPool; @@ -21,8 +23,8 @@ public class ProtocolModule { @Provides @ServerScope - Server provideServer(PacketPool packetPool) { - return new NettyServer(packetPool); + Server provideServer(PacketPool packetPool, EventBus eventBus) { + return new NettyServer(packetPool, eventBus); } @Provides @@ -39,4 +41,10 @@ public class ProtocolModule { return new PacketPool(map); } + + @Provides + @ServerScope + EventBus provideEventBus() { + return new SimpleEventBus(); + } } diff --git a/protocol/src/main/java/mc/protocol/utils/EventBus.java b/protocol/src/main/java/mc/protocol/utils/EventBus.java new file mode 100644 index 0000000..0d7ad36 --- /dev/null +++ b/protocol/src/main/java/mc/protocol/utils/EventBus.java @@ -0,0 +1,17 @@ +package mc.protocol.utils; + +import mc.protocol.State; +import mc.protocol.api.ConnectionContext; +import mc.protocol.packets.ClientSidePacket; + +public interface EventBus { + +

void subscribe(State state, Class

packetClass, EventHandler

eventHandler); + +

void emit(State state, ConnectionContext

channelContext); + + @FunctionalInterface + interface EventHandler

{ + void handle(ConnectionContext

channelContext); + } +} diff --git a/protocol/src/main/java/mc/protocol/utils/SimpleEventBus.java b/protocol/src/main/java/mc/protocol/utils/SimpleEventBus.java new file mode 100644 index 0000000..d79649d --- /dev/null +++ b/protocol/src/main/java/mc/protocol/utils/SimpleEventBus.java @@ -0,0 +1,25 @@ +package mc.protocol.utils; + +import mc.protocol.State; +import mc.protocol.api.ConnectionContext; +import mc.protocol.packets.ClientSidePacket; + +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class SimpleEventBus implements EventBus { + + private final Table, EventHandler> table = new Table<>(); + + @Override + public

void subscribe(State state, Class

packetClass, EventHandler

eventHandler) { + table.put(state, packetClass, eventHandler); + } + + @Override + public

void emit(State state, ConnectionContext

channelContext) { + EventHandler eventHandler = table.getColumnAndRow(state, channelContext.clientPacket().getClass()); + + if (eventHandler != null) { + eventHandler.handle(channelContext); + } + } +} diff --git a/protocol/src/main/java/mc/protocol/utils/Table.java b/protocol/src/main/java/mc/protocol/utils/Table.java new file mode 100644 index 0000000..acc1302 --- /dev/null +++ b/protocol/src/main/java/mc/protocol/utils/Table.java @@ -0,0 +1,23 @@ +package mc.protocol.utils; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; + +public class Table { + + private final Map> map = new HashMap<>(); + + @Nullable + public V getColumnAndRow(C column, R row) { + if (!map.containsKey(column)) { + return null; + } + + return map.get(column).get(row); + } + + public void put(C column, R row, V value) { + map.computeIfAbsent(column, c -> new HashMap<>()).put(row, value); + } +} diff --git a/server/src/main/java/mc/server/Main.java b/server/src/main/java/mc/server/Main.java index 553757a..3f2b7ff 100644 --- a/server/src/main/java/mc/server/Main.java +++ b/server/src/main/java/mc/server/Main.java @@ -56,11 +56,11 @@ public class Main { server.onNewConnect(connectionContext -> connectionContext.setState(State.HANDSHAKING)); server.onDisonnect(connectionContext -> connectionContext.setState(null)); - State.HANDSHAKING.packetFlux(HandshakePacket.class).subscribe(packetHandler::onHandshake); - State.STATUS.packetFlux(PingPacket.class).subscribe(packetHandler::onKeepAlive); - State.STATUS.packetFlux(StatusServerRequestPacket.class).subscribe(packetHandler::onServerStatus); - State.LOGIN.packetFlux(LoginStartPacket.class).subscribe(packetHandler::onLoginStart); - State.PLAY.packetFlux(PingPacket.class).subscribe(packetHandler::onKeepAlivePlay); + server.listenPacket(State.HANDSHAKING, HandshakePacket.class, packetHandler::onHandshake); + server.listenPacket(State.STATUS, PingPacket.class, packetHandler::onKeepAlive); + server.listenPacket(State.STATUS, StatusServerRequestPacket.class, packetHandler::onServerStatus); + server.listenPacket(State.LOGIN, LoginStartPacket.class, packetHandler::onLoginStart); + server.listenPacket(State.PLAY, PingPacket.class, packetHandler::onKeepAlivePlay); server.bind(config.server().host(), config.server().port()); }