首页 > 网络 > 云计算 >

kafka增加SSL认证的Producer客户端代码示例

2017-08-09

kafka增加SSL认证的Producer客户端代码示例。

kafka增加SSL认证的Producer客户端代码示例

package com.kafka.safe.ssl;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.*;

import java.util.*;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

public class SslProducerTest {

//public static final String TOPIC_NAME = "kafka-cluster";

public static final String TOPIC_NAME = "bpu_gateway_router";

private static final String CONTENT = "1704197100,9800100,4321,192.168.76.202,iaucap,2017-06-28 13:33:32";

public static void main(String[] args) throws KafkaException

{

Properties props = new Properties();

props.put("bootstrap.servers", "192.168.76.202:9093");

//props.put("acks", "all");

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("security.protocol", "SSL");

props.put("ssl.truststore.location", "E:\\kafka\\safe\\client.truststore.jks");

props.put("ssl.truststore.password", "test1234");

props.put("ssl.keystore.location", "E:\\kafka\\safe\\client.keystore.jks");

props.put("ssl.keystore.password", "pdas202");

props.put("ssl.key.password", "pdas202");

Producer producer = new KafkaProducer(props);

Runnable runnable = new Runnable()

{

Integer times = 0;

public void run()

{

producer.send(new ProducerRecord(TOPIC_NAME,Integer.toString(times),CONTENT));

}

};

ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

service.scheduleAtFixedRate(runnable, 0,100,TimeUnit.MICROSECONDS);

System.out.println("-----------------");

}

}

相关文章
最新文章
热点推荐