需求
此前公司用MR程序解析json,将结果以text file保存在hive表的目录下。随着数据量增大,text file的性能逐渐跟不上,需要修改代码将文件格式修改成parquet。
实现
以下是以parquet保存结果的demo。
将文本中的每行以空格分隔,第一列作为id(int),第二列作为name(string),直接保存到指定目录。
[root@kudu1 job]# cat test.txt 1 xiaoming 2 hanmeimei 3 kangkang 4 maria 5 yokiko 6 michael
以下代码只有map程序,没有reduce。以parquet格式输出步骤为:
1. 创建parquet的schema信息
parquet的数据类型有INT64, INT32, BOOLEAN, BINARY, FLOAT, DOUBLE, INT96, FIXED_LEN_BYTE_ARRAY,其中INT64对标Java的Long;INT32对标int;BINARY对标String,但是需要指定编码格式。
String writeSchema = "message example {\n optional INT32 id;\n optional binary name (UTF8);\n}";
2. 使用ExampleOutputFormat类配置parquet的schema、压缩格式、输出目录。
// 配置MR的configuration Configuration configuration = new Configuration(this.getConf()); configuration.set("mapreduce.input.fileinputformat.split.minsize","2147483648"); configuration.set("parquet.example.schema",writeSchema);
3. 配置map的value输出格式为org.apache.parquet.example.data.Group。
parquet是列式存储,不同列的同一行表示为一个group。可以理解为一行就是一个group。每次map都是以group格式写入parquet文件的。
job.setMapOutputValueClass(Group.class);//TODO 设置value是parquet的Group
4. 配置job的输出格式
LazyOutputFormat.setOutputFormatClass(job, ExampleOutputFormat.class); MultipleOutputs.addNamedOutput(job, "output", ExampleOutputFormat.class, NullWritable.class,Group.class);
5. 在Mapper类中定义用于创建group的工厂类。
//map类中定义 factory属性 private SimpleGroupFactory factory; //setup初始化方法中创建SimpleGroupFactory factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));
6.map方法中创建group并赋值,写出。
String[] strings = value.toString().split(" "); //TODO 创建group Group group = factory.newGroup(); //TODO 为group赋值 group.add("id",Integer.valueOf(strings[0])); group.add("name",strings[1]); //TODO 写出 mos.write("output", null, group);
代码
以下为完整代码
package com.zixuan.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.example.ExampleOutputFormat; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageTypeParser; import java.io.IOException; import java.util.Random; public class MrSavedAsParquet extends Configured implements Tool { public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new MrSavedAsParquet(), args); System.exit(ret); } public int run(String[] strings) throws Exception { String inputDir = "/user/hive/warehouse/inputdirTest"; String outputDir = "/user/hive/warehouse/ods.db/usertest"; //为parquet生成schema String writeSchema = "message example {\n optional INT32 id;\n optional binary name (UTF8);\n}"; // 配置MR的configuration Configuration configuration = new Configuration(this.getConf()); configuration.set("mapreduce.input.fileinputformat.split.minsize","2147483648"); configuration.set("parquet.example.schema",writeSchema); Job job = new Job(configuration,"UserTest"); //配置parquet ExampleOutputFormat.setSchema(job, MessageTypeParser.parseMessageType(writeSchema)); ExampleOutputFormat.setCompression(job, CompressionCodecName.SNAPPY); ExampleOutputFormat.setOutputPath(job, new Path(outputDir)); //配置Job的基本信息 job.setJarByClass(MrSavedAsParquet.class); job.setMapperClass(MapTest.class); job.setInputFormatClass(TextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Group.class);//TODO 设置value是parquet的Group MultipleInputs.addInputPath(job,new Path(inputDir), TextInputFormat.class,MapTest.class); job.setNumReduceTasks(0); //TODO 设置输出格式是parquet LazyOutputFormat.setOutputFormatClass(job, ExampleOutputFormat.class); MultipleOutputs.addNamedOutput(job, "output", ExampleOutputFormat.class, NullWritable.class,Group.class); FileSystem fileSystem = FileSystem.get(configuration); if ( ! fileSystem.exists(new Path(inputDir))){ System.out.print("input path does not exist!"); return 1; } int ret = job.waitForCompletion(true) ? 0 : 1; return ret; } public static class MapTest extends Mapper<LongWritable, Text, NullWritable, Group> { // 多目录输出 private MultipleOutputs<NullWritable, Group> mos; // 多目录输出 //定义用于创建group的工厂类 private SimpleGroupFactory factory; //初始化,创建mos和factory @Override public void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<NullWritable,Group >(context);// 初始化mos factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration())); } /* LongWritable key是文件偏移量 Text value是每行的数据 Context context是上下文对象,可以获取conf中的配置项 */ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strings = value.toString().split(" "); //TODO 创建group Group group = factory.newGroup(); //TODO 为group赋值 group.add("id",Integer.valueOf(strings[0])); group.add("name",strings[1]); //TODO 写出 mos.write("output", null, group); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } } }
文件已经保存到/user/hive/warehouse/ods.db/usertest目录下,创建ods.usertest的hive表:
CREATE TABLE ods.usertest ( id int, name string ) stored as parquet;
查询:
hive> select * from ods.usertest; OK 1 xiaoming 2 hanmeimei 3 kangkang 4 maria 5 yokiko 6 michael Time taken: 0.053 seconds, Fetched: 6 row(s)
下一个:Redis 主从同步原理解析
热门文章
- 家养2个月小猫咬出血要紧吗视频(两个月的猫咬出血了需要打针吗)
- 「2月28日」最高速度22M/S,2025年V2ray/SSR/Shadowrocket/Clash每天更新免费订阅地址分享
- JAVA多线程之同步容器&并发容器
- 宠物领养活动广告词大全(宠物领养广告文案)
- 「2月7日」最高速度20.3M/S,2025年V2ray/Clash/SSR/Shadowrocket每天更新免费订阅地址分享
- 动物医院怎么样赚钱快 动物医院怎么样赚钱快一点
- 合肥宠物寄养中心(合肥宠物寄养中心在哪里)
- 宠物医院美团好评回复大全(宠物医院美团评价)
- 宠物粮食创业(宠物粮食生意怎么样)
- 「1月6日」最高速度23M/S,2025年V2ray/Shadowrocket/Clash/SSR每天更新免费订阅地址分享