From 3e2c06b4a1cc5ac9821275d27fc27ab108da3d6d Mon Sep 17 00:00:00 2001 From: DmitriyMX Date: Mon, 26 Aug 2019 16:44:28 +0300 Subject: [PATCH] Hello, Kafka! --- pom.xml | 11 +++ .../globalchat/mq/ChatMessageData.java | 14 ++++ .../minecraft/globalchat/mq/KafkaService.java | 83 +++++++++++++++++++ 3 files changed, 108 insertions(+) create mode 100644 src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java create mode 100644 src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java diff --git a/pom.xml b/pom.xml index 1adbfdc..b89b0c1 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,17 @@ + + + org.projectlombok + lombok + 1.18.8 + + + org.apache.kafka + kafka-clients + 2.3.0 + diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java new file mode 100644 index 0000000..1652735 --- /dev/null +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/ChatMessageData.java @@ -0,0 +1,14 @@ +package ru.dmitriymx.minecraft.globalchat.mq; + +import com.google.gson.annotations.SerializedName; +import lombok.AllArgsConstructor; +import lombok.Data; + +@AllArgsConstructor +@Data +public class ChatMessageData { + + @SerializedName("name") + private String playerName; + private String message; +} diff --git a/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java new file mode 100644 index 0000000..a341155 --- /dev/null +++ b/src/main/java/ru/dmitriymx/minecraft/globalchat/mq/KafkaService.java @@ -0,0 +1,83 @@ +package ru.dmitriymx.minecraft.globalchat.mq; + +import com.google.gson.Gson; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.time.Duration; +import java.util.*; + +public class KafkaService { + + private static final Gson GSON = new Gson(); + + private final String hosts; + private final String topic; + private final long duration; + + private final Consumer consumer; + private final Producer producer; + + public KafkaService(String hosts, String topic, long duration) { + this.hosts = hosts; + this.topic = topic; + this.duration = duration; + this.consumer = createConsumer(); + this.producer = createProducer(); + } + + public void send(ChatMessageData messageData) { + producer.send(new ProducerRecord<>(topic, GSON.toJson(messageData))); + } + + public List get() { + ConsumerRecords records = consumer.poll(Duration.ofMillis(duration)); + if (records.count() == 0) { + return Collections.emptyList(); + } + + final List list = new ArrayList<>(); + records.forEach(record -> { + try { + ChatMessageData data = GSON.fromJson(record.value(), ChatMessageData.class); + list.add(data); + } catch (Exception ignore) { + } + }); + + return list; + } + + private Consumer createConsumer() { + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "DefaultGroup"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + final Consumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(Collections.singletonList(topic)); + return consumer; + } + + private Producer createProducer() { + final Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "DefaultConfig"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + return new KafkaProducer<>(props); + } +}