diff --git a/pom.xml b/pom.xml
index 1adbfdc..b89b0c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,6 +69,17 @@
+
+
+ org.projectlombok
+ lombok
+ 1.18.8
+
+
+ org.apache.kafka
+ kafka-clients
+ 2.3.0
+
diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java
new file mode 100644
index 0000000..1652735
--- /dev/null
+++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java
@@ -0,0 +1,14 @@
+package ru.dmitriymx.minecraft.globalchat.mq;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@AllArgsConstructor
+@Data
+public class ChatMessageData {
+
+ @SerializedName("name")
+ private String playerName;
+ private String message;
+}
diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java
new file mode 100644
index 0000000..a341155
--- /dev/null
+++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java
@@ -0,0 +1,83 @@
+package ru.dmitriymx.minecraft.globalchat.mq;
+
+import com.google.gson.Gson;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.time.Duration;
+import java.util.*;
+
+public class KafkaService {
+
+ private static final Gson GSON = new Gson();
+
+ private final String hosts;
+ private final String topic;
+ private final long duration;
+
+ private final Consumer consumer;
+ private final Producer producer;
+
+ public KafkaService(String hosts, String topic, long duration) {
+ this.hosts = hosts;
+ this.topic = topic;
+ this.duration = duration;
+ this.consumer = createConsumer();
+ this.producer = createProducer();
+ }
+
+ public void send(ChatMessageData messageData) {
+ producer.send(new ProducerRecord<>(topic, GSON.toJson(messageData)));
+ }
+
+ public List get() {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(duration));
+ if (records.count() == 0) {
+ return Collections.emptyList();
+ }
+
+ final List list = new ArrayList<>();
+ records.forEach(record -> {
+ try {
+ ChatMessageData data = GSON.fromJson(record.value(), ChatMessageData.class);
+ list.add(data);
+ } catch (Exception ignore) {
+ }
+ });
+
+ return list;
+ }
+
+ private Consumer createConsumer() {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "DefaultGroup");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+ final Consumer consumer = new KafkaConsumer<>(props);
+
+ consumer.subscribe(Collections.singletonList(topic));
+ return consumer;
+ }
+
+ private Producer createProducer() {
+ final Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "DefaultConfig");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ return new KafkaProducer<>(props);
+ }
+}