0

регистрация событий чата

This commit is contained in:
2019-08-26 17:10:07 +03:00
parent 3e2c06b4a1
commit dde16101f7
3 changed files with 77 additions and 3 deletions

View File

@@ -0,0 +1,28 @@
package ru.dmitriymx.minecraft.globalchat;
import lombok.RequiredArgsConstructor;
import org.bukkit.event.EventHandler;
import org.bukkit.event.Listener;
import org.bukkit.event.player.AsyncPlayerChatEvent;
import ru.dmitriymx.minecraft.globalchat.mq.ChatMessageData;
import ru.dmitriymx.minecraft.globalchat.mq.KafkaService;
@RequiredArgsConstructor
public class ChatListener implements Listener {
private final KafkaService service;
@EventHandler
public void onChat(AsyncPlayerChatEvent event) {
if (event.getMessage().startsWith("!")) {
ChatMessageData messageData = new ChatMessageData(
event.getPlayer().getName(),
event.getMessage().substring(1) //without "!"
);
service.send(messageData);
event.setCancelled(true);
}
}
}

View File

@@ -1,11 +1,44 @@
package ru.dmitriymx.minecraft.globalchat; package ru.dmitriymx.minecraft.globalchat;
import org.bukkit.Bukkit;
import org.bukkit.plugin.java.JavaPlugin; import org.bukkit.plugin.java.JavaPlugin;
import ru.dmitriymx.minecraft.globalchat.mq.KafkaService;
public class MainPlugin extends JavaPlugin { public class MainPlugin extends JavaPlugin {
private KafkaService service;
private Thread mqThread;
@Override @Override
public void onEnable() { public void onEnable() {
getLogger().info("hello?"); initKafkaService();
getServer().getPluginManager().registerEvents(new ChatListener(service), this);
}
@Override
public void onDisable() {
mqThread.interrupt();
}
private void initKafkaService() {
//FIXME перенести в конфигурацию
service = new KafkaService("127.0.0.1:9092", "global-chat", 1000);
mqThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
service.get().forEach(messageData -> {
//FIXME формат сообщений должен браться из конфига
Bukkit.getServer().broadcastMessage(String.format(
"%s: %s",
messageData.getPlayerName(),
messageData.getMessage()
));
});
}
service.shutdown();
service = null;
}, "Kafka service listener");
mqThread.start();
} }
} }

View File

@@ -25,8 +25,8 @@ public class KafkaService {
private final String topic; private final String topic;
private final long duration; private final long duration;
private final Consumer<Long, String> consumer; private Consumer<Long, String> consumer;
private final Producer<Long, String> producer; private Producer<Long, String> producer;
public KafkaService(String hosts, String topic, long duration) { public KafkaService(String hosts, String topic, long duration) {
this.hosts = hosts; this.hosts = hosts;
@@ -37,10 +37,17 @@ public class KafkaService {
} }
public void send(ChatMessageData messageData) { public void send(ChatMessageData messageData) {
if (producer == null) {
throw new IllegalStateException("Service is offline");
}
producer.send(new ProducerRecord<>(topic, GSON.toJson(messageData))); producer.send(new ProducerRecord<>(topic, GSON.toJson(messageData)));
} }
public List<ChatMessageData> get() { public List<ChatMessageData> get() {
if (consumer == null) {
throw new IllegalStateException("Service is offline");
}
ConsumerRecords<Long, String> records = consumer.poll(Duration.ofMillis(duration)); ConsumerRecords<Long, String> records = consumer.poll(Duration.ofMillis(duration));
if (records.count() == 0) { if (records.count() == 0) {
return Collections.emptyList(); return Collections.emptyList();
@@ -58,6 +65,12 @@ public class KafkaService {
return list; return list;
} }
public void shutdown() {
producer = null;
consumer.unsubscribe();
consumer = null;
}
private Consumer<Long, String> createConsumer() { private Consumer<Long, String> createConsumer() {
final Properties props = new Properties(); final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);