diff --git a/libs.gradle b/libs.gradle
index 691322b..402450a 100644
--- a/libs.gradle
+++ b/libs.gradle
@@ -19,7 +19,8 @@ ext {
yaml : 'org.yaml:snakeyaml:1.28',
json : 'com.eclipsesource.minimal-json:minimal-json:0.9.5',
ioutils : 'commons-io:commons-io:2.6',
- jopt : 'net.sf.jopt-simple:jopt-simple:6.0-alpha-3'
+ jopt : 'net.sf.jopt-simple:jopt-simple:6.0-alpha-3',
+ objpool : 'org.apache.commons:commons-pool2:2.9.0'
]
libs.logger = [
diff --git a/protocol/build.gradle b/protocol/build.gradle
index 03ac861..0e4b3c6 100644
--- a/protocol/build.gradle
+++ b/protocol/build.gradle
@@ -1,9 +1,9 @@
apply from: rootDir.toPath().resolve('logic.gradle').toFile()
dependencies {
- api libs.netty
- api libs.reactor
+ implementation libs.netty
implementation libs.json
+ implementation libs.objpool
testImplementation libs.lang3
}
diff --git a/protocol/src/main/java/mc/protocol/ChannelContext.java b/protocol/src/main/java/mc/protocol/ChannelContext.java
deleted file mode 100644
index 51d7a89..0000000
--- a/protocol/src/main/java/mc/protocol/ChannelContext.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package mc.protocol;
-
-import io.netty.channel.ChannelHandlerContext;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import mc.protocol.packets.ClientSidePacket;
-
-@RequiredArgsConstructor
-public class ChannelContext
{
-
- @Getter
- private final ChannelHandlerContext ctx;
-
- @Getter
- private final P packet;
-
- public void setState(State state) {
- ctx.channel().attr(NetworkAttributes.STATE).set(state);
- }
-}
diff --git a/protocol/src/main/java/mc/protocol/NettyConnectionContext.java b/protocol/src/main/java/mc/protocol/NettyConnectionContext.java
new file mode 100644
index 0000000..23fe235
--- /dev/null
+++ b/protocol/src/main/java/mc/protocol/NettyConnectionContext.java
@@ -0,0 +1,52 @@
+package mc.protocol;
+
+import io.netty.channel.ChannelHandlerContext;
+import lombok.EqualsAndHashCode;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import mc.protocol.api.ConnectionContext;
+import mc.protocol.packets.ServerSidePacket;
+import mc.protocol.pool.Passivable;
+
+@EqualsAndHashCode
+public class NettyConnectionContext implements ConnectionContext, Passivable {
+
+ @Accessors(chain = true)
+ @Setter
+ private ChannelHandlerContext ctx;
+
+ @Override
+ public State getState() {
+ return ctx.channel().attr(NetworkAttributes.STATE).get();
+ }
+
+ @Override
+ public void setState(State state) {
+ ctx.channel().attr(NetworkAttributes.STATE).set(state);
+ }
+
+ @Override
+ public void send(ServerSidePacket packet) {
+ ctx.write(packet);
+ }
+
+ @Override
+ public void sendNow(ServerSidePacket packet) {
+ ctx.writeAndFlush(packet);
+ }
+
+ @Override
+ public void flushSending() {
+ ctx.flush();
+ }
+
+ @Override
+ public void disconnect() {
+ ctx.disconnect();
+ }
+
+ @Override
+ public void passivate() {
+ this.ctx = null;
+ }
+}
diff --git a/protocol/src/main/java/mc/protocol/NettyServer.java b/protocol/src/main/java/mc/protocol/NettyServer.java
index 2c92645..70ff2cb 100644
--- a/protocol/src/main/java/mc/protocol/NettyServer.java
+++ b/protocol/src/main/java/mc/protocol/NettyServer.java
@@ -1,22 +1,46 @@
package mc.protocol;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import mc.protocol.di.DaggerProtocolComponent;
-import mc.protocol.di.ProtocolComponent;
+import mc.protocol.api.ConnectionContext;
+import mc.protocol.api.Server;
+import mc.protocol.event.EventBus;
+import mc.protocol.io.codec.ProtocolDecoder;
+import mc.protocol.io.codec.ProtocolEncoder;
+import mc.protocol.io.codec.ProtocolSplitter;
+import mc.protocol.packets.ClientSidePacket;
+
+import javax.annotation.Nonnull;
+import javax.inject.Provider;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.Consumer;
@Slf4j
@RequiredArgsConstructor
-public class NettyServer {
+public class NettyServer implements Server {
- private final ServerBootstrap serverBootstrap;
+ private final Provider protocolDecoderProvider;
+ private final Provider packetInboundHandlerProvider;
+ private final EventBus eventBus;
+ private Consumer consumerNewConnection;
+ private Consumer consumerDisconnect;
+ @Override
public void bind(String host, int port) {
log.info("Network starting: {}:{}", host, port);
try {
- serverBootstrap.bind(host, port).sync().channel().closeFuture().sync();
+ createServerBootstrap().bind(host, port).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
if (log.isTraceEnabled()) {
log.trace("{}: {}", e.getClass().getSimpleName(), e.getMessage(), e);
@@ -24,8 +48,56 @@ public class NettyServer {
}
}
- public static NettyServer createServer() {
- ProtocolComponent component = DaggerProtocolComponent.create();
- return component.getNettyServer();
+ @Override
+ public void onNewConnect(Consumer consumer) {
+ this.consumerNewConnection = consumer;
+ }
+
+ @Override
+ public void onDisonnect(Consumer consumer) {
+ this.consumerDisconnect = consumer;
+ }
+
+ @Override
+ @SuppressWarnings("java:S2326") // Сонар, ты бредишь
+ public void listenPacket(State state, Class
packetClass, EventBus.EventHandler
eventHandler) {
+ this.eventBus.subscribe(state, packetClass, eventHandler);
+ }
+
+ private ServerBootstrap createServerBootstrap() {
+ ServerBootstrap bootstrap = new ServerBootstrap();
+
+ bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup())
+ .channel(NioServerSocketChannel.class)
+ .childHandler(createChannelChannelInitializer());
+
+ return bootstrap;
+ }
+
+ private ChannelInitializer createChannelChannelInitializer() {
+ return new ChannelInitializer<>() {
+ @Override
+ protected void initChannel(@Nonnull SocketChannel socketChannel) {
+ ChannelPipeline pipeline = socketChannel.pipeline();
+ createChannelHandlerMap().forEach(pipeline::addLast);
+ }
+ };
+ }
+
+ private Map createChannelHandlerMap() {
+ Map map = new LinkedHashMap<>();
+
+ map.put("packet_splitter", new ProtocolSplitter());
+ map.put("logger", new LoggingHandler(LogLevel.DEBUG));
+
+ ProtocolDecoder protocolDecoder = protocolDecoderProvider.get();
+ protocolDecoder.setConsumerNewConnection(consumerNewConnection);
+ protocolDecoder.setConsumerDisconnect(consumerDisconnect);
+ map.put("packet_decoder", protocolDecoder);
+
+ map.put("packet_encoder", new ProtocolEncoder());
+ map.put("packet_handler", packetInboundHandlerProvider.get());
+
+ return map;
}
}
diff --git a/protocol/src/main/java/mc/protocol/PacketInboundHandler.java b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java
index a99fb4d..81a3e0d 100644
--- a/protocol/src/main/java/mc/protocol/PacketInboundHandler.java
+++ b/protocol/src/main/java/mc/protocol/PacketInboundHandler.java
@@ -4,19 +4,25 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.RequiredArgsConstructor;
import mc.protocol.packets.ClientSidePacket;
-import reactor.core.publisher.Sinks;
+import mc.protocol.event.EventBus;
+import mc.protocol.pool.PacketPool;
+import org.apache.commons.pool2.ObjectPool;
@RequiredArgsConstructor
public class PacketInboundHandler extends SimpleChannelInboundHandler {
- @SuppressWarnings("rawtypes")
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ClientSidePacket packet) {
- Sinks.Many packetSinks = ctx.channel().attr(NetworkAttributes.STATE)
- .get().getPacketSinks(packet.getClass());
+ private final ObjectPool poolNettyConnectionContext;
+ private final PacketPool poolPackets;
+ private final EventBus eventBus;
- if (packetSinks != null) {
- packetSinks.tryEmitNext(new ChannelContext<>(ctx, packet));
- }
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ClientSidePacket packet) throws Exception {
+ State state = ctx.channel().attr(NetworkAttributes.STATE).get();
+
+ NettyConnectionContext context = poolNettyConnectionContext.borrowObject().setCtx(ctx);
+ eventBus.emit(state, context, packet);
+
+ poolNettyConnectionContext.returnObject(context);
+ poolPackets.returnObject(packet);
}
}
diff --git a/protocol/src/main/java/mc/protocol/State.java b/protocol/src/main/java/mc/protocol/State.java
index 180f4d7..6159b43 100644
--- a/protocol/src/main/java/mc/protocol/State.java
+++ b/protocol/src/main/java/mc/protocol/State.java
@@ -8,12 +8,9 @@ import mc.protocol.packets.PingPacket;
import mc.protocol.packets.ServerSidePacket;
import mc.protocol.packets.client.*;
import mc.protocol.packets.server.*;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Sinks;
import javax.annotation.Nullable;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
@RequiredArgsConstructor
@@ -80,12 +77,10 @@ public enum State {
@Getter
private final int id;
+ @Getter
private final Map> clientSidePackets;
private final Map, Integer> serverSidePackets;
- @SuppressWarnings("rawtypes")
- private final Map, Sinks.Many> observedMap = new HashMap<>();
-
State(int id, Map> clientSidePackets) {
this.id = id;
this.clientSidePackets = clientSidePackets;
@@ -101,16 +96,4 @@ public enum State {
public Integer getServerSidePacketId(Class extends Packet> clazz) {
return serverSidePackets == null ? null : serverSidePackets.get(clazz);
}
-
-
- @SuppressWarnings("rawtypes")
- public Sinks.Many getPacketSinks(Class packetClass) {
- return observedMap.get(packetClass);
- }
-
- @SuppressWarnings("unchecked")
- public
Flux> packetFlux(Class packetClass) {
- return observedMap.computeIfAbsent(packetClass, aClass -> Sinks.many().multicast().directBestEffort())
- .asFlux().map(ChannelContext.class::cast);
- }
}
diff --git a/protocol/src/main/java/mc/protocol/api/ConnectionContext.java b/protocol/src/main/java/mc/protocol/api/ConnectionContext.java
new file mode 100644
index 0000000..5cf5c3d
--- /dev/null
+++ b/protocol/src/main/java/mc/protocol/api/ConnectionContext.java
@@ -0,0 +1,16 @@
+package mc.protocol.api;
+
+import mc.protocol.State;
+import mc.protocol.packets.ServerSidePacket;
+
+public interface ConnectionContext {
+
+ State getState();
+ void setState(State state);
+
+ void send(ServerSidePacket packet);
+ void sendNow(ServerSidePacket packet);
+ void flushSending();
+
+ void disconnect();
+}
diff --git a/protocol/src/main/java/mc/protocol/api/Server.java b/protocol/src/main/java/mc/protocol/api/Server.java
new file mode 100644
index 0000000..93b213f
--- /dev/null
+++ b/protocol/src/main/java/mc/protocol/api/Server.java
@@ -0,0 +1,18 @@
+package mc.protocol.api;
+
+import mc.protocol.State;
+import mc.protocol.packets.ClientSidePacket;
+import mc.protocol.event.EventBus;
+
+import java.util.function.Consumer;
+
+public interface Server {
+
+ void bind(String host, int port);
+
+ void onNewConnect(Consumer consumer);
+ void onDisonnect(Consumer consumer);
+
+ @SuppressWarnings("java:S2326") // Сонар, ты бредишь
+ void listenPacket(State state, Class
packetClass, EventBus.EventHandler
eventHandler);
+}
diff --git a/protocol/src/main/java/mc/protocol/di/PoolModule.java b/protocol/src/main/java/mc/protocol/di/PoolModule.java
new file mode 100644
index 0000000..756c6c2
--- /dev/null
+++ b/protocol/src/main/java/mc/protocol/di/PoolModule.java
@@ -0,0 +1,42 @@
+package mc.protocol.di;
+
+import dagger.Module;
+import dagger.Provides;
+import mc.protocol.NettyConnectionContext;
+import mc.protocol.State;
+import mc.protocol.packets.ClientSidePacket;
+import mc.protocol.packets.UnknownPacket;
+import mc.protocol.pool.NettyConnectionContextFactory;
+import mc.protocol.pool.PacketFactory;
+import mc.protocol.pool.PacketPool;
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Module
+public class PoolModule {
+
+ @Provides
+ @ServerScope
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ PacketPool providePacketPool() {
+ Map, ObjectPool> map = Stream.of(State.values())
+ .flatMap(state -> state.getClientSidePackets().values().stream())
+ .distinct()
+ .collect(Collectors.toMap(
+ packetClass -> packetClass,
+ packetClass -> new GenericObjectPool(new PacketFactory<>(packetClass))));
+ map.put(UnknownPacket.class, new GenericObjectPool(new PacketFactory<>(UnknownPacket.class)));
+
+ return new PacketPool(map);
+ }
+
+ @Provides
+ @ServerScope
+ ObjectPool providePoolNettyConnectionContext() {
+ return new GenericObjectPool<>(new NettyConnectionContextFactory());
+ }
+}
diff --git a/protocol/src/main/java/mc/protocol/di/ProtocolComponent.java b/protocol/src/main/java/mc/protocol/di/ProtocolComponent.java
index a433211..da7725e 100644
--- a/protocol/src/main/java/mc/protocol/di/ProtocolComponent.java
+++ b/protocol/src/main/java/mc/protocol/di/ProtocolComponent.java
@@ -1,11 +1,14 @@
package mc.protocol.di;
import dagger.Component;
-import mc.protocol.NettyServer;
+import mc.protocol.api.Server;
-@Component(modules = ProtocolModule.class)
+@Component(modules = {
+ ProtocolModule.class,
+ PoolModule.class
+})
@ServerScope
public interface ProtocolComponent {
- NettyServer getNettyServer();
+ Server getServer();
}
diff --git a/protocol/src/main/java/mc/protocol/di/ProtocolModule.java b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java
index 3e8a8df..26de985 100644
--- a/protocol/src/main/java/mc/protocol/di/ProtocolModule.java
+++ b/protocol/src/main/java/mc/protocol/di/ProtocolModule.java
@@ -2,68 +2,58 @@ package mc.protocol.di;
import dagger.Module;
import dagger.Provides;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
+import lombok.RequiredArgsConstructor;
+import mc.protocol.NettyConnectionContext;
import mc.protocol.NettyServer;
import mc.protocol.PacketInboundHandler;
+import mc.protocol.api.Server;
+import mc.protocol.event.EventBus;
+import mc.protocol.event.SimpleEventBus;
import mc.protocol.io.codec.ProtocolDecoder;
-import mc.protocol.io.codec.ProtocolEncoder;
-import mc.protocol.io.codec.ProtocolSplitter;
+import mc.protocol.pool.PacketPool;
+import org.apache.commons.pool2.ObjectPool;
-import javax.annotation.Nonnull;
import javax.inject.Provider;
-import java.util.LinkedHashMap;
-import java.util.Map;
@Module
+@RequiredArgsConstructor
public class ProtocolModule {
+ private final boolean readUnknownPackets;
+
@Provides
- NettyServer provideServer(ServerBootstrap serverBootstrap) {
- return new NettyServer(serverBootstrap);
+ @ServerScope
+ Server provideServer(
+ Provider protocolDecoderProvider,
+ Provider packetInboundHandlerProvider,
+ EventBus eventBus
+ ) {
+ return new NettyServer(protocolDecoderProvider, packetInboundHandlerProvider, eventBus);
}
@Provides
- ServerBootstrap provideServerBootstrap(ChannelInitializer channelChannelInitializer) {
- ServerBootstrap bootstrap = new ServerBootstrap();
-
- bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup())
- .channel(NioServerSocketChannel.class)
- .childHandler(channelChannelInitializer);
-
- return bootstrap;
+ @ServerScope
+ ProtocolDecoder provideProtocolDecoder(
+ ObjectPool poolNettyConnectionContext,
+ PacketPool poolPackets
+ ) {
+ return new ProtocolDecoder(readUnknownPackets, poolNettyConnectionContext, poolPackets);
}
@Provides
- ChannelInitializer provideChannelChannelInitializer(
- Provider