diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/ChatListener.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/ChatListener.java new file mode 100644 index 0000000..d6ed654 --- /dev/null +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/ChatListener.java @@ -0,0 +1,28 @@ +package ru.dmitriymx.minecraft.globalchat; + +import lombok.RequiredArgsConstructor; +import org.bukkit.event.EventHandler; +import org.bukkit.event.Listener; +import org.bukkit.event.player.AsyncPlayerChatEvent; +import ru.dmitriymx.minecraft.globalchat.mq.ChatMessageData; +import ru.dmitriymx.minecraft.globalchat.mq.KafkaService; + +@RequiredArgsConstructor +public class ChatListener implements Listener { + + private final KafkaService service; + + @EventHandler + public void onChat(AsyncPlayerChatEvent event) { + if (event.getMessage().startsWith("!")) { + ChatMessageData messageData = new ChatMessageData( + event.getPlayer().getName(), + event.getMessage().substring(1) //without "!" + ); + + service.send(messageData); + + event.setCancelled(true); + } + } +} diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java index 00f936f..a5822ec 100644 --- a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java @@ -1,11 +1,44 @@ package ru.dmitriymx.minecraft.globalchat; +import org.bukkit.Bukkit; import org.bukkit.plugin.java.JavaPlugin; +import ru.dmitriymx.minecraft.globalchat.mq.KafkaService; public class MainPlugin extends JavaPlugin { + private KafkaService service; + private Thread mqThread; + @Override public void onEnable() { - getLogger().info("hello?"); + initKafkaService(); + getServer().getPluginManager().registerEvents(new ChatListener(service), this); + } + + @Override + public void onDisable() { + mqThread.interrupt(); + } + + private void initKafkaService() { + //FIXME перенести в конфигурацию + service = new KafkaService("127.0.0.1:9092", "global-chat", 1000); + + mqThread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + service.get().forEach(messageData -> { + //FIXME формат сообщений должен браться из конфига + Bukkit.getServer().broadcastMessage(String.format( + "%s: %s", + messageData.getPlayerName(), + messageData.getMessage() + )); + }); + } + + service.shutdown(); + service = null; + }, "Kafka service listener"); + mqThread.start(); } } diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java index a341155..b89a30b 100644 --- a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java @@ -25,8 +25,8 @@ public class KafkaService { private final String topic; private final long duration; - private final Consumer consumer; - private final Producer producer; + private Consumer consumer; + private Producer producer; public KafkaService(String hosts, String topic, long duration) { this.hosts = hosts; @@ -37,10 +37,17 @@ public class KafkaService { } public void send(ChatMessageData messageData) { + if (producer == null) { + throw new IllegalStateException("Service is offline"); + } producer.send(new ProducerRecord<>(topic, GSON.toJson(messageData))); } public List get() { + if (consumer == null) { + throw new IllegalStateException("Service is offline"); + } + ConsumerRecords records = consumer.poll(Duration.ofMillis(duration)); if (records.count() == 0) { return Collections.emptyList(); @@ -58,6 +65,12 @@ public class KafkaService { return list; } + public void shutdown() { + producer = null; + consumer.unsubscribe(); + consumer = null; + } + private Consumer createConsumer() { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);