首页 > 网络 > 云计算 >

flume+kafka+storm整合

2017-03-20

flume+kafka+storm整合,flume,kafka, storm 的安装在下面三篇文章:flume:1 6 0kafka:注意这里最好下载scala2 10版本的kafka,因为scala2 10版本的兼容性比较好和2 11版本差别太大。

一、安装

flume,kafka, storm 的安装在下面三篇文章:
flume:1.6.0
kafka:注意这里最好下载scala2.10版本的kafka,因为scala2.10版本的兼容性比较好和2.11版本差别太大
这里写图片描述

flume学习01—安装
kafka安装部署
storm安装部署

二、各个部分调试

三、 Storm获取数据流程

3.1、首先来了解Strom-kafka

Strom-kafka的官网介绍项目
注意:可能使用浏览器的问题, 导致在IE上只能看到部分,换成其他浏览器就好了。

介绍Storm核心Spout 和Trident spout的实现,用户消费从 Apache Kafka 0.8.x获取的数据。

3.1.1、Spouts

We support both Trident and core Storm spouts.为了两种Spout实现,Strom使用一个BrokerHost interface 跟踪Kafka broker主机到分区映射和kafkaConfig控制一些Kafka相关的参数。

3.1.2 BrokerHosts

为了初始化您的Kafka spout/emitter,您需要创建一个标记BrokerHosts接口的实例。 目前,支持以下两种实现:

ZkHosts

如果你想动态跟踪Kafka broker到分区映射(partition mapping), 你应该使用ZkHosts。 这个类使用Kafka的ZooKeeper entries 来跟踪brokerHost - >分区映射。 您可以通过调用下面方法实例化对象:

   public ZkHosts(String brokerZkStr, String brokerZkPath)
   public ZkHosts(String brokerZkStr)

其中:

brokerZkStr**只为**ip:post(eg. localhost:2181), brokerZkPath: the root directory under which all the topics and partition information is stored, 默认为 /brokers 。 默认情况下,代理分区映射(borker-partition mapping)每60秒从ZooKeeper刷新。 如果要更改它,您应该将host.refreshFrezqSecs设置为您选择的值。
实现如:
ZkHosts zkHosts = new ZkHosts("192.168.57.4:2181,192.168.57.5:2181,192.168.57.6:2181");

StaticHosts

这是一个可选的实现,其中broker - >分区信息是静态的。 为了构造这个类的实例,您需要首先构造一个GlobalPartitionInformation的实例。

Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
    Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
    Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
    GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
    partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
    partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
    partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
    StaticHosts hosts = new StaticHosts(partitionInfo);

KafkaConfig

为创建KafkaSpout所需的第二个对象是KafkaConfig
创建KafkaConfig

 public KafkaConfig(BrokerHosts hosts, String topic)
 public KafkaConfig(BrokerHosts hosts, String topic, String clientId)

BrokerHosts可以是如上所述的BrokerHosts接口的任何实现。 topic是Kafka topic的名称。 ClientId是可选的, 用作ZooKeeper路径的一部分,其中存储了spout的当前消耗偏移量。

目前有2个KafkaConfig的扩展。

Spoutconfig

Spoutconfig是KafkaConfig的扩展,它支持使用ZooKeeper连接信息的其他字段,并用于控制特定于KafkaSpout的行为。 Zkroot将用作root来存储消费的偏移量。 ID应该唯一标识您的spout。

public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);

实现:

SpoutConfig spoutConfig 
    = new SpoutConfig(
        zkHosts, 
        topic, 
        "/test", // 偏移量offset的根目录
        "test");// ID应该唯一标识您的spout

除了这些参数,SpoutConfig包含以下字段控制KafkaSpout的行为:

    spoutConfig.forceFromStart = false; // 不从头开始消费,保证spout出现故障, 重启之后,能够从kafka的原来位置处理, 而不是从开始位置处理,kafka的偏移量,周期性的写入zookeeper中, 
    // setting for how often to save the current Kafka offset to ZooKeeper
    public long stateUpdateIntervalMs = 2000;

    // Retry strategy for failed messages
    public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();

    // Exponential back-off retry settings.  These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
    // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
    // Initial delay between successive retries
    public long retryInitialDelayMs = 0;
    public double retryDelayMultiplier = 1.0;

    // Maximum delay between successive retries    
    public long retryDelayMaxMs = 60 * 1000;
    // Failed message will be retried infinitely if retryLimit is less than zero. 
    public int retryLimit = -1;    

Core KafkaSpout only accepts an instance of SpoutConfig.

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

TridentKafkaConfig

TridentKafkaConfig is another extension of KafkaConfig. TridentKafkaEmitter only accepts TridentKafkaConfig.

KafkaConfig类还有一堆公共变量,用于控制应用程序的行为。 这里是默认值:

    public int fetchSizeBytes = 1024 * 1024;
    public int socketTimeoutMs = 10000;
    public int fetchMaxWait = 10000;
    public int bufferSizeBytes = 1024 * 1024;
    public MultiScheme scheme = new RawMultiScheme();
    public boolean ignoreZkOffsets = false;
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
    public long maxOffsetBehind = Long.MAX_VALUE;
    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
    public int metricsTimeBucketSizeInSecs = 60;

Most of them are self explanatory except MultiScheme.

MultiScheme

MultiScheme是一个接口,指示如何将从Kafka中消耗的ByteBuffer转换为成Storm中的tuple。 它还控制输出字段的命名。

  public Iterable> deserialize(ByteBuffer ser);
  public Fields getOutputFields();

默认的RawMultiScheme只接受ByteBuffer,并返回一个带有ByteBuffer的tuple,ByteBuffer转换为byte []。 outputField的名称为“bytes”。 还有一些可选实现,如SchemeAsMultiScheme和KeyValueSchemeAsMultiScheme,它们可以将ByteBuffer转换为String。
//从Kafka中取出的byte[],该如何反序列化
如在整合项目中实现:使用SchemeAsMultiScheme

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 定义输出为String

还有SchemeAsMultiScheme,MessageMetadataSchemeAsMultiScheme的扩展,它具有一个附加的反序列化方法,除了与消息关联的分区和偏移之外,还接受消息ByteBuffer。

public Iterable> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset)

这对于从Kafka topic上的任意处 auditing/replaying 消息非常有用,可以保存离散流的每个消息的分区和偏移量,而不是保留整个消息。

四、KafakSpout的具体实现

        TopologyBuilder builder = new TopologyBuilder();
        // config kafka spout
        String topic = "testflume";
        //第一步创建Zkhosts
        ZkHosts zkHosts = new ZkHosts("192.168.57.4:2181,192.168.57.5:2181,192.168.57.6:2181");
        //第二步创建SpoutConfig, 为了设置各种参数
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, 
                topic,    //kafka的topic名称
                "/test",  // 偏移量offset的根目录
                "test");  // kafka的唯一表示。
        //设置zkserver的信息, 可选的, 应为在上面的ZkHosts中已经设置了zookeeper的主机和端口号。
        List zkServers = new ArrayList();
        System.out.println(zkHosts.brokerZkStr);
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }
        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = 2181;
        //设置kafka的消费模式, 是否从头开始。
        spoutConfig.forceFromStart = false; // 不从头开始消费
        spoutConfig.socketTimeoutMs = 60 * 1000;  //与Kafka broker的连接的socket超时时间
        //从Kafka中取出的byte[],该如何反序列化
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 定义输出为String
        //KafkaSpout之接收一个参数SpoutConfig.
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        // set kafka spout
        builder.setSpout("kafka_spout", kafkaSpout, 3);
相关文章
最新文章
热点推荐