首页 > 网络 > 云计算 >

spark程序读写protobuf格式数据(java语言)

2017-04-22

spark程序读写protobuf格式数据(java语言):在spark上,用protobuf替代json格式作为数据序列化存储。

spark程序读写protobuf格式数据(java语言):在spark上,用protobuf替代json格式作为数据序列化存储。

谷歌的protobuf一般用来将复杂数据结构序列化为二进制数组,非常适合网络传输等领域,其效率和空间占用都优于json格式。

这一次,我在用spark做建模时,打算使用protobuf替换原json格式数据,以获得性能提升。在此记录下实现方式,以及如何避过我遇到的坑。

我的环境是spark1.5.0 + java7 + protobuf2.5。

首先,要编写.proto文件以描述数据结构。这里不详细解释,有兴趣的可参见别人写的:
http://www.cnblogs.com/dkblog/archive/2012/03/27/2419010.html

这里放一个proto文件的例子:

// protobufTest.proto
syntax = "proto2";
option java_package = "com.ismartv.recommendv2.test";
option java_outer_classname = "PersonEntity";//生成的数据访问类的类名  
message Person {  
  required string sn = 1;// sn  
  required string name = 2;//必须字段,在后面的使用中必须为该段设置值  
}  

使用命令protoc –java_out=src pathToProto/protobufTest.proto 即可将proto文件所描述的数据类型生成为java类。

接下来只需要编写spark程序,先将Person数据类型由java对象转为protobuf二进制数组输出到hdfs,再由hdfs读取二进制数组数据转换为java对象。完成读写操作。

以protobuf为结构,写java对象到HDFS二进制文件代码:

// 先生成若干个Person对象
JavaRDD numbersRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
JavaRDD persons = numbersRDD.map(new Function() {

    public Person call(Integer x) throws Exception {
        // TODO Auto-generated method stub
        PersonEntity.Person.Builder builder = PersonEntity.Person.newBuilder();
        builder.setSn(x.toString());
        builder.setName(x.toString() + "name");
        PersonEntity.Person person = builder.build();
        return person;
    }
});
// 将JavaRDD 转换为JavaPairRDD
// 最后保存到HDFS
persons.mapToPair(new PairFunction() {

    public Tuple2 call(Person person) throws Exception {
    // 这里new BytesWritable(person.toByteArray()) 是将java对象序列化为protobuf二进制数组
    return new Tuple2(NullWritable.get(), new BytesWritable(person.toByteArray()));
    }
}).saveAsNewAPIHadoopFile("hdfs://nameservice1/test/protobufTest", NullWritable.class, BytesWritable.class, SequenceFileOutputFormat.class);

以protobuf为结构,读HDFS二进制文件到java对象代码:

// 注意要用sequenceFile函数
JavaRDD readperson = sc.sequenceFile("hdfs://nameservice1/test/protobufTest/", NullWritable.class, BytesWritable.class)
.map(new Function, PersonEntity.Person>() {

    public Person call(Tuple2 tuple) throws Exception {
        // 解析byte[]为java对象,注意,一定要用copyBytes()而不是getBytes()
        PersonEntity.Person p3 = PersonEntity.Person.parseFrom(tuple._2.copyBytes());
        return p3;
    }
});
// 看一下结果
List list = readperson.collect();
for(PersonEntity.Person person : list){
    System.out.println(person.toString());
}
相关文章
最新文章
热点推荐