Archived
0

рефакторинг протокола

This commit is contained in:
2021-05-06 13:14:42 +03:00
parent 87dc18f009
commit 5f431ff138
13 changed files with 200 additions and 120 deletions

View File

@@ -1,8 +1,9 @@
apply from: rootDir.toPath().resolve('logic.gradle').toFile()
dependencies {
api libs.netty
api libs.reactor
implementation libs.netty
implementation libs.json
testImplementation libs.lang3

View File

@@ -1,20 +0,0 @@
package mc.protocol;
import io.netty.channel.ChannelHandlerContext;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import mc.protocol.packets.ClientSidePacket;
@RequiredArgsConstructor
public class ChannelContext<P extends ClientSidePacket> {
@Getter
private final ChannelHandlerContext ctx;
@Getter
private final P packet;
public void setState(State state) {
ctx.channel().attr(NetworkAttributes.STATE).set(state);
}
}

View File

@@ -0,0 +1,49 @@
package mc.protocol;
import io.netty.channel.ChannelHandlerContext;
import lombok.RequiredArgsConstructor;
import mc.protocol.api.ConnectionContext;
import mc.protocol.packets.ClientSidePacket;
import mc.protocol.packets.ServerSidePacket;
@RequiredArgsConstructor
public class NettyConnectionContext<P extends ClientSidePacket> implements ConnectionContext<P> {
private final ChannelHandlerContext ctx;
private final P packet;
@Override
public State getState() {
return ctx.channel().attr(NetworkAttributes.STATE).get();
}
@Override
public void setState(State state) {
ctx.channel().attr(NetworkAttributes.STATE).set(state);
}
@Override
public P clientPacket() {
return packet;
}
@Override
public void send(ServerSidePacket packet) {
ctx.write(packet);
}
@Override
public void sendNow(ServerSidePacket packet) {
ctx.writeAndFlush(packet);
}
@Override
public void flushSending() {
ctx.flush();
}
@Override
public void disconnect() {
ctx.disconnect();
}
}

View File

@@ -1,22 +1,40 @@
package mc.protocol;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mc.protocol.di.DaggerProtocolComponent;
import mc.protocol.di.ProtocolComponent;
import mc.protocol.api.ConnectionContext;
import mc.protocol.api.Server;
import mc.protocol.io.codec.ProtocolDecoder;
import mc.protocol.io.codec.ProtocolEncoder;
import mc.protocol.io.codec.ProtocolSplitter;
import javax.annotation.Nonnull;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Consumer;
@Slf4j
@RequiredArgsConstructor
public class NettyServer {
public class NettyServer implements Server {
private final ServerBootstrap serverBootstrap;
private Consumer<ConnectionContext<?>> consumerNewConnection;
private Consumer<ConnectionContext<?>> consumerDisconnect;
@Override
public void bind(String host, int port) {
log.info("Network starting: {}:{}", host, port);
try {
serverBootstrap.bind(host, port).sync().channel().closeFuture().sync();
createServerBootstrap().bind(host, port).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
if (log.isTraceEnabled()) {
log.trace("{}: {}", e.getClass().getSimpleName(), e.getMessage(), e);
@@ -24,8 +42,45 @@ public class NettyServer {
}
}
public static NettyServer createServer() {
ProtocolComponent component = DaggerProtocolComponent.create();
return component.getNettyServer();
@Override
public void onNewConnect(Consumer<ConnectionContext<?>> consumer) {
this.consumerNewConnection = consumer;
}
@Override
public void onDisonnect(Consumer<ConnectionContext<?>> consumer) {
this.consumerDisconnect = consumer;
}
private ServerBootstrap createServerBootstrap() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(createChannelChannelInitializer());
return bootstrap;
}
private ChannelInitializer<SocketChannel> createChannelChannelInitializer() {
return new ChannelInitializer<>() {
@Override
protected void initChannel(@Nonnull SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
createChannelHandlerMap().forEach(pipeline::addLast);
}
};
}
private Map<String, ChannelHandler> createChannelHandlerMap() {
Map<String, ChannelHandler> map = new LinkedHashMap<>();
map.put("packet_splitter", new ProtocolSplitter());
map.put("logger", new LoggingHandler(LogLevel.DEBUG));
map.put("packet_decoder", new ProtocolDecoder(true, consumerNewConnection, consumerDisconnect));
map.put("packet_encoder", new ProtocolEncoder());
map.put("packet_handler", new PacketInboundHandler());
return map;
}
}

View File

@@ -3,6 +3,7 @@ package mc.protocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.RequiredArgsConstructor;
import mc.protocol.api.ConnectionContext;
import mc.protocol.packets.ClientSidePacket;
import reactor.core.publisher.Sinks;
@@ -12,11 +13,11 @@ public class PacketInboundHandler extends SimpleChannelInboundHandler<ClientSide
@SuppressWarnings("rawtypes")
@Override
protected void channelRead0(ChannelHandlerContext ctx, ClientSidePacket packet) {
Sinks.Many<ChannelContext> packetSinks = ctx.channel().attr(NetworkAttributes.STATE)
Sinks.Many<ConnectionContext> packetSinks = ctx.channel().attr(NetworkAttributes.STATE)
.get().getPacketSinks(packet.getClass());
if (packetSinks != null) {
packetSinks.tryEmitNext(new ChannelContext<>(ctx, packet));
packetSinks.tryEmitNext(new NettyConnectionContext<>(ctx, packet));
}
}
}

View File

@@ -2,6 +2,7 @@ package mc.protocol;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import mc.protocol.api.ConnectionContext;
import mc.protocol.packets.ClientSidePacket;
import mc.protocol.packets.Packet;
import mc.protocol.packets.PingPacket;
@@ -84,7 +85,7 @@ public enum State {
private final Map<Class<? extends ServerSidePacket>, Integer> serverSidePackets;
@SuppressWarnings("rawtypes")
private final Map<Class<? extends ClientSidePacket>, Sinks.Many<ChannelContext>> observedMap = new HashMap<>();
private final Map<Class<? extends ClientSidePacket>, Sinks.Many<ConnectionContext>> observedMap = new HashMap<>();
State(int id, Map<Integer, Class<? extends ClientSidePacket>> clientSidePackets) {
this.id = id;
@@ -104,13 +105,13 @@ public enum State {
@SuppressWarnings("rawtypes")
public <P extends ClientSidePacket> Sinks.Many<ChannelContext> getPacketSinks(Class<P> packetClass) {
public <P extends ClientSidePacket> Sinks.Many<ConnectionContext> getPacketSinks(Class<P> packetClass) {
return observedMap.get(packetClass);
}
@SuppressWarnings("unchecked")
public <P extends ClientSidePacket> Flux<ChannelContext<P>> packetFlux(Class<P> packetClass) {
public <P extends ClientSidePacket> Flux<ConnectionContext<P>> packetFlux(Class<P> packetClass) {
return observedMap.computeIfAbsent(packetClass, aClass -> Sinks.many().multicast().directBestEffort())
.asFlux().map(ChannelContext.class::cast);
.asFlux().map(ConnectionContext.class::cast);
}
}

View File

@@ -0,0 +1,19 @@
package mc.protocol.api;
import mc.protocol.State;
import mc.protocol.packets.ClientSidePacket;
import mc.protocol.packets.ServerSidePacket;
public interface ConnectionContext<P extends ClientSidePacket> {
State getState();
void setState(State state);
P clientPacket();
void send(ServerSidePacket packet);
void sendNow(ServerSidePacket packet);
void flushSending();
void disconnect();
}

View File

@@ -0,0 +1,11 @@
package mc.protocol.api;
import java.util.function.Consumer;
public interface Server {
void bind(String host, int port);
void onNewConnect(Consumer<ConnectionContext<?>> consumer);
void onDisonnect(Consumer<ConnectionContext<?>> consumer);
}

View File

@@ -1,11 +1,11 @@
package mc.protocol.di;
import dagger.Component;
import mc.protocol.NettyServer;
import mc.protocol.api.Server;
@Component(modules = ProtocolModule.class)
@ServerScope
public interface ProtocolComponent {
NettyServer getNettyServer();
Server getServer();
}

View File

@@ -2,68 +2,16 @@ package mc.protocol.di;
import dagger.Module;
import dagger.Provides;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import mc.protocol.NettyServer;
import mc.protocol.PacketInboundHandler;
import mc.protocol.io.codec.ProtocolDecoder;
import mc.protocol.io.codec.ProtocolEncoder;
import mc.protocol.io.codec.ProtocolSplitter;
import javax.annotation.Nonnull;
import javax.inject.Provider;
import java.util.LinkedHashMap;
import java.util.Map;
import mc.protocol.api.Server;
@Module
public class ProtocolModule {
@Provides
NettyServer provideServer(ServerBootstrap serverBootstrap) {
return new NettyServer(serverBootstrap);
@ServerScope
Server provideServer() {
return new NettyServer();
}
@Provides
ServerBootstrap provideServerBootstrap(ChannelInitializer<SocketChannel> channelChannelInitializer) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(channelChannelInitializer);
return bootstrap;
}
@Provides
ChannelInitializer<SocketChannel> provideChannelChannelInitializer(
Provider<Map<String, ChannelHandler>> channelHandlerMapProvider) {
return new ChannelInitializer<>() {
@Override
protected void initChannel(@Nonnull SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
channelHandlerMapProvider.get().forEach(pipeline::addLast);
}
};
}
@Provides
Map<String, ChannelHandler> provideChannelHandlerMap() {
Map<String, ChannelHandler> map = new LinkedHashMap<>();
map.put("packet_splitter", new ProtocolSplitter());
map.put("logger", new LoggingHandler(LogLevel.DEBUG));
map.put("packet_decoder", new ProtocolDecoder(true));
map.put("packet_encoder", new ProtocolEncoder());
map.put("packet_handler", new PacketInboundHandler());
return map;
}
}

View File

@@ -5,30 +5,36 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mc.protocol.NettyConnectionContext;
import mc.protocol.NetworkAttributes;
import mc.protocol.State;
import mc.protocol.api.ConnectionContext;
import mc.protocol.io.NetByteBuf;
import mc.protocol.packets.ClientSidePacket;
import mc.protocol.packets.UnknownPacket;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
@Slf4j
@RequiredArgsConstructor
public class ProtocolDecoder extends ByteToMessageDecoder {
private final boolean readUnknownPackets;
private final Consumer<ConnectionContext<?>> consumerNewConnection;
private final Consumer<ConnectionContext<?>> consumerDisconnect;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().attr(NetworkAttributes.STATE).set(State.HANDSHAKING);
public void channelActive(@Nonnull ChannelHandlerContext ctx) throws Exception {
consumerNewConnection.accept(new NettyConnectionContext<>(ctx, null));
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().attr(NetworkAttributes.STATE).set(null);
public void channelInactive(@Nonnull ChannelHandlerContext ctx) throws Exception {
consumerDisconnect.accept(new NettyConnectionContext<>(ctx, null));
super.channelInactive(ctx);
}