基于k8s、strimzi的kafka connect实战 -欧洲杯足彩官网

`
wiselyman
  • 浏览: 2076981 次
  • 性别:
  • 来自: 合肥
博主相关
  • 博客
  • 微博
  • 相册
  • 收藏
  • 博客专栏
    点睛spring4.1
    浏览量:80745
    点睛spring mvc4...
    浏览量:129940
    社区版块
    • ( 11)
    • ( 19)
    • ( 0)
    存档分类
    最新评论

    基于k8s、strimzi的kafka connect实战

    0. 源码地址

    1. operator framework

    operator framework是一个用来管理k8s原生应用(operator)的开源工具。

    operator framework支持的operator分享地址:。

    如安装kafka使用strimzi apache kafka operator,地址为: 。

    打开strimzi apache kafka operator页面,右侧有install按钮,按照页面提示进行operator安装。

    2. 安装operator lifecycle manager

    operator lifecycle manager是operator framework的一部分,olm扩展了k8s提供声明式方法安装、管理、更新operator以及他们的依赖。

    点击页面上的install显示如何安装strimzi apache kafka operator,我们首先第一步要安装operator lifecycle manager(不要执行下句命令):

    curl -sl https://github.com/operator-framework/operator-lifecycle-manager/releases/download/0.12.0/install.sh | bash -s 0.12.0

    该命令需要使用quay.io的镜像,我们需采取从源码安装,并修改源码中的镜像地址加速。

    源码地址:,当前最新版本为0.12.0

    • 下载:

    • 下载:

    olm.yml中:

    quay.io ->  quay.azk8s.cn

    执行安装:

    kubectl apply -f crds.yaml
    kubectl apply -f olm.yaml

    3. 安装strimzi apache kafka operator

    kubectl create -f https://operatorhub.io/install/strimzi-kafka-operator.yaml

    使用下面命令观察operator启动情况

    kubectl get csv -n operators

    显示如下则安装成功

    wangyunfeis-macbook-pro:olm wangyunfei$ kubectl get csv -n operators
    name                               display                         version   replaces                           phase
    strimzi-cluster-operator.v0.14.0   strimzi apache kafka operator   0.14.0    strimzi-cluster-operator.v0.13.0   succeeded
    

    4. 安装kafka集群

    下载,主要修改的是所需存储空间为5gi作为测试条件,这里的存储需要k8s集群中有默认的storageclass

    apiversion: kafka.strimzi.io/v1beta1
    kind: kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        version: 2.3.0
        replicas: 3
        listeners:
          plain: {}
          tls: {}
        config:
          offsets.topic.replication.factor: 3
          transaction.state.log.replication.factor: 3
          delete.topic.enable: "true"
          transaction.state.log.min.isr: 2
          log.message.format.version: "2.3"
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 5gi
            deleteclaim: false
      zookeeper:
        replicas: 3
        storage:
          type: persistent-claim
          size: 5gi
          deleteclaim: false
      entityoperator:
        topicoperator: {}
        useroperator: {}
    kubectl apply -f kafka-persistent.yml -n kafka 
    • 发送消息测试
    kubectl exec -i -n kafka my-cluster-kafka-0 -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic strimizi-my-topic
    • 接受消息测试
    kubectl exec -i -n kafka  my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic strimizi-my-topic --from-beginning
    • 显示集群topic
    kubectl exec -n kafka my-cluster-kafka-0   -- bin/kafka-topics.sh --list --zookeeper localhost:2181

    5. kafka connect

    本节将外部的sql server中的表person(字段只有idname)通过kafka connect同步至k8s集群里的postgresql中。

    5.1 开启sql server数据库的cdc(change data capture)功能

    5.1.1 启用数据库cdc

    use bs_portal
    exec sys.sp_cdc_enable_db;

    bs_portal为数据库名,此时会自动给我们创建cdc的schema和相关表:

    • captured_columns
    • change_tables
    • dbo_person_ct
    • ddl_history
    • index_columns
    • lsn_time_mapping

    可使用下面sql语句查询已开启cdc的数据库:

    select * from sys.databases where is_cdc_enabled = 1 

    5.1.2 启用表的cdc

    use bs_portal 
    exec sys.sp_cdc_enable_table  
        @source_schema = 'dbo',  
        @source_name = 'person',  
        @role_name = 'cdc_admin',
        @supports_net_changes = 1;

    @source_name为表名,查询表开启cds的sql语句:

    select name, is_tracked_by_cdc from sys.tables where object_id = object_id('dbo.person')  

    查看新增的job

    select job_id,name,enabled,date_created,date_modified from msdb.dbo.sysjobs order by date_created

    确定用户有权限访问cdc表

    exec sys.sp_cdc_help_change_data_capture;
    

    5.1.3 开启“sql server 代理”

    检查安装了sql server的操作系统中“服务”中是否开启了“sql server 代理”。

    5.1.4 关闭cdc

    关闭数据库的cdc

    use bs_portal
    exec sys.sp_cdc_disable_db;

    关闭表的cdc

    use bs_portal
    exec sys.sp_cdc_disable_table   
        @source_schema = 'dbo',  
        @source_name = 'person',  
        @capture_instance = 'all';
    

    5.2 sql server to posgresql

    5.2.1 准备kafka connect镜像

    输入插件(source):下载sql server connector plugin:;输出插件(sink):下载kafka connect jdbc:。

    新建dockerfile文件,将debezium-connector-sqlserver-0.10.0.final-plugin.zip解压放置到dockerfile相同目录下的plugins目录;在plugins目录下新建目录kafka-connect-jdbc,解压confluentinc-kafka-connect-jdbc-5.3.1.zip,将lib下的kafka-connect-jdbc-5.3.1.jarpostgresql-9.4.1212.jar放置在kafka-connect-jdbc

    编写dockerfile

    from strimzi/kafka:0.14.0-kafka-2.3.0
    user root:root
    copy ./plugins/ /opt/kafka/plugins/
    user 1001
    maintainer 285414629@qq.com

    使用阿里云“容器镜像服务”()编译镜像,目前我们的源码地址位于:。

    • “镜像仓库”->“创建镜像仓库”:

      1. 仓库名称:kafka-connect-form-sql-to-jdbc

      2. 仓库类型:公开

    • 下一步后,选择“github”标签页,使用自己的github库,“构建设置”只勾选“海外机器构建”,然后点击“创建镜像仓库”。

    • 点击镜像仓库列表中的“kafka-connect-mysql-postgres”->“构建”->“添加规则”:

      1. 类型:branch

      2. branch/tag:master

      3. dockerfile目录:/sqlserver-to-jdbc/

      4. dockfile文件名:dockerfile

      5. 镜像版本:0.1

    • 确认后,“构建规则设置”->“立即构建”,“构建日志”显示“构建状态”为“成功”即可。

    5.2.2 安装kafka connect

    编写kafka connect集群部署文件kafka-connect-sql-postgres.yml

    apiversion: kafka.strimzi.io/v1beta1
    kind: kafkaconnect
    metadata:
      name: my-connect-cluster
    spec:
      version: 2.3.0
      replicas: 1
      bootstrapservers: 'my-cluster-kafka-bootstrap:9093'
      image: registry.cn-hangzhou.aliyuncs.com/wiselyman/kafka-connect-from-sql-to-jdbc:0.1
      tls:
        trustedcertificates:
          - secretname: my-cluster-cluster-ca-cert
            certificate: ca.crt

    执行安装

    kubectl apply -f kafka-connect-sql-postgres.yml -n kafka

    查询已安装的插件

    kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -x get http://my-connect-cluster-connect-api:8083/connector-plugins

    结果如:

    [{
    	"class": "io.confluent.connect.jdbc.jdbcsinkconnector",
    	"type": "sink",
    	"version": "5.3.1"
    }, {
    	"class": "io.confluent.connect.jdbc.jdbcsourceconnector",
    	"type": "source",
    	"version": "5.3.1"
    }, {
    	"class": "io.debezium.connector.sqlserver.sqlserverconnector",
    	"type": "source",
    	"version": "0.10.0.final"
    }, {
    	"class": "org.apache.kafka.connect.file.filestreamsinkconnector",
    	"type": "sink",
    	"version": "2.3.0"
    }, {
    	"class": "org.apache.kafka.connect.file.filestreamsourceconnector",
    	"type": "source",
    	"version": "2.3.0"
    }]

    5.2.3 使用helm安装postgresql

    使用helm安装postgresql,这里的postgresql库来自于,可在helm中配置。

    对postgresql的账号、密码、初始化数据库、服务类型进行定制后安装:

    helm install --name my-pg --set global.storageclass=standard,postgresuser=wisely,postgrespassword=zzzzzz,postgresdatabase=center,service.type=nodeport,service.nodeport=5432 stable/postgresql

    5.2.4 kafka connect source配置

    编写source配置:sql-server-source.json

    {
      "name": "sql-server-connector",
      "config": {
        "connector.class" : "io.debezium.connector.sqlserver.sqlserverconnector",
        "tasks.max" : "1",
        "database.server.name" : "exam",
        "database.hostname" : "172.16.8.221",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "sa",
        "database.dbname" : "bs_portal",
        "database.history.kafka.bootstrap.servers" : "my-cluster-kafka-bootstrap:9092",
        "database.history.kafka.topic": "schema-changes.person",
        "table.whitelist": "dbo.person"
      }
    }

    编写sink配置:postgres-sink.json

    {
      "name": "postgres-sink",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector",
        "tasks.max": "1",
        "topics": "exam.dbo.mh_yczm",
        "connection.url": "jdbc:postgresql://my-pg-postgresql.default.svc.cluster.local:5432/center?user=wisely&password=zzzzzz",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate",
        "transforms.unwrap.drop.tombstones": "false",
        "auto.create": "true",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "pk.fields": "ipdz",
        "pk.mode": "record_key"
      }
    }

    5.2.5 使用

    将配置文件提交到kafka connect

    cat sql-server-source.json | kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -x post -h "accept:application/json" -h "content-type:application/json" http://my-connect-cluster-connect-api:8083/connectors -d @-
    cat postgres-sink.json| kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -x post -h "accept:application/json" -h "content-type:application/json" http://my-connect-cluster-connect-api:8083/connectors -d @-

    查看所有的connector

    kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -x get http://my-connect-cluster-connect-api:8083/connectors

    删除connect

    kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -x delete http://my-connect-cluster-connect-api:8083/connectors/postgres-sink
    

    查看所有的topic

    kubectl exec -n kafka my-cluster-kafka-0   -- bin/kafka-topics.sh --list --zookeeper localhost:2181
    

    查看sql server connector中的数据

    kubectl exec -i -n kafka my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic exam.dbo.person --from-beginning

    我们此时查看postgresql数据库已经有了person表和数据,当对sql server新增、修改、删除数据时,postgresql中也会同步更新。

    0
    0
    分享到:
    评论

    相关推荐

      图解 kafka 之实战指南.pdf

      strimzi-kafka-group-authorizer strimzi kafka operator的简单授权者,可以基于模式为用户组配置acl

      k8s-kafka kubernetes上的kafka容器编辑controller.yaml并更改zookeeper_connect环境以指向您已经拥有的zookeeper实例。 zookeeper的配置不在本文档范围内。 zookeeper列表是逗号分隔的列表,例如host:port,host:...

      图解 kafka 之实战指南.7z

      从zookeeper、kafka的安装,到kafka-connect的配置,有详细的步骤和参数的解释。

      apache kafka实战.pdf..

      kafka实战pdf

      kafka connect redis 用于redis的kafka源和接收器连接器 连接器 来源 kafka connect redis source使用redis发布/订阅订阅redis通道/模式(包括),并将接收到的消息写入kafka。 有关更多信息,请参见。 下沉 kafka...

      1. 高级-项目实战-日志收集系统kafka库实战 2. 高级-etcd、contex、kafka消费实例、logagent 3. 实战-商品秒杀架构设计与开发 4. 实战-商品秒杀开发与接入层实现 总共18课时,网上收集的资料,只共用于学习,不...

      kafka-connect-ui 这是kafka connect的网络工具,用于设置和管理多个连接集群的连接器。现场演示与docker独立运行docker run --rm -it -p 8000:8000 \ -e "connect_url=...

      kafka connect jdbc连接器 kafka-connect-jdbc是一个用于与任何兼容jdbc的数据库之间加载数据。 可以在找到该连接器的文档。发展要构建开发版本,您需要kafka的最新版本以及一系列上游confluent项目,您必须从其相应...

      kafka-connect-mqtt 此仓库包含用于apache kafka的mqtt源和接收器连接器。 已通过kafka 2 进行了测试。 使用源连接器,您可以订阅mqtt主题,并将这些消息写到kafka主题。 接收器连接器以相反的方式工作。 笔记: ...

      kafka streams是kafka提供的一个用于构建流式处理程序的java库,它与storm、spark等流式处理框架不同,是一个仅依赖于kafka的java库,而不是一个流式处理框架。除kafka之外,kafka streams不需要额外的流式处理集群...

      flink搭配kafka,构建流式采集框架,提供了docker部署方式脚本和k8s多副本方式部署脚本

      kafka从入门基础,深入理解、核心设计,包括:kafka结构中的broker\topic\partition\producer\consumer等

      您可以通过使用kafka9-connect-mongodb分支将此连接器用于kafka9。 用于kafka connect的mongodb接收器连接器提供了从kafka主题或主题集到mongodb集合或多个集合的简单,连续的链接。 连接器使用kafka消息,重命名...

      the jdbc source and sink connectors allow you to exchange data between relational databases and kafka. the jdbc source connector allows you to import data from any relational database with a jdbc ...

      介绍通过安装该kafka connect连接器提供了监视目录的文件和在将新文件写入输入目录时读取数据的功能。 输入文件中的每个记录将根据用户提供的模式进行转换。 csvrecordprocessor支持读取csv或tsv文件。 它可以将csv...

      k8s-kafka docker容器,用于通过环境中的配置设置在ubuntu容器(java7)中运行kafka (0.8.1.1)。 环境 这些环境变量的名称经过精心选择,以与kubernetes(主要是gke)为定义的服务提供的内容兼容。 kafka_server...

      kafka监控工具kafkaoffsetmnitor 没有响应,需要修改offsetapp目录index.html ,进行本地化。压缩包文件就是所需要的文件。

    global site tag (gtag.js) - google analytics
    网站地图