0

fix: наименование групп kafka

This commit is contained in:
2019-08-26 18:02:40 +03:00
parent 8dc902a62e
commit baecf489bd
2 changed files with 9 additions and 2 deletions

View File

@@ -21,8 +21,11 @@ public class MainPlugin extends JavaPlugin {
}
private void initKafkaService() {
ClassLoader originalContext = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
//FIXME перенести в конфигурацию
service = new KafkaService("127.0.0.1:9092", "global-chat", 1000);
Thread.currentThread().setContextClassLoader(originalContext);
mqThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {

View File

@@ -62,19 +62,23 @@ public class KafkaService {
}
});
consumer.commitAsync();
return list;
}
public void shutdown() {
producer.close();
producer = null;
consumer.unsubscribe();
consumer.close();
consumer = null;
}
private Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DefaultGroup");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "WithoutGroup." + UUID.randomUUID().toString());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());