From 8be8caa05a9120e022e0a36cba9b300622f76282 Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Sat, 21 Apr 2018 09:20:16 +0300 Subject: [PATCH] Keep alive --- .../core/embedded/InMemoryPlayerManager.java | 36 +++++++++++++++++-- src/main/java/mc/core/network/NetChannel.java | 5 ++- .../proto_125/netty/PacketHandler.java | 10 ++++++ .../netty/wrappers/WrapperNetChannel.java | 16 ++++----- .../proto_125/packets/KeepAlivePacket.java | 21 +++++++++++ .../proto_125/packets/PacketManager.java | 1 + 6 files changed, 75 insertions(+), 14 deletions(-) create mode 100644 src/main/java/mc/core/network/proto_125/packets/KeepAlivePacket.java diff --git a/src/main/java/mc/core/embedded/InMemoryPlayerManager.java b/src/main/java/mc/core/embedded/InMemoryPlayerManager.java index d113c36..55161ae 100644 --- a/src/main/java/mc/core/embedded/InMemoryPlayerManager.java +++ b/src/main/java/mc/core/embedded/InMemoryPlayerManager.java @@ -4,6 +4,7 @@ */ package mc.core.embedded; +import lombok.extern.slf4j.Slf4j; import mc.core.Config; import mc.core.Player; import mc.core.PlayerManager; @@ -14,18 +15,24 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -public class InMemoryPlayerManager implements PlayerManager { +@Slf4j +public class InMemoryPlayerManager implements PlayerManager, Runnable { private List players; + private final Object lock = new Object(); @Autowired public InMemoryPlayerManager(Config config) { final int c = config.getMaxPlayers() > 50 ? 50 : config.getMaxPlayers(); players = Collections.synchronizedList(new ArrayList<>(c)); + (new Thread(this, "KeepAlive")).start(); } @Override public void addPlayer(Player player) { - players.add(player); + synchronized (lock) { + players.add(player); + lock.notify(); + } } @Override @@ -53,4 +60,29 @@ public class InMemoryPlayerManager implements PlayerManager { public void bloadcastWrite(final CSPacket packet) { players.forEach(player -> player.getChannel().writeAndFlush(packet)); } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + synchronized (lock) { + while (players.size() == 0) { + try { + lock.wait(); + } catch (InterruptedException e) { + return; + } + } + } + + players.stream() + .parallel() + .forEach(player -> player.getChannel().sendKeepAlive()); + + try { + Thread.sleep(1); + } catch (InterruptedException e) { + return; + } + } + } } diff --git a/src/main/java/mc/core/network/NetChannel.java b/src/main/java/mc/core/network/NetChannel.java index a88f29a..34f560b 100644 --- a/src/main/java/mc/core/network/NetChannel.java +++ b/src/main/java/mc/core/network/NetChannel.java @@ -5,8 +5,7 @@ package mc.core.network; public interface NetChannel { - void write(Object obj); - void flush(); - void writeAndFlush(Object obj); + void sendKeepAlive(); + void writeAndFlush(SCPacket pkt); } diff --git a/src/main/java/mc/core/network/proto_125/netty/PacketHandler.java b/src/main/java/mc/core/network/proto_125/netty/PacketHandler.java index 921b224..04d31db 100644 --- a/src/main/java/mc/core/network/proto_125/netty/PacketHandler.java +++ b/src/main/java/mc/core/network/proto_125/netty/PacketHandler.java @@ -8,8 +8,10 @@ package mc.core.network.proto_125.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; import mc.core.Location; +import mc.core.Player; import mc.core.PlayerManager; import mc.core.network.CSPacket; import mc.core.Config; @@ -25,11 +27,18 @@ import java.util.Random; @Slf4j public class PacketHandler extends SimpleChannelInboundHandler { private static final Random random = new Random(); + private static final AttributeKey ATTR_PLAYER = AttributeKey.newInstance("ATTR_PLAYER"); @Autowired private Config config; @Autowired private PlayerManager playerManager; + @Override + public void channelInactive(ChannelHandlerContext context) throws Exception { + super.channelInactive(context); + playerManager.removePlayer(context.channel().attr(ATTR_PLAYER).get()); + } + @Override protected void channelRead0(ChannelHandlerContext ctx, CSPacket packet) throws Exception { Optional optionalMethod = Arrays.stream(this.getClass().getDeclaredMethods()) @@ -145,6 +154,7 @@ public class PacketHandler extends SimpleChannelInboundHandler { channel.write(posLookPkt); playerManager.addPlayer(player); + channel.attr(ATTR_PLAYER).set(player); channel.flush(); } } diff --git a/src/main/java/mc/core/network/proto_125/netty/wrappers/WrapperNetChannel.java b/src/main/java/mc/core/network/proto_125/netty/wrappers/WrapperNetChannel.java index 68adccc..78525b4 100644 --- a/src/main/java/mc/core/network/proto_125/netty/wrappers/WrapperNetChannel.java +++ b/src/main/java/mc/core/network/proto_125/netty/wrappers/WrapperNetChannel.java @@ -7,23 +7,21 @@ package mc.core.network.proto_125.netty.wrappers; import io.netty.channel.Channel; import lombok.RequiredArgsConstructor; import mc.core.network.NetChannel; +import mc.core.network.SCPacket; +import mc.core.network.proto_125.packets.KeepAlivePacket; +import mc.core.network.proto_125.packets.PingPacket; @RequiredArgsConstructor public class WrapperNetChannel implements NetChannel { private final Channel channel; @Override - public void write(Object obj) { - channel.write(obj); + public void sendKeepAlive() { + channel.writeAndFlush(new KeepAlivePacket()); } @Override - public void flush() { - channel.flush(); - } - - @Override - public void writeAndFlush(Object obj) { - channel.writeAndFlush(obj); + public void writeAndFlush(SCPacket pkt) { + channel.writeAndFlush(pkt); } } diff --git a/src/main/java/mc/core/network/proto_125/packets/KeepAlivePacket.java b/src/main/java/mc/core/network/proto_125/packets/KeepAlivePacket.java new file mode 100644 index 0000000..b082b13 --- /dev/null +++ b/src/main/java/mc/core/network/proto_125/packets/KeepAlivePacket.java @@ -0,0 +1,21 @@ +/* + * DmitriyMX + * 2018-04-21 + */ +package mc.core.network.proto_125.packets; + +import mc.core.network.SCPacket; +import mc.core.network.proto_125.ByteArrayOutputNetStream; + +import java.util.Random; + +public class KeepAlivePacket implements SCPacket { + private static final Random rand = new Random(); + + @Override + public byte[] toByteArray() { + ByteArrayOutputNetStream netStream = new ByteArrayOutputNetStream(); + netStream.writeInt(rand.nextInt(Integer.MAX_VALUE)); + return netStream.toByteArray(); + } +} diff --git a/src/main/java/mc/core/network/proto_125/packets/PacketManager.java b/src/main/java/mc/core/network/proto_125/packets/PacketManager.java index 1496ce0..b10eef5 100644 --- a/src/main/java/mc/core/network/proto_125/packets/PacketManager.java +++ b/src/main/java/mc/core/network/proto_125/packets/PacketManager.java @@ -12,6 +12,7 @@ import mc.core.network.proto_125.packets.*; public class PacketManager { private static final BiMap> packetMap = ImmutableBiMap.>builder() + .put(0x00, KeepAlivePacket.class) .put(0x01, LoginPacket.class) .put(0x02, HandshakePacket.class) .put(0x06, SpawnPositionPacket.class)