Confluent Cloud

This tutorial describes how to configure a YugabyteDB connector to publish changes to Confluent Cloud, and assumes some familiarity with Docker and YAML files.

Architecture of YugabyteDB to Confluent Cloud pipeline

Configure a Confluent Cloud cluster

Set up a Confluent Cloud cluster on the provider of your choice. For more information, refer to Manage Kafka Clusters on Confluent Cloud in the Confluent documentation.

Download the credentials

Create an API key for your Confluent cluster and save it. Refer to Use API Keys to Control Access in Confluent Cloud for instructions.

Create custom Kafka Connect image with YugabyteDB connector

To create a Kafka Connect image with the YugabyteDB connector, start with the Confluent Server Docker Image for Kafka Connect.

  1. Create a directory which will be used to store all related files.

    mkdir kafka-connect-ccloud && cd kafka-connect-ccloud
    
  2. Download the YugabyteDB connector jar.

    curl -so debezium-connector-yugabytedb-1.9.5.y.22.jar https://github.com/yugabyte/debezium-connector-yugabytedb/releases/download/v1.9.5.y.22/debezium-connector-yugabytedb-1.9.5.y.22.jar
    
  3. Create a Dockerfile with the following contents:

    FROM confluentinc/cp-server-connect:7.4.0
    ADD debezium-connector-yugabytedb-1.9.5.y.22.jar /usr/share/java/kafka/
    USER 1001
    
  4. To build the image, execute the following command:

    docker build . -t custom-connect:latest
    
  5. Create a docker-compose.yaml file with the following contents:

    version: '3'
    services:
      kafka-CCLOUD-BROKER-ENDPOINT.:latest
        container_name: kafka-connect-ccloud
        ports:
          - 8083:8083
        environment:
          CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
          CONNECT_CUB_KAFKA_TIMEOUT: 300
          CONNECT_BOOTSTRAP_SERVERS: "CCLOUD-BROKER-ENDPOINT.confluent.cloud:9092"
          CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-ccloud'
          CONNECT_REST_PORT: 8083
          CONNECT_GROUP_ID: kafka-connect-group-01-v04
          CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect-configs
          CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect-offsets
          CONNECT_STATUS_STORAGE_TOPIC: kafka-connect-status
          CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
          CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
          CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
          CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
          CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
          CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
          CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
          CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/usr/share/java/kafka/'
          # Confluent Cloud config
          CONNECT_REQUEST_TIMEOUT_MS: "20000"
          CONNECT_RETRY_BACKOFF_MS: "500"
          CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
          CONNECT_SASL_MECHANISM: "PLAIN"
          CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
          CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username='CCLOUD_USER' password='CCLOUD_PASSWORD';"
          #
          CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
          CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
          CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
          CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username='CCLOUD_USER' password='CCLOUD_PASSWORD';"
          CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
          CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
          #
          CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
          CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
          CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
          CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username='CCLOUD_USER' password='CCLOUD_PASSWORD';"
          CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
          CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
        command:
          - bash
          - -c
          - |
            echo "Launching Kafka Connect worker"
            /etc/confluent/docker/run &
            #
            echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
            while : ; do
                curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
                echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
                if [ $$curl_status -eq 200 ] ; then
                break
                fi
                sleep 5
            done
            #
            sleep infinity        
    

    Using Schema Registry

    If the configuration is such that it needs schema registry as well, then you need to add the following environment variables to the Docker compose file:

    CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
    CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "https://SCHEMA-RGISTRY-CCLOUD-ENDPOINT.confluent.cloud"
    CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
    CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
    CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
    CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://SCHEMA-REGISTRY-CCLOUD-ENDPOINT.confluent.cloud"
    CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
    CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
    

    Using authentication and authorization

  6. Start the Kafka Connect cluster:

    docker-compose up
    
  7. Deploy the connector. For more information, refer to Deployment.