package ru.dmitriymx.minecraft.globalchat; import org.bukkit.Bukkit; import org.bukkit.plugin.java.JavaPlugin; import ru.dmitriymx.minecraft.globalchat.mq.KafkaService; import java.text.MessageFormat; public class MainPlugin extends JavaPlugin { private Config config; private KafkaService service; private Thread mqThread; @Override public void onEnable() { initConfig(); initKafkaService(); getServer().getPluginManager().registerEvents(new ChatListener(service), this); } @Override public void onDisable() { mqThread.interrupt(); } private void initConfig() { saveDefaultConfig(); config = new Config(this, getConfig()); } private void initKafkaService() { getLogger().info(String.format( "Kafka settings: [hosts: %s; topic: %s; duration %d]", config.getHosts(), config.getTopic(), config.getDuration() )); ClassLoader originalContext = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(null); service = new KafkaService(config.getHosts(), config.getTopic(), config.getDuration()); Thread.currentThread().setContextClassLoader(originalContext); mqThread = new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { service.get().forEach(messageData -> { Bukkit.getServer().broadcastMessage(MessageFormat.format( config.getFormat(), messageData.getPlayerName(), messageData.getMessage() )); }); } service.shutdown(); service = null; }, "Kafka service listener"); mqThread.start(); } }