Archived
0

Merge branch 'dev/event-bus' into dev/network-api

# Conflicts:
#	protocol/build.gradle
#	protocol/src/main/java/mc/protocol/NettyServer.java
#	protocol/src/main/java/mc/protocol/PacketInboundHandler.java
#	protocol/src/main/java/mc/protocol/State.java
#	protocol/src/main/java/mc/protocol/di/ProtocolModule.java
#	server/src/main/java/mc/server/Main.java
This commit is contained in:
2021-05-06 13:42:01 +03:00
10 changed files with 99 additions and 38 deletions

View File

@@ -1,8 +1,6 @@
apply from: rootDir.toPath().resolve('logic.gradle').toFile()
dependencies {
api libs.reactor
implementation libs.netty
implementation libs.json
implementation libs.objpool

View File

@@ -16,6 +16,8 @@ import mc.protocol.api.Server;
import mc.protocol.io.codec.ProtocolDecoder;
import mc.protocol.io.codec.ProtocolEncoder;
import mc.protocol.io.codec.ProtocolSplitter;
import mc.protocol.packets.ClientSidePacket;
import mc.protocol.utils.EventBus;
import mc.protocol.utils.PacketPool;
import javax.annotation.Nonnull;
@@ -28,6 +30,7 @@ import java.util.function.Consumer;
public class NettyServer implements Server {
private final PacketPool packetPool;
private final EventBus eventBus;
private Consumer<ConnectionContext<?>> consumerNewConnection;
private Consumer<ConnectionContext<?>> consumerDisconnect;
@@ -54,6 +57,11 @@ public class NettyServer implements Server {
this.consumerDisconnect = consumer;
}
@Override
public <P extends ClientSidePacket> void listenPacket(State state, Class<P> packetClass, EventBus.EventHandler<P> eventHandler) {
this.eventBus.subscribe(state, packetClass, eventHandler);
}
private ServerBootstrap createServerBootstrap() {
ServerBootstrap bootstrap = new ServerBootstrap();
@@ -81,7 +89,7 @@ public class NettyServer implements Server {
map.put("logger", new LoggingHandler(LogLevel.DEBUG));
map.put("packet_decoder", new ProtocolDecoder(true, packetPool, consumerNewConnection, consumerDisconnect));
map.put("packet_encoder", new ProtocolEncoder());
map.put("packet_handler", new PacketInboundHandler(packetPool));
map.put("packet_handler", new PacketInboundHandler(packetPool, eventBus));
return map;
}

View File

@@ -3,25 +3,20 @@ package mc.protocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.RequiredArgsConstructor;
import mc.protocol.api.ConnectionContext;
import mc.protocol.packets.ClientSidePacket;
import mc.protocol.utils.EventBus;
import mc.protocol.utils.PacketPool;
import reactor.core.publisher.Sinks;
@RequiredArgsConstructor
public class PacketInboundHandler extends SimpleChannelInboundHandler<ClientSidePacket> {
private final PacketPool poolPackets;
private final EventBus eventBus;
@SuppressWarnings("rawtypes")
@Override
protected void channelRead0(ChannelHandlerContext ctx, ClientSidePacket packet) throws Exception {
Sinks.Many<ConnectionContext> packetSinks = ctx.channel().attr(NetworkAttributes.STATE)
.get().getPacketSinks(packet.getClass());
if (packetSinks != null) {
packetSinks.tryEmitNext(new NettyConnectionContext<>(ctx, packet));
}
State state = ctx.channel().attr(NetworkAttributes.STATE).get();
eventBus.emit(state, new NettyConnectionContext<>(ctx, packet));
poolPackets.returnObject(packet);
}

View File

@@ -2,19 +2,15 @@ package mc.protocol;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import mc.protocol.api.ConnectionContext;
import mc.protocol.packets.ClientSidePacket;
import mc.protocol.packets.Packet;
import mc.protocol.packets.PingPacket;
import mc.protocol.packets.ServerSidePacket;
import mc.protocol.packets.client.*;
import mc.protocol.packets.server.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@RequiredArgsConstructor
@@ -85,9 +81,6 @@ public enum State {
private final Map<Integer, Class<? extends ClientSidePacket>> clientSidePackets;
private final Map<Class<? extends ServerSidePacket>, Integer> serverSidePackets;
@SuppressWarnings("rawtypes")
private final Map<Class<? extends ClientSidePacket>, Sinks.Many<ConnectionContext>> observedMap = new HashMap<>();
State(int id, Map<Integer, Class<? extends ClientSidePacket>> clientSidePackets) {
this.id = id;
this.clientSidePackets = clientSidePackets;
@@ -103,16 +96,4 @@ public enum State {
public Integer getServerSidePacketId(Class<? extends Packet> clazz) {
return serverSidePackets == null ? null : serverSidePackets.get(clazz);
}
@SuppressWarnings("rawtypes")
public <P extends ClientSidePacket> Sinks.Many<ConnectionContext> getPacketSinks(Class<P> packetClass) {
return observedMap.get(packetClass);
}
@SuppressWarnings("unchecked")
public <P extends ClientSidePacket> Flux<ConnectionContext<P>> packetFlux(Class<P> packetClass) {
return observedMap.computeIfAbsent(packetClass, aClass -> Sinks.many().multicast().directBestEffort())
.asFlux().map(ConnectionContext.class::cast);
}
}

View File

@@ -1,5 +1,9 @@
package mc.protocol.api;
import mc.protocol.State;
import mc.protocol.packets.ClientSidePacket;
import mc.protocol.utils.EventBus;
import java.util.function.Consumer;
public interface Server {
@@ -8,4 +12,6 @@ public interface Server {
void onNewConnect(Consumer<ConnectionContext<?>> consumer);
void onDisonnect(Consumer<ConnectionContext<?>> consumer);
<P extends ClientSidePacket> void listenPacket(State state, Class<P> packetClass, EventBus.EventHandler<P> eventHandler);
}

View File

@@ -7,8 +7,10 @@ import mc.protocol.State;
import mc.protocol.api.Server;
import mc.protocol.packets.ClientSidePacket;
import mc.protocol.packets.UnknownPacket;
import mc.protocol.utils.EventBus;
import mc.protocol.utils.PacketFactory;
import mc.protocol.utils.PacketPool;
import mc.protocol.utils.SimpleEventBus;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
@@ -21,8 +23,8 @@ public class ProtocolModule {
@Provides
@ServerScope
Server provideServer(PacketPool packetPool) {
return new NettyServer(packetPool);
Server provideServer(PacketPool packetPool, EventBus eventBus) {
return new NettyServer(packetPool, eventBus);
}
@Provides
@@ -39,4 +41,10 @@ public class ProtocolModule {
return new PacketPool(map);
}
@Provides
@ServerScope
EventBus provideEventBus() {
return new SimpleEventBus();
}
}

View File

@@ -0,0 +1,17 @@
package mc.protocol.utils;
import mc.protocol.State;
import mc.protocol.api.ConnectionContext;
import mc.protocol.packets.ClientSidePacket;
public interface EventBus {
<P extends ClientSidePacket> void subscribe(State state, Class<P> packetClass, EventHandler<P> eventHandler);
<P extends ClientSidePacket> void emit(State state, ConnectionContext<P> channelContext);
@FunctionalInterface
interface EventHandler<P extends ClientSidePacket> {
void handle(ConnectionContext<P> channelContext);
}
}

View File

@@ -0,0 +1,25 @@
package mc.protocol.utils;
import mc.protocol.State;
import mc.protocol.api.ConnectionContext;
import mc.protocol.packets.ClientSidePacket;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class SimpleEventBus implements EventBus {
private final Table<State, Class<? extends ClientSidePacket>, EventHandler> table = new Table<>();
@Override
public <P extends ClientSidePacket> void subscribe(State state, Class<P> packetClass, EventHandler<P> eventHandler) {
table.put(state, packetClass, eventHandler);
}
@Override
public <P extends ClientSidePacket> void emit(State state, ConnectionContext<P> channelContext) {
EventHandler eventHandler = table.getColumnAndRow(state, channelContext.clientPacket().getClass());
if (eventHandler != null) {
eventHandler.handle(channelContext);
}
}
}

View File

@@ -0,0 +1,23 @@
package mc.protocol.utils;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
public class Table<C, R, V> {
private final Map<C, Map<R, V>> map = new HashMap<>();
@Nullable
public V getColumnAndRow(C column, R row) {
if (!map.containsKey(column)) {
return null;
}
return map.get(column).get(row);
}
public void put(C column, R row, V value) {
map.computeIfAbsent(column, c -> new HashMap<>()).put(row, value);
}
}