Kafka Connect的作用

KafkaConnect是一个将数据从数据库、文件或其它地方读取到kafak中,或将kafka中的消息落地到数据库、elastic search中的一个工具。一进一出,这两个功能可以独立使用。

KafkaConnect是一个标准的接口或使用方式,它有很多的connect插件来完成它各种各样的功能,自己也可以写一个connect插件来满足自己的需求。通过插件可以实现例如:将数据库某个表增量地一行一条消息地输出到kafka中;将kafka中的消息落地到elastic search中。

KafkaConnect有两种运行方式,standalone和distributed,前者适合自测时使用,它是一个独立的进程在跑任务,关闭了就停止了;后者适合起多个相同的进程,它会自己调度着跑,后者适合于线上环境。

非镜像方式使用

kafka压缩包自带了kafka connect,但是并没有带各种插件,需要自己去下载并放到kafka libs目录下。例如最用的jdbc连mysql,就需要把kafka-connect-jdbc-5.3.1.jar和mysql-connector-java-8.0.18.jar这两个jar包放到libs目录下。

下载一个最新的kafka.tar.gz并解压,修改config/connect-distributed.properties,一般只需要修改bootstrap.servers就好了:

bootstrap.servers: kafka broker地址,ip:port,ip:port
# group.id 可以用默认的,相同的group idconnect集群的相同分组
# key.converter  这个和下面这个可以使用默认的json方式
# value.converter

然后启动:

cd bin # 进入到bin目录下
./connect-distributed.sh ../config/connect-distributed.properties

正常的话启动不会报错,并监听在8083端口。参考下面常用方式进行操作。

镜像方式使用

使用这个镜像:cp-kafka-connect:5.3.1 然后配置环境变量:

CONNECT_BOOTSTRAP_SERVERS=192.168.0.100:9092   # 实际的broker ip地址列表,逗号隔开
CONNECT_CONFIG_STORAGE_TOPIC=connect-configs   # 默认这个名称即可
CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets   # 默认这个名称即可
CONNECT_STATUS_STORAGE_TOPIC=connect-status    # 默认这个名称即可
CONNECT_GROUP_ID=connect-cluster               # 默认这个名称即可
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter  # 默认这个名称即可
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter  # 默认这个名称即可
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter  # 默认这个名称即可
CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter  # 默认这个名称即可
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 # 重要:这个要根据实际kafka broker数量填
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 # 重要:这个要根据实际kafka broker数量填
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 # 重要:这个要根据实际kafka broker数量填
CONNECT_REST_ADVERTISED_HOST_NAME=connect # 这个没啥用,但是不填不行

启动之后,把容器内部的8083端口暴露出来即可。

这个镜像自带了很多插件,例如kafka-connect-jdbc-5.3.1.jar,放在镜像中/usr/share/java/下,要注意的是,这个目录下的jar包并不会生效,需要把jar包拷贝到/usr/share/java/kafka/ 下才生效。

一般来说,我们会基于这个镜像,把我们自己的mysql、orable等驱动拷贝进去,再制作出一个自己的镜像。

常用方式

  1. 查看所有支持的connectors
    curl -X GET http://localhost:8083/connector-plugins
    默认自带了FileStreamSinkConnector、FileStreamSourceConnector
    如果自己放了插件进去,可以通过这个接口查询到插件是否生效,如果可以查出自己放进去的Connector,就表示生效了

  2. 查看所有在运行的connectors
    curl -X GET http://localhost:8083/connectors

  3. 删除指定的connector
    curl -s -X DELETE http://localhost:8083/connectors/connector名称

从mysql导出数据到kafka例子

把kafka-connect-jdbc-5.3.1.jar和mysql-connector-java-8.0.18.jar放到对应的目录下,然后通过curl -X GET http://localhost:8083/connector-plugins可以看到JdbcSourceConnector,然后执行:

echo '{"name":"mysql-test-connector","config":{"connector.class":"JdbcSourceConnector","connection.url":"jdbc:mysql://192.168.0.100:3306/test?user=root&password=root","mode":"incrementing","table.whitelist":"student","validate.non.null":false,"incrementing.column.name":"id","topic.prefix":"mysql."}}' | curl -X POST -d @- http://localhost:8083/connectors --header "Content-Type:application/json"

就可以启动一个connector了,上面命令中的主要设置:

connection.url # 数据库连接url,自行修改ip端口用户名密码
mode 增量方式,它支持全量bulktimestampincrementing几种方式
table.whitelist 表名
topic.prefix 生成的kafka主题的前缀

mode:timestamp && “timestamp.column.name”:”login_time”,表示识别根据login_time时间列来识别增量数据,一旦这一列值发生变化,就会有一天新的记录写到kafka主题

mode:incrementing && “incrementing.column.id”:”id”,适合还有自增列的表,一旦有新的记录入mysq,就会有新的记录写到kafka主题

关于kafka connect从数据库转换出来的decimal或numberic数据转换问题

这个在读取MSSQL Server转换出来的数据时发现的,假设数据库是numberic(10,3),3位小数,那么收到的数据实际上是个字符串,需要这样转换:

// 后面的数字要根据实际数据库的值来设置,数据库是2位小数就是index,数据库是3位小数就是3
BigDecimal bigDecimal = new BigDecimal(new BigInteger(Base64.getDecoder().decode("RcQ=")),3);
System.out.println(bigDecimal);

参考博客

  1. https://www.jianshu.com/p/46b6fa53cae4
  2. https://my.oschina.net/hnrpf/blog/1555915
文档更新时间: 2019-11-01 16:13   作者:nick