Archived
0

[WIP] try RxJava

This commit is contained in:
2020-05-21 01:21:38 +03:00
parent c9f396a999
commit b9400b110a
3 changed files with 19 additions and 0 deletions

View File

@@ -2,6 +2,8 @@ package mc.server.logic;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import mc.protocol.handshake.client.HandshakePacket; import mc.protocol.handshake.client.HandshakePacket;
@@ -17,12 +19,21 @@ public class HandshakeLogic {
public void setup() { public void setup() {
log.info("HandshakeLogic setup"); log.info("HandshakeLogic setup");
rxProvider.getPacketObservable(HandshakePacket.class) rxProvider.getPacketObservable(HandshakePacket.class)
.observeOn(Schedulers.computation())
.doOnNext(container -> handle(container.getChannel(), container.getPacket())) .doOnNext(container -> handle(container.getChannel(), container.getPacket()))
.subscribe(); .subscribe();
} }
private void handle(Channel channel, HandshakePacket packet) { private void handle(Channel channel, HandshakePacket packet) {
log.error("THREAD: {}|{}", Thread.currentThread().getId(), Thread.currentThread().getName());
log.info("HandshakePacket handle"); log.info("HandshakePacket handle");
channel.attr(NettyConstants.ATTR_STATE).set(packet.getNextState()); channel.attr(NettyConstants.ATTR_STATE).set(packet.getNextState());
// try {
// Thread.sleep(20_000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
} }
} }

View File

@@ -1,6 +1,7 @@
package mc.server.logic; package mc.server.logic;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import mc.protocol.login.client.LoginStartPacket; import mc.protocol.login.client.LoginStartPacket;
@@ -20,8 +21,10 @@ public class LoginLogic {
public void setup() { public void setup() {
log.info("LoginLogic setup"); log.info("LoginLogic setup");
rxProvider.getPacketObservable(LoginStartPacket.class) rxProvider.getPacketObservable(LoginStartPacket.class)
.observeOn(Schedulers.computation())
.map(NetworkContainer::getChannel) .map(NetworkContainer::getChannel)
.doOnNext(channel -> { .doOnNext(channel -> {
log.error("THREAD: {}|{}", Thread.currentThread().getId(), Thread.currentThread().getName());
log.info("LoginLogic handle"); log.info("LoginLogic handle");
DisconnectPacket disconnectPacket = new DisconnectPacket(); DisconnectPacket disconnectPacket = new DisconnectPacket();

View File

@@ -2,6 +2,7 @@ package mc.server.logic;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import mc.protocol.ProtocolConstant; import mc.protocol.ProtocolConstant;
@@ -27,11 +28,14 @@ public class StatusLogic {
public void setup() { public void setup() {
log.info("StatusLogic setup"); log.info("StatusLogic setup");
rxProvider.getPacketObservable(StatusServerRequest.class) rxProvider.getPacketObservable(StatusServerRequest.class)
.observeOn(Schedulers.computation())
.doOnNext(container -> status(container.getChannel(), container.getPacket())) .doOnNext(container -> status(container.getChannel(), container.getPacket()))
.subscribe(); .subscribe();
rxProvider.getPacketObservable(PingPacket.class) rxProvider.getPacketObservable(PingPacket.class)
.observeOn(Schedulers.computation())
.doOnNext(container -> { .doOnNext(container -> {
log.error("THREAD: {}|{}", Thread.currentThread().getId(), Thread.currentThread().getName());
log.info("StatusLogic ping handle"); log.info("StatusLogic ping handle");
container.getChannel().writeAndFlush(container.getPacket()).channel().disconnect(); container.getChannel().writeAndFlush(container.getPacket()).channel().disconnect();
}) })
@@ -39,6 +43,7 @@ public class StatusLogic {
} }
private void status(Channel channel, StatusServerRequest packet) throws IOException { private void status(Channel channel, StatusServerRequest packet) throws IOException {
log.error("THREAD: {}|{}", Thread.currentThread().getId(), Thread.currentThread().getName());
log.info("StatusLogic handle"); log.info("StatusLogic handle");
log.info("{}", packet); log.info("{}", packet);