spring boot配置spring-kafka与原生kafka
阅读数:1028 评论数:0
跳转到新版页面分类
python/Java
正文
一、概述
springboot中有两种方式使用kafka时,直接使用kafka-client连接kafka服务;另一种是使用spring-kafka框架来连接kafka。
使用时要注意版本问题,详情参考:https://spring.io/projects/spring-kafka#overview
主要的版本兼容体现在
springboot与spring-kafka之间
spring-kafka和kafka-client之间
kafka-client和kafka服务端之间
的兼容。
2004年 | Spring框架发布了第一个正式版本1.0,这个版本包括IoC容器、AOP、数据访问等核心特性 |
2006年 | Spring框架发布了2.0版本,这个版本包括了对Java 5的支持,支持注解配置,引入了Spring MVC等新特性 |
2009年 | Spring框架发布了3.0版本,这个版本引入了Java配置方式,改进了AOP和数据访问功能,提供了RESTful Web服务支持 |
2013年 | Spring框架发布了4.0版本,这个版本主要支持Java 8,引入了Spring WebSocket和Spring Security的大量改进 |
2017年 | Spring框架发布了5.0版本,这个版本主要支持Java 9和Java EE 8,同时引入了响应式编程模型和WebFlux框架。 |
消息中间件是基于队列与消息的传递技术,在分布式、高并发的应用场景中高效、稳定的进行数据交换。
解耦 | 将消息写入消息队列,需要消息的系统自己从消息队列中订阅; |
异步 | 异步方式可以提高执行效率,减少执行时间,生产消息后,无需同步等待消息结果,任务并行处理; |
最终一致 | 最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情; |
流量削峰 | 允许消息积压,根据处理能力慢慢处理积压的消息,典型的使用场景就是秒杀业务用于流量削峰场景; |
广播 | 只需要关心消息是否送达了队列,减少了开发和联调的工作量; |
二、spring-kafka
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
因为spring-boot提供了对spring-kafka的自动装配,所以直接在application配置文件中配置就可以,当然也可以手动配置。
##consumer的配置参数(开始)##
#如果enable.auto.commit为true,则消费者偏移自动提交给kafka的频率
#(以毫秒为单位)默认5000
spring.kafka.consumer.auto-commit-interval;
#当kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,
#默认值为latest,表示自动将偏移量重置为最新的偏移量
#可选的值为latest、earliest、none
spring.kafka.consumer.auto-offset-reset=latest;
#以逗号分隔的ip:port列表,用于建立与kafka集群的初始连接
spring.kafka.consumer.bootstrap-servers;
#ID在发出请求时传递给服务器,用于服务器端日志记录。
spring.kafka.consumer.client-id;
#如果为true,则消费者的偏移量将在后台定期提交,默认值为true
spring.kafka.consumer.enable-auto-commit=true;
#如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器在回答
#获取请求之前将阻塞的最长时间(单位ms),默认500
spring.kafka.consumer.fetch-max-wait;
#服务器以字节为单位返回获取请求的最小数据量,默认为1
spring.kafka.consumer.fetch-min-size;
#用于标识此使用者所属的使用者组的唯一字符串
spring.kafka.consumer.group-id;
#心跳周期,默认3000ms
spring.kafka.consumer.heartbeat-interval;
#key的反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#value的反序列类
spring.kafka.consumer.value-deserializer=org.apache.common.serialization.StringDeserializer
#一次调用poll()操作时返回的最大记录数,默认值为500
spring.kafka.consumer.max-poll-records;
## consumer的配置参数(结束 )##
## producer的配置参数(开始) ##
#producer要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化
# acks=0 , 如果设置为0,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。
#在这种情况下,无法保证服务顺已收到记录,并且重试配置将不会生效,为每条记录返回的偏移量始终为-1。
#acks=1,这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应。
#但在将数据复制到所有副本服务之前,有数据丢失的可能性。
#acks=all,这意味着leader将等待完整的同步副本以确认记录。
spring.kafka.producer.acks=1
#每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求。默认为16384字节
spring.kafka.producer.batch-size=16384
spring.kafka.producer.bootstrap-servers
#生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认33554432
spring.kafka.producer.buffer-memory;
spring.kafka.producer.client-di
#生产者生成的所数据的压缩类型,此配置接受标准压缩编解码器(gzip, snappy, lz4)
#还接受uncompressed以及producer(保留生产者设置的原始压缩编解码器),
#默认producer
spring.kafka.producer.compression-type=producer
spring.kafka.producer.key-serializer
spring.kafka.producer.value-serializer
#失败重试的次数
spring.kafka.producer.retries
## producers的配置参数(结束 ) ##
##listener的配置参数 ##
#侦听器的ackmode,当enable.auto.commit为false生效
spring.kafka.listener.ack-mode;
#在侦听器容器中运行的线程数
spring.kafka.listener.concurrency;
#轮询消费者时使用的超时(ms)
spring.kafka.listener.poll-timeout;
#当ackMode为COUNT或COUNT_TIME时,偏移提交之间的记录数
spring.kafka.listener.ack-count;
#当ackMode为COUNT或COUNT_TIME时,偏移提交之间的时间(ms)
spring.kafka.listener.ack-time;
配置示例
kafka.consumer.zookeeper.connect=zookeeper-ip:2181
kafka.consumer.servers=kafka-ip:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10
kafka.producer.servers=kafka-ip:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
package com.databps.bigdaf.admin.config;
import com.databps.bigdaf.admin.manager.HomePageManager;
import com.databps.bigdaf.admin.vo.HomePageVo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author haipeng
* @create 17-11-2 上午11:39
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Autowired
private HomePageManager homePageManager;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// propsMap.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181");
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
package com.databps.bigdaf.admin.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
* @author haipeng
* @create 17-11-2 上午11:37
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
(1)添加KafkaTemplate注入
@Autowired
private KafkaTemplate kafkaTemplate;
(2)发送
AuditVo auditVo=new AuditVo();
long sortData=Long.parseLong(DateUtils.getNowDateTime());
auditVo.setId("sdfdf");
auditVo.setCmpyId(cmpyId);
auditVo.setUser("whp");
auditVo.setPluginIp("192.168.1.53");
auditVo.setAccessTime(DateUtils.getNowDateTime());
auditVo.setAccessType("WRITE");
auditVo.setAction("write");
auditVo.setAccessResult("success");
auditVo.setServiceType("hbase");
auditVo.setResourcePath("/whp");
Gson gson=new Gson();
kafkaTemplate.send("test", gson.toJson(auditVo));
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void processMessage(String content) {
System.out.println("消息被消费"+content);
}
}
三、使用原生Kafka的Java API
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
package com.example.demo.kafka;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
public class kafkaConsumer {
private String topic="test";
@Test
public void Producer(){
Properties props = new Properties();
props.put("bootstrap.servers", "master1.hdp.com:6667");
props.put("acks", "all"); //ack方式,all,会等所有的commit最慢的方式
props.put("retries", 0); //失败是否重试,设置会有可能产生重复数据
props.put("batch.size", 16384); //对于每个partition的batch buffer大小
props.put("linger.ms", 1); //等多久,如果buffer没满,比如设为1,即消息发送会多1ms的延迟,如果buffer没满
props.put("buffer.memory", 33554432); //整个producer可以用于buffer的内存大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topic, "", Integer.toString(1)));
producer.close();
}
private ConsumerConnector consumer;
@Test
public void kafkaConsumer() {
Properties props = new Properties();
// zookeeper 配置
props.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181");
// group 代表一个消费组
props.put("group.id", "jd-group");
// zk连接超时
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "largest");
// 序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = (ConsumerConnector) kafka.consumer.Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(
new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream<String, String> stream = consumerMap.get(
"test").get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext())
System.out.println(it.next().message());
}
}
相关推荐
consumer是非线程安全的,它不能在多线程中使用,所以在shutdownHook线程中不能使用consumer的close方法。如果你没有在程序退出前很好关闭consumer,最明显的行为主
afka在有新消费者加入或者撤出时,会触发rebalance操作,在subscribe订阅主题的时候,我们可以编写回掉函数,在触发rebalance操作之前和之后,提交相应偏移量和获取偏移量。<
一、概述
broker
一台kafka服务器就是一个broker
topic
topic是一个存储消息的逻辑概念,可以认为是一个消息集合。物理上来说,不同的Topic的消息是分开存储的
websocket协议基于tcp的网络协议,它实现浏览器与器全双工通信。
spring boot2 +websocket
1、添加依赖
<pre clas
背景:
之前用spring boot+mybatis+oracle,现在要改成spring boot_mybatis+postgresql。
为了想让一套代码即可
共同点
都是用来表示Spring某个类是否可以接收HTTP请求。
不同点
@Controller标识一个spring类是Spring MVC c
系统异常捕获
参见:spring boot 2 全局异常处理
@ControllerAdvice(annotations = {RestController.class})
public class
从SSH(Structs/Spring/Hibernate)到SSM(SpringMVC/Spring/MyBatis),到现在一个S(Spring)就够的年代,可见Spring越来越重要了。<
很多应用程序使用Log4j记录日志,如何使用Kafka实时的收集与存储这些Log4j产生的日志呢?一种方案是使用其他组件(比如Flume,或者自己开发程序)实时监控这些日志文件,然后发送至Kaf