From 3e2c06b4a1cc5ac9821275d27fc27ab108da3d6d Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Mon, 26 Aug 2019 16:44:28 +0300 Subject: [PATCH 1/8] Hello, Kafka! --- pom.xml | 11 +++ .../globalchat/mq/ChatMessageData.java | 14 ++++ .../minecraft/globalchat/mq/KafkaService.java | 83 +++++++++++++++++++ 3 files changed, 108 insertions(+) create mode 100644 src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java create mode 100644 src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java diff --git a/pom.xml b/pom.xml index 1adbfdc..b89b0c1 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,17 @@ + + + org.projectlombok + lombok + 1.18.8 + + + org.apache.kafka + kafka-clients + 2.3.0 + diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java new file mode 100644 index 0000000..1652735 --- /dev/null +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java @@ -0,0 +1,14 @@ +package ru.dmitriymx.minecraft.globalchat.mq; + +import com.google.gson.annotations.SerializedName; +import lombok.AllArgsConstructor; +import lombok.Data; + +@AllArgsConstructor +@Data +public class ChatMessageData { + + @SerializedName("name") + private String playerName; + private String message; +} diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java new file mode 100644 index 0000000..a341155 --- /dev/null +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java @@ -0,0 +1,83 @@ +package ru.dmitriymx.minecraft.globalchat.mq; + +import com.google.gson.Gson; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.time.Duration; +import java.util.*; + +public class KafkaService { + + private static final Gson GSON = new Gson(); + + private final String hosts; + private final String topic; + private final long duration; + + private final Consumer consumer; + private final Producer producer; + + public KafkaService(String hosts, String topic, long duration) { + this.hosts = hosts; + this.topic = topic; + this.duration = duration; + this.consumer = createConsumer(); + this.producer = createProducer(); + } + + public void send(ChatMessageData messageData) { + producer.send(new ProducerRecord<>(topic, GSON.toJson(messageData))); + } + + public List get() { + ConsumerRecords records = consumer.poll(Duration.ofMillis(duration)); + if (records.count() == 0) { + return Collections.emptyList(); + } + + final List list = new ArrayList<>(); + records.forEach(record -> { + try { + ChatMessageData data = GSON.fromJson(record.value(), ChatMessageData.class); + list.add(data); + } catch (Exception ignore) { + } + }); + + return list; + } + + private Consumer createConsumer() { + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "DefaultGroup"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + final Consumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(Collections.singletonList(topic)); + return consumer; + } + + private Producer createProducer() { + final Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "DefaultConfig"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + return new KafkaProducer<>(props); + } +} From dde16101f7e46a2b708260e394436de649ca669e Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Mon, 26 Aug 2019 17:10:07 +0300 Subject: [PATCH 2/8] =?UTF-8?q?=D1=80=D0=B5=D0=B3=D0=B8=D1=81=D1=82=D1=80?= =?UTF-8?q?=D0=B0=D1=86=D0=B8=D1=8F=20=D1=81=D0=BE=D0=B1=D1=8B=D1=82=D0=B8?= =?UTF-8?q?=D0=B9=20=D1=87=D0=B0=D1=82=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../minecraft/globalchat/ChatListener.java | 28 +++++++++++++++ .../minecraft/globalchat/MainPlugin.java | 35 ++++++++++++++++++- .../minecraft/globalchat/mq/KafkaService.java | 17 +++++++-- 3 files changed, 77 insertions(+), 3 deletions(-) create mode 100644 src/main/java/ru/dmitriymx/minecraft/globalchat/ChatListener.java diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/ChatListener.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/ChatListener.java new file mode 100644 index 0000000..d6ed654 --- /dev/null +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/ChatListener.java @@ -0,0 +1,28 @@ +package ru.dmitriymx.minecraft.globalchat; + +import lombok.RequiredArgsConstructor; +import org.bukkit.event.EventHandler; +import org.bukkit.event.Listener; +import org.bukkit.event.player.AsyncPlayerChatEvent; +import ru.dmitriymx.minecraft.globalchat.mq.ChatMessageData; +import ru.dmitriymx.minecraft.globalchat.mq.KafkaService; + +@RequiredArgsConstructor +public class ChatListener implements Listener { + + private final KafkaService service; + + @EventHandler + public void onChat(AsyncPlayerChatEvent event) { + if (event.getMessage().startsWith("!")) { + ChatMessageData messageData = new ChatMessageData( + event.getPlayer().getName(), + event.getMessage().substring(1) //without "!" + ); + + service.send(messageData); + + event.setCancelled(true); + } + } +} diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java index 00f936f..a5822ec 100644 --- a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java @@ -1,11 +1,44 @@ package ru.dmitriymx.minecraft.globalchat; +import org.bukkit.Bukkit; import org.bukkit.plugin.java.JavaPlugin; +import ru.dmitriymx.minecraft.globalchat.mq.KafkaService; public class MainPlugin extends JavaPlugin { + private KafkaService service; + private Thread mqThread; + @Override public void onEnable() { - getLogger().info("hello?"); + initKafkaService(); + getServer().getPluginManager().registerEvents(new ChatListener(service), this); + } + + @Override + public void onDisable() { + mqThread.interrupt(); + } + + private void initKafkaService() { + //FIXME перенести в конфигурацию + service = new KafkaService("127.0.0.1:9092", "global-chat", 1000); + + mqThread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + service.get().forEach(messageData -> { + //FIXME формат сообщений должен браться из конфига + Bukkit.getServer().broadcastMessage(String.format( + "%s: %s", + messageData.getPlayerName(), + messageData.getMessage() + )); + }); + } + + service.shutdown(); + service = null; + }, "Kafka service listener"); + mqThread.start(); } } diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java index a341155..b89a30b 100644 --- a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java @@ -25,8 +25,8 @@ public class KafkaService { private final String topic; private final long duration; - private final Consumer consumer; - private final Producer producer; + private Consumer consumer; + private Producer producer; public KafkaService(String hosts, String topic, long duration) { this.hosts = hosts; @@ -37,10 +37,17 @@ public class KafkaService { } public void send(ChatMessageData messageData) { + if (producer == null) { + throw new IllegalStateException("Service is offline"); + } producer.send(new ProducerRecord<>(topic, GSON.toJson(messageData))); } public List get() { + if (consumer == null) { + throw new IllegalStateException("Service is offline"); + } + ConsumerRecords records = consumer.poll(Duration.ofMillis(duration)); if (records.count() == 0) { return Collections.emptyList(); @@ -58,6 +65,12 @@ public class KafkaService { return list; } + public void shutdown() { + producer = null; + consumer.unsubscribe(); + consumer = null; + } + private Consumer createConsumer() { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); From 8dc902a62e5468b8c4a8120e9f546f3d02aa096b Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Mon, 26 Aug 2019 18:02:00 +0300 Subject: [PATCH 3/8] fix: dependency scope --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index b89b0c1..6293f98 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ org.projectlombok lombok 1.18.8 + provided org.apache.kafka From baecf489bde6d64251a1a7547192b09d36163224 Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Mon, 26 Aug 2019 18:02:40 +0300 Subject: [PATCH 4/8] =?UTF-8?q?fix:=20=D0=BD=D0=B0=D0=B8=D0=BC=D0=B5=D0=BD?= =?UTF-8?q?=D0=BE=D0=B2=D0=B0=D0=BD=D0=B8=D0=B5=20=D0=B3=D1=80=D1=83=D0=BF?= =?UTF-8?q?=D0=BF=20kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ru/dmitriymx/minecraft/globalchat/MainPlugin.java | 3 +++ .../dmitriymx/minecraft/globalchat/mq/KafkaService.java | 8 ++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java index a5822ec..75411c8 100644 --- a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java @@ -21,8 +21,11 @@ public class MainPlugin extends JavaPlugin { } private void initKafkaService() { + ClassLoader originalContext = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(null); //FIXME перенести в конфигурацию service = new KafkaService("127.0.0.1:9092", "global-chat", 1000); + Thread.currentThread().setContextClassLoader(originalContext); mqThread = new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java index b89a30b..a04a9ac 100644 --- a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java @@ -62,19 +62,23 @@ public class KafkaService { } }); + consumer.commitAsync(); + return list; } public void shutdown() { + producer.close(); producer = null; - consumer.unsubscribe(); + + consumer.close(); consumer = null; } private Consumer createConsumer() { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "DefaultGroup"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "WithoutGroup." + UUID.randomUUID().toString()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); From e5816809e05714b3c362f509ffc418802775553a Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Mon, 26 Aug 2019 18:27:35 +0300 Subject: [PATCH 5/8] downgrade: spigot 1.12 -> 1.8 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6293f98..4c56810 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ UTF-8 1.8 - 1.12-R0.1-SNAPSHOT + 1.8.8-R0.1-SNAPSHOT ru.dmitriymx.minecraft.globalchat.MainPlugin ${java.encoding} From 9625b324d58e78fbc49e84226c7024fc56a1f599 Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Mon, 26 Aug 2019 19:33:16 +0300 Subject: [PATCH 6/8] configuration file --- .../minecraft/globalchat/Config.java | 63 +++++++++++++++++++ .../minecraft/globalchat/MainPlugin.java | 17 +++-- .../minecraft/globalchat/mq/KafkaService.java | 1 + src/main/resources/config.yml | 9 +++ 4 files changed, 85 insertions(+), 5 deletions(-) create mode 100644 src/main/java/ru/dmitriymx/minecraft/globalchat/Config.java create mode 100644 src/main/resources/config.yml diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/Config.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/Config.java new file mode 100644 index 0000000..b25a5ba --- /dev/null +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/Config.java @@ -0,0 +1,63 @@ +package ru.dmitriymx.minecraft.globalchat; + +import lombok.AllArgsConstructor; +import org.bukkit.Bukkit; +import org.bukkit.configuration.file.FileConfiguration; +import org.bukkit.plugin.java.JavaPlugin; + +import java.util.List; +import java.util.StringJoiner; + +@AllArgsConstructor +class Config { + + private JavaPlugin plugin; + private FileConfiguration config; + + String getHosts() { + String hosts; + List hostList = config.getStringList("kafka.hosts"); + if (hostList.size() == 0) { + Bukkit.getServer().getPluginManager().disablePlugin(plugin); + throw new RuntimeException("Empty field 'kafka.hosts'!"); + } else if (hostList.size() == 1) { + hosts = hostList.get(0); + } else { + StringJoiner sj = new StringJoiner(","); + hostList.forEach(sj::add); + hosts = sj.toString(); + } + + return hosts; + } + + String getTopic() { + String topic = config.getString("kafka.topic"); + if (topic == null || topic.trim().isEmpty()) { + Bukkit.getServer().getPluginManager().disablePlugin(plugin); + throw new RuntimeException("Empty field 'kafka.topic'!"); + } + + return topic; + } + + long getDuration() { + long duration = config.getLong("kafka.duration"); + if (duration == 0L) { + plugin.getLogger().warning("Field 'kafka.duration' is verry low. Set default value 1000."); + duration = 1000L; + } + + return duration; + } + + String getFormat() { + String format = config.getString("message_format"); + if (format.trim().isEmpty()) { + plugin.getLogger().warning("Field 'message_format' is empty. Set default value '{0}: {1}'."); + format = "{0}: {1}"; + } + + return format; + } +} diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java index 75411c8..531f8b8 100644 --- a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java @@ -4,13 +4,17 @@ import org.bukkit.Bukkit; import org.bukkit.plugin.java.JavaPlugin; import ru.dmitriymx.minecraft.globalchat.mq.KafkaService; +import java.text.MessageFormat; + public class MainPlugin extends JavaPlugin { + private Config config; private KafkaService service; private Thread mqThread; @Override public void onEnable() { + initConfig(); initKafkaService(); getServer().getPluginManager().registerEvents(new ChatListener(service), this); } @@ -20,19 +24,22 @@ public class MainPlugin extends JavaPlugin { mqThread.interrupt(); } + private void initConfig() { + saveDefaultConfig(); + config = new Config(this, getConfig()); + } + private void initKafkaService() { ClassLoader originalContext = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(null); - //FIXME перенести в конфигурацию - service = new KafkaService("127.0.0.1:9092", "global-chat", 1000); + service = new KafkaService(config.getHosts(), config.getTopic(), config.getDuration()); Thread.currentThread().setContextClassLoader(originalContext); mqThread = new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { service.get().forEach(messageData -> { - //FIXME формат сообщений должен браться из конфига - Bukkit.getServer().broadcastMessage(String.format( - "%s: %s", + Bukkit.getServer().broadcastMessage(MessageFormat.format( + config.getFormat(), messageData.getPlayerName(), messageData.getMessage() )); diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java index a04a9ac..e3af492 100644 --- a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java @@ -59,6 +59,7 @@ public class KafkaService { ChatMessageData data = GSON.fromJson(record.value(), ChatMessageData.class); list.add(data); } catch (Exception ignore) { + //FIXME } }); diff --git a/src/main/resources/config.yml b/src/main/resources/config.yml new file mode 100644 index 0000000..504c4a0 --- /dev/null +++ b/src/main/resources/config.yml @@ -0,0 +1,9 @@ +kafka: + hosts: [ '127.0.0.1:9022' ] + topic: 'global-chat' + duration: 1000 + +# Global message format. +# {0} - player name +# {1} - message +message_format: '{0}: {1}' From ac9aea9d88d14918e468367e7b4ce58125e02b08 Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Mon, 26 Aug 2019 20:37:42 +0300 Subject: [PATCH 7/8] fix: default configuration --- .../java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java | 5 +++++ src/main/resources/config.yml | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java index 531f8b8..2d24a70 100644 --- a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java @@ -30,6 +30,11 @@ public class MainPlugin extends JavaPlugin { } private void initKafkaService() { + getLogger().info(String.format( + "Kafka settings: [hosts: %s; topic: %s; duration %d]", + config.getHosts(), config.getTopic(), config.getDuration() + )); + ClassLoader originalContext = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(null); service = new KafkaService(config.getHosts(), config.getTopic(), config.getDuration()); diff --git a/src/main/resources/config.yml b/src/main/resources/config.yml index 504c4a0..f7e11b0 100644 --- a/src/main/resources/config.yml +++ b/src/main/resources/config.yml @@ -1,5 +1,5 @@ kafka: - hosts: [ '127.0.0.1:9022' ] + hosts: [ '127.0.0.1:9092' ] topic: 'global-chat' duration: 1000 From a2635c8b3248263e527c24bc1dc09b338a62ef90 Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Mon, 26 Aug 2019 21:29:09 +0300 Subject: [PATCH 8/8] version: 0.1-alpha --- CHANGELOG.MD | 12 ++++++++++++ README.MD | 26 ++++++++++++++++++++++++++ pom.xml | 4 ++-- 3 files changed, 40 insertions(+), 2 deletions(-) create mode 100644 CHANGELOG.MD create mode 100644 README.MD diff --git a/CHANGELOG.MD b/CHANGELOG.MD new file mode 100644 index 0000000..abc697b --- /dev/null +++ b/CHANGELOG.MD @@ -0,0 +1,12 @@ +# Changelog + +Формат базируется на [Keep a Changelog](https://keepachangelog.com/ru/1.0.0/), +и данный проект придерживается [Semantic Versioning](https://semver.org/lang/ru/spec/v2.0.0.html). + +## [0.1-alpha] - 2019-08-26 + +### Добавлено + +- сообщения, написанные в формате `!сообщение`, показываются на всех подключенных серверах. +- настройка формата глобальных сообщений. +- настройка подключения к Kafka. \ No newline at end of file diff --git a/README.MD b/README.MD new file mode 100644 index 0000000..2cebd3e --- /dev/null +++ b/README.MD @@ -0,0 +1,26 @@ +# Global Chat + +![version: 0.1-alpha](https://img.shields.io/badge/version-0.1--alpha-000.svg?style=flat) +![support minecraft: 1.8.8+](https://img.shields.io/badge/support_minecraft-1.8.8+-060.svg?style=flat) + +Глобальный чат, работающий на всех подключенных серверах. + +## Возможности + +- легкое масштабирование: просто запустите новый сервер с этим плагином и глобальный чат уже будет работать + +## Использование + +В чате написать `!сообщение` и сообщение будет видно на всех подключенных серверах. + +## Минимальные требования + +- Java 8 +- Bukkit 1.8.8 +- [Kafka 2.3.0](https://kafka.apache.org/) + +## Сборка + +```bash +mvn clean compile assembly:single +``` diff --git a/pom.xml b/pom.xml index 4c56810..990b0c7 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ ru.dmitriymx.minecraft global-chat - 0.0-SNAPSHOT + 0.1-alpha @@ -119,7 +119,7 @@ maven-assembly-plugin 2.2-beta-5 - ${project.artifactId}-${project.version}-fat + ${project.artifactId}-${project.version} false jar-with-dependencies