Hadoop集羣(第10期)_MapReduce與MySQL交互

二、MapReduce與MySQL交互

  MapReduce技術推出後,曾遭到關係數據庫研究者的挑剔和批評,認爲MapReduce不具有有相似於關係數據庫中的結構化數據存儲和處理能力。爲此,Google和MapReduce社區進行了不少努力。一方面,他們設計了相似於關係數據中結構化數據表的技術(Google的BigTable,Hadoop的HBase)提供一些粗粒度的結構化數據存儲和處理能力;另外一方面,爲了加強與關係數據庫的集成能力,Hadoop MapReduce提供了相應的訪問關係數據庫庫的編程接口。java

  MapReduce與MySQL交互的總體架構以下圖所示。mysql

 

圖2-1整個環境的架構sql

  具體到MapReduce框架讀/寫數據庫,有2個主要的程序分別是 DBInputFormatDBOutputFormat,DBInputFormat 對應的是SQL語句select,而DBOutputFormat 對應的是 Inster/update,使用DBInputFormat和DBOutputForma時候須要實現InputFormat這個抽象類,這個抽象類含有getSplits()和createRecordReader()抽象方法,在DBInputFormat類中由 protected String getCountQuery() 方法傳入結果集的個數,getSplits()方法再肯定輸入的切分原則,利用SQL中的 LIMIT 和 OFFSET 進行切分得到數據集的範圍 ,請參考DBInputFormat源碼中public InputSplit[] getSplits(JobConf job, int chunks) throws IOException的方法,在DBInputFormat源碼中createRecordReader()則能夠按必定格式讀取相應數據。數據庫

      1)創建關係數據庫鏈接apache

  • DBConfiguration:提供數據庫配置和建立鏈接的接口。

      DBConfiguration類中提供了一個靜態方法建立數據庫鏈接:編程

 

public static void configureDB(Job job,String driverClass,String dbUrl,String userName,String Password)安全

 

      其中,job爲當前準備執行的做業,driverClasss爲數據庫廠商提供的訪問其數據庫的驅動程序,dbUrl爲運行數據庫的主機的地址,userName和password分別爲數據庫提供訪問地用戶名和相應的訪問密碼。服務器

      2)相應的從關係數據庫查詢和讀取數據的接口架構

  • DBInputFormat:提供從數據庫讀取數據的格式。
  • DBRecordReader:提供讀取數據記錄的接口。

  3)相應的向關係數據庫直接輸出結果的編程接口app

  • DBOutputFormat:提供向數據庫輸出數據的格式。
  • DBRecordWrite:提供數據庫寫入數據記錄的接口。

  數據庫鏈接完成後,便可完成從MapReduce程序向關係數據庫寫入數據的操做。爲了告知數據庫將寫入哪一個表中的哪些字段,DBOutputFormat中提供了一個靜態方法來指定須要寫入的數據表和字段:

 

public static void setOutput(Job job,String tableName,String ... fieldName)

 

      其中,tableName指定即將寫入的數據表,後續參數將指定哪些字段數據將寫入該表。

2.1 從數據庫中輸入數據

      雖然Hadoop容許從數據庫中直接讀取數據記錄做爲MapReduce的輸入,但處理效率較低,並且大量頻繁地從MapReduce程序中查詢讀取關係數據庫可能會大大增長數據庫訪問負載,所以DBInputFormat僅適合讀取小量數據記錄計算和應用不適合數據倉庫聯機數據分析大量數據讀取處理

      讀取大量數據記錄一個更好的解決辦法是:用數據庫中的Dump工具將大量待分析數據輸出文本數據文件,並上載到HDFS中進行處理。

 

      1)首先建立要讀入的數據

  • Windows環境

  首先建立數據庫"school",使用下面命令進行:

 

create database school;

 

      而後經過如下幾句話,把咱們事先準備好的sql語句(student.sql事先放到了D盤目錄)導入到剛建立的"school"數據庫中。用到的命令以下:

 

use school;

source d:\student.sql

 

      "student.sql"中的內容以下所示:

 

DROP TABLE IF EXISTS `school`.`student`;

 

CREATE TABLE `school`.`student` (

`id` int(11) NOT NULL default '0',

`name` varchar(20) default NULL,

`sex` varchar(10) default NULL,

`age` int(10) default NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

INSERT INTO `student` VALUES ('201201', '張三', '男', '21');

INSERT INTO `student` VALUES ('201202', '李四', '男', '22');

INSERT INTO `student` VALUES ('201203', '王五', '女', '20');

INSERT INTO `student` VALUES ('201204', '趙六', '男', '21');

INSERT INTO `student` VALUES ('201205', '小紅', '女', '19');

INSERT INTO `student` VALUES ('201206', '小明', '男', '22');

 

      執行結果以下所示:

 

 

      查詢剛纔建立的數據庫表"student"的內容。

 

 

      結果發現顯示是亂碼,記得我當時是設置的UTF-8,怎麼就出現亂碼了呢?其實咱們使用的操做系統的系統爲中文,且它的默認編碼是gbk,而MySQL的編碼有兩種,它們分別是:

  【client】:客戶端的字符集。客戶端默認字符集。當客戶端向服務器發送請求時,請求以該字符集進行編碼。

  【mysqld】:服務器字符集,默認狀況下所採用的。

 

      找到安裝MySQL目錄,好比咱們的安裝目錄爲:

 

E:\HadoopWorkPlat\MySQL Server 5.5

 

      從中找到"my.ini"配置文件,最終發現my.ini裏的2個character_set把client改爲gbk,把server改爲utf8就能夠了。

    【client】端:

 

[client]

port=3306

[mysql]

default-character-set=gbk

 

    【mysqld】端:

 

[mysqld]

# The default character set that will be used when a new schema or table is

# created and no character set is defined

character-set-server=utf8

 

      按照上面修改完以後,重啓MySQL服務。

 

 

      此時在Windows下面的數據庫表已經準備完成了。

 

  • Linux環境

  首先經過"FlashFXP"把咱們剛纔的"student.sql"上傳到"/home/hadoop"目錄下面,而後按照上面的語句建立"school"數據庫。

 

  

      查看咱們上傳的"student.sql"內容:

 

  

      建立"school"數據庫,並導入"student.sql"語句。

 

  

 

      顯示數據庫"school"中的表"student"信息。

 

  

     顯示錶"student"中的內容。

 

  

 

      到此爲止在"Windows"和"Linux"兩種環境下面都建立了表"student"表,並初始化了值。下面就開始經過MapReduce讀取MySQL庫中表"student"的信息。

      2)使MySQL能遠程鏈接

      MySQL默認是容許別的機器進行遠程訪問地,爲了使Hadoop集羣能訪問MySQL數據庫,因此進行下面操做。

  • 用MySQL用戶"root"登陸。

 

mysql -u root -p

 

  • 使用下面語句進行受權,賦予任何主機訪問數據的權限。

 

GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'hadoop' WITH GRANT OPTION;

 

  • 刷新,使之當即生效。

 

FLUSH PRIVILEGES;

 

      執行結果以下圖。

      Windows下面:

 

  

      Linux下面:

 

  

      到目前爲止,若是鏈接Win7上面的MySQL數據庫還不行,你們還應該記得前面在Linux下面關掉了防火牆可是咱們在Win7下對防火牆並無作任何處理,若是不對防火牆作處理,即便執行了上面的遠程受權,仍然不能鏈接。下面是設置Win7上面的防火牆,使遠程機器能經過3306端口訪問MySQL數據庫。

      解決方案:只要在'入站規則'上創建一個3306端口便可。

  執行順序控制面板à管理工具à高級安全的Windows防火牆à入站規則

  而後新建規則à選擇'端口'à在'特定本地端口'上輸入一個'3306' à選擇'容許鏈接'=>選擇'域'、'專用'、'公用'=>給個名稱,如:MySqlInput

 

      3)對JDBC的Jar包處理

      由於程序雖然用Eclipse編譯運行但最終要提交到Hadoop集羣上,因此JDBC的jar必須放到Hadoop集羣中。有兩種方式:

      (1)在每一個節點下的${HADOOP_HOME}/lib下添加該包,重啓集羣,通常是比較原始的方法。

      咱們的Hadoop安裝包在"/usr/hadoop",因此把Jar放到"/usr/hadoop/lib"下面,而後重啓,記得是Hadoop集羣中全部的節點都要放,由於執行分佈式是程序是在每一個節點本地機器上進行。

      (2)在Hadoop集羣的分佈式文件系統中建立"/lib"文件夾,並把咱們的的JDBC的jar包上傳上去,而後在主程序添加以下語句,就能保證Hadoop集羣中全部的節點都能使用這個jar包。由於這個jar包放在了HDFS上,而不是本地系統,這個要理解清楚。

 

DistributedCache.addFileToClassPath(new Path("/lib/mysql-connector-java-5.1.18-bin.jar"), conf);

 

      咱們用的JDBC的jar以下所示:

 

mysql-connector-java-5.1.18-bin.jar

 

      經過Eclipse下面的DFS Locations進行建立"/lib"文件夾,並上傳JDBC的jar包。執行結果以下:

  

      備註咱們這裏採用第二種方式

      4)源程序代碼以下所示

 

package com.hebut.mr;

 

import java.io.IOException;

import java.io.DataInput;

import java.io.DataOutput;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

 

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.lib.IdentityReducer;

import org.apache.hadoop.mapred.lib.db.DBWritable;

import org.apache.hadoop.mapred.lib.db.DBInputFormat;

import org.apache.hadoop.mapred.lib.db.DBConfiguration;

 

public class ReadDB {

 

    public static class Map extends MapReduceBase implements

            Mapper<LongWritable, StudentRecord, LongWritable, Text> {

 

        // 實現map函數

        public void map(LongWritable key, StudentRecord value,

        OutputCollector<LongWritable, Text> collector, Reporter reporter)

                throws IOException {

            collector.collect(new LongWritable(value.id),

                    new Text(value.toString()));

        }

 

    }

 

    public static class StudentRecord implements Writable, DBWritable {

        public int id;

        public String name;

        public String sex;

        public int age;

 

        @Override

        public void readFields(DataInput in) throws IOException {

            this.id = in.readInt();

            this.name = Text.readString(in);

            this.sex = Text.readString(in);

            this.age = in.readInt();

        }

 

        @Override

        public void write(DataOutput out) throws IOException {

            out.writeInt(this.id);

            Text.writeString(out, this.name);

            Text.writeString(out, this.sex);

            out.writeInt(this.age);

        }

 

        @Override

        public void readFields(ResultSet result) throws SQLException {

            this.id = result.getInt(1);

            this.name = result.getString(2);

            this.sex = result.getString(3);

            this.age = result.getInt(4);

        }

 

        @Override

        public void write(PreparedStatement stmt) throws SQLException {

            stmt.setInt(1, this.id);

            stmt.setString(2, this.name);

            stmt.setString(3, this.sex);

            stmt.setInt(4, this.age);

        }

 

        @Override

        public String toString() {

            return new String("學號:" + this.id + "_姓名:" + this.name

                    + "_性別:"+ this.sex + "_年齡:" + this.age);

        }

    }

 

    public static void main(String[] args) throws Exception {

 

        JobConf conf = new JobConf(ReadDB.class);

 

        // 這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

 

        // 很是重要,值得關注

        DistributedCache.addFileToClassPath(new Path(

         "/lib/mysql-connector-java-5.1.18-bin.jar"), conf);

 

        // 設置輸入類型

        conf.setInputFormat(DBInputFormat.class);

 

        // 設置輸出類型

        conf.setOutputKeyClass(LongWritable.class);

        conf.setOutputValueClass(Text.class);

 

        // 設置Map和Reduce類

        conf.setMapperClass(Map.class);

        conf.setReducerClass(IdentityReducer.class);

 

        // 設置輸出目錄

        FileOutputFormat.setOutputPath(conf, new Path("rdb_out"));

 

        // 創建數據庫鏈接

        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",

            "jdbc:mysql://192.168.1.24:3306/school", "root", "hadoop");

 

        // 讀取"student"表中的數據

        String[] fields = { "id", "name", "sex", "age" };

        DBInputFormat.setInput(conf, StudentRecord.class, "student", null,"id", fields);

 

        JobClient.runJob(conf);

    }

}

 

      備註:因爲Hadoop1.0.0新的API對關係型數據庫暫不支持,只能用舊的API進行,因此下面的"向數據庫中輸出數據"也是如此。

 

      5)運行結果以下所示

      通過上面的設置後,已經經過鏈接Win7和Linux上的MySQL數據庫,執行結果都同樣。惟獨變得就是代碼中"DBConfiguration.configureDB"中MySQL數據庫所在機器的IP地址。

 

 

2.2 向數據庫中輸出數據

      基於數據倉庫數據分析挖掘輸出結果的數據量通常不會太大,於是可能適合直接向數據庫寫入。咱們這裏嘗試與"WordCount"程序相結合,把單詞統計的結果存入到關係型數據庫中。

      1)建立寫入的數據庫表

      咱們還使用剛纔建立的數據庫"school",只是在裏添加一個新的表"wordcount",仍是使用下面語句執行:

 

use school;

source sql腳本全路徑

 

      下面是要建立的"wordcount"表的sql腳本。

 

DROP TABLE IF EXISTS `school`.`wordcount`;

 

CREATE TABLE `school`.`wordcount` (

`id` int(11) NOT NULL auto_increment,

`word` varchar(20) default NULL,

`number` int(11) default NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

      執行效果以下所示:

  • Windows環境

  • Linux環境

 

      2)程序源代碼以下所示

 

package com.hebut.mr;

 

import java.io.IOException;

import java.io.DataInput;

import java.io.DataOutput;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.Iterator;

import java.util.StringTokenizer;

 

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.lib.db.DBOutputFormat;

import org.apache.hadoop.mapred.lib.db.DBWritable;

import org.apache.hadoop.mapred.lib.db.DBConfiguration;

 

public class WriteDB {

    // Map處理過程

    public static class Map extends MapReduceBase implements

            Mapper<Object, Text, Text, IntWritable> {

 

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

 

        @Override

        public void map(Object key, Text value,

            OutputCollector<Text, IntWritable> output, Reporter reporter)

                throws IOException {

            String line = value.toString();

            StringTokenizer tokenizer = new StringTokenizer(line);

            while (tokenizer.hasMoreTokens()) {

                word.set(tokenizer.nextToken());

                output.collect(word, one);

            }

        }

    }

 

    // Combine處理過程

    public static class Combine extends MapReduceBase implements

            Reducer<Text, IntWritable, Text, IntWritable> {

 

        @Override

        public void reduce(Text key, Iterator<IntWritable> values,

            OutputCollector<Text, IntWritable> output, Reporter reporter)

                throws IOException {

            int sum = 0;

            while (values.hasNext()) {

                sum += values.next().get();

            }

            output.collect(key, new IntWritable(sum));

        }

    }

 

    // Reduce處理過程

    public static class Reduce extends MapReduceBase implements

            Reducer<Text, IntWritable, WordRecord, Text> {

 

        @Override

        public void reduce(Text key, Iterator<IntWritable> values,

            OutputCollector<WordRecord, Text> collector, Reporter reporter)

                throws IOException {

 

            int sum = 0;

            while (values.hasNext()) {

                sum += values.next().get();

            }

 

            WordRecord wordcount = new WordRecord();

            wordcount.word = key.toString();

            wordcount.number = sum;

 

            collector.collect(wordcount, new Text());

        }

    }

 

    public static class WordRecord implements Writable, DBWritable {

        public String word;

        public int number;

 

        @Override

        public void readFields(DataInput in) throws IOException {

            this.word = Text.readString(in);

            this.number = in.readInt();

        }

 

        @Override

        public void write(DataOutput out) throws IOException {

            Text.writeString(out, this.word);

            out.writeInt(this.number);

        }

 

        @Override

        public void readFields(ResultSet result) throws SQLException {

            this.word = result.getString(1);

            this.number = result.getInt(2);

        }

 

        @Override

        public void write(PreparedStatement stmt) throws SQLException {

            stmt.setString(1, this.word);

            stmt.setInt(2, this.number);

        }

    }

 

    public static void main(String[] args) throws Exception {

 

        JobConf conf = new JobConf(WriteDB.class);

 

        // 這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

 

        DistributedCache.addFileToClassPath(new Path(

                "/lib/mysql-connector-java-5.1.18-bin.jar"), conf);

 

        // 設置輸入輸出類型

        conf.setInputFormat(TextInputFormat.class);

        conf.setOutputFormat(DBOutputFormat.class);

        // 不加這兩句,通不過,可是網上給的例子沒有這兩句。

        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(IntWritable.class);

 

        // 設置Map和Reduce類

        conf.setMapperClass(Map.class);

        conf.setCombinerClass(Combine.class);

        conf.setReducerClass(Reduce.class);

 

        // 設置輸如目錄

        FileInputFormat.setInputPaths(conf, new Path("wdb_in"));

 

        // 創建數據庫鏈接

        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",

            "jdbc:mysql://192.168.1.24:3306/school", "root", "hadoop");

 

        // 寫入"wordcount"表中的數據

        String[] fields = { "word", "number" };

        DBOutputFormat.setOutput(conf, "wordcount", fields);

 

        JobClient.runJob(conf);

    }

}

 

      3)運行結果以下所示

  • Windows環境

  測試數據:

(1)file1.txt

 

hello word

hello hadoop

 

    (2)file2.txt

 

蝦皮 hadoop

蝦皮 word

軟件 軟件

 

      運行結果:

 

      咱們發現上圖中出現了"?",後來查找原來是由於個人測試數據時在Windows用記事本寫的而後保存爲"UTF-8",在保存時爲了區分編碼,自動在前面加了一個"BOM",可是不會顯示任何結果。然而咱們的代碼把它識別爲"?"進行處理。這就出現了上面的結果,若是咱們在每一個要處理的文件前面的第一行加一個空格,結果就成以下顯示:

 

      接着又作了一個測試,在Linux上面用下面命令建立了一個文件,並寫上中文內容。結果顯示並無出現"?",並且網上說不一樣的記事本軟件(EmEditor、UE)保存爲"UTF-8"就沒有這個問題。通過修改以後的Map類,就可以正常識別了。

 

    // Map處理過程

    public static class Map extends MapReduceBase implements

            Mapper<Object, Text, Text, IntWritable> {

 

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

 

        @Override

        public void map(Object key, Text value,

            OutputCollector<Text, IntWritable> output, Reporter reporter)

                throws IOException {

            String line = value.toString();

           

            //處理記事本UTF-8的BOM問題

            if (line.getBytes().length > 0) {

                if ((int) line.charAt(0) == 65279) {

                    line = line.substring(1);

                }

            }

           

            StringTokenizer tokenizer = new StringTokenizer(line);

            while (tokenizer.hasMoreTokens()) {

                word.set(tokenizer.nextToken());

                output.collect(word, one);

            }

        }

    }

 

      處理以後的結果:

 

 

      從上圖中得知,咱們的問題已經解決了,所以,在編輯、更改任何文本文件時,請務必使用不會亂加BOM的編輯器。Linux下的編輯器應該都沒有這個問題。Windows下,請勿使用記事本等編輯器。推薦的編輯器是: Editplus 2.12版本以上; EmEditor; UltraEdit(須要取消'添加BOM'的相關選項); Dreamweaver(須要取消'添加BOM'的相關選項) 等。

  對於已經添加了BOM的文件,要取消的話,能夠用以上編輯器另存一次。(Editplus須要先另存爲gb,再另存爲UTF-8。) DW解決辦法以下: 用DW打開指定文件,按Ctrl+Jà標題/編碼à編碼選擇"UTF-8",去掉"包括Unicode簽名(BOM)"勾選à保存/另存爲,便可。

    國外有一個牛人已經把這個問題解決了,使用"UnicodeInputStream"、"UnicodeReader"。

    地址:http://koti.mbnet.fi/akini/java/unicodereader/

    示例:Java讀帶有BOM的UTF-8文件亂碼緣由及解決方法

    代碼:http://download.csdn.net/detail/xia520pi/4146123

 

  • Linux環境

  測試數據:

    (1)file1.txt

 

MapReduce is simple

 

    (2)file2.txt

 

MapReduce is powerful is simple

 

    (3)file2.txt

 

Hello MapReduce bye MapReduce

 

      運行結果:

 

 

      到目前爲止,MapReduce與關係型數據庫交互已經結束,從結果中得知,目前新版的API還不能很好的支持關係型數據庫的操做,上面兩個例子都是使用的舊版的API。關於更多的MySQL操做,具體參考"Hadoop集羣_第10期副刊_經常使用MySQL數據庫命令_V1.0"。

 

      本期歷時五天,終於完成,期間遇到的關鍵問題以下:

 

  • MySQL的JDBC的jar存放問題。
  • Win7對MySQL防火牆的設置。
  • Linux中MySQL變動目錄不能啓動。
  • MapReduce處理帶BOM的UTF-8問題。
  • 設置MySQL能夠遠程訪問。
  • MySQL處理中文亂碼問題。

 

  從這幾天對MapReduce的瞭解,發現其實Hadoop對關係型數據庫的處理還不是很強,主要是Hadoop和關係型數據作的事不是同一類型,各有所特長。下面幾期咱們將對Hadoop裏的HBase和Hive進行全面瞭解。

相關文章
相關標籤/搜索