spring boot配置spring-kafka与原生kafka

阅读数:824 评论数:0

跳转到新版页面

分类

python/Java

正文

一、概述

springboot中有两种方式使用kafka时,直接使用kafka-client连接kafka服务;另一种是使用spring-kafka框架来连接kafka。

1、版本兼容

使用时要注意版本问题,详情参考:https://spring.io/projects/spring-kafka#overview

主要的版本兼容体现在

springboot与spring-kafka之间

spring-kafka和kafka-client之间

kafka-client和kafka服务端之间

的兼容。

2、spring框架的发展

2004年 Spring框架发布了第一个正式版本1.0,这个版本包括IoC容器、AOP、数据访问等核心特性
2006年 Spring框架发布了2.0版本,这个版本包括了对Java 5的支持,支持注解配置,引入了Spring MVC等新特性
2009年 Spring框架发布了3.0版本,这个版本引入了Java配置方式,改进了AOP和数据访问功能,提供了RESTful Web服务支持
2013年 Spring框架发布了4.0版本,这个版本主要支持Java 8,引入了Spring WebSocket和Spring Security的大量改进
2017年 Spring框架发布了5.0版本,这个版本主要支持Java 9和Java EE 8,同时引入了响应式编程模型和WebFlux框架。

3、消息中间件

消息中间件是基于队列与消息的传递技术,在分布式、高并发的应用场景中高效、稳定的进行数据交换。

解耦  将消息写入消息队列,需要消息的系统自己从消息队列中订阅;
异步 异步方式可以提高执行效率,减少执行时间,生产消息后,无需同步等待消息结果,任务并行处理;
最终一致 最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情;
流量削峰 允许消息积压,根据处理能力慢慢处理积压的消息,典型的使用场景就是秒杀业务用于流量削峰场景;
广播  只需要关心消息是否送达了队列,减少了开发和联调的工作量;

二、spring-kafka

1、添加pom依赖

<dependencies>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

2、配置参数说明

因为spring-boot提供了对spring-kafka的自动装配,所以直接在application配置文件中配置就可以,当然也可以手动配置。

##consumer的配置参数(开始)##
#如果enable.auto.commit为true,则消费者偏移自动提交给kafka的频率
#(以毫秒为单位)默认5000
spring.kafka.consumer.auto-commit-interval;
#当kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,
#默认值为latest,表示自动将偏移量重置为最新的偏移量
#可选的值为latest、earliest、none
spring.kafka.consumer.auto-offset-reset=latest;
#以逗号分隔的ip:port列表,用于建立与kafka集群的初始连接
spring.kafka.consumer.bootstrap-servers;
#ID在发出请求时传递给服务器,用于服务器端日志记录。
spring.kafka.consumer.client-id;
#如果为true,则消费者的偏移量将在后台定期提交,默认值为true
spring.kafka.consumer.enable-auto-commit=true;
#如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器在回答
#获取请求之前将阻塞的最长时间(单位ms),默认500
spring.kafka.consumer.fetch-max-wait;
#服务器以字节为单位返回获取请求的最小数据量,默认为1
spring.kafka.consumer.fetch-min-size;
#用于标识此使用者所属的使用者组的唯一字符串
spring.kafka.consumer.group-id;
#心跳周期,默认3000ms
spring.kafka.consumer.heartbeat-interval;
#key的反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#value的反序列类
spring.kafka.consumer.value-deserializer=org.apache.common.serialization.StringDeserializer
#一次调用poll()操作时返回的最大记录数,默认值为500
spring.kafka.consumer.max-poll-records;
## consumer的配置参数(结束 )##
## producer的配置参数(开始) ##
#producer要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化
# acks=0 , 如果设置为0,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。
#在这种情况下,无法保证服务顺已收到记录,并且重试配置将不会生效,为每条记录返回的偏移量始终为-1。
#acks=1,这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应。
#但在将数据复制到所有副本服务之前,有数据丢失的可能性。
#acks=all,这意味着leader将等待完整的同步副本以确认记录。
spring.kafka.producer.acks=1
#每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求。默认为16384字节
spring.kafka.producer.batch-size=16384
 
spring.kafka.producer.bootstrap-servers
#生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认33554432
spring.kafka.producer.buffer-memory;
 
spring.kafka.producer.client-di
#生产者生成的所数据的压缩类型,此配置接受标准压缩编解码器(gzip, snappy, lz4)
#还接受uncompressed以及producer(保留生产者设置的原始压缩编解码器),
#默认producer
spring.kafka.producer.compression-type=producer
 
spring.kafka.producer.key-serializer
 
spring.kafka.producer.value-serializer
#失败重试的次数
spring.kafka.producer.retries
## producers的配置参数(结束 ) ##
##listener的配置参数 ##
#侦听器的ackmode,当enable.auto.commit为false生效
spring.kafka.listener.ack-mode;
#在侦听器容器中运行的线程数
spring.kafka.listener.concurrency;
#轮询消费者时使用的超时(ms)
spring.kafka.listener.poll-timeout;
#当ackMode为COUNT或COUNT_TIME时,偏移提交之间的记录数
spring.kafka.listener.ack-count;
#当ackMode为COUNT或COUNT_TIME时,偏移提交之间的时间(ms)
spring.kafka.listener.ack-time;

配置示例

kafka.consumer.zookeeper.connect=zookeeper-ip:2181
kafka.consumer.servers=kafka-ip:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10
 
kafka.producer.servers=kafka-ip:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960

3、添加KafkaConsumer配置类

package com.databps.bigdaf.admin.config;
 
import com.databps.bigdaf.admin.manager.HomePageManager;
import com.databps.bigdaf.admin.vo.HomePageVo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 
import java.util.HashMap;
import java.util.Map;
/**
 * @author haipeng
 * @create 17-11-2 上午11:39
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    @Autowired
    private HomePageManager homePageManager;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }
 
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
 
 
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
//        propsMap.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181");
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
}

4、添加KafkaProducer配置类

package com.databps.bigdaf.admin.config;
 
 
import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
 * @author haipeng
 * @create 17-11-2 上午11:37
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;
 
 
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
 
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
 
}

5、生产调用过程

(1)添加KafkaTemplate注入

  @Autowired
  private KafkaTemplate kafkaTemplate;

(2)发送

    AuditVo auditVo=new AuditVo();
    long sortData=Long.parseLong(DateUtils.getNowDateTime());
    auditVo.setId("sdfdf");
    auditVo.setCmpyId(cmpyId);
    auditVo.setUser("whp");
    auditVo.setPluginIp("192.168.1.53");
    auditVo.setAccessTime(DateUtils.getNowDateTime());
    auditVo.setAccessType("WRITE");
    auditVo.setAction("write");
    auditVo.setAccessResult("success");
    auditVo.setServiceType("hbase");
    auditVo.setResourcePath("/whp");
    Gson gson=new Gson();
    kafkaTemplate.send("test", gson.toJson(auditVo));

6、消费,只要在方法上添加注解就可以了。

@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"test"})
    public void processMessage(String content) {
        System.out.println("消息被消费"+content);
    }
}

三、使用原生Kafka的Java API

1、maven依赖

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>0.10.2.0</version>
</dependency>

2、使用

package com.example.demo.kafka;
 
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
 
 
public class kafkaConsumer {
 
  private String topic="test";
 
  @Test
  public void Producer(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "master1.hdp.com:6667");
    props.put("acks", "all"); //ack方式,all,会等所有的commit最慢的方式
    props.put("retries", 0); //失败是否重试,设置会有可能产生重复数据
    props.put("batch.size", 16384); //对于每个partition的batch buffer大小
    props.put("linger.ms", 1);  //等多久,如果buffer没满,比如设为1,即消息发送会多1ms的延迟,如果buffer没满
    props.put("buffer.memory", 33554432); //整个producer可以用于buffer的内存大小
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, String>(topic, "", Integer.toString(1)));
 
 
    producer.close();
  }
 
  private  ConsumerConnector consumer;
 
  @Test
  public  void kafkaConsumer() {
 
    Properties props = new Properties();
 
    // zookeeper 配置
 
    props.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181");
 
    // group 代表一个消费组
 
    props.put("group.id", "jd-group");
 
    // zk连接超时
 
    props.put("zookeeper.session.timeout.ms", "4000");
 
    props.put("zookeeper.sync.time.ms", "200");
 
    props.put("auto.commit.interval.ms", "1000");
 
    props.put("auto.offset.reset", "largest");
 
    // 序列化类
 
    props.put("serializer.class", "kafka.serializer.StringEncoder");
 
    ConsumerConfig config = new ConsumerConfig(props);
 
    consumer = (ConsumerConnector) kafka.consumer.Consumer.createJavaConsumerConnector(config);
 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
 
    topicCountMap.put("test", new Integer(1));
 
    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
 
    StringDecoder valueDecoder = new StringDecoder(
        new VerifiableProperties());
    Map<String, List<KafkaStream<String, String>>> consumerMap =
        consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
    KafkaStream<String, String> stream = consumerMap.get(
        "test").get(0);
    ConsumerIterator<String, String> it = stream.iterator();
    while (it.hasNext())
      System.out.println(it.next().message());
  }
}



相关推荐

consumer是非线程安全的,它不能在多线程中使用,所以在shutdownHook线程中不能使用consumer的close方法。如果你没有在程序退出前很好关闭consumer,最明显的行为主

afka在有新消费者加入或者撤出时,会触发rebalance操作,在subscribe订阅主题的时候,我们可以编写回掉函数,在触发rebalance操作之前和之后,提交相应偏移量和获取偏移量。<

一、概述 broker 一台kafka服务器就是一个broker topic topic是一个存储消息的逻辑概念,可以认为是一个消息集合。物理上来说,不同的Topic的消息是分开存储的

当然可以自己写redis的工具类,或者使用第三方开源jar包或代码,这里使用spring boot的集成类。 一、pom依赖 <dependency> <gro

websocket协议基于tcp的网络协议,它实现浏览器与器全双工通信。 spring boot2 +websocket 1、添加依赖 <pre clas

背景: 之前用spring boot+mybatis+oracle,现在要改成spring boot_mybatis+postgresql。 为了想让一套代码即可

共同点 都是用来表示Spring某个类是否可以接收HTTP请求。 不同点 @Controller标识一个spring类是Spring MVC c

系统异常捕获 参见:spring boot 2 全局异常处理 @ControllerAdvice(annotations = {RestController.class}) public class

从SSH(Structs/Spring/Hibernate)到SSM(SpringMVC/Spring/MyBatis),到现在一个S(Spring)就够的年代,可见Spring越来越重要了。<

很多应用程序使用Log4j记录日志,如何使用Kafka实时的收集与存储这些Log4j产生的日志呢?一种方案是使用其他组件(比如Flume,或者自己开发程序)实时监控这些日志文件,然后发送至Kaf