Kafka Connect支持独立(单进程)和分布式,在下文华纳云分析关于如何配置、运行和管理Kafka Connect。独立模式下全部工作都在单个进程中执行,此配置的设置和启动会更见到那,只有一个工作进程有意义的情况下很有用大,但是不能从Kafka Connect的某些功能中收益,启动独立进程:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
第一个参数是工作器的配置,包括Kafka连接参数、序列化格式及提交偏移量的频率等设置。提供示例应该适用于适用提供的默认配置运行的本地集群config/server.properties。需要进行调整才能和不同配置或生产部署一起使用。所有工作器需要配置:
bootstrap.servers- 用于引导与 Kafka 连接的 Kafka 服务器列表
key.converter- 用于在 Kafka Connect 格式和写入 Kafka 的序列化格式之间进行转换的转换器类。这控制写入 Kafka 或从 Kafka 读取的消息中的键的格式,并且由于这独立于连接器,因此它允许任何连接器使用任何序列化格式。常见格式的示例包括 JSON 和 Avro。
value.converter- 用于在 Kafka Connect 格式和写入 Kafka 的序列化格式之间进行转换的转换器类。这控制写入 Kafka 或从 Kafka 读取的消息中的值的格式,并且由于这独立于连接器,因此它允许任何连接器使用任何序列化格式。常见格式的示例包括 JSON 和 Avro。独立模式特有的重要配置选项是:
offset.storage.file.filename- 用于存储偏移数据的文件
Kafka Connect中生产者和消费者配置参数的设置方法:
通用配置:Kafka Connect使用同一套参数来配置生产者和消费者,通过添加前缀producer.和consumer.来区分。
特定参数:除了通用参数外,bootstrap.servers参数无需前缀,通常足以满足需求。
安全集群:对于需要额外安全参数的安全集群,相同的参数要在工作器配置中设置三次,分别对应管理访问、Kafka源和Kafka接收器。
启动Kafka集群前,需要设置以下关键配置参数:
group.id:标识Connect集群组的唯一名称,默认为"connect-cluster",不能与消费者组ID冲突。
config.storage.topic:存储连接器和任务配置的主题,默认为"connect-configs",应为单分区、高复制、压缩主题,可能需要手动创建。
offset.storage.topic:存储偏移量的主题,默认为"connect-offsets",应有多个分区、复制和压缩。
status.storage.topic:存储状态的主题,默认为"connect-status",可有多个分区,应复制和压缩。
在分布式模式下,连接器配置不通过命令行传递,而是通过REST API创建、修改和销毁。
连接器配置是键值映射,独立模式下在属性文件中定义,分布式模式下包含在JSON负载中。
常见配置选项包括:
name:连接器的唯一名称。
connector.class:连接器的Java类。
tasks.max:为此连接器创建的最大任务数。
key.converter 和 value.converter:覆盖默认的键和值转换器。
接收器连接器控制输入的选项:
topics:输入主题列表。
topics.regex:输入主题的Java正则表达式。
连接器可配置转换功能,如数据修改和事件路由,指定转换链。
Kafka Connect包括多种转换,如InsertField、ReplaceField、MaskField等,用于数据和路由转换。配置转换时,需要指定转换别名、类型和特定配置。例如,使用InsertField添加字段,ReplaceField重命名字段,MaskField屏蔽字段等。