Archived
0

3 Commits

Author SHA1 Message Date
b9400b110a [WIP] try RxJava 2020-05-21 01:21:38 +03:00
c9f396a999 [WIP] try RxJava 2020-05-21 00:51:24 +03:00
980088b157 разделение на два модуля: server-api и server 2020-05-18 11:37:17 +03:00
27 changed files with 270 additions and 147 deletions

View File

@@ -1,3 +1,4 @@
subprojects {
apply plugin: 'java' apply plugin: 'java'
project.group = projectGroup project.group = projectGroup
@@ -16,35 +17,20 @@ repositories {
ext { ext {
slf4j_version = '1.7.30' slf4j_version = '1.7.30'
logback_version = '1.2.3'
netty_version = '4.1.22.Final'
library = [ library = [
guice: ['com.google.inject:guice:4.1.0'], lombok: 'org.projectlombok:lombok:1.18.2',
logger: ["ch.qos.logback:logback-core:$logback_version",
"ch.qos.logback:logback-classic:$logback_version"],
lombok: ['org.projectlombok:lombok:1.18.2'],
netty: ["io.netty:netty-transport:$netty_version",
"io.netty:netty-codec:$netty_version",
"io.netty:netty-handler:$netty_version" /*for logger handler*/],
slf4j: ["org.slf4j:slf4j-api:$slf4j_version", slf4j: ["org.slf4j:slf4j-api:$slf4j_version",
"org.slf4j:jcl-over-slf4j:$slf4j_version"], "org.slf4j:jcl-over-slf4j:$slf4j_version"],
commons: ['commons-io:commons-io:2.6']
] ]
} }
dependencies { dependencies {
/* LOGGER */ /* LOGGER */
implementation library.slf4j implementation library.slf4j
implementation library.logger
/* LOMBOK */ /* LOMBOK */
annotationProcessor library.lombok annotationProcessor library.lombok
compileOnly library.lombok compileOnly library.lombok
}
/* COMPONENTS */
implementation library.guice
implementation library.netty
implementation library.commons
implementation project(':protocol')
} }

View File

@@ -1,6 +1,6 @@
# suppress inspection "UnusedProperty" for whole file # suppress inspection "UnusedProperty" for whole file
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5.1-all.zip distributionUrl=https\://services.gradle.org/distributions/gradle-5.3-all.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

18
server-api/build.gradle Normal file
View File

@@ -0,0 +1,18 @@
ext {
netty_version = '4.1.22.Final';
library = [
rxjava: ['io.reactivex.rxjava3:rxjava:3.0.3'],
netty: [
"io.netty:netty-transport:$netty_version"
]
]
}
dependencies {
/* COMPONENTS */
compile project(':protocol')
compile library.rxjava
implementation library.netty
}

View File

@@ -0,0 +1,12 @@
package mc.server.reactive;
import io.netty.channel.Channel;
import lombok.Data;
import mc.protocol.Packet;
@Data
public class NetworkContainer<T extends Packet> {
private final Channel channel;
private final T packet;
}

View File

@@ -0,0 +1,28 @@
package mc.server.reactive;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subjects.Subject;
import mc.protocol.Packet;
import java.util.HashMap;
import java.util.Map;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class RxProvider {
private final Map<Class, Subject> subjectMap = new HashMap<>();
public <T extends Packet> Observable<NetworkContainer<T>> getPacketObservable(Class<T> packetType) {
return giveSubject(packetType);
}
public Observer<NetworkContainer<? extends Packet>> getPacketObserver(Class<? extends Packet> packetType) {
return giveSubject(packetType);
}
private Subject giveSubject(Class<? extends Packet> packetType) {
return subjectMap.computeIfAbsent(packetType, key -> PublishSubject.create());
}
}

22
server/build.gradle Normal file
View File

@@ -0,0 +1,22 @@
ext {
logback_version = '1.2.3'
library = [
guice: ['com.google.inject:guice:4.1.0'],
logger: ["ch.qos.logback:logback-core:$logback_version",
"ch.qos.logback:logback-classic:$logback_version"],
netty: ['io.netty:netty-all:4.1.22.Final'],
commons: ['commons-io:commons-io:2.6']
]
}
dependencies {
/* LOGGER */
implementation library.logger
/* COMPONENTS */
implementation project(':server-api')
implementation library.guice
implementation library.netty
implementation library.commons
}

View File

@@ -2,8 +2,8 @@ package mc.server;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
import mc.server.config.NetworkModule;
import mc.server.network.Server; import mc.server.network.Server;
import mc.server.network.config.NetworkModule;
public class Main { public class Main {

View File

@@ -0,0 +1,39 @@
package mc.server.logic;
import com.google.inject.Inject;
import io.netty.channel.Channel;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mc.protocol.handshake.client.HandshakePacket;
import mc.server.network.impl.NettyConstants;
import mc.server.reactive.RxProvider;
@Slf4j
@RequiredArgsConstructor(onConstructor = @__({ @Inject }))
public class HandshakeLogic {
private final RxProvider rxProvider;
public void setup() {
log.info("HandshakeLogic setup");
rxProvider.getPacketObservable(HandshakePacket.class)
.observeOn(Schedulers.computation())
.doOnNext(container -> handle(container.getChannel(), container.getPacket()))
.subscribe();
}
private void handle(Channel channel, HandshakePacket packet) {
log.error("THREAD: {}|{}", Thread.currentThread().getId(), Thread.currentThread().getName());
log.info("HandshakePacket handle");
channel.attr(NettyConstants.ATTR_STATE).set(packet.getNextState());
// try {
// Thread.sleep(20_000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
}

View File

@@ -0,0 +1,41 @@
package mc.server.logic;
import com.google.inject.Inject;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mc.protocol.login.client.LoginStartPacket;
import mc.protocol.login.server.DisconnectPacket;
import mc.protocol.text.Text;
import mc.protocol.text.TextColor;
import mc.protocol.text.TextStyle;
import mc.server.reactive.NetworkContainer;
import mc.server.reactive.RxProvider;
@Slf4j
@RequiredArgsConstructor(onConstructor = @__({ @Inject }))
public class LoginLogic {
private final RxProvider rxProvider;
public void setup() {
log.info("LoginLogic setup");
rxProvider.getPacketObservable(LoginStartPacket.class)
.observeOn(Schedulers.computation())
.map(NetworkContainer::getChannel)
.doOnNext(channel -> {
log.error("THREAD: {}|{}", Thread.currentThread().getId(), Thread.currentThread().getName());
log.info("LoginLogic handle");
DisconnectPacket disconnectPacket = new DisconnectPacket();
disconnectPacket.setReason(Text.builder()
.append(Text.of(TextColor.WHITE, "Server is "))
.color(TextColor.RED).style(TextStyle.BOLD).append("NOT ")
.color(TextColor.WHITE).append("available.")
.build());
channel.writeAndFlush(disconnectPacket).channel().disconnect();
})
.subscribe();
}
}

View File

@@ -1,12 +1,17 @@
package mc.server.network.impl.handler; package mc.server.logic;
import io.netty.channel.ChannelHandlerContext; import com.google.inject.Inject;
import io.netty.channel.Channel;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import mc.protocol.ProtocolConstant; import mc.protocol.ProtocolConstant;
import mc.protocol.dto.ServerInfo; import mc.protocol.dto.ServerInfo;
import mc.protocol.status.PingPacket;
import mc.protocol.status.client.StatusServerRequest; import mc.protocol.status.client.StatusServerRequest;
import mc.protocol.status.server.StatusServerResponse; import mc.protocol.status.server.StatusServerResponse;
import mc.protocol.text.Text; import mc.protocol.text.Text;
import mc.server.reactive.RxProvider;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import java.io.IOException; import java.io.IOException;
@@ -14,12 +19,32 @@ import java.util.Base64;
import java.util.Collections; import java.util.Collections;
@Slf4j @Slf4j
public class StatusHandler extends AbstractPacketHandler<StatusServerRequest> { @RequiredArgsConstructor(onConstructor = @__({ @Inject }))
public class StatusLogic {
private static final String FAVICON_HEADER = "data:image/png;base64,"; private static final String FAVICON_HEADER = "data:image/png;base64,";
private final RxProvider rxProvider;
@Override public void setup() {
protected void channelRead1(ChannelHandlerContext ctx, StatusServerRequest packet) throws Exception { log.info("StatusLogic setup");
rxProvider.getPacketObservable(StatusServerRequest.class)
.observeOn(Schedulers.computation())
.doOnNext(container -> status(container.getChannel(), container.getPacket()))
.subscribe();
rxProvider.getPacketObservable(PingPacket.class)
.observeOn(Schedulers.computation())
.doOnNext(container -> {
log.error("THREAD: {}|{}", Thread.currentThread().getId(), Thread.currentThread().getName());
log.info("StatusLogic ping handle");
container.getChannel().writeAndFlush(container.getPacket()).channel().disconnect();
})
.subscribe();
}
private void status(Channel channel, StatusServerRequest packet) throws IOException {
log.error("THREAD: {}|{}", Thread.currentThread().getId(), Thread.currentThread().getName());
log.info("StatusLogic handle");
log.info("{}", packet); log.info("{}", packet);
final ServerInfo.Version version = new ServerInfo.Version(); final ServerInfo.Version version = new ServerInfo.Version();
@@ -40,7 +65,7 @@ public class StatusHandler extends AbstractPacketHandler<StatusServerRequest> {
StatusServerResponse response = new StatusServerResponse(); StatusServerResponse response = new StatusServerResponse();
response.setServerInfoDto(serverInfo); response.setServerInfoDto(serverInfo);
ctx.channel().writeAndFlush(response); channel.writeAndFlush(response);
} }
private String getEmbeddedIconBase64() throws IOException { private String getEmbeddedIconBase64() throws IOException {

View File

@@ -1,4 +1,4 @@
package mc.server.config; package mc.server.network.config;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Provides; import com.google.inject.Provides;
@@ -21,7 +21,8 @@ import mc.server.network.impl.NettyServer;
import mc.server.network.impl.codec.PacketDecoder; import mc.server.network.impl.codec.PacketDecoder;
import mc.server.network.impl.codec.PacketEncoder; import mc.server.network.impl.codec.PacketEncoder;
import mc.server.network.impl.codec.PacketSplitter; import mc.server.network.impl.codec.PacketSplitter;
import mc.server.network.impl.handler.HandshakeHandler; import mc.server.network.impl.handler.CommonPacketHandler;
import mc.server.reactive.RxProvider;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@@ -33,6 +34,7 @@ public class NetworkModule extends AbstractModule {
bind(Server.class).to(NettyServer.class).in(Singleton.class); bind(Server.class).to(NettyServer.class).in(Singleton.class);
bind(EventLoopGroup.class).annotatedWith(Names.named("bossGroup")).toInstance(new NioEventLoopGroup(1)); bind(EventLoopGroup.class).annotatedWith(Names.named("bossGroup")).toInstance(new NioEventLoopGroup(1));
bind(EventLoopGroup.class).annotatedWith(Names.named("workerGroup")).toInstance(new NioEventLoopGroup()); bind(EventLoopGroup.class).annotatedWith(Names.named("workerGroup")).toInstance(new NioEventLoopGroup());
bind(RxProvider.class).toInstance(new RxProvider());
} }
@Provides @Provides
@@ -51,14 +53,14 @@ public class NetworkModule extends AbstractModule {
@Provides @Provides
@Named("channelHandlerMap") @Named("channelHandlerMap")
Map<String, ChannelHandler> channelHandlerMap(HandshakeHandler handshakeHandler) { Map<String, ChannelHandler> channelHandlerMap(CommonPacketHandler commonPacketHandler) {
final Map<String, ChannelHandler> map = new LinkedHashMap<>(); final Map<String, ChannelHandler> map = new LinkedHashMap<>();
map.put("logger", new LoggingHandler()); map.put("logger", new LoggingHandler());
map.put("packet_splitter", new PacketSplitter(new ProtocolSplitter())); map.put("packet_splitter", new PacketSplitter(new ProtocolSplitter()));
map.put("packet_decoder", new PacketDecoder(new ProtocolDecoder(PacketDirection.SERVER_BOUND))); map.put("packet_decoder", new PacketDecoder(new ProtocolDecoder(PacketDirection.SERVER_BOUND)));
map.put("packet_encoder", new PacketEncoder(new ProtocolEncoder(PacketDirection.CLIENT_BOUND))); map.put("packet_encoder", new PacketEncoder(new ProtocolEncoder(PacketDirection.CLIENT_BOUND)));
map.put("handshake_handler", handshakeHandler); map.put("common_handler", commonPacketHandler);
return map; return map;
} }

View File

@@ -3,18 +3,28 @@ package mc.server.network.impl;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Provider; import com.google.inject.Provider;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import mc.server.logic.HandshakeLogic;
import mc.server.logic.LoginLogic;
import mc.server.logic.StatusLogic;
import mc.server.network.Server; import mc.server.network.Server;
@Slf4j @Slf4j
@RequiredArgsConstructor(onConstructor = @__({ @Inject }))
public class NettyServer implements Server { public class NettyServer implements Server {
@Inject private final Provider<ServerBootstrap> serverBootstrapProvider;
private Provider<ServerBootstrap> serverBootstrapProvider; private final HandshakeLogic handshakeLogic;
private final StatusLogic statusLogic;
private final LoginLogic loginLogic;
@Override @Override
public void start(String host, int port) { public void start(String host, int port) {
handshakeLogic.setup();
statusLogic.setup();
loginLogic.setup();
try { try {
log.info("Network starting: {}:{}", host, port); log.info("Network starting: {}:{}", host, port);
serverBootstrapProvider.get() serverBootstrapProvider.get()

View File

@@ -0,0 +1,21 @@
package mc.server.network.impl.handler;
import com.google.inject.Inject;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.RequiredArgsConstructor;
import mc.protocol.Packet;
import mc.server.reactive.NetworkContainer;
import mc.server.reactive.RxProvider;
@RequiredArgsConstructor(onConstructor = @__({ @Inject }))
public class CommonPacketHandler extends SimpleChannelInboundHandler<Packet> {
private final RxProvider rxProvider;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet packet) {
rxProvider.getPacketObserver(packet.getClass())
.onNext(new NetworkContainer<>(ctx.channel(), packet));
}
}

View File

Before

Width:  |  Height:  |  Size: 1.6 KiB

After

Width:  |  Height:  |  Size: 1.6 KiB

View File

@@ -2,3 +2,6 @@ rootProject.name = projectName
include ':protocol' include ':protocol'
project(':protocol').projectDir = new File(settingsDir, '../mc-protocol') project(':protocol').projectDir = new File(settingsDir, '../mc-protocol')
include ':server-api'
include ':server'

View File

@@ -1,15 +0,0 @@
package mc.server.network.impl.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import mc.protocol.Packet;
public abstract class AbstractPacketHandler<P extends Packet> extends SimpleChannelInboundHandler<P> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, P msg) throws Exception {
channelRead1(ctx, msg);
}
protected abstract void channelRead1(ChannelHandlerContext ctx, P packet) throws Exception;
}

View File

@@ -1,34 +0,0 @@
package mc.server.network.impl.handler;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.netty.channel.ChannelHandlerContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mc.protocol.State;
import mc.protocol.handshake.client.HandshakePacket;
import static mc.server.network.impl.NettyConstants.ATTR_STATE;
@Slf4j
@RequiredArgsConstructor(onConstructor = @__({ @Inject }))
public class HandshakeHandler extends AbstractPacketHandler<HandshakePacket> {
private final Provider<StatusHandler> statusHandlerProvider;
private final Provider<PingHandler> pingHandlerProvider;
private final Provider<LoginHandler> loginHandlerProvider;
@Override
protected void channelRead1(ChannelHandlerContext ctx, HandshakePacket packet) {
log.info("{}", packet);
ctx.channel().attr(ATTR_STATE).set(packet.getNextState());
if (packet.getNextState() == State.STATUS) {
ctx.channel().pipeline().replace("handshake_handler", "status_handler", statusHandlerProvider.get());
ctx.channel().pipeline().addAfter("status_handler", "ping_handler", pingHandlerProvider.get());
} else if (packet.getNextState() == State.LOGIN) {
ctx.channel().pipeline().replace("handshake_handler", "login_handler", loginHandlerProvider.get());
}
}
}

View File

@@ -1,23 +0,0 @@
package mc.server.network.impl.handler;
import io.netty.channel.ChannelHandlerContext;
import mc.protocol.login.client.LoginStartPacket;
import mc.protocol.login.server.DisconnectPacket;
import mc.protocol.text.Text;
import mc.protocol.text.TextColor;
import mc.protocol.text.TextStyle;
public class LoginHandler extends AbstractPacketHandler<LoginStartPacket> {
@Override
protected void channelRead1(ChannelHandlerContext ctx, LoginStartPacket packet) {
DisconnectPacket disconnectPacket = new DisconnectPacket();
disconnectPacket.setReason(Text.builder()
.append(Text.of(TextColor.WHITE, "Server is "))
.color(TextColor.RED).style(TextStyle.BOLD).append("NOT ")
.color(TextColor.WHITE).append("available.")
.build());
ctx.channel().writeAndFlush(disconnectPacket).channel().disconnect();
}
}

View File

@@ -1,12 +0,0 @@
package mc.server.network.impl.handler;
import io.netty.channel.ChannelHandlerContext;
import mc.protocol.status.PingPacket;
public class PingHandler extends AbstractPacketHandler<PingPacket> {
@Override
protected void channelRead1(ChannelHandlerContext ctx, PingPacket packet) {
ctx.writeAndFlush(packet).channel().disconnect();
}
}