0

Hello, Kafka!

This commit is contained in:
2019-08-26 16:44:28 +03:00
parent adb7293654
commit 3e2c06b4a1
3 changed files with 108 additions and 0 deletions

11
pom.xml
View File

@@ -69,6 +69,17 @@
</exclusions> </exclusions>
</dependency> </dependency>
<!-- COMPONENTS -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@@ -0,0 +1,14 @@
package ru.dmitriymx.minecraft.globalchat.mq;
import com.google.gson.annotations.SerializedName;
import lombok.AllArgsConstructor;
import lombok.Data;
@AllArgsConstructor
@Data
public class ChatMessageData {
@SerializedName("name")
private String playerName;
private String message;
}

View File

@@ -0,0 +1,83 @@
package ru.dmitriymx.minecraft.globalchat.mq;
import com.google.gson.Gson;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.*;
public class KafkaService {
private static final Gson GSON = new Gson();
private final String hosts;
private final String topic;
private final long duration;
private final Consumer<Long, String> consumer;
private final Producer<Long, String> producer;
public KafkaService(String hosts, String topic, long duration) {
this.hosts = hosts;
this.topic = topic;
this.duration = duration;
this.consumer = createConsumer();
this.producer = createProducer();
}
public void send(ChatMessageData messageData) {
producer.send(new ProducerRecord<>(topic, GSON.toJson(messageData)));
}
public List<ChatMessageData> get() {
ConsumerRecords<Long, String> records = consumer.poll(Duration.ofMillis(duration));
if (records.count() == 0) {
return Collections.emptyList();
}
final List<ChatMessageData> list = new ArrayList<>();
records.forEach(record -> {
try {
ChatMessageData data = GSON.fromJson(record.value(), ChatMessageData.class);
list.add(data);
} catch (Exception ignore) {
}
});
return list;
}
private Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DefaultGroup");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
return consumer;
}
private Producer<Long, String> createProducer() {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DefaultConfig");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
}