首页 > 网络 > 云计算 >

基于java和tunnel-sdk的OSS与ODPS之间的数据连通

2017-06-30

基于java和tunnel-sdk的OSS与ODPS之间的数据连通。大数据计算服务(MaxCompute,原名 ODPS)是一种快速、完全托管的 GB TB PB 级数据仓库解决方案。MaxCompute 向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题。

公司做大数据用的是阿里云这个平台,用了几天下来,更加感觉到阿里的强大。先简单介绍一下阿里云平台上的OSS和ODPS(现在已经改名为MaxCompute)。

这里写图片描述

对象存储 OSS

阿里云对象存储服务(Object Storage Service,简称 OSS),是阿里云提供的海量、安全、低成本、高可靠的云存储服务。您可以通过调用 API,在任何应用、任何时间、任何地点上传和下载数据,也可以通过 Web 控制台对数据进行简单的管理。OSS 适合存放任意类型的文件,适合各种网站、开发企业及开发者使用。

简单来说,我认为OSS就相当于一个网盘,存一些企业的历史数据。

MaxCompute

大数据计算服务(MaxCompute,原名 ODPS)是一种快速、完全托管的 GB/TB/PB 级数据仓库解决方案。MaxCompute 向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全。

数据通道

TUNNEL:提供高并发的离线数据上传下载服务。用户可以使用 Tunnel 服务向 MaxCompute 批量上传或下载数据。MaxCompute Tunnel 仅提供 Java 编程接口供用户使用。

计算及分析任务

1、MaxCompute 只能以表的形式存储数据,并对外提供了 SQL 查询功能。

2、UDF,MaxCompute提供了很多内建函数,也允许自定义函数来满足需求。

3、MapReduce

4、Graph:MaxCompute 提供的 Graph 功能是一套面向迭代的图计算处理框架。

简单介绍后,这里直接切入正题。

在企业里你要去做一件事,首先是因为你有需求。

这里的需求在于公司将某个表的近3个月的历史信息存在了OSS上,为了导入ODPS的表中来做数据分析,所以需要开发一个java程序来实现OSS与ODPS之间的数据连通。

实现OSS和ODPS之间数据连同的方式有好几种,最简单的可以通过阿里云平台自带的功能来实现,这里之所以需要开发一个java程序来实现是为了开发完整的java程序后可以通过定时任务来每日自动执行,因为这张表以后是每天都在做一次这样的更新操作的。

在OSS上存储的数据是按dt=”天”来存储的。

其中CommonConstants类是一个静态配置类,主要保存了一些参数,配置自己公司的即可。

第一步 将OSS的按 表名/dt=”天”的形式保存到本地

public class LoadFromOSS implements Callable{

private static FileUtil fileUtil=new FileUtil();

private static OSSClient ossClient=null;

private static Set roomSet=null;

//获得前一天的日期

private static String preDateString = fileUtil.getPreDate();

/*

找到当天的所有房间号

*/

public static Set startLoad() {

roomSet=new HashSet();

String key = "example/".concat(preDateString).concat("/");

final int maxKeys = 200;

String nextMarker = null;

ObjectListing objectListing = null;

int i = 0;

do {

objectListing = ossClient.listObjects(new ListObjectsRequest(CommonConstants.bucketName).withPrefix(key).withMarker(nextMarker).withMaxKeys(maxKeys));

List sums = objectListing.getObjectSummaries();

for (OSSObjectSummary s : sums) {

//获得匹配到的object的目录路径

System.out.println(s.getKey());

String[] subtitleArr = s.getKey().split("/");

String roomId = null;

//提取roomId

if (subtitleArr.length == 3) {

roomId = subtitleArr[2].trim();

}

//将roomId存入roomSet,以便下载时使用

if(!StringUtils.isEmpty(roomId)){

roomSet.add(roomId);

}

}

nextMarker = objectListing.getNextMarker();

} while (objectListing.isTruncated());

return roomSet;

}

public static void LoadFromOss(Set roomId){

//如果本地没有当天目录则创建一个

if(!fileUtil.isExists(CommonConstants.rootdir_del+preDateString)){

try {

fileUtil.createDir(CommonConstants.rootdir_del+preDateString);

} catch (IOException e) {

e.printStackTrace();

}

}

int count=0;

for(String room:roomId){

String key="subtitles/".concat(preDateString).concat("/").concat(room);

System.out.println(++count);

ossClient.getObject(new GetObjectRequest(CommonConstants.bucketName, key), new File(CommonConstants.rootdir_del+preDateString+"/"+room));

}

ossClient.shutdown();

}

@Override

public Object call() throws Exception {

System.out.println("开始下载OSS文件到本地");

Date date1=new Date();

ossClient = new OSSClient(CommonConstants.endPoint, CommonConstants.accessKeyId, CommonConstants.accessKeySecret);

try {

Set set =startLoad();

System.out.println(set.size());

LoadFromOss(set);

} catch (OSSException oe) {

oe.printStackTrace();

} catch (ClientException ce) {

ce.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

} finally {

ossClient.shutdown();

}

Date date2=new Date();

Thread.sleep(5000);

System.out.println("任务完成,耗时:"+(date2.getTime()-date1.getTime())/1000+"秒");

return "ok";

}

}

第二步 将保存到本地的文件按需求进行简单处理

第二步针对不同的需求可以将文件里的内容做对应的处理,例如原本文件中只有两列,我希望增加一列(增加一个属性)等,这里省略。

第三步 将处理后的文件上传到ODPS上

针对表的每个分区插入相应的记录。

public class UpLoadToODPS implements Callable {

private static FileUtil fileUtil=new FileUtil();

static String preDate=fileUtil.getPreDate();

private static String partition = "";

@Override

public Object call() throws Exception {

System.out.println("开始上传处理后的文件到ODPS");

Date date1=new Date();

Account account = new AliyunAccount(CommonConstants.accessId, CommonConstants.accessKey);

Odps odps = new Odps(account);

odps.setEndpoint(CommonConstants.odpsUrl);

odps.setDefaultProject(CommonConstants.project);

Table t = odps.tables().get(CommonConstants.table);

//获取当天的日期

partition="dt="+preDate;

try {

if(!t.hasPartition(new PartitionSpec(partition))){

t.createPartition(new PartitionSpec(partition));

}

} catch (OdpsException e) {

e.printStackTrace();

}

try {

TableTunnel tunnel = new TableTunnel(odps);

tunnel.setEndpoint(CommonConstants.tunnelUrl);

PartitionSpec partitionSpec = new PartitionSpec(partition);

TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(CommonConstants.project,

CommonConstants.table, partitionSpec);

System.out.println("Session Status is : "

+ uploadSession.getStatus().toString());

RecordWriter recordWriter = uploadSession.openRecordWriter(0);

Record record = uploadSession.newRecord();

File file=new File(CommonConstants.rootdir_delc+preDate);

File[] flist=file.listFiles();

BufferedReader br=null;

for(File f:flist){

br=fileUtil.getBR(f);

String line=null;

while((line=br.readLine())!=null){

String[] fields=line.split("_");

record.setString(0,fields[0]);

record.setString(1,fields[1]);

recordWriter.write(record);

}

br.close();

}

recordWriter.close();

uploadSession.commit(new Long[]{0L});

System.out.println("upload success!"+ file.getName());

} catch (TunnelException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

Date date2=new Date();

Thread.sleep(5000);

System.out.println("任务完成,耗时:"+(date2.getTime()-date1.getTime())/1000+"秒");

return "ok";

}

}

相关文章
最新文章
热点推荐