博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka实战系列--Kafka API使用体验
阅读量:5276 次
发布时间:2019-06-14

本文共 4201 字,大约阅读时间需要 14 分钟。

 

前言:

  kafka是linkedin开源的消息队列, 淘宝的metaq就是基于kafka而研发. 而消息队列作为一个分布式组件, 在服务解耦/异步化, 扮演非常重要的角色. 本系列主要研究kafka的思想和使用, 本文主要讲解kafka的一些基本概念和api的使用.

*) 准备工作

1) 配置maven依赖

  
org.apache.kafka
  
kafka_2.9.2
  
0.8.1.1

2).配置hosts

vim /etc/hosts
把kafka集群相关的ip及其hostname, 配置到kafka客户端的本地机器

*) Kafka的基础知识

1). Broker, Zookeeper, Producer, Consumer
Broker具体承担消息存储转发工作, Zookeeper则用与元信息的存储(topic的定义/消费进度), Producer则是消息的生产者, Consumer则是消息的消费者.

2). Topic, Partition, Replication, Consumer Group

  Topic对应一个具体的队列, 在Kafka的概念中, 一个应用一个队列. 应用数据往往呈现部分有序的特点, 因此对kafka的队列, 引入partition的概念, 即可topic划分为多个partition. 单个Partition内保证有序, Partition间不保证. 这样作的好处, 是充分利用了集群的能力, 均匀负载和提高性能.
  Replication主要为了高可用性, 保证部分节点失效的恶劣情况下, 队列数据能不丢.
  Consumer Group的概念的引入, 很有创新性, 把以往传统队列(topic模式, queue模式)的属性从队列本身挪到了消费端. 若要使用queue模式, 则所有的消费端都采用统一个consumer group, 若采用topic模式, 则所有的客户端都设置为不同的consumer group. 其partition的消费进度在zookeeper有所保存.

 *) Kafka API的简单样列代码

1). 生产者代码

分区类代码片段

public class SimplePartitioner implements Partitioner {  public SimplePartitioner (VerifiableProperties props) {  }  public int partition(Object key, int numPartitions) {    return (key.hashCode() & 0x0FFFFFFF) % numPartitions;  }}

评注: SimplePartitioner用于对消息进行分发到具体的partition中, 有消息的key来决定, 这个有点像map/reduce中的partition机制.

生产者代码片段

Properties props = new Properties();// 配置metadata.broker.list, 为了高可用, 最好配两个broker实例props.put("metadata.broker.list", "127.0.0.1:9092");// serializer.class为消息的序列化类props.put("serializer.class", "kafka.serializer.StringEncoder");// 设置Partition类, 对队列进行合理的划分props.put("partitioner.class", "mmxf.kafka.practise.SimplePartitioner");// ACK机制, 消息发送需要kafka服务端确认props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);Producer
producer = new Producer
(config);// KeyedMessage
//   K对应Partition Key的类型//   V对应消息本身的类型 //   topic: "test", key: "key", message: "message"KeyedMessage
message = new KeyedMessage
("test", "key", "message");producer.send(message);// 关闭producer实例producer.close();

2). 消费者代码

使用High Level Consumer的API 线程模型和Partition数最好能保持一致, 即One Thread For Partition
参考sample样例: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
代码片段如下:

public static void main(String[] args) {  // *) 创建ConsumerConfig  Properties props = new Properties();  // 设置zookeeper的链接地址  props.put("zookeeper.connect", "127.0.0.1:2181");  // 设置group id  props.put("group.id", "group_id");  // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新  props.put("auto.commit.interval.ms", "1000");  ConsumerConfig consumerConfig = new ConsumerConfig(props);  ConsumerConnector consumer = (ConsumerConnector) Consumer.createJavaConsumerConnector(consumerConfig);  String topic = "test";  int threadNum = 1;  // *) 设置Topic=>Thread Num映射关系, 构建具体的流  Map
topicCountMap = new HashMap
();  topicCountMap.put(topic,threadNum);  Map
>> consumerMap = consumer.createMessageStreams(topicCountMap);  List
> streams = consumerMap.get(topic);  // *) 启动线程池去消费对应的消息  ExecutorService executor = Executors.newCachedThreadPool();  for ( final KafkaStream
stream : streams ) {    executor.submit(new Runnable() {      public void run() {        ConsumerIterator
iter = stream.iterator();        while ( iter.hasNext() ) {          MessageAndMetadata
mam = iter.next();          System.out.println(            String.format("thread_id: %d, key: %s, value: %s",                Thread.currentThread().getId(),                new String(mam.key()),                new String(mam.message())              )          );          }      }    });  }  try {    Thread.sleep(1000 * 10);  } catch (InterruptedException e) {    e.printStackTrace();  }  // *) 优雅地退出  consumer.shutdown();  executor.shutdown();  while ( !executor.isTerminated() ) {    try {      executor.awaitTermination(1, TimeUnit.SECONDS);    } catch (InterruptedException e) {    }  }}

 结果输出:

thread_id: 18, key: key, value: message

转载于:https://www.cnblogs.com/mumuxinfei/p/3859193.html

你可能感兴趣的文章
git - 搭建最简单的git server
查看>>
.net中从GridView中导出数据到excel(详细)
查看>>
[LeetCode]Single Number II
查看>>
poj3216 Prime Path(BFS)
查看>>
使用IntelliJ IDEA 2016创建maven管理的Java Web项目
查看>>
R语言 线性回归
查看>>
Ubuntu下用cue文件对ape和wav文件自动分轨
查看>>
会话控制
查看>>
推荐一款UI设计软件Balsamiq Mockups
查看>>
DRF的版本控制,认证,权限和频率限制
查看>>
Linux crontab 命令格式与详细例子
查看>>
百度地图Api进阶教程-地图鼠标左右键操作实例和鼠标样式6.html
查看>>
游标使用
查看>>
LLBL Gen Pro 设计器使用指南
查看>>
SetCapture() & ReleaseCapture() 捕获窗口外的【松开左键事件】: WM_LBUTTONUP
查看>>
PLSQL Developer使用技巧
查看>>
Android 设置界面的圆角选项
查看>>
百度地图api服务端根据经纬度得到地址
查看>>
使用yum更新时不升级Linux内核的方法
查看>>
sqlserver计算时间差DATEDIFF 函数
查看>>