Subscribe to Stuck in an Infiniteloop        RSS Feed
-----

Mirroring SSL Kafka Clusters with Brooklin

Icon Leave Comment
Linkedin was the birthplace of Apache Kafka. Last year they also open sourced a project called brooklin. One of its features is the ability to mirror a Kafka instance. The documentation is sparse and they are selectively responsive in their Gitter chatroom.

Their Kafka mirroring example is with PLAINTEXT communications which no real instance is running unless its behind a firewall. After digging through the code, here is the correct way to use Brooklin to mirror a kafka instance that is utilizing SSL connections:

kafkaTransportProvider - this is the destination server producer

There is no domain config name required, Kafka Producer property names come directly after brooklin.server.transportProvider.kafkaTransporProvider.

It will look like this in brooklin's server.properties:

brooklin.server.transportProvider.kafkaTransportProvider.factoryClassName=com.linkedin.datastream.kafka.KafkaTransportProviderAdminFactory
brooklin.server.transportProvider.kafkaTransportProvider.bootstrap.servers=kafkaserver:port
brooklin.server.transportProvider.kafkaTransportProvider.client.id=datastream-producer

brooklin.server.transportProvider.kafkaTransportProvider.security.protocol=ssl

brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.location=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.key.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.location=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.endpoint.identification.algorithm=

brooklin.server.transportProvider.kafkaTransportProvider.ssl.enabled.protocols=TLSv1.2
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.type=JKS
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.type=JKS


kafkaMirroringConnector - this is the source server consumer

The Kafka Mirroring Connector requires "consumer" domain property after brooklin.server.connector.kafkaMirroringConnector.

It will look like this in brooklin's server.properties:


brooklin.server.connector.kafkaMirroringConnector.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory
brooklin.server.connector.kafkaMirroringConnector.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
brooklin.server.connector.kafkaMirroringConnector.consumer.security.protocol=ssl

brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.keystore.location=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.keystore.password=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.key.password=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.truststore.location=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.truststore.password=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.endpoint.identification.algorithm=

brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.enabled.protocols=TLSv1.2
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.keystore.type=JKS
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.truststore.type=JKS



Stream creation looks like this:

brooklin-rest-client.sh -o CREATE -u http://localhost:32311/ -n first-mirroring-stream -s "kafkassl://localhost:9093/first-topic" -c kafkaMirroringConnector -t kafkaTransportProvider -m '{"owner":"test-user","system.reuseExistingDestination":"false"}`


This is now an ssl to ssl configured Kafka Mirror.

HOWEVER, when you query for the description of the stream

brooklin-rest-client.sh -o READALL -u http://localhost:32311/


You will see a regular kakfa connection string, not kafkassl:

{
  "name" : "first-mirroring-stream",
  "connectorName" : "kafkaMirroringConnector",
  "transportProviderName" : "kafkaTransportProvider",
  "source" : {
    "connectionString" : "kafkassl://localhost:9093/first-topic"
  },
  "Status" : "READY",
  "destination" : {
    "connectionString" : "kafka://localhost:9095/*"
  },
  "metadata" : {
    "datastreamUUID" : "aa891768-abbe-44d9-8c4e-18615ef31e91",
    "group.id" : "first-mirroring-stream",
    "owner" : "test-user",
    "system.IsConnectorManagedDestination" : "true",
    "system.creation.ms" : "1580139099105",
    "system.destination.KafkaBrokers" : "localhost:9095",
    "system.reuseExistingDestination" : "false",
    "system.taskPrefix" : "first-mirroring-stream"
  }
}


This is just a visual bug as mirroring works just fine.

This is a basic example for a generic/vanilla topic mirror from Kafka A to B. Things start to break when you try to do something non trivial, like use Confluent's Schema Registry, but that's a post for another time.

Happy coding!

0 Comments On This Entry

 

August 2020

S M T W T F S
      1
23456 7 8
9101112131415
16171819202122
23242526272829
3031     

Tags

    Recent Entries

    Recent Comments

    Search My Blog

    3 user(s) viewing

    3 Guests
    0 member(s)
    0 anonymous member(s)