日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

hadoop 写入mysql_使用MapReducer将文件写入mysql 数据库

發布時間:2024/10/8 数据库 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop 写入mysql_使用MapReducer将文件写入mysql 数据库 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

自定義類

package DBOutFormat;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.lib.db.DBWritable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

public class MysqlDBOutPutFormat implements DBWritable, Writable {

private String address ;

private String type ;

private String name ;

private String divce ;

public MysqlDBOutPutFormat(){}

public MysqlDBOutPutFormat(String address,String type,String name,String divce){

this.address = address ;

this.type = type ;

this.name = name ;

this.divce = divce ;

}

@Override

public void write(PreparedStatement statement) throws SQLException {

statement.setString(1,address);

statement.setString(2,type);

statement.setString(3,name);

statement.setString(4,divce);

}

@Override

public void readFields(ResultSet resultSet) throws SQLException {

this.address = resultSet.getString(1);

this.type = resultSet.getString(2);

this.name = resultSet.getString(3);

this.divce = resultSet.getString(4);

}

@Override

public void write(DataOutput out) throws IOException {

out.writeUTF(address);

out.writeUTF(type);

out.writeUTF(name);

out.writeUTF(divce);

}

@Override

public void readFields(DataInput in) throws IOException {

this.address = in.readUTF() ;

this.type = in.readUTF() ;

this.name = in.readUTF() ;

this.divce = in.readUTF() ;

}

}

mapreducer 示例代碼

package DBOutFormat;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;

import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class reduce {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

String input = "data1/mysql.txt" ;

final Configuration co = new Configuration() ;

DBConfiguration.configureDB(co,

"com.mysql.jdbc.Driver",

"jdbc:mysql://11.11.11.2:3306/su?characterEncoding=UTF-8",

"root",

"root"

); //獲取 Job 對象

final Job job = Job.getInstance(co);

//設置class

job.setJarByClass(reduce.class);

//設置mapper 和 Reduce

job.setMapperClass(MyMapper.class);

job.setReducerClass(MyReducer.class);

//設置 Mapper 階段輸出數據的key 和value

job.setMapOutputKeyClass(LongWritable.class);

job.setMapOutputValueClass(Text.class);

//設置Reducer 階段輸出數據的key 和value

job.setOutputKeyClass(MysqlDBOutPutFormat.class);

job.setOutputValueClass(NullWritable.class);

//設置輸入和輸出路徑

FileInputFormat.setInputPaths(job, new Path(input));

//job輸出發生變化 ,不能使用默認的 Fileoutputformat

job.setOutputFormatClass(DBOutputFormat.class);

String[] fields = {"address","type","name","divce"};

DBOutputFormat.setOutput(job,"zyplc",fields);

//提交 job

final boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);

}

public static class MyMapper extends Mapper {

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

context.write(key,value);

}

}

public static class MyReducer extends Reducer{

@Override

protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException {

for (Text value : values) {

String[] info = value.toString().split(",") ;

if(info.length==4){

context.write(new MysqlDBOutPutFormat(info[0].trim(),info[1].trim(),info[2].trim(),info[3].trim()),NullWritable.get());

}

}

}

}

}

總結

以上是生活随笔為你收集整理的hadoop 写入mysql_使用MapReducer将文件写入mysql 数据库的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。