怎么进行PulsarKafkaClient的简单分析
本篇文章给大家分享的是有关怎么进行Pulsar Kafka Client的简单分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
专注于为中小企业提供成都做网站、成都网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业渝中免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了数千家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。
⌨️引入依赖
org.apache.pulsar pulsar-client-kafka {project.version}
⌨️ 使用 Kafka Schema
>>>添加生产者代码
String topic = "persistent://public/default/test";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer
producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord
(topic, i, Integer.toString(i))); }
producer.close();
>>> 添加消费者代码
String topic = "persistent://public/default/test";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@SuppressWarnings("resource")
Consumer
consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords
records = consumer.poll(100); records.forEach(record -> {
log.info("Received record: {}", record);
});
// Commit last offset
consumer.commitSync();
}
⌨️使用 Pulsar Schema
@Data
@ToString
@EqualsAndHashCode
public class Foo {
@Nullable
private String field1;
@Nullable
private String field2;
private int field3;
}
@Data
@ToString
@EqualsAndHashCode
public class Bar {
private boolean field1;
}
>>> 生产者端代码
String topic = "persistent://public/default/test-avro";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AvroSchema
barSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Bar.class).build()); AvroSchema
fooSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
Producer
producer = new KafkaProducer<>(props, fooSchema, barSchema);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord
(topic, i, foo, bar)); log.info("Message {} sent successfully", i);
}
producer.close();
>>> 消费者端代码
String topic = "persistent://public/default/test-avro";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
AvroSchema
barSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Bar.class).build()); AvroSchema
fooSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
@SuppressWarnings("resource")
Consumer
consumer = new PulsarKafkaConsumer<>(props, fooSchema, barSchema); consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords
records = consumer.poll(100); records.forEach(record -> {
log.info("Received record: {}", record);
});
// Commit last offset
consumer.commitSync();
}
以上就是怎么进行Pulsar Kafka Client的简单分析,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。
分享文章:怎么进行PulsarKafkaClient的简单分析
网站地址:http://scyanting.com/article/iipsdg.html