基於haddop的HDFS和Excel開源庫POI導出大數據報表(一)

關鍵詞

JavaPHPhdfsmqrocketexcelpoi報表php

需求背景

在業務需求方面,每一個企業或多或少都會有報表導出的做業,量少則但是使用輸出流或者字符串的輸出便可完成,只要指定respose的相應Content-Type便可。若是大量的數據須要導出,尤爲是訂單這類業務邏輯複雜的報表,導出的時候須要加入各類條件和權限,從數據處理方面就已經很費力了,更況且導出的需求不是一天兩天,而是半月一月的數據量,小公司的業務,數量級也可能達到了十多萬。html

function generateExcel($filename, $header, array &$data)
{
    generateDownHeader($filename);

    $rs = '<table><tr>';
    if (is_string($header)) {
        $header = explode(',', $header);
    }
    foreach ($header as $v) {
        $rs .= '<th>'.$v.'</th>';
    }
    $rs .= '</tr>';
    foreach ($data as $coll) {
        $rs .= '<tr>';
        foreach ($coll as $v) {
            if (AppHelper::isDouble($v)) {
                $rs .= '<td style="vnd.ms-excel.numberformat:@">'.$v.'</td>';
            } else {
                $rs .= '<td>'.$v.'</td>';
            }
        }
        $rs .= '</tr>';
    }

    $rs .= '</table>';

    echo $rs;
    exit;
}

function generateDownHeader($filename)
{
    header("Content-Type: application/force-download");
    header("Content-Type: application/octet-stream");
    header("Content-Type: application/download");
    header('Content-Disposition:inline;filename="'.$filename.'"');
    header("Content-Transfer-Encoding: binary");
    header("Last-Modified: " . gmdate("D, d M Y H:i:s") . " GMT");
    header("Cache-Control: must-revalidate, post-check=0, pre-check=0");
    header("Pragma: no-cache");
}

這十多萬的數據,若是使用通常的方法(上面代碼所示)或許是不可行的(其餘通常方法沒有嘗試過),php處理中通常使用curl調用接口,nginx服務器和php中的curl請求超時通常都是30s,30s處理1w條數據的導出工做,若是服務器的性能好,而且是多核的,可使用multi_curl多線程處理,若是服務器的性能不是很好,這種處理方法或許更耗時。前端

下面是我使用的curl處理接口數據:java

function curl($url, $option = null, $method = 'POST', $getCode = false, $header = [])
{
    $curl = curl_init ();
    curl_setopt($curl, CURLOPT_URL, $url);
    curl_setopt($curl, CURLOPT_TIMEOUT, 30);
    if (!array_key_exists('Content-Type', $header)) {
        $header['Content-Type'] = 'application/json;charset=UTF-8';
    }
    $headers = [];
    if ($header) {
        foreach ($header as $k=>$v) {
            $headers[] = $k.': '.$v;
        }
    }
    curl_setopt($curl, CURLOPT_HTTPHEADER, $headers);
    if ($option) {
        if (is_array($option)) {
            $option = json_encode($option);
        }
        curl_setopt($curl, CURLOPT_POSTFIELDS, $option);
    }
    curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
    curl_setopt($curl, CURLOPT_CUSTOMREQUEST, $method);
    $result = curl_exec($curl);
    if ($getCode) {
        $curl_code = curl_getinfo($curl, CURLINFO_HTTP_CODE);
        $message = self::isJson($result) ? json_decode($result, true) : $result;
        $result = ['code' => $curl_code];
        if (isset($message['exception']) && count($message) == 1) {
            $result['exception'] = $message['exception'];
            $result['result'] = null;
        } else {
            $result['result'] = $message;
        }
    }
    curl_close($curl);
    return $result;
}

由於數據量大,後來改成多線程:nginx

function curlMulti(array $urls, $options = null, $method = 'POST',  $getCode = false, $header = []) 
{
    $mh = curl_multi_init();
    // 添加curl批處理會話
    $handles = $contents = [];
    foreach ($urls as $key => $url) {
        $handles[$key] = curl_init($url);
        curl_setopt($handles[$key], CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($handles[$key], CURLOPT_TIMEOUT, 30);
        curl_setopt($handles[$key], CURLOPT_CUSTOMREQUEST, $method);

        if (!array_key_exists('Content-Type', $header)) {
            $header['Content-Type'] = 'application/json;charset=utf-8';
        }
        $headers = [];
        if ($header) {
            foreach ($header as $k => $val) {
                $headers[] = $k.': '.$val;
            }
        }
        curl_setopt($handles[$key], CURLOPT_HTTPHEADER, $headers);
        if ($options) {
            if (is_array($options)) {
                $options = json_encode($options);
            }
            curl_setopt($handles[$key], CURLOPT_POSTFIELDS, $options);
        }
        curl_multi_add_handle($mh, $handles[$key]);
    }
    // 執行批處理句柄
    /*$active = null;
    do{
        $mrc = curl_multi_exec($mh, $active);
    } while ($mrc == CURLM_CALL_MULTI_PERFORM);

    while ($active and $mrc == CURLM_OK) {
        if (curl_multi_select($mh) === -1) {
            usleep(100);
            do {
                $mrc = curl_multi_exec($mh, $active);
            }while($mrc == CURLM_CALL_MULTI_PERFORM);
        }
    }// 獲取批處理內容
    $errors = [];
    foreach ($handles as $k => $ch) {
        $errors[$k] = curl_error($ch);
        $content = curl_multi_getcontent($ch);
        if ($getCode) {
            $content = curl_errno($ch) == 0 && self::isJson($content)? json_decode($content,true) : [];
        }
        $contents = array_merge($contents,$content);

    }
    $info = curl_multi_info_read($mh);*/
    $output = $errors = $infos = [];
    do {
        while (($execrun =  curl_multi_exec($mh, $running)) == CURLM_CALL_MULTI_PERFORM);
        if ($execrun != CURLM_OK)
            break;
        while ($done = curl_multi_info_read($mh)) {
            $info= curl_getinfo($done['handle']);
            $infos['http_code'][] = $info['http_code'];
            $result['code'] = $info['http_code'];
            $infos['url'][] = $info['url'];
            $errors[] = curl_error($done['handle']);
            $output = self::isJson(curl_multi_getcontent($done['handle'])) ?
                array_merge($output, json_decode(curl_multi_getcontent($done['handle']),true)) : $output;
            if ($running)
                curl_multi_select($mh, 30);
        }
    } while ($running);

    $result['result'] = $output;
    $result['exception'] = $errors;
    $result['info'] = $infos;
    foreach ($handles as $ch) {
        curl_multi_remove_handle($mh, $ch);
    }
    curl_multi_close($mh);
    return $result;
}

上面的代碼中有一段代碼是註釋掉的,按照道理來講,上面的代碼執行的結果應該和下面的同樣,事實證實,倒是執行的結果是同樣,我這裏說的結果不是多線程返回的結果,既然是多線程,那麼不一樣的線程競爭到資源也是不同的,返回結果出現了混亂,導出的excel數據並非根據某種排序而排序的,也就是你不知道那個線程先返回告終果,這是問題一,其二,在導出的過程當中,發現不一樣程度的丟失數據,加熱管每一個線程500條數據,結果在驗證數據時,發現僅僅返回了300多條數據,數據變更不一致,第三,過多的數據,依然形成nginx服務器超時,錯誤code 504。git

PS: 爲何在php的中沒有使用phpexcel第三方包,緣由很簡單,測試發現,phpexcel太耗內存,機器吃不消,因此就沒用。github

初步解決方案

既然php的多線程方案不能解決問題,只能找其餘的辦法,最可靠的也是你們都能想到的,就是隊列處理,把導出請求放入到隊列中,直接返回給客戶端,告訴客戶業務正在處理,而後具體的導出交由消費端處理,最後把結果反饋到客戶端。web

咱們都知道php的隊列有不少,經常使用的好比Swoole,Workman以及Gearman等。我選擇了Gearman,由於方便,而Swoole原來在咱們的項目中,後來被踢掉了,不知起因。spring

Gearman服務端work的代碼demo:apache

<?php
/**
 * Created by PhpStorm.
 * User: zhoujunwen
 * Date: 16/7/12
 * Time: 下午4:54
 */

namespace console\controllers;

use Yii;
use common\extensions\AppHelper;
use yii\console\Controller;

class ExportController extends Controller
{
    public function actionExport()
    {
        $worker = new \GearmanWorker();
        $worker->addServer();
        $worker->addFunction('export', function (\GearmanJob $job) {
            $workload = $job->workload();

            if (($data = $this->parseJson($workload)) == false) {
                return AppHelper::encodeJson(['code' => '-1', 'result' => null, 'exception' => '參數錯誤']);
            }
            $user = isset($data['user']) && !empty($data['user']) ? $data['user'] : 'guest';
            $path = dirname(Yii::$app->basePath) . '/backend/downloads/' . sha1($user) . '/' . date('Y-m-d') . '/';
            $filename = isset($data['filename']) && !empty($data['filename']) ? $data['filename'] : date('Y-m-d') . '-order.xls';
            $rs = $this->getData($data['type']['data'], $data['type']['count'], $data['api'], $data['params']);
            $this->writeExcel($path, $filename, $rs, $data['header']);
            return 200;
        });
        //無際循環運行,gearman內部已有處理,不會出現佔用太高死掉的狀況
        while ($worker->work()) {
            if ($worker->returnCode() !== GEARMAN_SUCCESS) {
                echo 'error' . PHP_EOL;
            }
        }
    }

    public function parseJson($str)
    {
        $data = json_decode($str, true);
        return (json_last_error() == JSON_ERROR_NONE) ? $data : false;
    }

    public function writeExcel($path, $filename, $data, $header)
    {
        if ($this->mkDir($path)) {
            $data = $this->assembleData($data);
            $rs = $this->generateExcel($header, $data);
            file_put_contents(rtrim($path, '/') . '/' . $filename, $rs);
        } else {
            echo '目錄不存在,寫文件錯誤!';
        }
        return;

    }

    public function getData($dataApi, $countApi, $api, $params)
    {
        $start = microtime(true);
        $count = AppHelper::getData($api . $countApi . '?' . http_build_query($params));
        echo $api . $countApi . '?' . http_build_query($params).PHP_EOL;
        echo '總條數:' . $count . PHP_EOL;
        $params['perpage'] = 500;
        $times = ceil($count / $params['perpage']);
        $data = [];
        if ($count > 0) {
            for ($i = 0; $i < $times; $i++) {
                $params['page'] = $i + 1;
                $rs = AppHelper::getData($api . $dataApi . '?' . http_build_query($params));
                $data = array_merge($data, $rs);
            }
        }
        $end = microtime(true);
        echo "花費時間:" . ($end - $start) . PHP_EOL;
        return $data;
    }

    public function generateExcel($header, array &$data)
    {

        $rs = '<table><tr>';
        if (is_string($header)) {
            $header = explode(',', $header);
        }
        foreach ($header as $v) {
            $rs .= '<th>' . $v . '</th>';
        }
        $rs .= '</tr>';
        foreach ($data as $coll) {
            $rs .= '<tr>';
            foreach ($coll as $v) {
                if (AppHelper::isDouble($v)) {
                    $rs .= '<td style="vnd.ms-excel.numberformat:@">' . $v . '</td>';
                } else {
                    $rs .= '<td>' . $v . '</td>';
                }
            }
            $rs .= '</tr>';
        }

        $rs .= '</table>';

        unset($data);
        return $rs;
    }

    public function assembleData($rs)
    {
        $users = [];
        if ($rs) {
            $uids = array_column($rs, 'uid');
            $us = Yii::$app->get('db')->createCommand('select uid,gender,adminflag,mobile,type from {{%user}} where uid in (' . implode(',', $uids) . ')')->queryAll();
            if ($us && is_array($us)) {
                foreach ($us as $u) {
                    $users[$u['uid']] = $u;
                }
            }
        }
        $content = [];
        foreach ($rs as $k => $v) {
            $data = AppHelper::decodeJson($v['data'], true);
            $status = '已刪除';
            if ($v['status'] == 0) {
                $status = '已關閉';
            } elseif ($v['status'] == 1) {
                $status = '下單';
            } elseif ($v['status'] == 2) {
                $status = '付款確認中';
            } elseif ($v['status'] == 3) {
                $status = '已付款';
            } elseif ($v['status'] == 4) {
                $status = '已發貨';
            } elseif ($v['status'] == 5) {
                $status = '已確認收貨';
            } elseif ($v['status'] == 6) {
                $status = '已評價';
            } elseif ($v['status'] == 7) {
                $status = '支付價格與訂單價格不一致';
            }
            $refund = '未申請退款';
            if (isset($v['refund'])) {
                if ($v['refund'] == 5) {
                    $refund = '退款已到帳';
                } elseif ($v['refund'] == 4) {
                    $refund = '賣家已確認但需人工處理';
                } elseif ($v['refund'] == 3) {
                    $refund = '贊成退款';
                } elseif ($v['refund'] == 2) {
                    $refund = '拒絕退款';
                } elseif ($v['refund'] == 1) {
                    $refund = '退款申請中';
                } elseif ($v['refund'] == 0) {
                    $refund = '未申請';
                } elseif ($v['refund'] == 6) {
                    $refund = '退貨退款申請中';
                } elseif ($v['refund'] == 7) {
                    $refund = '贊成退貨申請';
                } elseif ($v['refund'] == 8) {
                    $refund = '拒絕退貨申請';
                } elseif ($v['refund'] == 9) {
                    $refund = '買家退貨已發出';
                } elseif ($v['refund'] == 10) {
                    $refund = '賣家確認收貨';
                } elseif ($v['refund'] == 11) {
                    $refund = '收到貨拒絕退款';
                } elseif ($v['refund'] == 12) {
                    $refund = '退貨退款已到帳';
                }
            }
            $gender = '未知';
            if (isset($users[$v['uid']]) && $users[$v['uid']]['gender'] == 1) {
                $gender = '男';
            } else if (isset($users[$v['uid']]) && $users[$v['uid']]['gender'] == 2) {
                $gender = '女';
            }
            $type = '普通用戶';
            if (isset($users[$v['uid']]) && $users[$v['uid']]['adminflag'] == 3) {
                $type = '審覈中的匠人';
            } else if (isset($users[$v['uid']]) && $users[$v['uid']]['adminflag'] == 2) {
                $type = '種子用戶';
            } else if (isset($users[$v['uid']]) && $users[$v['uid']]['adminflag'] == 1) {
                $type = '管理員';
            }
            $itype = '未設置/現貨';
            if (isset($data['type'])) {
                if ($data['type'] == 1) {
                    $itype = '現貨';
                } else if ($data['type'] == 2) {
                    $itype = '定製';
                } else {
                    $itype = '拍賣';
                }
            }
            $utype = isset($users[$v['uid']]['type']) && $users[$v['uid']]['type'] == 4 ? '微信購買註冊' : 'APP內註冊';
            $otype = !$v['otype'] ? 'APP內購買' : '微信購買';
            $paytype = !$v['prepaytype'] ? 'APP內付款' : '微信付款';
            $snapshot = AppHelper::getData(Yii::$app->params['imageServer'] . $v['snapshot']);
            $content[] = [date('Y/m/d H:i:s', floor($v['createtm'] / 1000)), $v['ooid'], isset($snapshot['item']['pid']) ? $snapshot['item']['pid'] : '', $v['iid'], $data['title'], $itype, (isset($v['parentCategory']) ? $v['parentCategory'] . '/' : '') . $v['category'],
                $v['craftsman'], $v['suid'], $v['quantity'], $v['username'], $utype, $v['uid'], $data['address'], $status, $refund, $data['price'], $v['realpay'],
                $otype, $paytype, isset($users[$v['uid']]['mobile']) ? $users[$v['uid']]['mobile'] : '未知', $gender, $type];
        }
        return $content;
    }

    public function mkDir($path)
    {
        if (is_dir($path)) {
            echo '目錄' . $path . '已存在!';
            return true;
        } else {
            $res = mkdir($path, 0777, true);
            echo $res ? '目錄建立成功' : '目錄建立失敗';
            return $res;
        }
    }
}
}

Gearman的Client端的代碼:

<?php
...

public function exportExcel($str)
{
    $client = new \GearmanClient();
    $client->addServer('127.0.0.1', 4730);

    $client->setCompleteCallback(completeCallBack);
    $result2 = $client->doBackground('export', $str);//異步進行,只返回處理句柄。

//        $result1 = $client->do('export', 'do');//do是同步進行,進行處理並返回處理結果。
//        $result3 = $client->addTask('export', 'addTask');//添加任務到隊列,同步進行?經過添加task能夠設置回調函數。
//        $result4 = $client->addTaskBackground('export', 'addTaskBackground');//添加後臺任務到隊列,異步進行?

    $client->runTasks();//運行隊列中的任務,只是do系列不須要runTask()
    return $result2;

}
//綁定回調函數,只對addTask有效
function completeCallBack($task)
{
    echo 'CompleteCallback!handle result:'.$task->data().'<br/>';
}

ps:要運行上面的代碼,須要在服務器或者本地安裝Gearman服務,而且須要安裝php_gearman擴展,安裝教程自行搜索。

若是你的業務邏輯不復雜,到此能夠導出幾萬條數據綽綽有餘了,然而,個人問題並無所以而解決,上司說,不想用Gearman隊列處理,最好仍是java處理。嗯,不要緊,我喜歡這種在技術中跳來跳去的解決問題,既然不知足上司的需求,那就另行方案。

MqRocket+HDFS+POI

說明:這裏用到的java項目都是基於spring+dubbo/dubbox的項目。所用到的配置或者註解均在spring的相關配置和註解範疇,除了mapper的配置和註解。

三個項目:

  • mq項目:提供rest服務,發送消息(@rxl)

  • biz項目:提供dubbo、restfull接口,處理業務(@lee)

  • data項目:處理數據導出

如上,三個項目分別是不一樣的工程師所寫,咱們不關心怎麼實現的,只需知道咱們能使用每一個功能便可。

mq提供的restfull接口:

@Path("/message")
@Produces({ContentType.APPLICATION_JSON_UTF_8})
@Component("sendMessageService")
public class SendMessageImpl implements SendMessageService{

    @Resource
    public IProducer producer;

    @PUT
    @Path("send")
    @Consumes({MediaType.APPLICATION_JSON})
    @Override
    public void sendMessage(Message message) {
        System.out.println("message" + message.getMessage());
        producer.send(message.getTopic(),message.getKey(),message.getMessage());
    }
}

這樣咱們在php後臺經過put方式,調用該接口,將須要處理的數據發送給導出處理服務端。發送put請求可使用curl強大的request功能。

curl_setopt($curl, CURLOPT_CUSTOMREQUEST, 'PUT');

假如mq提供的rest接口是:http://localhost:8018/mq/message/send,咱們須要傳遞一個json字符串,該字符串原型是一個關聯數組,數組的key分別爲「topic」、「key」和「message」,topic是消息的主題,須要指定的mq主題去消費,key是消息的key,該topic下面會有不少key,所以,咱們的消費方即數據導出方須要根據key作判斷處理。message裏面就是具體的一下參數,好比須要導出哪些字段,好比文件上傳服務器地址等等信息。

$message = [
    'topic' => 'order_export',
    'key' => 'order_tag_' . $orderNo,
    'message' => [
        'params' => [
            ...
        ],
        'headers' => [
            ...
        ],
        'options' => [
            ...
        ],
    ],

];

完整的接口請求:

http://localhost:8018/mq/message/send?{"topic":"order_export","key":"order_tag_","message":{"params":[],"header":[],"options":[]}}

poi工具類封裝

Java的Excel API不少,惟獨Apache POI這款使用最方便最靈活(或許其餘的沒有使用過)。

HSSF is the POI Project's pure Java implementation of the Excel '97(-2007) file format. XSSF is the POI Project's pure Java implementation of the Excel 2007 OOXML (.xlsx) file format.

HSSF and XSSF provides ways to read spreadsheets create, modify, read and write XLS spreadsheets. They provide:

  • low level structures for those with special needs

  • an eventmodel api for efficient read-only access

  • a full usermodel api for creating, reading and modifying XLS files

在gradle引入poi包:

// java excel api
compile 'org.apache.poi:poi:3.10.1'
compile 'org.apache.poi:poi-ooxml:3.9'
package cn.test.web.utils;

import cn.test.util.Utils;
import org.apache.commons.io.FilenameUtils;
import org.apache.poi.hssf.record.crypto.Biff8EncryptionKey;
import org.apache.poi.hssf.usermodel.HSSFFont;
import org.apache.poi.hssf.usermodel.HSSFFooter;
import org.apache.poi.hssf.usermodel.HSSFHeader;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
import org.apache.poi.poifs.filesystem.POIFSFileSystem;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellStyle;
import org.apache.poi.ss.usermodel.Font;
import org.apache.poi.ss.usermodel.Footer;
import org.apache.poi.ss.usermodel.Header;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Properties;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:02
 */
public class POIUtils {
    private static final short HEADER_FONT_SIZE = 16; // 大綱字體
    private static final short FONT_HEIGHT_IN_POINTS = 14; // 行首字體

    public static Workbook createWorkbook(String file) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = null;
        switch (ext) {
            case "xls":
                wb = createHSSFWorkbook();
                break;
            case "xlsx":
                wb = createXSSFWorkbook();
                break;
            default:
                wb = createHSSFWorkbook();
        }
        return wb;
    }

    public static Workbook createWorkbookByIS(String file, InputStream inputStream) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = null;
        try {
            switch (ext) {
                case "xls":
                    wb = new HSSFWorkbook(inputStream);
                    break;
                case "xlsx":
                    wb = new XSSFWorkbook(inputStream);
                    break;
                default:
                    wb = new HSSFWorkbook(inputStream);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return wb;
    }

    public static Workbook writeFile(Workbook wb, String file) {
        if (wb == null || Utils.isEmpty(file)) {
            return null;
        }
        FileOutputStream out = null;
        try {
            out = new FileOutputStream(file);
            wb.write(out);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (out != null) {
                try {
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return wb;
    }

    public static Workbook createHSSFWorkbook() {
        //生成Workbook
        HSSFWorkbook wb = new HSSFWorkbook();
        //添加Worksheet(不添加sheet時生成的xls文件打開時會報錯)
        @SuppressWarnings("unused")
        Sheet sheet = wb.createSheet();
        return wb;
    }

    public static Workbook createXSSFWorkbook() {
        XSSFWorkbook wb = new XSSFWorkbook();
        @SuppressWarnings("unused")
        Sheet sheet = wb.createSheet();
        return wb;
    }

    public static Workbook openWorkbook(String file) {
        FileInputStream in = null;
        Workbook wb = null;

        try {
            in = new FileInputStream(file);
            wb = WorkbookFactory.create(in);
        } catch (InvalidFormatException | IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return wb;
    }

    public static Workbook openEncryptedWorkbook(String file, String password) {
        FileInputStream input = null;
        BufferedInputStream binput = null;
        POIFSFileSystem poifs = null;
        Workbook wb = null;
        try {
            input = new FileInputStream(file);
            binput = new BufferedInputStream(input);
            poifs = new POIFSFileSystem(binput);
            Biff8EncryptionKey.setCurrentUserPassword(password);
            String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
            switch (ext) {
                case "xls":
                    wb = new HSSFWorkbook(poifs);
                    break;
                case "xlsx":
                    wb = new XSSFWorkbook(input);
                    break;
                default:
                    wb = new HSSFWorkbook(poifs);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return wb;
    }

    /**
     * 追加一個sheet,若是wb爲空且isNew爲true,建立一個wb
     *
     * @param wb
     * @param isNew
     * @param type  建立wb類型,isNew爲true時有效 1:xls,2:xlsx
     * @return
     */
    public static Workbook appendSheet(Workbook wb, boolean isNew, int type) {
        if (wb != null) {
            Sheet sheet = wb.createSheet();
        } else if (isNew) {
            if (type == 1) {
                wb = new HSSFWorkbook();
                wb.createSheet();
            } else {
                wb = new XSSFWorkbook();
                wb.createSheet();
            }
        }
        return wb;
    }


    public static Workbook setSheetName(Workbook wb, int index, String sheetName) {
        if (wb != null && wb.getSheetAt(index) != null) {
            wb.setSheetName(index, sheetName);
        }
        return wb;
    }

    public static Workbook removeSheet(Workbook wb, int index) {
        if (wb != null && wb.getSheetAt(index) != null) {
            wb.removeSheetAt(index);
        }
        return wb;
    }

    public static Workbook insert(Workbook wb, String sheetName, int row, int start,
                                  List<?> columns) {
        if (row == 0 || wb == null) return wb;
        for (int i = start; i < (row + start); i++) {
            Row rows = wb.getSheet(sheetName).createRow(i);
            if (columns != null && columns.size() > 0) {
                for (int j = 0; j < columns.size(); j++) {
                    Cell ceil = rows.createCell(j);
                    ceil.setCellValue(String.valueOf(columns.get(j)));
                }
            }
        }
        return wb;
    }

    /**
     * 設置excel頭部
     *
     * @param wb
     * @param sheetName
     * @param columns   好比:["國家","活動類型","年份"]
     * @return
     */
    public static Workbook setHeader(Workbook wb, String sheetName, List<?> columns) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        return setHeaderStyle(insert(wb, sheetName, 1, 0, columns), sheetName);

    }

    /**
     * 插入數據
     *
     * @param wb        Workbook
     * @param sheetName sheetName,默認爲第一個sheet
     * @param start     開始行數
     * @param data      數據,List嵌套List ,好比:[["中國","奧運會",2008],["倫敦","奧運會",2012]]
     * @return
     */
    public static Workbook setData(Workbook wb, String sheetName, int start,
                                   List<?> data) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        if (data != null || data.size() > 0) {
            if (data instanceof List) {
                int s = start;
                for (Object columns : data) {
                    insert(wb, sheetName, data.size() - (s - 1), s, (List<?>) columns);
                    s++;
                }
            }
        }
        return wb;
    }

    /**
     * 移除某一行
     *
     * @param wb
     * @param sheetName sheet name
     * @param row       行號
     * @return
     */
    public static Workbook delRow(Workbook wb, String sheetName, int row) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Row r = wb.getSheet(sheetName).getRow(row);
        wb.getSheet(sheetName).removeRow(r);
        return wb;
    }

    /**
     * 移動行
     *
     * @param wb
     * @param sheetName
     * @param start     開始行
     * @param end       結束行
     * @param step      移動到那一行後(前) ,負數表示向前移動
     *                  moveRow(wb,null,2,3,5); 把第2和3行移到第5行以後
     *                  moveRow(wb,null,2,3,-1); 把第3行和第4行往上移動1行
     * @return
     */
    public static Workbook moveRow(Workbook wb, String sheetName, int start, int end, int step) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        wb.getSheet(sheetName).shiftRows(start, end, step);
        return wb;
    }

    public static Workbook setHeaderStyle(Workbook wb, String sheetName) {
        Font font = wb.createFont();
        CellStyle style = wb.createCellStyle();
        font.setBoldweight(HSSFFont.BOLDWEIGHT_BOLD);
        font.setFontHeightInPoints(FONT_HEIGHT_IN_POINTS);
        font.setFontName("黑體");
        style.setFont(font);
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        int row = wb.getSheet(sheetName).getFirstRowNum();
        int cell = wb.getSheet(sheetName).getRow(row).getLastCellNum();
        for (int i = 0; i < cell; i++) {
            wb.getSheet(sheetName).getRow(row).getCell(i).setCellStyle(style);
        }
        return wb;
    }

    public static Workbook setHeaderOutline(Workbook wb, String sheetName, String title) {
        if (wb == null) return null;
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Header header = wb.getSheet(sheetName).getHeader();
        header.setLeft(HSSFHeader.startUnderline() +
                HSSFHeader.font("宋體", "Italic") +
                "打雞血的口號!" +  // 好比:愛我中華
                HSSFHeader.endUnderline());
        header.setCenter(HSSFHeader.fontSize(HEADER_FONT_SIZE) +
                HSSFHeader.startDoubleUnderline() +
                HSSFHeader.startBold() +
                title +
                HSSFHeader.endBold() +
                HSSFHeader.endDoubleUnderline());
        header.setRight("時間:" + HSSFHeader.date() + " " + HSSFHeader.time());
        return wb;
    }

    public static Workbook setFooter(Workbook wb, String sheetName, String copyright) {
        if (wb == null) return null;
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Footer footer = wb.getSheet(sheetName).getFooter();
        if (Utils.isEmpty(copyright)) {
            copyright = "中華人民共和國"; // 版權信息,本身公司的名字或者app的名字
        }
        footer.setLeft("Copyright @ " + copyright);
        footer.setCenter("Page:" + HSSFFooter.page() + " / " + HSSFFooter.numPages());
        footer.setRight("File:" + HSSFFooter.file());
        return wb;
    }

    public static String create(String sheetNm, String file, List<?> header, List<?> data, String title, String copyright) {
        Workbook wb = createWorkbook(file);
        if (Utils.isEmpty(sheetNm)) {
            sheetNm = wb.getSheetAt(0).getSheetName();
        }
        setHeaderOutline(wb, sheetNm, title);
        setHeader(wb, sheetNm, header);
        setData(wb, sheetNm, 1, data);
        setFooter(wb, sheetNm, copyright);
        writeFile(wb, file);
        if (wb != null) {
            return file;
        }
        return null;
    }

    public static String getSystemFileCharset() {
        Properties pro = System.getProperties();
        return pro.getProperty("file.encoding");
    }
    // TODO 後面增長其餘設置

}

HDFS工具類封裝

Hadoop分佈式文件系統(HDFS)被設計成適合運行在通用硬件(commodity hardware)上的分佈式文件系統。它和現有的分佈式文件系統有不少共同點。但同時,它和其餘的分佈式文件系統的區別也是很明顯的。HDFS是一個高度容錯性的系統,適合部署在廉價的機器上。HDFS能提供高吞吐量的數據訪問,很是適合大規模數據集上的應用。HDFS放寬了一部分POSIX約束,來實現流式讀取文件系統數據的目的。HDFS在最開始是做爲Apache Nutch搜索引擎項目的基礎架構而開發的。HDFS是Apache Hadoop Core項目的一部分。

HDFS有着高容錯性(fault-tolerant)的特色,而且設計用來部署在低廉的(low-cost)硬件上。並且它提供高吞吐量(high throughput)來訪問應用程序的數據,適合那些有着超大數據集(large data set)的應用程序。HDFS放寬了(relax)POSIX的要求(requirements)這樣能夠實現流的形式訪問(streaming access)文件系統中的數據。

在gradle中引入hdfs:

// jersey
    compile 'com.sun.jersey:jersey-core:1.19.1'
    compile 'com.sun.jersey:jersey-server:1.19.1'
    compile 'com.sun.jersey:jersey-client:1.19.1'
    compile 'com.sun.jersey:jersey-json:1.19.1'

    // hadoop
    compile ('org.apache.hadoop:hadoop-common:2.7.2') {
        exclude(module: 'jersey')
        exclude(module: 'contribs')
    }
    compile ('org.apache.hadoop:hadoop-hdfs:2.7.2') {
        exclude(module: 'jersey')
        exclude(module: 'contribs')
    }
    compile ('org.apache.hadoop:hadoop-client:2.7.2') {
        exclude(module: 'jersey')
        exclude(module: 'contribs')
    }`
package cn.test.web.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.poi.ss.usermodel.Workbook;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:41
 */
public class HDFSUtils {
    private static FileSystem fs = null;

    public static FileSystem getFileSystem(Configuration conf) throws IOException,
            URISyntaxException {
        fs = FileSystem.get(conf);
        //fs = FileSystem.newInstance(conf);
        return fs;
    }

    /**
     * 判斷路徑是否存在
     *
     * @param conf
     * @param path
     * @return
     * @throws IOException
     */
    public static boolean exits(Configuration conf, String path) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        return fs.exists(new Path(path));
    }

    /**
     * 建立文件
     *
     * @param conf
     * @param filePath
     * @param contents
     * @throws IOException
     */
    public static void createFile(Configuration conf, String filePath, byte[] contents)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        FSDataOutputStream outputStream = fs.create(path);
        outputStream.write(contents, 0, contents.length);
        outputStream.hflush();
        outputStream.close();
        fs.close();
    }

    /**
     * 建立文件
     *
     * @param conf
     * @param filePath
     * @param fileContent
     * @throws IOException
     */
    public static void createFile(Configuration conf, String fileContent, String filePath)
            throws IOException, URISyntaxException {
        createFile(conf, filePath, fileContent.getBytes());
    }

    /**
     * 上傳文件
     *
     * @param conf
     * @param localFilePath
     * @param remoteFilePath
     * @throws IOException
     */
    public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path localPath = new Path(localFilePath);
        Path remotePath = new Path(remoteFilePath);
        fs.copyFromLocalFile(true, true, localPath, remotePath);
        fs.close();
    }

    /**
     * 刪除目錄或文件
     *
     * @param conf
     * @param remoteFilePath
     * @param recursive
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(Configuration conf, String remoteFilePath, boolean recursive)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        boolean result = fs.delete(new Path(remoteFilePath), recursive);
        fs.close();
        return result;
    }

    /**
     * 刪除目錄或文件(若是有子目錄,則級聯刪除)
     *
     * @param conf
     * @param remoteFilePath
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(Configuration conf, String remoteFilePath)
            throws IOException, URISyntaxException {
        return deleteFile(conf, remoteFilePath, true);
    }

    /**
     * 文件重命名
     *
     * @param conf
     * @param oldFileName
     * @param newFileName
     * @return
     * @throws IOException
     */
    public static boolean renameFile(Configuration conf, String oldFileName, String newFileName)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path oldPath = new Path(oldFileName);
        Path newPath = new Path(newFileName);
        boolean result = fs.rename(oldPath, newPath);
        fs.close();
        return result;
    }

    /**
     * 建立目錄
     *
     * @param conf
     * @param dirName
     * @return
     * @throws IOException
     */
    public static boolean createDirectory(Configuration conf, String dirName)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path dir = new Path(dirName);
        boolean result = fs.mkdirs(dir);
        fs.close();
        return result;
    }

    /**
     * 列出指定路徑下的全部文件(不包含目錄)
     *
     * @param fs
     * @param basePath
     * @param recursive
     */
    public static RemoteIterator<LocatedFileStatus> listFiles(FileSystem fs, String basePath, boolean recursive)
            throws IOException {

        RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator = fs.listFiles(new Path(basePath), recursive);

        return fileStatusRemoteIterator;
    }

    /**
     * 列出指定路徑下的文件(非遞歸)
     *
     * @param conf
     * @param basePath
     * @return
     * @throws IOException
     */
    public static RemoteIterator<LocatedFileStatus> listFiles(Configuration conf, String basePath)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path(basePath), false);
        fs.close();
        return remoteIterator;
    }

    /**
     * 列出指定目錄下的文件\子目錄信息(非遞歸)
     *
     * @param conf
     * @param dirPath
     * @return
     * @throws IOException
     */
    public static FileStatus[] listStatus(Configuration conf, String dirPath) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        FileStatus[] fileStatuses = fs.listStatus(new Path(dirPath));
        fs.close();
        return fileStatuses;
    }


    /**
     * 讀取文件內容並寫入outputStream中
     *
     * @param conf 配置
     * @param filePath 文件路徑
     * @param os 輸出流
     * @return
     * @throws IOException
     */
    public static void readFile(Configuration conf, String filePath, OutputStream os) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        try (FSDataInputStream inputStream = fs.open(path)) {
            Workbook wb = POIUtils.createWorkbookByIS(filePath, inputStream);
            wb.write(os);
            inputStream.close();
        } finally {
            fs.close();
        }
    }

    /**
     * 讀取文件內容並返回
     * @param conf
     * @param filePath
     * @return
     * @throws IOException
     * @throws URISyntaxException
     */
    public static String readFile(Configuration conf, String filePath) throws IOException,
            URISyntaxException {
        String fileContent = null;
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        InputStream inputStream = null;
        ByteArrayOutputStream outputStream = null;
        try {
            inputStream = fs.open(path);
            outputStream = new ByteArrayOutputStream(inputStream.available());
            IOUtils.copyBytes(inputStream, outputStream, conf);
            byte[] lens = outputStream.toByteArray();
            fileContent = new String(lens, "UTF-8");
        } finally {
            IOUtils.closeStream(inputStream);
            IOUtils.closeStream(outputStream);
            fs.close();
        }
        return fileContent;
    }
}

對於hdfs我單獨有謝了兩個類,一個是HDFSFileUploader,一個是Configuration。如類名,前者用於文件上傳,後者用於hdfs的配置。

HDFSFileUploader

package cn.test.web.utils.hadoop;

import cn.test.common.log.Log;
import cn.test.common.log.LogFactory;
import cn.test.common.util.Utils;
import cn.test.web.utils.HDFSUtils;
import org.apache.commons.lang.NullArgumentException;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.UUID;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:42
 */
public class HDFSFileUploader {
    public static final byte FROM_LOCAL_COPY = 1; // 從本地上傳文件
    public static final byte FROM_CONTENT_WRITE = 2; // 讀取字符串或字節,生成文件

    private static final Log LOGGER = LogFactory.getLog(HDFSFileUploader.class);
    private static final String HDFS_SCHEMA = "hdfs://";
    private static final String SEPARATOR = "/";
    private static final String SUFFIX_PREFIX = ".";

    private static final int BUFFER_SIZE = 1024;
    private static final Configuration CONF = new Configuration();


    /**
     * 上傳二進制文件,使用默認配置的域名,隨機生成文件名
     *
     * @param path
     * @param suffix
     * @param contents
     * @return
     */
    public static String upload(String path, String suffix, byte[] contents) {
        return upload(null, path, suffix, contents);
    }

    /**
     * 上傳二進制文件,隨機生成文件名
     *
     * @param domain
     * @param path
     * @param suffix
     * @param contents
     * @return
     */
    public static String upload(String domain, String path, String suffix, byte[] contents) {
        return upload(domain, path, null, suffix, contents);
    }

    /**
     * 上傳二進制文件,指定文件名,只能經過流上傳
     *
     * @param domain
     * @param path
     * @param filename
     * @param suffix
     * @param content
     * @return
     */
    public static String upload(String domain, String path, String filename, String suffix,
                                final byte[] content) {
        return upload(domain, path, filename, suffix, new String(content), FROM_CONTENT_WRITE);
    }

    /**
     * 上傳文件,默認域名和隨機文件名
     *
     * @param path
     * @param suffix
     * @param src
     * @return
     */
    public static String upload(String path, String suffix, String src, byte fromLocal) {
        return upload(null, path, suffix, src, fromLocal);
    }

    /**
     * 上傳文件到指定域名的指定目錄,文件名隨機生成
     *
     * @param domain 域名,好比 10.25.126.28:9000
     * @param path   文件路徑,好比 /usr/local/com.hd.test/2016-08-08/
     * @param suffix 文件後綴,好比 .xsl,xsl
     * @param src    文件內容,字符串 || 本地文件路徑
     * @return String 完整的文件名
     */
    public static String upload(String domain, String path, String suffix, String src, byte
            fromLocal) {
        return upload(domain, path, null, suffix, src, fromLocal);
    }

    /**
     * 上傳文件,指定了域名,路徑,文件名,後綴
     *
     * @param domain   域名
     * @param path     路徑
     * @param filename 文件名
     * @param suffix   後綴
     * @param src      內容 || 本地路徑
     * @return
     */
    public static String upload(String domain, String path, String filename, String suffix, String
            src, byte fromLocal) {
        String filePath = getRealAddr(domain, path, suffix, filename);
        System.out.println(filePath);
        try {
            switch (fromLocal) {
                case FROM_LOCAL_COPY:
                    HDFSUtils.copyFromLocalFile(CONF, src, filePath);
                    break;
                case FROM_CONTENT_WRITE:
                    HDFSUtils.createFile(CONF, src, filePath);
                    break;
            }
            return filePath;
        } catch (IOException | URISyntaxException e) {
            LOGGER.warn("上傳文件失敗:{}",e.getMessage());
        }
        return null;
    }

    /**
     * 文件完整的路徑
     *
     * @param domain   域名
     * @param path     目錄路徑
     * @param suffix   後綴
     * @param filename 文件名
     * @return
     */
    private static String getRealAddr(String domain, String path, String suffix, String filename) {
        if (!Utils.isEmpty(domain) && !domain.startsWith(HDFS_SCHEMA)) {
            domain = HDFS_SCHEMA + domain;
        } else {
            domain = "";
        }
        path = getPath(path);
        filename = getFilename(filename, suffix);
        return String.format("%s%s%s", domain, path, filename);

    }

    /**
     * 文件路徑
     *
     * @param path
     * @return
     */
    private static String getPath(String path) {
        if (Utils.isEmpty(path)) {
            throw new NullArgumentException("path id null");
        }
        if (!path.startsWith(SEPARATOR)) {
            path = SEPARATOR + path;
        }
        if (!path.endsWith(SEPARATOR)) {
            path = path + SEPARATOR;
        }
        return path;
    }

    /**
     * 生成文件名
     *
     * @param filename
     * @param suffix
     * @return
     */
    private static String getFilename(String filename, String suffix) {
        if (Utils.isEmpty(filename)) {
            filename = generateFilename();
        }
        if (!Utils.isEmpty(suffix)) {
            filename = suffix.equals(SEPARATOR) ? filename : (filename.endsWith(suffix) ?
                    filename : ((filename.endsWith(SUFFIX_PREFIX)
                    || suffix.startsWith(SUFFIX_PREFIX)) ? filename + suffix
                    : filename + SUFFIX_PREFIX + suffix));
        }
        return filename;
    }

    /**
     * 生成文件名
     *
     * @return
     */
    private static String generateFilename() {
        return getUuid(false);
    }

    /**
     * 生成UUID
     *
     * @param isNeedHyphen
     * @return
     */
    public static String getUuid(boolean isNeedHyphen) {
        UUID uuid = UUID.randomUUID();
        String str = uuid.toString();
        if (isNeedHyphen) {
            str = str.replaceAll("-", "");
        }
        return str;
    }

    public static void setConfResource(final Configuration config) {
        CONF.addResource(config);
    }
}

HDFSFileUploader中的一系列方法,用於上傳不一樣類型的文件,好比二進制文件,字符串等,還有hdfs的copy本地文件以及文件名uuid生成等方法。

Configuration

package cn.test.web.utils.hadoop;

import cn.test.web.utils.CommonUtils;
import org.apache.commons.io.FilenameUtils;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/9
 * Time 上午9:30
 * 建議使用方法:
 * <bean id="hadoopConfig" class="cn.test.util.hadoop.Configuration">
 * <property name="resources">
 * <list>
 * <value>classpath:/spring/core-site.xml</value>
 * </list>
 * </property>
 * </bean>
 * 在使用的地方直接注入hadoopConfig:
 *
 * @Resource private Configuration hadoopConfig;
 */
public class Configuration extends org.apache.hadoop.conf.Configuration {
    private Resource[] resources;

    public void setResources(List<String> filenames) throws IOException {
        List<Resource> resources = new ArrayList<>();
        if (filenames != null && filenames.size() > 0) {
            for (String filename : filenames) {
                filename = filename.trim();
                String realName = getFileName(filename);
                String ext = FilenameUtils.getExtension(realName);
                if (ext.equals("xml")) {
                    PathMatchingResourcePatternResolver pathMatchingResourcePatternResolver =
                            new PathMatchingResourcePatternResolver();
                    try {
                        Resource[] resourceList = pathMatchingResourcePatternResolver.getResources(filename);
                        Collections.addAll(resources, resourceList);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        for (Resource resource : resources) {
            this.addResource(resource.getURL());
        }
    }

    private String getFileName(String fileName) {
        return CommonUtils.getFileName(fileName);
    }
}

這個類很簡單,實際上是集成了hadoop的org.apache.hadoop.conf.Configuration類,目的是爲了在spring配置文件中,靈活的指定hadoop的配置文件,所用到的就是org.apache.hadoop.conf.Configuration的addResource(String name)方法,下面是在spring xml中的配置。

<!-- hadoop配置 -->
    <bean id="hadoopConfig" class="cn.test.web.utils.hadoop.Configuration">
        <property name="resources">
            <list>
                <value>classpath:META-INF/hadoop/*.xml</value>
            </list>
        </property>
    </bean>

導出訂單處理(mq消費端)

package cn.test.web.mq.consumer;
... // 不少依賴包

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/9
 * Time 下午2:14
 */
public class OrderExportHandler implements IMessageHandler<String, String> {

    private static final Log LOGGER = LogFactory.getLog(OrderExportHandler.class);
    private static final int MUL_SEC = 1000;
    private static final Gson GSON = new Gson();
    
    @Value("${image_server}") 
    private String imageServer;
    @Autowired
    private DataManager manager;
 
    @Override
    public void handle(final String key, final String message) {
        System.out.println("message" + message);
        Pattern p = Pattern.compile("-");
        String[] skey = p.split(key);
        if (skey.length < 3) {
            return;
        }
        int res = insert(skey[1], skey[0], skey[2]);
        LOGGER.debug("主鍵:{}", res);
        if (res > 0) {
            //插入數據成功,執行導出數據邏輯
            Map data = manager.parseData(message);
            List<?> header = null;
            List<?> content = null;
            List<Order> orders = null;

            DataExportLog log = new DataExportLog();
            log.setDelid(res);
            log.setUid(Integer.valueOf(skey[2]));

            if (data.containsKey("params")) {
                LOGGER.debug("params:{}", data.get("params"));
                orders = manager.getOrders(data.get("params"));
                LOGGER.debug("導出數據的條數:{}", orders.size());
            }
            if (orders == null || orders.size() == 0) {
                log.setStatus((byte) 4);
            } else if (data.containsKey("header") && (data.get("header") instanceof Map)) {
                Object obj = data.get("header");
                Map<String, List> map = (obj instanceof Map) ?
                        manager.parseHeader((Map<String, String>) obj) : null;

                if (map != null && map.size() > 0) {
                    if (map.containsKey("header")) {
                        header = getHeader(map.get("header"));
                    }
                    if (map.containsKey("key")) {
                        content = getContent(orders, map.get("key"));
                    }
                }
                // 調用hdfs 接口,上傳文件
                if (!Utils.isEmpty(header) || !Utils.isEmpty(content)) {
                    // 生成excel文件
                    String fName = getFilename(data);
                    String localFile = manager.writeExecelFile(fName, header, content, null, null);
                    String file = manager.copyFileFromLocal(skey[0], localFile);

                    if (Utils.isEmpty(localFile) || Utils.isEmpty(file)) {
                        log.setStatus((byte) 3);
                    } else {
                        log.setStatus((byte) 1);
                        log.setLink(file);
                    }
                    LOGGER.info("本地臨時文件:{}", localFile);
                    LOGGER.info("上傳到hadoop服務器中的文件:{}", file);
                }

            }
            update(log);
        }
    }
    
    // TODO 
    // 處理數據,這裏面會調用biz項目的dubbo接口
    // 具體的操做不在這裏面寫
    
}

訂單導出邏輯都在上面的類,以及DataManager中進行處理,期間獲取數據等接口則由biz項目的dubbo接口提供,具體業務邏輯在此不涉及。

下面會給出manager.writeExecelFile(fName, header, content, null, null);方法和manager.copyFileFromLocal(skey[0], localFile);方法的code:

public String writeExecelFile(String filename, List<?> header, List<?> datas, String title, String copyright) {
    SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd");
       String date = sd.format(new Date());
    if (Utils.isEmpty(filename)) {
        filename = HDFSFileUploader.getUuid(true) + this.ext;
    }
    String filePath = this.tmpDir + "/" + date + "/" + filename;
    filePath = filePath.replaceAll("//", "/");
    File f = new File(CommonUtils.getFilePath(filePath));
    if (!f.exists() && !f.isDirectory()) {
        f.mkdir();
    }
    if (Utils.isEmpty(title)) {
        title = DEFAULT_TITLE;
    }
    if (Utils.isEmpty(copyright)) {
        copyright = this.copyright;
    }
    return POIUtils.create(null, filePath, header, datas, title, copyright);
}

writeExecelFile方法調用了poi的create方法,此時臨時文件已生成。
還有一點須要說一下,好比臨時路徑,上傳到hdfs的路徑,版權信息等最好是在配置文件中可配置的,這就依賴予spring的org.springframework.beans.factory.config.PropertyPlaceholderConfigurer類,他能夠作到,咱們只須要在代碼中這麼寫而且在properties文件中寫入相應的配置便可:

@Value("${hdfs_upload_dir}")
    private String uploadDir;

    @Value("${file_tmp_dir}")
    private String tmpDir;

    @Value("${copyright}")
    private String copyright;

    @Value("${default_file_ext}")
    private String ext;

再看看copyFileFromLocal這個方法:

/**
     * 寫hdfs文件
     *
     * @param type
     * @param file
     * @return
     */
    public String copyFileFromLocal(String type, String file) {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        String date = format.format(new Date());
        String path = this.uploadDir + type + '/' + date + '/';
        HDFSFileUploader.setConfResource(hadoopConfig);
        return HDFSFileUploader.upload(path, this.ext, file, HDFSFileUploader.FROM_LOCAL_COPY);
    }

這個方法中調用了HDFSFileUploader.upload的方法,即上面展現的一個封裝類中的方法。須要注意的是,這地方注入了hadoop的配置文件HDFSFileUploader.setConfResource(hadoopConfig);。而hadoop得Configuration這樣引入在DataMananager類中:

@Resource
private Configuration hadoopConfig;

到此,咱們把生成的excel文件上傳到了hdfs的指定文件路徑。可使用hadoop客戶端的命令查看:

hadoop fs -ls /cn/test/order/ (這裏是上傳路徑)

訂單導出(下載)

訂單導出,這裏由java後端直接提供rest接口,若是使用php的hdfs第三方包phdfs(github),用起來並不那麼書順暢,編譯時報錯。

好吧,看看這個接口是怎麼寫的:

package cn.test.web.impl;

import cn.test.common.log.Log;
import cn.test.common.log.LogFactory;
import cn.test.util.Utils;
import cn.test.web.manager.DataManager;
import cn.test.web.service.DownloadService;
import cn.test.web.utils.CommonUtils;
import com.alibaba.dubbo.rpc.protocol.rest.support.ContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.net.URISyntaxException;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/16
 * Time 下午5:21
 */
@Path("download")
@Component("downloads")
@Produces({ContentType.APPLICATION_JSON_UTF_8})
public class DownloadServiceImpl implements DownloadService {
    private static final Log LOGGER = LogFactory.getLog(DownloadServiceImpl.class);
    @Autowired
    private DataManager manager;
    @Override
    @GET
    @Path("order")
    public void down(@Context HttpServletResponse response, @QueryParam("url") String url,
                             @QueryParam("uid") Integer uid) {
        LOGGER.debug("下載地址:{}", url);
        if (Utils.isEmpty(url)) {
            return;
        }
        String filename = CommonUtils.getFileName(url);
        // 設置頭部
        response.setContentType(MediaType.APPLICATION_OCTET_STREAM);
        response.setContentType("application/vnd.ms-excel;charset=gb2312");
        response.setHeader("Content-Disposition", "attachment;filename=" + filename);
        try {
            // 讀取並寫入下載數據
            manager.readFile(url, response.getOutputStream());
            response.flushBuffer();
        } catch (IOException | URISyntaxException e) {
            LOGGER.error(e.getMessage());
        }
    }
}

PHP頁面只須要一個超級連接便可。優化了一下,線上接口所有走內網的,所以,在a標籤中不可能直接把該接口的ip暴露出去,所以在nginx服務器作了代理配置,只須要訪問一個downloads/order?url=xxx&uid=xxx便可。

location /downloads/ {
    proxy_pass http://127.0.0.1:8086/presentation/download/;
}

踩過的坑

多線程獲取調用biz接口

public List<Order> getOrders(Object params) {
        OrderSearch search = null;
        if (params != null && (params instanceof Map)) {
            System.out.println("params:" + params);
            search = GSON.fromJson(GSON.toJson(params), OrderSearch.class);
            System.out.println("title:" + search.getTitle());
        } else {
            search = new OrderSearch();
        }
        int count = orderService.searchCount(search);
        int cycleTimes = (int) Math.ceil(count * 1.0 / TIMES_IN_SIGNEL_PROCESSOR);
        LOGGER.debug("數據總條數count:{},外部循壞執行次數:times:{}", count, cycleTimes);
        // 獲取全部併發任務的運行結果
        List<Order> orders = new ArrayList<>();
        int page = 0;
        for (int j = 0; j < cycleTimes; j++) {
            int signel = (count > TIMES_IN_SIGNEL_PROCESSOR) ? TIMES_IN_SIGNEL_PROCESSOR : count;
            count = count - signel;
            int poolNum = (int) Math.ceil(signel * 1.0 / LIMIT);
            LOGGER.debug("線程池數量:{}", poolNum);
            // 建立一個線程池
            ExecutorService pool = Executors.newFixedThreadPool(poolNum);
            // 建立多個有返回值的任務
            List<Future> list = new ArrayList<Future>();
            for (int i = 0; i < poolNum; i++) {
                Callable c = new OrderExportCallable(i + "", ++page, LIMIT, orderService, search);
                // 執行任務並獲取Future對象
                Future f = pool.submit(c);
                list.add(f);
            }
            // 關閉線程池
            pool.shutdown();
            try {
                Thread.sleep(THREAD_SLEEP);
            } catch (InterruptedException e) {
                LOGGER.debug("線程休眠時,引發中斷異常:{}", e.getMessage());
            }
            for (Future f : list) {
                // 從Future對象上獲取任務的返回值
                try {
                    orders.addAll((Collection<? extends Order>) f.get());
                    LOGGER.debug(">>>線程:{}返回的數據條數:{}", f.toString(),
                            ((Collection<? extends Order>) f.get()).size());
                } catch (InterruptedException | ExecutionException e) {
                    LOGGER.warn("調用OrderService接口的search方法失敗:{}", e.getMessage());
                    return null;
                }
            }

        }

        return orders;
    }

該方法是一個多線程調用dubbo接口,返回訂單數據。在調用Callable c = new OrderExportCallable(i + "", ++page, LIMIT, orderService, search);這個方法以後,發現每次獲取的數據都是最好設定的過濾條件,好比分頁,無論傳入的page是1仍是2,假如最後一次傳入的是5,那麼起做用的就是5,並非1或者2,緣由到底出在哪裏呢?通過打印日誌,發現OrderExportCallable類中,傳入的參數若是是對象,則是引用傳遞,無論哪個線程去修改,都會修改原來的對象屬性值,所以,問題找到了,解決方法也就出來了。

/**
 * 多線程處理數據
 */
class OrderExportCallable implements Callable<List<Order>> {
    private static final int THREAD_SLEEP = 1000;

    private String taskNum;
    private int page;
    private int limit;
    private OrderService orderService;
    private OrderSearch orderSearch;

    OrderExportCallable(String taskNum, int page, int limit, OrderService orderService,
                        OrderSearch orderSearch) {
        this.taskNum = taskNum;
        this.page = page;
        this.limit = limit;
        this.orderService = orderService;
        this.orderSearch = orderSearch;
    }


    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    @Override
    public List<Order> call() throws Exception {
        System.out.println(">>>" + taskNum + "任務啓動");
        Thread.sleep(THREAD_SLEEP);
        OrderSearch os = new OrderSearch(); 
        BeanUtils.copyProperties(orderSearch, os);
        os.setPage(this.page);
        os.setLimit(this.limit);
        return orderService.search(os);
    }
}

在OrderExportCallable從新new一個對象,把傳入的對象屬性copy到新建立的對象便可。至於爲何選擇Future,由於Future線程執行完會有返回結果。並且爲了處理數據的順序性,將Future對象加入到list,等待結果返回,依次處理返回結果。
若是數據量過大,超過併發線程單次請求的數量,則須要等待結果返回,從新建立線程。每次請求500條數據,若是有1萬條,那麼開20個線程,這樣就有點不划算了,因此1萬條數據分紅兩次執行,每次10個併發線程。因此,在建立線程的時候使用了兩次for循環。
PS:這裏須要優化的是如何讓返回結果的線程不關閉,繼續執行下一次請求,直到沒有後續的請求再關閉線程,減小建立線程的資源消耗。

導出excel數據流問題

導出數據的時候,一開始想到的是把excel讀取成流,轉換成字符串直接由rest響應到前端,可是這個方法失敗了,不管如何,導出的excel都是亂碼。

問題:HDFS讀取excel內容出現亂碼
上面有相關嘗試過的代碼,各類經常使用流都嘗試過,均失敗了,就在我絕望的時候,上司@hsj幫我解決了此問題。在rest接口中看到這句code了嗎?

manager.readFile(url, response.getOutputStream());

是的,就是傳入一個response的output流,可是,僅僅這句話還不能解決此問題,繼續往下看:

public void readFile(String filename, OutputStream os) throws IOException, URISyntaxException {
    if (HDFSUtils.exits(hadoopConfig, filename)) {
       HDFSUtils.readFile(hadoopConfig, filename, os);
    }
}

就是上面封裝的hdfs工具類的一個方法:

/**
     * 讀取文件內容並寫入outputStream中
     *
     * @param conf 配置
     * @param filePath 文件路徑
     * @param os 輸出流
     * @return
     * @throws IOException
     */
    public static void readFile(Configuration conf, String filePath, OutputStream os) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        try (FSDataInputStream inputStream = fs.open(path)) {
            Workbook wb = POIUtils.createWorkbookByIS(filePath, inputStream);
            wb.write(os);
            inputStream.close();
        } finally {
            fs.close();
        }
    }

而後調用poi工具類的方法:

public static Workbook createWorkbookByIS(String file, InputStream inputStream) {
    String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
    Workbook wb = null;
    try {
        switch (ext) {
            case "xls":
                wb = new HSSFWorkbook(inputStream);
                break;
            case "xlsx":
                wb = new XSSFWorkbook(inputStream);
                break;
            default:
                wb = new HSSFWorkbook(inputStream);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
    return wb;
}

咱們看到,最終調用了poi的createWorkbookByIS方法,而該方法僅僅作了一件事,就是根據文件擴展名建立了一個已有輸入流的Workbook對象,而後readFile將調用Workbook對象的write方法,將輸入流寫入到輸出列,而且response到request請求。同時,在rest接口中指定了請求響應的內容類型:response.setContentType("application/vnd.ms-excel;charset=gb2312");

POI導出大量數據卡死的現象(內存不足致使的)

public static Workbook createWorkbook(String file) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = createSXSSFWorkbook(MEM_ROW);
        /*switch (ext) {
            case "xls":
                wb = createHSSFWorkbook();
                break;
            case "xlsx":
                wb = createXSSFWorkbook();
                break;
            default:
                wb = createHSSFWorkbook();
        }*/
        return wb;
    }
    
    public static Workbook createSXSSFWorkbook(int memRow) {
        Workbook wb = new SXSSFWorkbook(memRow);
        Sheet sheet = wb.createSheet();
        return wb;
    }

使用SXSSFWorkbook建立wb對象。

java.lang.OutOfMemoryError:GC overhead limit exceeded

java.lang.OutOfMemoryError:GC overhead limit exceeded填坑心得
java.lang.OutOfMemoryError: Java heap space解決方法

org.apache.poi.poifs.filesystem.OfficeXmlFileException: The supplied data appears to be in the Office 2007+ XML. You are calling the part of POI that deals with OLE2 Office Documents. You need to call a different part of POI to process this data (eg XSSF instead of HSSF)

java解析獲取Excel中的數據--同時兼容2003及2007

public static Workbook createWorkbookByIS(String file, InputStream inputStream) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = null;
        try {
            wb = new XSSFWorkbook(inputStream);
        } catch (Exception e) {
            try {
                wb = new HSSFWorkbook(inputStream);
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
        return wb;
    }

遺留問題待解決

PS:然而,這篇文章所描述的內容並無完全解決大數據導出問題,好比,此刻導出的數據若是達到上萬條,CPU吃緊內存爆滿(4核,32G),還有一個吻頭疼的問題,導出4w+的數據須要3個小時,逆天了,這個還不是重點,重點是下載的慢的要死,4w的數據能導出1G之多。

優化之路纔開始,下一篇《基於haddop的HDFS和Excel開源庫POI導出大數據報表(二)》

若是您喜歡,或者能幫上您的忙,請收藏,若是您有更好的建議,請留言。若是您看到這句話而且不反感的話,請點個贊,給我一點碼字的鼓勵!

相關文章
相關標籤/搜索