日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

avro使用详解

發(fā)布時間:2023/12/4 综合教程 32 生活家
生活随笔 收集整理的這篇文章主要介紹了 avro使用详解 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、avro的介紹
1、概括
avro是一個數(shù)據(jù)序列化系統(tǒng),它提供

豐富的數(shù)據(jù)結(jié)構(gòu)
快速可壓縮的二進(jìn)制數(shù)據(jù)形式
存儲持久數(shù)據(jù)的文件容器
遠(yuǎn)程過程調(diào)用RPC
簡單的動態(tài)語言結(jié)合功能
2、類型
?

二、avro在hadoop的使用
1、模式確定
例如:{"namespace": "example.avro",
? ? ? ? "type": "record",
? ? ? ? "name": "User",
? ? ? ? "fields": [
? ? ? ? {"name": "name", "type": "string"},
? ? ? ? {"name": "favorite_number", ?"type": ["int", "null"]},
? ? ? ? {"name": "favorite_color", "type": ["string", "null"]}
? ? ? ? ]
? ? }

其中namespace是包名,name是類名

2、text數(shù)據(jù)作為輸入
2.1 無插件的序列化
//創(chuàng)建數(shù)據(jù)記錄
Schema schema = new Schema.Parser().parse(new File("user.avsc"));
GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
// Leave favorite color null

GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");

//序列化
// Serialize user1, user2 and user3 to disk
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("users.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();

//反序列化
// Deserialize Users from disk
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(file, userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
? ? // Reuse user object by passing it to next(). This ?saves us from
? ? // allocating and garbage collecting many objects for ? files with
? ? // many items.
? ? user = dataFileReader.next(user);
? ? System.out.println(user);
}

2.2有插件的序列化
2.2.1 插件導(dǎo)入
<plugin>
? <groupId>org.apache.avro</groupId>
? <artifactId>avro-maven-plugin</artifactId>
? <version>1.8.2</version>
? <executions>
? ? <execution>
? ? ? <phase>generate-sources</phase>
? ? ? <goals>
? ? ? ? <goal>schema</goal>
? ? ? </goals>
? ? ? <configuration>
? ? ? ? <sourceDirectory>${project.basedir}/../</sourceDirectory>
? ? ? ? <outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory>
? ? ? </configuration>
? ? </execution>
? </executions>
</plugin>

2.2.2 編譯schema文件
注意schema文件放在指定的文件中?
?
在idea中編譯此文件,使之在目錄中生成class文件

2.2.3 常規(guī)使用
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("users.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();

//序列化
// Deserialize Users from disk
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(file, userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
? ? // Reuse user object by passing it to next(). This saves us from
? ? // allocating and garbage collecting many objects for files with
? ? // many items.
? ? user = dataFileReader.next(user);
? ? System.out.println(user);
}

3、例子(使用的是有插件的方式)
MapReduceColorCount:

package example;

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import example.avro.User;

public class MapReduceColorCount extends Configured implements Tool {

? public static class ColorCountMapper extends
? Mapper<AvroKey<User>, NullWritable, Text, IntWritable> {

? ? @Override
? ? public void map(AvroKey<User> key, NullWritable value, Context context)
? ? ? ? throws IOException, InterruptedException {

? ? ? CharSequence color = key.datum().getFavoriteColor();
? ? ? if (color == null) {
? ? ? ? color = "none";
? ? ? }
? ? ? context.write(new Text(color.toString()), new IntWritable(1));
? ? ? }
? ? }

? ? public static class ColorCountReducer extends
? Reducer<Text, IntWritable, AvroKey<CharSequence>, AvroValue<Integer>> {

? ? @Override
? ? public void reduce(Text key, Iterable<IntWritable> values,
? ? ? ? Context context) throws IOException, InterruptedException {

? ? ? int sum = 0;
? ? ? for (IntWritable value : values) {
? ? ? ? sum += value.get();
? ? ? }
? ? ? context.write(new AvroKey<CharSequence>(key.toString()), new AvroValue<Integer>(sum));
? ? }
? ? }

? public int run(String[] args) throws Exception {
? ? if (args.length != 2) {
? ? ? System.err.println("Usage: MapReduceColorCount <input path> <output path>");
? ? ? return -1;
? ? }

? ? Job job = new Job(getConf());
? ? job.setJarByClass(MapReduceColorCount.class);
? ? job.setJobName("Color Count");

? ? FileInputFormat.setInputPaths(job, new Path(args[0]));
? ? FileOutputFormat.setOutputPath(job, new Path(args[1]));

? ? job.setInputFormatClass(AvroKeyInputFormat.class);
? ? job.setMapperClass(ColorCountMapper.class);
? ? AvroJob.setInputKeySchema(job, User.getClassSchema());
? ? job.setMapOutputKeyClass(Text.class);
? ? job.setMapOutputValueClass(IntWritable.class);

? ? job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
? ? job.setReducerClass(ColorCountReducer.class);
? ? AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
? ? AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));

? ? return (job.waitForCompletion(true) ? 0 : 1);
? }

? public static void main(String[] args) throws Exception {
? ? int res = ToolRunner.run(new MapReduceColorCount(), args);
? ? System.exit(res);
? }
}

注意:當(dāng)采用不用插件的方式時,map的代碼如下?
@Override?
public void map(AvroKey key, NullWritable value, Context context)throws IOException,InterruptedException {}?
由于代碼并不知道AvroKey的schema,所以要在main中使用AvroJob.setDataModelClass(job,GenericData.class);指定數(shù)據(jù)的schema。
?

總結(jié)

以上是生活随笔為你收集整理的avro使用详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。