Archived
0

создана абстракция над NettyServer

This commit is contained in:
2021-04-27 00:10:55 +03:00
parent eff6dcf148
commit 2a1992b6dd
19 changed files with 194 additions and 201 deletions

View File

@@ -13,6 +13,8 @@ ext {
annotations: 'com.google.code.findbugs:jsr305:3.0.2', annotations: 'com.google.code.findbugs:jsr305:3.0.2',
guava : 'com.google.guava:guava:30.1-jre', guava : 'com.google.guava:guava:30.1-jre',
lang3 : 'org.apache.commons:commons-lang3:3.11', lang3 : 'org.apache.commons:commons-lang3:3.11',
netty : 'io.netty:netty-all:4.1.22.Final',
reactor : 'io.projectreactor:reactor-core:3.4.5'
] ]
libs.logger = [ libs.logger = [

View File

@@ -1,7 +1,8 @@
apply from: rootDir.toPath().resolve('logic.gradle').toFile() apply from: rootDir.toPath().resolve('logic.gradle').toFile()
dependencies { dependencies {
implementation 'io.netty:netty-all:4.1.22.Final' implementation libs.netty
implementation libs.reactor
implementation libs.guava implementation libs.guava
testImplementation libs.lang3 testImplementation libs.lang3

View File

@@ -0,0 +1,20 @@
package mc.protocol;
import io.netty.channel.ChannelHandlerContext;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import mc.protocol.packets.Packet;
@RequiredArgsConstructor
public class ChannelContext<P extends Packet> {
@Getter
private final ChannelHandlerContext ctx;
@Getter
private final P packet;
public void setState(State state) {
ctx.channel().attr(NetworkAttributes.STATE).set(state);
}
}

View File

@@ -0,0 +1,43 @@
package mc.protocol;
import io.netty.bootstrap.ServerBootstrap;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mc.protocol.di.DaggerProtocolComponent;
import mc.protocol.di.ProtocolComponent;
import mc.protocol.packets.Packet;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.Map;
@SuppressWarnings("rawtypes")
@Slf4j
@RequiredArgsConstructor
public class NettyServer {
private final ServerBootstrap serverBootstrap;
private final Map<Class<? extends Packet>, Sinks.Many<ChannelContext>> observedMap;
public void bind(String host, int port) {
log.info("Network starting: {}:{}", host, port);
try {
serverBootstrap.bind(host, port).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
if (log.isTraceEnabled()) {
log.trace("{}: {}", e.getClass().getSimpleName(), e.getMessage(), e);
}
}
}
@SuppressWarnings("unchecked")
public <P extends Packet> Flux<ChannelContext<P>> packetFlux(Class<P> packetClass) {
return observedMap.get(packetClass).asFlux().map(ChannelContext.class::cast);
}
public static NettyServer createServer() {
ProtocolComponent component = DaggerProtocolComponent.create();
return component.getNettyServer();
}
}

View File

@@ -0,0 +1,21 @@
package mc.protocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.RequiredArgsConstructor;
import mc.protocol.packets.Packet;
import reactor.core.publisher.Sinks;
import java.util.Map;
@SuppressWarnings("rawtypes")
@RequiredArgsConstructor
public class PacketInboundHandler extends SimpleChannelInboundHandler<Packet> {
private final Map<Class<? extends Packet>, Sinks.Many<ChannelContext>> observedMap;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet packet) {
observedMap.get(packet.getClass()).tryEmitNext(new ChannelContext<>(ctx, packet));
}
}

View File

@@ -55,6 +55,7 @@ public enum State {
@Getter @Getter
private final int id; private final int id;
@Getter
private final BiMap<Integer, Class<? extends Packet>> serverBoundPackets; private final BiMap<Integer, Class<? extends Packet>> serverBoundPackets;
private final BiMap<Integer, Class<? extends Packet>> clientBoundPackets; private final BiMap<Integer, Class<? extends Packet>> clientBoundPackets;

View File

@@ -0,0 +1,11 @@
package mc.protocol.di;
import dagger.Component;
import mc.protocol.NettyServer;
@Component(modules = ProtocolModule.class)
@ServerScope
public interface ProtocolComponent {
NettyServer getNettyServer();
}

View File

@@ -1,5 +1,6 @@
package mc.server.di; package mc.protocol.di;
import com.google.common.collect.ImmutableMap;
import dagger.Module; import dagger.Module;
import dagger.Provides; import dagger.Provides;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
@@ -11,28 +12,29 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j; import mc.protocol.ChannelContext;
import mc.protocol.NettyServer;
import mc.protocol.PacketInboundHandler;
import mc.protocol.State;
import mc.protocol.io.codec.ProtocolDecoder; import mc.protocol.io.codec.ProtocolDecoder;
import mc.protocol.io.codec.ProtocolEncoder; import mc.protocol.io.codec.ProtocolEncoder;
import mc.protocol.io.codec.ProtocolSplitter; import mc.protocol.io.codec.ProtocolSplitter;
import mc.server.network.Server; import mc.protocol.packets.Packet;
import mc.server.network.netty.handler.HandshakeHandler; import reactor.core.publisher.Sinks;
import mc.server.network.netty.NettyServer;
import mc.server.network.netty.handler.KeepAliveHandler;
import mc.server.network.netty.handler.LoginHandler;
import mc.server.network.netty.handler.StatusHandler;
import javax.inject.Provider; import javax.inject.Provider;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.stream.Stream;
@Module @Module
@Slf4j public class ProtocolModule {
public class NetworkModule {
@SuppressWarnings("rawtypes")
@Provides @Provides
Server provideServer(ServerBootstrap serverBootstrap) { NettyServer provideServer(ServerBootstrap serverBootstrap,
return new NettyServer(serverBootstrap); Map<Class<? extends Packet>, Sinks.Many<ChannelContext>> observedMap) {
return new NettyServer(serverBootstrap, observedMap);
} }
@Provides @Provides
@@ -47,7 +49,9 @@ public class NetworkModule {
} }
@Provides @Provides
ChannelInitializer<SocketChannel> provideChannelChannelInitializer(Provider<Map<String, ChannelHandler>> channelHandlerMapProvider) { ChannelInitializer<SocketChannel> provideChannelChannelInitializer(
Provider<Map<String, ChannelHandler>> channelHandlerMapProvider) {
return new ChannelInitializer<>() { return new ChannelInitializer<>() {
@Override @Override
protected void initChannel(SocketChannel socketChannel) { protected void initChannel(SocketChannel socketChannel) {
@@ -57,36 +61,32 @@ public class NetworkModule {
}; };
} }
@SuppressWarnings("rawtypes")
@Provides @Provides
Map<String, ChannelHandler> provideChannelHandlerMap( Map<String, ChannelHandler> provideChannelHandlerMap(
Provider<StatusHandler> statusHandlerProvider, Map<Class<? extends Packet>, Sinks.Many<ChannelContext>> observedMap) {
Provider<LoginHandler> loginHandlerProvider,
Provider<KeepAliveHandler> keepAliveHandlerProvider
) {
Map<String, ChannelHandler> map = new LinkedHashMap<>(); Map<String, ChannelHandler> map = new LinkedHashMap<>();
map.put("packet_splitter", new ProtocolSplitter());
map.put("logger", new LoggingHandler(LogLevel.DEBUG)); map.put("logger", new LoggingHandler(LogLevel.DEBUG));
map.put("protocol_splitter", new ProtocolSplitter()); map.put("packet_decoder", new ProtocolDecoder(true));
map.put("protocol_decoder", new ProtocolDecoder(true)); map.put("packet_encoder", new ProtocolEncoder());
map.put("protocol_encoder", new ProtocolEncoder()); map.put("packet_handler", new PacketInboundHandler(observedMap));
map.put("handshake_handler", new HandshakeHandler(
statusHandlerProvider, loginHandlerProvider, keepAliveHandlerProvider));
return map; return map;
} }
@SuppressWarnings("rawtypes")
@Provides @Provides
StatusHandler provideStatusHandler() { @ServerScope
return new StatusHandler(); Map<Class<? extends Packet>, Sinks.Many<ChannelContext>> provideObservedMap() {
} ImmutableMap.Builder<Class<? extends Packet>, Sinks.Many<ChannelContext>> builder = ImmutableMap.builder();
@Provides Stream.of(State.values())
LoginHandler provideLoginHandler() { .flatMap(state -> state.getServerBoundPackets().values().stream())
return new LoginHandler(); .forEach(packetClass -> builder.put(packetClass, Sinks.many().multicast().directBestEffort()));
}
@Provides return builder.build();
KeepAliveHandler provideKeepAliveHandler() {
return new KeepAliveHandler();
} }
} }

View File

@@ -0,0 +1,10 @@
package mc.protocol.di;
import javax.inject.Scope;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Scope
@Retention(RetentionPolicy.RUNTIME)
public @interface ServerScope {
}

View File

@@ -15,9 +15,7 @@ dependencies {
implementation libs.logger.logback implementation libs.logger.logback
implementation platform('io.projectreactor:reactor-bom:2020.0.6') implementation libs.netty
implementation 'io.projectreactor:reactor-core' implementation libs.reactor
implementation 'io.netty:netty-all:4.1.22.Final'
implementation libs.guava implementation libs.guava
} }

View File

@@ -1,9 +1,13 @@
package mc.server; package mc.server;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import mc.server.di.DaggerNetworkComponent; import mc.protocol.NettyServer;
import mc.server.di.NetworkComponent; import mc.protocol.packets.PingPacket;
import mc.server.network.Server; import mc.protocol.packets.client.HandshakePacket;
import mc.protocol.packets.client.LoginStartPacket;
import mc.protocol.packets.client.StatusServerRequest;
import mc.protocol.packets.server.DisconnectPacket;
import mc.protocol.packets.server.StatusServerResponse;
@Slf4j @Slf4j
public class Main { public class Main {
@@ -11,8 +15,49 @@ public class Main {
public static void main(String[] args) { public static void main(String[] args) {
log.info("hello"); log.info("hello");
NetworkComponent networkComponent = DaggerNetworkComponent.create(); NettyServer server = NettyServer.createServer();
Server server = networkComponent.getServer();
server.packetFlux(HandshakePacket.class)
.doOnNext(channel -> log.info("{}", channel.getPacket()))
.subscribe(channel -> channel.setState(channel.getPacket().getNextState()));
server.packetFlux(PingPacket.class)
.doOnNext(channel -> log.info("{}", channel.getPacket()))
.subscribe(channel -> channel.getCtx().writeAndFlush(channel.getPacket()).channel().disconnect());
server.packetFlux(StatusServerRequest.class)
.doOnNext(channel -> log.info("{}", channel.getPacket()))
.subscribe(channel -> {
StatusServerResponse response = new StatusServerResponse();
response.setInfo("{\n" +
" \"version\": {\n" +
" \"name\": \"1.12.2\",\n" +
" \"protocol\": 340\n" +
" },\n" +
" \"players\": {\n" +
" \"max\": 0,\n" +
" \"online\": 0,\n" +
" \"sample\": []\n" +
" },\n" +
" \"description\": {\n" +
" \"text\": \"Hello world\"\n" +
" }\n" +
"}");
channel.getCtx().writeAndFlush(response);
});
server.packetFlux(LoginStartPacket.class)
.doOnNext(channel -> log.info("{}", channel.getPacket()))
.subscribe(channel -> {
DisconnectPacket disconnectPacket = new DisconnectPacket();
disconnectPacket.setReason("{\n" +
" \"text\": \"Server is not available.\"\n" +
"}");
channel.getCtx().writeAndFlush(disconnectPacket).channel().disconnect();
});
server.bind("127.0.0.1", 25565); server.bind("127.0.0.1", 25565);
} }
} }

View File

@@ -1,10 +0,0 @@
package mc.server.di;
import dagger.Component;
import mc.server.network.Server;
@Component(modules = NetworkModule.class)
public interface NetworkComponent {
Server getServer();
}

View File

@@ -1,6 +0,0 @@
package mc.server.network;
public interface Server {
void bind(String host, int port);
}

View File

@@ -1,26 +0,0 @@
package mc.server.network.netty;
import io.netty.bootstrap.ServerBootstrap;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mc.server.network.Server;
@Slf4j
@RequiredArgsConstructor
public class NettyServer implements Server {
private final ServerBootstrap serverBootstrap;
@Override
public void bind(String host, int port) {
log.info("Network starting: {}:{}", host, port);
try {
serverBootstrap.bind(host, port).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
if (log.isTraceEnabled()) {
log.trace("{}: {}", e.getClass().getSimpleName(), e.getMessage(), e);
}
}
}
}

View File

@@ -1,17 +0,0 @@
package mc.server.network.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import mc.protocol.packets.Packet;
public abstract class AbstractPacketHandler<P extends Packet> extends SimpleChannelInboundHandler<Packet> {
@SuppressWarnings("unchecked")
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
channelRead1(ctx, (P) msg);
}
@SuppressWarnings("java:S112")
protected abstract void channelRead1(ChannelHandlerContext ctx, P packet) throws Exception;
}

View File

@@ -1,33 +0,0 @@
package mc.server.network.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mc.protocol.NetworkAttributes;
import mc.protocol.State;
import mc.protocol.packets.client.HandshakePacket;
import javax.inject.Provider;
@Slf4j
@RequiredArgsConstructor
public class HandshakeHandler extends AbstractPacketHandler<HandshakePacket> {
private final Provider<StatusHandler> statusHandlerProvider;
private final Provider<LoginHandler> loginHandlerProvider;
private final Provider<KeepAliveHandler> keepAliveHandlerProvider;
@Override
protected void channelRead1(ChannelHandlerContext ctx, HandshakePacket packet) {
log.info("{}", packet);
ctx.channel().attr(NetworkAttributes.STATE).set(packet.getNextState());
if (State.STATUS == packet.getNextState()) {
ctx.pipeline().replace("handshake_handler", "status_handler", statusHandlerProvider.get());
ctx.pipeline().addAfter("status_handler", "keepalive_handler", keepAliveHandlerProvider.get());
} else if (State.LOGIN == packet.getNextState()) {
ctx.channel().pipeline().replace("handshake_handler", "login_handler", loginHandlerProvider.get());
}
}
}

View File

@@ -1,16 +0,0 @@
package mc.server.network.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import mc.protocol.packets.PingPacket;
@Slf4j
public class KeepAliveHandler extends AbstractPacketHandler<PingPacket> {
@Override
protected void channelRead1(ChannelHandlerContext ctx, PingPacket packet) {
log.info("{}", packet);
ctx.writeAndFlush(packet).channel().disconnect();
}
}

View File

@@ -1,18 +0,0 @@
package mc.server.network.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import mc.protocol.packets.client.LoginStartPacket;
import mc.protocol.packets.server.DisconnectPacket;
public class LoginHandler extends AbstractPacketHandler<LoginStartPacket> {
@Override
protected void channelRead1(ChannelHandlerContext ctx, LoginStartPacket packet) {
DisconnectPacket disconnectPacket = new DisconnectPacket();
disconnectPacket.setReason("{\n" +
" \"text\": \"Server is not available.\"\n" +
"}");
ctx.channel().writeAndFlush(disconnectPacket).channel().disconnect();
}
}

View File

@@ -1,33 +0,0 @@
package mc.server.network.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import mc.protocol.packets.client.StatusServerRequest;
import mc.protocol.packets.server.StatusServerResponse;
@Slf4j
public class StatusHandler extends AbstractPacketHandler<StatusServerRequest> {
@Override
protected void channelRead1(ChannelHandlerContext ctx, StatusServerRequest packet) {
log.info("{}", packet);
StatusServerResponse response = new StatusServerResponse();
response.setInfo("{\n" +
" \"version\": {\n" +
" \"name\": \"1.12.2\",\n" +
" \"protocol\": 340\n" +
" },\n" +
" \"players\": {\n" +
" \"max\": 0,\n" +
" \"online\": 0,\n" +
" \"sample\": []\n" +
" },\n" +
" \"description\": {\n" +
" \"text\": \"Hello world\"\n" +
" }\n" +
"}");
ctx.channel().writeAndFlush(response);
}
}