玩轉大數據系列之Apache Pig如何與MySQL集成(三)

上篇介紹瞭如何把Pig的結果存儲到Solr中,那麼可能就會有朋友問了,爲何不存到數據庫呢? 不支持仍是? 其實只要咱們願意,咱們能夠存儲它的結果集到任何地方,只須要重寫咱們本身的StoreFunc類便可。

關於如何將Pig分析完的結果存儲到數據庫,在pig的piggy貢獻組織裏,已經有了對應的UDF了,piggybank是非apache官方提供的工具函數,裏面的大部分的UDF都是,其餘公司或着我的在後來使用時貢獻的,這些工具類,雖然沒有正式劃入pig的源碼包裏,可是pig每次發行的時候,都會以擴展庫的形式附帶,編譯後會放在pig根目錄下一個叫contrib的目錄下,
piggybank的地址是
https://cwiki.apache.org/confluence/display/PIG/PiggyBank
,感興趣的朋友們,能夠看一看。

將pig分析完的結果存入到數據庫,也是很是簡單的,須要的條件有:


(1)piggybank.jar的jar包
(2)依賴數據庫的對應的驅動jar


有一點須要注意下,在將結果存儲到數據庫以前,必定要確保有訪問和寫入數據庫的權限,不然任務就會失敗!
散仙在存儲到遠程的MySQL上,就是因爲權限的問題,而寫入失敗了,具體的異常是這樣描述的:

java

Java代碼 複製代碼 收藏代碼mysql

  1. Access denied for user 'root'@'localhost'   sql

Access denied for user 'root'@'localhost'


當出現上面異常的時候,就意味着權限寫入有問題,咱們使用如下的受權方法,來給目標機賦予權限:
(1)容許全部的機器ip訪問
GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypassword' WITH GRANT OPTION; 
(2)容許指定的機器ip訪問:
1. GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'192.168.1.3' IDENTIFIED BY    'mypassword' WITH GRANT OPTION;  


肯定有權限以後,咱們就能夠造一份數據,測試是否能夠將HDFS上的數據存儲到數據庫中,測試數據以下:
數據庫

Java代碼 複製代碼 收藏代碼express

  1. 1,2,3  apache

  2. 1,2,4  app

  3. 2,2,4  less

  4. 3,4,2  ide

  5. 8,2,4  函數

1,2,3
1,2,4
2,2,4
3,4,2
8,2,4


提早在對應的MySQL上,建庫建表建字段,看下散仙測試表的結構:


最後,在來看下咱們的pig腳本是如何定義和使用的:

Java代碼 複製代碼 收藏代碼

  1. --註冊數據庫驅動包和piggybank的jar   

  2. register ./dependfiles/mysql-connector-java-5.1.23-bin.jar;   

  3. register ./dependfiles/piggybank.jar   

  4.   

  5. --爲了能使schemal和數據庫對應起來,建議在這個地方給數據加上列名   

  6. a = load '/tmp/dongliang/g.txt' using PigStorage(',') as (id:int,name:chararray,count:int) ;   

  7.   

  8.   

  9. --過濾出id大於2的數據   

  10.   

  11. a = filter a by id > 2;   

  12.   

  13. --存儲結果到數據庫裏   

  14. STORE a INTO '/tmp/dbtest' using org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 'jdbc:mysql://192.168.146.63/user', 'root', 'pwd',   

  15.     'INSERT into pig(id,name,count) values (?,?,?)');   

  16. ~                                                             

--註冊數據庫驅動包和piggybank的jar
register ./dependfiles/mysql-connector-java-5.1.23-bin.jar;
register ./dependfiles/piggybank.jar

--爲了能使schemal和數據庫對應起來,建議在這個地方給數據加上列名
a = load '/tmp/dongliang/g.txt' using PigStorage(',') as (id:int,name:chararray,count:int) ;


--過濾出id大於2的數據

a = filter a by id > 2;

--存儲結果到數據庫裏
STORE a INTO '/tmp/dbtest' using org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 'jdbc:mysql://192.168.146.63/user', 'root', 'pwd',
    'INSERT into pig(id,name,count) values (?,?,?)');
~



執行成功後,咱們再去查看數據庫發現已經將pig處理後的數據正確的寫入到了數據庫中:



最後,附上DBStore類的源碼:

Java代碼 複製代碼 收藏代碼

  1.  

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.piggybank.storage;

import org.joda.time.DateTime;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;

import java.io.IOException;
import java.sql.*;

public class DBStorage extends StoreFunc {
  private final Log log = LogFactory.getLog(getClass());

  private PreparedStatement ps;
  private Connection con;
  private String jdbcURL;
  private String user;
  private String pass;
  private int batchSize;
  private int count = 0;
  private String insertQuery;

  public DBStorage(String driver, String jdbcURL, String insertQuery) {
    this(driver, jdbcURL, null, null, insertQuery, "100");
  }

  public DBStorage(String driver, String jdbcURL, String user, String pass,
      String insertQuery) throws SQLException {
    this(driver, jdbcURL, user, pass, insertQuery, "100");
  }

  public DBStorage(String driver, String jdbcURL, String user, String pass,
      String insertQuery, String batchSize) throws RuntimeException {
    log.debug("DBStorage(" + driver + "," + jdbcURL + "," + user + ",XXXX,"
        + insertQuery + ")");
    try {
      Class.forName(driver);
    } catch (ClassNotFoundException e) {
      log.error("can't load DB driver:" + driver, e);
      throw new RuntimeException("Can't load DB Driver", e);
    }
    this.jdbcURL = jdbcURL;
    this.user = user;
    this.pass = pass;
    this.insertQuery = insertQuery;
    this.batchSize = Integer.parseInt(batchSize);
  }

  /**
   * Write the tuple to Database directly here.
   */
  public void putNext(Tuple tuple) throws IOException {
    int sqlPos = 1;
    try {
      int size = tuple.size();
      for (int i = 0; i < size; i++) {
        try {
          Object field = tuple.get(i);

          switch (DataType.findType(field)) {
          case DataType.NULL:
            ps.setNull(sqlPos, java.sql.Types.VARCHAR);
            sqlPos++;
            break;

          case DataType.BOOLEAN:
            ps.setBoolean(sqlPos, (Boolean) field);
            sqlPos++;
            break;

          case DataType.INTEGER:
            ps.setInt(sqlPos, (Integer) field);
            sqlPos++;
            break;

          case DataType.LONG:
            ps.setLong(sqlPos, (Long) field);
            sqlPos++;
            break;

          case DataType.FLOAT:
            ps.setFloat(sqlPos, (Float) field);
            sqlPos++;
            break;

          case DataType.DOUBLE:
            ps.setDouble(sqlPos, (Double) field);
            sqlPos++;
            break;

          case DataType.DATETIME:
            ps.setDate(sqlPos, new Date(((DateTime) field).getMillis()));
            sqlPos++;
            break;

          case DataType.BYTEARRAY:
            byte[] b = ((DataByteArray) field).get();
            ps.setBytes(sqlPos, b);

            sqlPos++;
            break;
          case DataType.CHARARRAY:
            ps.setString(sqlPos, (String) field);
            sqlPos++;
            break;
          case DataType.BYTE:
            ps.setByte(sqlPos, (Byte) field);
            sqlPos++;
            break;

          case DataType.MAP:
          case DataType.TUPLE:
          case DataType.BAG:
            throw new RuntimeException("Cannot store a non-flat tuple "
                + "using DbStorage");

          default:
            throw new RuntimeException("Unknown datatype "
                + DataType.findType(field));

          }

        } catch (ExecException ee) {
          throw new RuntimeException(ee);
        }

      }
      ps.addBatch();
      count++;
      if (count > batchSize) {
        count = 0;
        ps.executeBatch();
        ps.clearBatch();
        ps.clearParameters();
      }
    } catch (SQLException e) {
      try {
        log
            .error("Unable to insert record:" + tuple.toDelimitedString("\t"),
                e);
      } catch (ExecException ee) {
        // do nothing
      }
      if (e.getErrorCode() == 1366) {
        // errors that come due to utf-8 character encoding
        // ignore these kind of errors TODO: Temporary fix - need to find a
        // better way of handling them in the argument statement itself
      } else {
        throw new RuntimeException("JDBC error", e);
      }
    }
  }

  class MyDBOutputFormat extends OutputFormat<NullWritable, NullWritable> {

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException,
        InterruptedException {
      // IGNORE
    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
        throws IOException, InterruptedException {
      return new OutputCommitter() {

        @Override
        public void abortTask(TaskAttemptContext context) throws IOException {
          try {
            if (ps != null) {
              ps.close();
            }
            if (con != null) {
              con.rollback();
              con.close();
            }
          } catch (SQLException sqe) {
            throw new IOException(sqe);
          }
        }

        @Override
        public void commitTask(TaskAttemptContext context) throws IOException {
          if (ps != null) {
            try {
              ps.executeBatch();
              con.commit();
              ps.close();
              con.close();
              ps = null;
              con = null;
            } catch (SQLException e) {
              log.error("ps.close", e);
              throw new IOException("JDBC Error", e);
            }
          }
        }

        @Override
        public boolean needsTaskCommit(TaskAttemptContext context)
            throws IOException {
          return true;
        }

        @Override
        public void cleanupJob(JobContext context) throws IOException {
          // IGNORE
        }

        @Override
        public void setupJob(JobContext context) throws IOException {
          // IGNORE
        }

        @Override
        public void setupTask(TaskAttemptContext context) throws IOException {
          // IGNORE
        }
      };
    }

    @Override
    public RecordWriter<NullWritable, NullWritable> getRecordWriter(
        TaskAttemptContext context) throws IOException, InterruptedException {
      // We don't use a record writer to write to database
    	return new RecordWriter<NullWritable, NullWritable>() {
    		   	  @Override
    		   	  public void close(TaskAttemptContext context) {
    		   		  // Noop
    		    	  }
    		    	  @Override
    		    	  public void write(NullWritable k, NullWritable v) {
    		    		  // Noop
    		    	  }
    		      };
    }

  }

  @SuppressWarnings("unchecked")
  @Override
  public OutputFormat getOutputFormat()
      throws IOException {
    return new MyDBOutputFormat();
  }

  /**
   * Initialise the database connection and prepared statement here.
   */
  @SuppressWarnings("unchecked")
  @Override
  public void prepareToWrite(RecordWriter writer)
      throws IOException {
    ps = null;
    con = null;
    if (insertQuery == null) {
      throw new IOException("SQL Insert command not specified");
    }
    try {
      if (user == null || pass == null) {
        con = DriverManager.getConnection(jdbcURL);
      } else {
        con = DriverManager.getConnection(jdbcURL, user, pass);
      }
      con.setAutoCommit(false);
      ps = con.prepareStatement(insertQuery);
    } catch (SQLException e) {
      log.error("Unable to connect to JDBC @" + jdbcURL);
      throw new IOException("JDBC Error", e);
    }
    count = 0;
  }

  @Override
  public void setStoreLocation(String location, Job job) throws IOException {
    // IGNORE since we are writing records to DB.
  }
}
相關文章
相關標籤/搜索