diff --git a/CHANGELOG.MD b/CHANGELOG.MD
new file mode 100644
index 0000000..abc697b
--- /dev/null
+++ b/CHANGELOG.MD
@@ -0,0 +1,12 @@
+# Changelog
+
+Формат базируется на [Keep a Changelog](https://keepachangelog.com/ru/1.0.0/),
+и данный проект придерживается [Semantic Versioning](https://semver.org/lang/ru/spec/v2.0.0.html).
+
+## [0.1-alpha] - 2019-08-26
+
+### Добавлено
+
+- сообщения, написанные в формате `!сообщение`, показываются на всех подключенных серверах.
+- настройка формата глобальных сообщений.
+- настройка подключения к Kafka.
\ No newline at end of file
diff --git a/README.MD b/README.MD
new file mode 100644
index 0000000..2cebd3e
--- /dev/null
+++ b/README.MD
@@ -0,0 +1,26 @@
+# Global Chat
+
+
+
+
+Глобальный чат, работающий на всех подключенных серверах.
+
+## Возможности
+
+- легкое масштабирование: просто запустите новый сервер с этим плагином и глобальный чат уже будет работать
+
+## Использование
+
+В чате написать `!сообщение` и сообщение будет видно на всех подключенных серверах.
+
+## Минимальные требования
+
+- Java 8
+- Bukkit 1.8.8
+- [Kafka 2.3.0](https://kafka.apache.org/)
+
+## Сборка
+
+```bash
+mvn clean compile assembly:single
+```
diff --git a/pom.xml b/pom.xml
index 1adbfdc..990b0c7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
ru.dmitriymx.minecraft
global-chat
- 0.0-SNAPSHOT
+ 0.1-alpha
@@ -20,7 +20,7 @@
UTF-8
1.8
- 1.12-R0.1-SNAPSHOT
+ 1.8.8-R0.1-SNAPSHOT
ru.dmitriymx.minecraft.globalchat.MainPlugin
${java.encoding}
@@ -69,6 +69,18 @@
+
+
+ org.projectlombok
+ lombok
+ 1.18.8
+ provided
+
+
+ org.apache.kafka
+ kafka-clients
+ 2.3.0
+
@@ -107,7 +119,7 @@
maven-assembly-plugin
2.2-beta-5
- ${project.artifactId}-${project.version}-fat
+ ${project.artifactId}-${project.version}
false
jar-with-dependencies
diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/ChatListener.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/ChatListener.java
new file mode 100644
index 0000000..d6ed654
--- /dev/null
+++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/ChatListener.java
@@ -0,0 +1,28 @@
+package ru.dmitriymx.minecraft.globalchat;
+
+import lombok.RequiredArgsConstructor;
+import org.bukkit.event.EventHandler;
+import org.bukkit.event.Listener;
+import org.bukkit.event.player.AsyncPlayerChatEvent;
+import ru.dmitriymx.minecraft.globalchat.mq.ChatMessageData;
+import ru.dmitriymx.minecraft.globalchat.mq.KafkaService;
+
+@RequiredArgsConstructor
+public class ChatListener implements Listener {
+
+ private final KafkaService service;
+
+ @EventHandler
+ public void onChat(AsyncPlayerChatEvent event) {
+ if (event.getMessage().startsWith("!")) {
+ ChatMessageData messageData = new ChatMessageData(
+ event.getPlayer().getName(),
+ event.getMessage().substring(1) //without "!"
+ );
+
+ service.send(messageData);
+
+ event.setCancelled(true);
+ }
+ }
+}
diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/Config.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/Config.java
new file mode 100644
index 0000000..b25a5ba
--- /dev/null
+++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/Config.java
@@ -0,0 +1,63 @@
+package ru.dmitriymx.minecraft.globalchat;
+
+import lombok.AllArgsConstructor;
+import org.bukkit.Bukkit;
+import org.bukkit.configuration.file.FileConfiguration;
+import org.bukkit.plugin.java.JavaPlugin;
+
+import java.util.List;
+import java.util.StringJoiner;
+
+@AllArgsConstructor
+class Config {
+
+ private JavaPlugin plugin;
+ private FileConfiguration config;
+
+ String getHosts() {
+ String hosts;
+ List hostList = config.getStringList("kafka.hosts");
+ if (hostList.size() == 0) {
+ Bukkit.getServer().getPluginManager().disablePlugin(plugin);
+ throw new RuntimeException("Empty field 'kafka.hosts'!");
+ } else if (hostList.size() == 1) {
+ hosts = hostList.get(0);
+ } else {
+ StringJoiner sj = new StringJoiner(",");
+ hostList.forEach(sj::add);
+ hosts = sj.toString();
+ }
+
+ return hosts;
+ }
+
+ String getTopic() {
+ String topic = config.getString("kafka.topic");
+ if (topic == null || topic.trim().isEmpty()) {
+ Bukkit.getServer().getPluginManager().disablePlugin(plugin);
+ throw new RuntimeException("Empty field 'kafka.topic'!");
+ }
+
+ return topic;
+ }
+
+ long getDuration() {
+ long duration = config.getLong("kafka.duration");
+ if (duration == 0L) {
+ plugin.getLogger().warning("Field 'kafka.duration' is verry low. Set default value 1000.");
+ duration = 1000L;
+ }
+
+ return duration;
+ }
+
+ String getFormat() {
+ String format = config.getString("message_format");
+ if (format.trim().isEmpty()) {
+ plugin.getLogger().warning("Field 'message_format' is empty. Set default value '{0}: {1}'.");
+ format = "{0}: {1}";
+ }
+
+ return format;
+ }
+}
diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java
index 00f936f..2d24a70 100644
--- a/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java
+++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/MainPlugin.java
@@ -1,11 +1,59 @@
package ru.dmitriymx.minecraft.globalchat;
+import org.bukkit.Bukkit;
import org.bukkit.plugin.java.JavaPlugin;
+import ru.dmitriymx.minecraft.globalchat.mq.KafkaService;
+
+import java.text.MessageFormat;
public class MainPlugin extends JavaPlugin {
+ private Config config;
+ private KafkaService service;
+ private Thread mqThread;
+
@Override
public void onEnable() {
- getLogger().info("hello?");
+ initConfig();
+ initKafkaService();
+ getServer().getPluginManager().registerEvents(new ChatListener(service), this);
+ }
+
+ @Override
+ public void onDisable() {
+ mqThread.interrupt();
+ }
+
+ private void initConfig() {
+ saveDefaultConfig();
+ config = new Config(this, getConfig());
+ }
+
+ private void initKafkaService() {
+ getLogger().info(String.format(
+ "Kafka settings: [hosts: %s; topic: %s; duration %d]",
+ config.getHosts(), config.getTopic(), config.getDuration()
+ ));
+
+ ClassLoader originalContext = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(null);
+ service = new KafkaService(config.getHosts(), config.getTopic(), config.getDuration());
+ Thread.currentThread().setContextClassLoader(originalContext);
+
+ mqThread = new Thread(() -> {
+ while (!Thread.currentThread().isInterrupted()) {
+ service.get().forEach(messageData -> {
+ Bukkit.getServer().broadcastMessage(MessageFormat.format(
+ config.getFormat(),
+ messageData.getPlayerName(),
+ messageData.getMessage()
+ ));
+ });
+ }
+
+ service.shutdown();
+ service = null;
+ }, "Kafka service listener");
+ mqThread.start();
}
}
diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java
new file mode 100644
index 0000000..1652735
--- /dev/null
+++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java
@@ -0,0 +1,14 @@
+package ru.dmitriymx.minecraft.globalchat.mq;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@AllArgsConstructor
+@Data
+public class ChatMessageData {
+
+ @SerializedName("name")
+ private String playerName;
+ private String message;
+}
diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java
new file mode 100644
index 0000000..e3af492
--- /dev/null
+++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java
@@ -0,0 +1,101 @@
+package ru.dmitriymx.minecraft.globalchat.mq;
+
+import com.google.gson.Gson;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.time.Duration;
+import java.util.*;
+
+public class KafkaService {
+
+ private static final Gson GSON = new Gson();
+
+ private final String hosts;
+ private final String topic;
+ private final long duration;
+
+ private Consumer consumer;
+ private Producer producer;
+
+ public KafkaService(String hosts, String topic, long duration) {
+ this.hosts = hosts;
+ this.topic = topic;
+ this.duration = duration;
+ this.consumer = createConsumer();
+ this.producer = createProducer();
+ }
+
+ public void send(ChatMessageData messageData) {
+ if (producer == null) {
+ throw new IllegalStateException("Service is offline");
+ }
+ producer.send(new ProducerRecord<>(topic, GSON.toJson(messageData)));
+ }
+
+ public List get() {
+ if (consumer == null) {
+ throw new IllegalStateException("Service is offline");
+ }
+
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(duration));
+ if (records.count() == 0) {
+ return Collections.emptyList();
+ }
+
+ final List list = new ArrayList<>();
+ records.forEach(record -> {
+ try {
+ ChatMessageData data = GSON.fromJson(record.value(), ChatMessageData.class);
+ list.add(data);
+ } catch (Exception ignore) {
+ //FIXME
+ }
+ });
+
+ consumer.commitAsync();
+
+ return list;
+ }
+
+ public void shutdown() {
+ producer.close();
+ producer = null;
+
+ consumer.close();
+ consumer = null;
+ }
+
+ private Consumer createConsumer() {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "WithoutGroup." + UUID.randomUUID().toString());
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+ final Consumer consumer = new KafkaConsumer<>(props);
+
+ consumer.subscribe(Collections.singletonList(topic));
+ return consumer;
+ }
+
+ private Producer createProducer() {
+ final Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "DefaultConfig");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ return new KafkaProducer<>(props);
+ }
+}
diff --git a/src/main/resources/config.yml b/src/main/resources/config.yml
new file mode 100644
index 0000000..f7e11b0
--- /dev/null
+++ b/src/main/resources/config.yml
@@ -0,0 +1,9 @@
+kafka:
+ hosts: [ '127.0.0.1:9092' ]
+ topic: 'global-chat'
+ duration: 1000
+
+# Global message format.
+# {0} - player name
+# {1} - message
+message_format: '{0}: {1}'