大數據綜合項目DocCloud之需求分析與功能實現詳細(續更)

DocCloud項目需求

項目背景:

在一些大型企事業單位,工做中存在各類各樣的工做文檔,技術文檔,規範等等。這些文檔以word,xls,ppt,wps,pdf,txt存在。在此項目以前,文檔的分享主要靠單位內部人員的互相發送。沒有一個統一的平臺對企業現存的各類文檔進行統一管理。DocCloud項目提供了統一的文檔管理平臺。用戶能夠將文檔上傳至平臺,全部其餘用戶能夠在線查看此文檔。同時知足搜索文檔,分享,收藏等等一系列需求。在實踐中,有百度文庫,doc88,豆丁等公網項目。可是沒有一個專門爲企業用戶服務的一個文檔管理平臺。css

項目需求:

1.文檔的統一存儲html

2.文檔的檢索前端

3.文檔的在線預覽java

4.文檔分享mysql

5.文檔推薦jquery

6.文檔上傳下載linux

7.用戶的註冊,登陸nginx

8.文檔權限管理git

項目架構:

HDFS+LibreOffice6.0+solr+nginx+flume+hive+springboot+jpa+js+html+cssweb

文檔存儲: HDFS

文件存儲:1.本地(linux)-web服務器 優缺點:內存小,可是存儲方便

2.ftp服務器(搭建一個存儲文件的集羣)優缺點:文檔存儲內存夠,可是不能容錯

3.hdfs (hadoop集羣)優缺點:可擴展、容錯、分佈式存儲

文檔格式轉換: LibreOffice6.0

由於存儲的內容不是純文本,就是傳統的io流不能用、須要變成純文本文件(txt)

doc、docx、ppt---→html---→txt

使用LibreOffice6.0,不適用word的緣由:1.沒有接口(不開源)2.不能再linux上運行

進程間通訊:hadoop ipc

全文檢索: solr

日誌記錄服務器:ngnix

web日誌採集:flume

日誌分析:hive

webMvc:springboot

持久層框架:jpa

單元測試:junit4

前端:css+html+js+jquery+bootstrap

版本管理:svn

依賴管理:maven

開發環境:idea

部署環境:linux

數據庫:mysql


 

項目具體設計:

1.文檔的上傳下載

a.用戶在前端點擊上傳按鈕

b.在本地選擇上傳文檔

c.開始上傳

b.服務端校驗文件後綴是否符合文檔格式。

容許格式:doc,docx,ppt,pptx,xls,xlsx,pdf,txt

目的:避免上傳不能轉碼的文檔如:exe,zip,….

e.校驗文檔大小,容許128兆如下的文檔上傳。

128M:爲了使文檔在hdfs是一個塊的形式保存。

f.計算文檔的md5值,判斷文檔是否在文庫中已經存在,若是存在,告知用戶已經存在。

g.不存在,則上傳至hdfs,同時數據庫中保存用戶上傳文檔信息。

數據保存在hdfs上,元數據保存在數據庫mysql

2.上傳成功之後須要提交文檔轉換任務(主要功能以下)

      1>轉換成html

       2>轉換成pdf提取縮略圖,頁數

       3>提取文本 創建索引


如下代碼沒有涉及前端、只是後臺測試(使用Postman測試),部分參數都沒有從session中獲取,都是隨機生成的

沒有軟件的附上資源(下載雙擊安裝就能夠):連接:https://pan.baidu.com/s/1SibrDOB4GwkX4L0iw3nYTA 
提取碼:bisn

1、功能一:上傳文件/2018.10.29

日誌配置:

         在類名上添加註解

         @Slf4j

         直接在類中使用log記錄日誌

文件上傳:

         關鍵註解:

         @RequestParam("file") MultipartFile file

         獲取上傳文件名:

         file.getOriginalFilename()

 

建立一個java項目DocCloud

在DocCloud中建立model---->doccloudweb(模塊名要小寫)

在module中選擇Spring Initializr

在建立module時,選擇pom中的依賴以下,選完以後一路下一步

core下選擇DevTools、LomBok

web下選擇web

sql下選擇JPA、mysql

NoSQL下選solr

 

<dependencies>
    <!--數據相關依賴-持久層-->
    <!--<dependency>-->
        <!--<groupId>org.springframework.boot</groupId>-->
        <!--<artifactId>spring-boot-starter-data-jpa</artifactId>-->
    <!--</dependency>-->
    <!--全文檢索-->
    <!--<dependency>-->
        <!--<groupId>org.springframework.boot</groupId>-->
        <!--<artifactId>spring-boot-starter-data-solr</artifactId>-->
    <!--</dependency>-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--熱部署-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
    </dependency>
    <!--數據庫鏈接-->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!--單元測試-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.0.6.RELEASE</version>
    </dependency>                   
</dependencies>

<!--編譯、打jar包-->
<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

以上是建立項目選擇的依賴,請添加如下外加依賴

<!--上傳文件到hdfs上-->

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.7.5</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.7.5</version>

        </dependency>

數據庫建立Doc_cloud

Doc:記錄文件屬性

使用jpa---->java+persistence+api

JPA是Java Persistence API的簡稱,中文名Java持久層API,是JDK 5.0註解或XML描述對象-關係表的映射關係,並將運行期的實體對象持久化到數據庫中

1.配置數據源application.properties

#數據源配置
spring.datasource.name=root
spring.datasource.password=123
#true:表示展現sql語句
spring.jpa.show-sql=true
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/doc_cloud

2..上傳文件到hdfs上用到的core-site.xml

<?xml version="1.0" encoding="UTF-8"?>

<configuration>

    <property>

        <name>fs.defaultFS</name>

        <value>hdfs://master2:9000</value>

    </property>

    <property>

        <name>fs.hdfs.impl</name>

        <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>

    </property>

</configuration>

3.編寫controller層-- DocController

import com.zhiyou100.doccloudweb.service.DocService;

import com.zhiyou100.doccloudweb.util.HdfsUtil;

import com.zhiyou100.doccloudweb.util.MD5Util;

import com.zhiyou100.doccloudweb.entity.Doc;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestParam;

import org.springframework.web.bind.annotation.ResponseBody;

import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Optional;

import java.util.Random;

@Controller  //表示是controller層--業務層

@RequestMapping("/doc")

@Slf4j

public class DocController {

    @Autowired

    private DocService docService;

    //定義合法的文件後綴類型

    public static final String[] DOC_SUFFIXS= new String[]{"doc", "docx", "ppt", "pptx", "txt", "xls", "xlsx", "pdf"};

    //定義文件最大大小

    public static final int DOC_MAX_SIZE = 128*1024*1024;

    //定義文件保存到hdfs上的根目錄

    public static final String HOME="hdfs://192.168.228.13:9000/doccloud";

    @RequestMapping("/upload")

    @ResponseBody

    public String upload(@RequestParam("file") MultipartFile file){

        //判斷是不是文件

        if (file.isEmpty()){

            return "file is empty";

        }

        //獲取文件名

        String filename = file.getOriginalFilename();

        //以點分割-獲取文件後綴

        String[] strings = filename.split("\\.");

        if (strings.length==1){

            return "file does not has suffix";

        }

        String suffix = strings[1];

        log.info("doc suffix is {}",suffix);

        //1.判斷文件後綴是否合法

        boolean flag = isSuffixLegal(suffix);

        if (!flag){

            return "file is illegal";

        }

        try {

            //2.判斷文件大小是否合法

            byte[] bytes = file.getBytes();

            log.info("file size is {}",bytes.length);

            if (bytes.length>DOC_MAX_SIZE){

                return "file is large,file Max size:"+DOC_MAX_SIZE;

            }

            //3.計算文檔的MD5值

            String md5 = getMD5(bytes);

            log.info("file is md5 {} ",md5);

            //用戶上傳文件,保存到數據庫

            //1.校驗數據庫中的md5值,判斷數據庫中是否存在

            Optional<Doc> doc = docService.findByMd5(md5);

            if (doc.isPresent()){

                //2.若是存在,更新

                // 2.1獲取文件對象

                Doc docEntity = doc.get();

                //2.2設置文件更新的人

                docEntity.setUserId(new Random().nextInt());

                //2.3保存到數據庫

                docService.save(docEntity);

            }else {

                //3.若是不存在,將文件元數據保存到數據庫,將數據保存到hdfs

                //3.1保存數據到hdfs

                //3.1.1生成文件保存路徑:HOME+當前時間

                String date = getDate();

                String dst = HOME+"/"+date+"/"+file.getOriginalFilename()+"/";

                log.info("file dst {}",dst);

                //3.1.2上傳文件

                HdfsUtil.upload(bytes,file.getOriginalFilename(),dst);

                //3.2將元數據保存到數據庫

                //3.2.1建立一個文件對象

                Doc docEntity = new Doc();

                //3.2.2設置做者

                docEntity.setUserId(new Random().nextInt());

                //3.2.3設置備註

                docEntity.setDocComment("hadoop");

                //3.2.4設置文件路徑

                docEntity.setDocDir(dst);

                //3.2.5設置文件名

                docEntity.setDocName(filename);

                //3.2.6設置文件大小

                docEntity.setDocSize(bytes.length);

                //3.2.7設置文件權限

                docEntity.setDocPermission("1");

                //3.2.8設置文件類型(後綴)

                docEntity.setDocType(suffix);

                //3.2.9設置文件狀態

                docEntity.setDocStatus("upload");

                //3.2.10設置文件的md5值--保證文件的惟一性

                docEntity.setMd5(md5);

                //3.2.11設置文件創做時間

                docEntity.setDocCreateTime(new Date());

                //3.2.12保存元數據

                docService.save(docEntity);

            }

        } catch (IOException e) {

            e.printStackTrace();

        }

        return "upload success";

    }

    /**

     * 獲取當前是時間,用於文件的保存路徑

     * @return

     */

    private String getDate() {

        Date date = new Date();

        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");

        return simpleDateFormat.format(date);

    }

    /**

     * 計算字節數組的MD5值

     * @param bytes

     * @return

     */

    private String getMD5(byte[] bytes) {

        return MD5Util.getMD5String(bytes);

    }

    /**

     * 判斷文件後綴是否合法

     * @param suffix

     * @return

     */

    private boolean isSuffixLegal(String suffix) {

        for (String docsuffix :

                DOC_SUFFIXS) {

            if (suffix.equals(docsuffix)){

                return true;

            }

        }

        return false;

    }

}

 

4.編寫業務層service層-- DocService

import com.zhiyou100.doccloudweb.dao.DocRepository;

import com.zhiyou100.doccloudweb.entity.Doc;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import java.util.Optional;

@Service

public class DocService {

    @Autowired

    private DocRepository docRepository;

    //經過id獲取文件對象

    public Optional<Doc> findById(int id) {

        return docRepository.findById(id);

    }

    //經過MD5獲取文件對象

    public Optional<Doc> findByMd5(String md5) {

        return docRepository.findByMd5(md5);

    }

    //保存文件對象到數據庫

    public void save(Doc docEntity) {

        docRepository.save(docEntity);

    }

}

    //定義合法的文件後綴類型

    public static final String[] DOC_SUFFIXS= new String[]{"doc", "docx", "ppt", "pptx", "txt", "xls", "xlsx", "pdf"};

    //定義文件最大大小

    public static final int DOC_MAX_SIZE = 128*1024*1024;

    @RequestMapping("/doclist")

    @ResponseBody

    Doc doList(){

        Optional<Doc> id = docService.findById(1);

        return null;

    }

 

5.dao層—持久層--DocRepository

import com.zhiyou100.doccloudweb.entity.Doc;

import org.springframework.data.jpa.repository.JpaRepository;

import org.springframework.stereotype.Repository;

import java.util.Optional;

@Repository

//Doc:表示定義的實體類,Integer:表示主鍵類型

public interface DocRepository extends JpaRepository<Doc,Integer> {

    //利用反射機制自動識別

    Optional<Doc> findByMd5(String md5);

}

 

6.實體層—Doc

import lombok.Data;

import org.springframework.web.bind.annotation.ResponseBody;

import javax.persistence.*;

import java.util.Date;

/**

 * 文件屬性

 */

@Entity

@Table(name = "doc") //映射到數據庫中的表

@Data //get/set

public class Doc {

 

    @Id //主鍵

    //告訴框架id生成策略(怎麼生成)GenerationType.IDENTITY:表示自動生成

    @GeneratedValue(strategy = GenerationType.IDENTITY)

    private int id;

    @Column(name = "md5")//若是數據庫字段與entity中字段名同樣,則不用加此註解

    private String md5;

    @Column(name = "doc_name")

    private String docName;

    @Column(name = "doc_type")

    private String docType;

    @Column(name = "doc_status")

    private String docStatus;

    @Column(name = "doc_size")

    private int docSize;

    @Column(name = "doc_dir")

    private String docDir;

    @Column(name = "user_id")

    private int userId;

    @Column(name = "doc_create_time")

    private Date docCreateTime;

    @Column(name = "doc_comment")

    private String docComment;

    @Column(name = "doc_permission")

    private String docPermission;

}

7.工具類

HdfsUtil 

import com.google.common.io.Resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

/*
*@ClassName:HdfsUtil
 @Description:TODO
 @Author:
 @Date:2018/10/29 17:17 
 @Version:v1.0
*/
public class HdfsUtil {
    //文檔上傳工具類
    public static void upload(byte[] src, String docName, String dst) throws IOException {
        //加載配置文件
        Configuration coreSiteConf = new Configuration();
        coreSiteConf.addResource(Resources.getResource("core-site.xml"));
        //獲取文件系統客戶端對象
        FileSystem fileSystem = FileSystem.get(coreSiteConf);

        FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(dst + "/" + docName));

        fsDataOutputStream.write(src);
        fsDataOutputStream.close();
        fileSystem.close();
    }
}

MD5Util 

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

public class MD5Util {
    protected static char hexDigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
    protected static MessageDigest messagedigest = null;

    /**
     * MessageDigest初始化
     *
     * @author
     */
    static {
        try {
            messagedigest = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            System.err.println("MD5FileUtil messagedigest初始化失敗");
            e.printStackTrace();
        }
    }

    /**
     * 對文件進行MD5加密
     *
     * @author
     */
    public static String getFileMD5String(File file) throws IOException {
        FileInputStream in = new FileInputStream(file);
        FileChannel ch = in.getChannel();
        MappedByteBuffer byteBuffer = ch.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
        messagedigest.update(byteBuffer);
        return bufferToHex(messagedigest.digest());
    }

    /**
     * 對字符串進行MD5加密
     *
     * @author
     */
    public static String getMD5String(String s) {
        return getMD5String(s.getBytes());
    }

    /**
     * 對byte類型的數組進行MD5加密
     *
     * @author
     */
    public static String getMD5String(byte[] bytes) {
        messagedigest.update(bytes);
        return bufferToHex(messagedigest.digest());
    }

    private static String bufferToHex(byte bytes[]) {
        return bufferToHex(bytes, 0, bytes.length);
    }

    private static String bufferToHex(byte bytes[], int m, int n) {
        StringBuffer stringbuffer = new StringBuffer(2 * n);
        int k = m + n;
        for (int l = m; l < k; l++) {
            char c0 = hexDigits[(bytes[l] & 0xf0) >> 4];
            char c1 = hexDigits[bytes[l] & 0xf];
            stringbuffer.append(c0);
            stringbuffer.append(c1);
        }
        return stringbuffer.toString();
    }
}

2、功能:文檔轉換

上傳成功之後須要提交文檔轉換任務(主要功能以下)

      1>轉換成html

       2>轉換成pdf提取縮略圖,頁數

       3>提取文本 創建索引

1.定義一個docjob對象,用於封裝任務信息

2.實現writable接口。由於要經過hadoop ipc序列化實現文檔轉換守護進程
該進程的做用是完成存放在本節點文檔的轉換,索引的任務。

1.文檔轉換成htm!經過runtime.exec執行命令來實現

2.經過hadoop ipc來接受任務

hadoop ipc
hadoop ipc是一套hadoop自帶的成熟的rpc框架,性能高,穩定性性強。

server:
a.服務端定義接口b.定義按口的實現類
c用hadoop ipc暴露服務

client;
經過rpc.geproxy來調用服務崗的接口。


下面建立的是服務端的代碼: 

1.新建模塊--docservicedeamon文件轉換守護進程

2.pom文件中的依賴

<dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.5</version>
</dependency>
<!--ipc通訊模塊-->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.5</version>
</dependency>
<!--註解、-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.20</version>
</dependency>
<!--berklydb數據庫依賴-->
<!-- https://mvnrepository.com/artifact/com.sleepycat/je -->
<dependency>
    <groupId>com.sleepycat</groupId>
    <artifactId>je</artifactId>
    <version>5.0.73</version>
</dependency>
<!--hdfs文件上傳與下載-->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.7.5</version>
</dependency>

 

3.配置core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://master2:9000</value>
    </property>
    <property>
        <name>fs.hdfs.impl</name>
        <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
    </property>
</configuration>

4.類一---DocJob

import lombok.Data;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;

/**
 *此方法用於封裝任務信息
 */
@Data
public class DocJob implements Writable,Serializable {
    private static final long serialVersionUID = 12345678L;
    //任務id
    private int id;
    //任務名
    private String name;
    //任務類型
    private DocJobType jobType;
    //提交者
    private int userId;
    //提交時間
    private long submitTime;
    //完成時間
    private long finishTime;
    //任務狀態
    private JobStatus jobStatus;
    //任務重試次數
    private int retryTime;
    //文檔輸入路徑
    private String input;
    //任務輸出路徑
    private String output;
    //文件名
    private String fileName;
    public void write(DataOutput out) throws IOException {
        out.writeInt(id);
        out.writeUTF(name);
        out.writeUTF(jobType.name());
        out.writeInt(userId);
        out.writeLong(finishTime);
        out.writeLong(submitTime);
        out.writeUTF(jobStatus.name());
        out.writeInt(retryTime);
        out.writeUTF(input);
        out.writeUTF(output);
        out.writeUTF(fileName);
    }

    public void readFields(DataInput in) throws IOException {
        id= in.readInt();
        name=in.readUTF();
        jobType=DocJobType.valueOf(in.readUTF());
        userId=in.readInt();
        finishTime=in.readLong();
        submitTime=in.readLong();
        jobStatus=JobStatus.valueOf(in.readUTF());
        retryTime=in.readInt();
        input=in.readUTF();
        output=in.readUTF();
        fileName=in.readUTF();
    }
}

5.類2、DocJobType

/**
 * 項目的類型:文檔轉換、定義索引...
 */
public enum DocJobType {
    DOC_JOB_CONVERT,DOC_JOB_CREATE_INDEX,DOC_JOB_UPDATE_INDEX
}

6.類3、JobStatus

/**
 * 文檔狀態:準備、提交、運行、失敗、完成
 */
public enum JobStatus {
    PREPARE,SUBMIT,RUNNING,FAILED,SUCCEED,
}

7類4、.JobDaemonService

/**
 * 服務端
 * 1.定義接口繼承VersionedProtocol
 */
public interface JobDaemonService extends VersionedProtocol {
    //定義通訊間的暗號
    long versionID=1L;
    //定義提交方法
    void submitDocJob(DocJob job);

}

8.類5、JobDaemonServiceImpl

import com.zhiyou100.doccloud.utils.BdbPersistentQueue;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.ipc.ProtocolSignature;


import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 2.定義接口的實現類
 * 實現Runnable接口:是爲了使用多線程處理
 */
@Slf4j
public class JobDaemonServiceImpl implements JobDaemonService,Runnable{
    //定義將hdfs下載到本地的目錄的根路徑
    private static final String WORK_DIR="/tmp/docjobdaemon/";
    //定義持久化對象
    public  BdbPersistentQueue<DocJob> queue;
    //定義線程池--多線程並行處理
    private ExecutorService pool = Executors.newFixedThreadPool(4);
    //定義一個標準-讓線程運行
    private boolean flag = true;

    //構造方法:用於建立berkly數據庫目錄,並初始化持久化隊列
    public JobDaemonServiceImpl(){
        //建立工做目錄--本地保存路徑
        File workDir = new File(WORK_DIR + "/" + "bdb/");
        if (!workDir.exists()){
            //若是不存在將建立
            workDir.mkdirs();
            System.out.println(workDir.getAbsolutePath());
        }
        //初始化持久化隊列
        queue = new BdbPersistentQueue<DocJob>(WORK_DIR+"/"+"bdb/", "docjob", DocJob.class);
    }

    public void submitDocJob(DocJob job) {
        System.out.println(job);
        //將任務保存在序列化隊列中,1.保證任務不丟失   2.併發控制,內存溢出
        log.info("receive job {}",job);
        queue.offer(job);
    }

    public long getProtocolVersion(String s, long l) throws IOException {
        return versionID;
    }

    public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
        return null;
    }

    @Override
    public void run() {
        while (flag){
            //將任務從序列化隊列中取出任務,poll:每取出一個就從磁盤中移除一個
            DocJob docJob = queue.poll();
            //判斷docjob中否爲空
            if (docJob==null){
                //爲空,等待5000毫秒
                try {
                    Thread.sleep(5000);
                    System.out.println("waiting for docjob");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else {
                pool.submit(new DocJobHandler(docJob));
            }
        }
    }
}

9.類6、Main

import com.zhiyou100.doccloud.job.JobDaemonService;
import com.zhiyou100.doccloud.job.JobDaemonServiceImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

import java.io.IOException;

/**
 * 守護進程--項目的入口類
 * 3.服務端:暴露端口
 */
public class Main {
    public static void main(String[] args) throws IOException {
        //建立服務端接口實現類對象
        JobDaemonServiceImpl instance = new JobDaemonServiceImpl();
        //開啓線程
        new Thread(instance).start();

        // 建立一個RPC builder
        RPC.Builder builder = new RPC.Builder(new Configuration());

        //指定RPC Server的參數
        builder.setBindAddress("localhost");
        builder.setPort(7788);

        //將本身的程序部署到server上
        builder.setProtocol(JobDaemonService.class);
        builder.setInstance(instance);

        //建立Server
        RPC.Server server = builder.build();

        //啓動服務
        server.start();
    }
}

10.類七--DocJobHandler

import com.zhiyou100.doccloud.utils.HdfsUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;

import java.io.File;
import java.io.IOException;
import java.util.UUID;

@Slf4j
public class DocJobHandler implements Runnable {
    private DocJob docJob;

    public DocJobHandler(DocJob docJob) {
        this.docJob = docJob;
        log.info("start to deal job {}",docJob);
    }

    /**
     *將文件衝hdfs上下載到本地,再將文件格式轉化成HTML,最終上傳到hdfs上
     */
    @Override
    public void run() {
        //1.將hdfs上的文件下載到本地
        //1.1獲取文件的下載路徑(在hdfs上的位置)
        String input = docJob.getInput();
        //1.2建立目標路徑(下載到本地的路徑)
        String tmpWorkDirPath = "/tmp/docjobdaemon/" + UUID.randomUUID().toString() + "/";
        File tmpWorkDir = new File(tmpWorkDirPath);
        tmpWorkDir.mkdirs();
        System.out.println("tmpWorkDirPath: "+tmpWorkDirPath);
        //1.3下載文件到臨時目錄
        try {
            HdfsUtil.copyToLocal(input,tmpWorkDirPath);
            log.info("download file to {}",tmpWorkDirPath);
            //step1:將下載到本地的文件格式轉化成HTML
            String command = "D:\\soft\\LibreOffice_6.0.6\\program\\soffice --headless --invisible --convert-to html " + docJob.getFileName();
            Process process = Runtime.getRuntime().exec(command, null, tmpWorkDir);
            //結果信息
            System.out.println(IOUtils.toString(process.getInputStream()));
            //錯誤信息
            System.out.println(IOUtils.toString(process.getErrorStream()));
            //step2 轉換成pdf
            //step3 提取頁碼
            //step4 提取首頁縮略圖
            //step5 利用solr創建索引
            //step6 上傳結果
            //step7 清理臨時目錄
            //step8 任務成功回調
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

工具類、

將hdfs上的文件下載到本地--HdfsUtil

import com.google.common.io.Resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.net.URI;

public class HdfsUtil {
    public static final String HOME="hdfs://192.168.228.13:9000/";
    //文檔上傳工具類
    public static void upload(byte[] src, String docName, String dst) throws IOException {
        //加載配置文件
        Configuration coreSiteConf = new Configuration();
        coreSiteConf.addResource(Resources.getResource("core-site.xml"));
        //獲取文件系統客戶端對象
        FileSystem fileSystem = FileSystem.get(coreSiteConf);

        FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(dst + "/" + docName));

        fsDataOutputStream.write(src);
        fsDataOutputStream.close();
        fileSystem.close();
    }

    /**
     * 將集羣的問價下載到本地
     * @param dst
     * @param localPath
     * @throws IOException
     */
    public static void copyToLocal(String dst,String localPath) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(Resources.getResource("core-site.xml"));
        FileSystem fs = FileSystem.get(URI.create(dst),conf);
        fs.copyToLocalFile(new Path(dst),new Path(localPath));
        fs.close();
    }
}

持久化隊列,基於BDB實現--BdbPersistentQueue&&BdbEnvironment

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.io.FileUtils;

import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.serial.SerialBinding;
import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.collections.StoredMap;
import com.sleepycat.collections.StoredSortedMap;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseExistsException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.EnvironmentConfig;
/**
 * 持久化隊列,基於BDB實現,也繼承Queue,以及能夠序列化.但不等同於Queue的時,再也不使用後須要關閉
 * 相比通常的內存Queue,插入和獲取值須要多消耗必定的時間
 * 這裏爲何是繼承AbstractQueue而不是實現Queue接口,是由於只要實現offer,peek,poll幾個方法便可,
 * 其餘如remove,addAll,AbstractQueue會基於這幾個方法去實現
 *
 * @contributor 
 * @param <E>
 */
public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements
        Serializable {
    private static final long serialVersionUID = 3427799316155220967L;
    private transient BdbEnvironment dbEnv;            // 數據庫環境,無需序列化
    private transient Database queueDb;             // 數據庫,用於保存值,使得支持隊列持久化,無需序列化
    private transient StoredMap<Long,E> queueMap;   // 持久化Map,Key爲指針位置,Value爲值,無需序列化
    private transient String dbDir;                 // 數據庫所在目錄
    private transient String dbName;                // 數據庫名字
    //AtomicLong:元子類型,線程安全
    //i++線程不安全
    private AtomicLong headIndex;                   // 頭部指針
    private AtomicLong tailIndex;                   // 尾部指針
    private transient E peekItem=null;              // 當前獲取的值

    /**
     * 構造函數,傳入BDB數據庫
     *
     * @param db
     * @param valueClass
     * @param classCatalog
     */
    public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){
        this.queueDb=db;
        this.dbName=db.getDatabaseName();
        headIndex=new AtomicLong(0);
        tailIndex=new AtomicLong(0);
        bindDatabase(queueDb,valueClass,classCatalog);
    }
    /**
     * 構造函數,傳入BDB數據庫位置和名字,本身建立數據庫
     *
     * @param dbDir
     * @param dbName
     * @param valueClass
     */
    public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){
        //headIndex=new AtomicLong(0);
        //tailIndex=new AtomicLong(0);
        this.dbDir=dbDir;
        this.dbName=dbName;
        createAndBindDatabase(dbDir,dbName,valueClass);
    }
    /**
     * 綁定數據庫
     *
     * @param db
     * @param valueClass
     * @param classCatalog
     */
    public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){
        EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);
        if(valueBinding == null) {
            valueBinding = new SerialBinding<E>(classCatalog, valueClass);   // 序列化綁定
        }
        queueDb = db;
        queueMap = new StoredSortedMap<Long,E>(
                db,                                             // db
                TupleBinding.getPrimitiveBinding(Long.class),   //Key 序列化類型
                valueBinding,                                   // Value
                true);                                          // allow write
        //todo
        Long firstKey = ((StoredSortedMap<Long, E>) queueMap).firstKey();
        Long lastKey = ((StoredSortedMap<Long, E>) queueMap).lastKey();

        headIndex=new AtomicLong(firstKey == null ? 0 : firstKey);
        tailIndex=new AtomicLong(lastKey==null?0:lastKey+1);
    }
    /**
     * 建立以及綁定數據庫
     *
     * @param dbDir
     * @param dbName
     * @param valueClass
     * @throws DatabaseNotFoundException
     * @throws DatabaseExistsException
     * @throws DatabaseException
     * @throws IllegalArgumentException
     */
    private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException,
            DatabaseExistsException,DatabaseException,IllegalArgumentException{
        File envFile = null;
        EnvironmentConfig envConfig = null;
        DatabaseConfig dbConfig = null;
        Database db=null;

        try {
            // 數據庫位置
            envFile = new File(dbDir);

            // 數據庫環境配置
            envConfig = new EnvironmentConfig();
            envConfig.setAllowCreate(true);
            //不支持事務
            envConfig.setTransactional(false);

            // 數據庫配置
            dbConfig = new DatabaseConfig();
            dbConfig.setAllowCreate(true);
            dbConfig.setTransactional(false);
            //是否要延遲寫
            dbConfig.setDeferredWrite(true);

            // 建立環境
            dbEnv = new BdbEnvironment(envFile, envConfig);
            // 打開數據庫
            db = dbEnv.openDatabase(null, dbName, dbConfig);
            // 綁定數據庫
            bindDatabase(db,valueClass,dbEnv.getClassCatalog());

        } catch (DatabaseNotFoundException e) {
            throw e;
        } catch (DatabaseExistsException e) {
            throw e;
        } catch (DatabaseException e) {
            throw e;
        } catch (IllegalArgumentException e) {
            throw e;
        }


    }

    /**
     * 值遍歷器
     */
    @Override
    public Iterator<E> iterator() {
        return queueMap.values().iterator();
    }
    /**
     * 大小
     */
    @Override
    public int size() {
        synchronized(tailIndex){
            synchronized(headIndex){
                return (int)(tailIndex.get()-headIndex.get());
            }
        }
    }

    /**
     * 插入值
     */
    @Override
    public boolean offer(E e) {
        synchronized(tailIndex){
            queueMap.put(tailIndex.getAndIncrement(), e);// 從尾部插入
            //將數據不保存在緩衝區,直接存入磁盤
            dbEnv.sync();
        }
        return true;
    }

    /**
     * 獲取值,從頭部獲取
     */
    @Override
    public E peek() {
        synchronized(headIndex){
            if(peekItem!=null){
                return peekItem;
            }
            E headItem=null;
            while(headItem==null&&headIndex.get()<tailIndex.get()){ // 沒有超出範圍
                headItem=queueMap.get(headIndex.get());
                if(headItem!=null){
                    peekItem=headItem;
                    continue;
                }
                headIndex.incrementAndGet();    // 頭部指針後移
            }
            return headItem;
        }
    }

    /**
     * 移出元素,移出頭部元素
     */
    @Override
    public E poll() {
        synchronized(headIndex){
            E headItem=peek();
            if(headItem!=null){
                queueMap.remove(headIndex.getAndIncrement());
                //從磁盤上移除
                dbEnv.sync();
                peekItem=null;
                return headItem;
            }
        }
        return null;
    }

    /**
     * 關閉,也就是關閉所是用的BDB數據庫但不關閉數據庫環境
     */
    public void close(){
        try {
            if(queueDb!=null){
                queueDb.sync();
                queueDb.close();
            }
        } catch (DatabaseException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (UnsupportedOperationException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 清理,會清空數據庫,而且刪掉數據庫所在目錄,慎用.若是想保留數據,請調用close()
     */
    @Override
    public void clear() {
        try {
            close();
            if(dbEnv!=null&&queueDb!=null){
                dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName);
                dbEnv.close();
            }
        } catch (DatabaseNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (DatabaseException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally{
            try {
                if(this.dbDir!=null){
                    FileUtils.deleteDirectory(new File(this.dbDir));
                }

            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}

 

import java.io.File;

import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
/**
 * BDB數據庫環境,能夠緩存StoredClassCatalog並共享
 *
 * @contributor 
 */
public class BdbEnvironment extends Environment {
    StoredClassCatalog classCatalog;
    Database classCatalogDB;

    /**
     * Constructor
     *
     * @param envHome 數據庫環境目錄
     * @param envConfig config options  數據庫換記念館配置
     * @throws DatabaseException
     */
    public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException {
        super(envHome, envConfig);
    }

    /**
     * 返回StoredClassCatalog
     * @return the cached class catalog
     */
    public StoredClassCatalog getClassCatalog() {
        if(classCatalog == null) {
            DatabaseConfig dbConfig = new DatabaseConfig();
            dbConfig.setAllowCreate(true);
            try {
                //事務、數據庫名、配置項
                classCatalogDB = openDatabase(null, "classCatalog", dbConfig);
                classCatalog = new StoredClassCatalog(classCatalogDB);
            } catch (DatabaseException e) {
                // TODO Auto-generated catch block
                throw new RuntimeException(e);
            }
        }
        return classCatalog;
    }

    @Override
    public synchronized void close() throws DatabaseException {
        if(classCatalogDB!=null) {
            classCatalogDB.close();
        }
        super.close();
    }

}

 

客戶端--doccloudweb

1.將上面客戶端的DocJob、DocJobType、JobDeamonService、JobStatus類複製到客戶端

2.將DocController中接着添加

//上傳成功之後須要提交文檔轉換任務
//轉換成html,
submitDocJob(docEntity,new Random().nextInt());
/**
 * 提交任務到集羣上運行--文檔轉換任務
 * @param docEntity
 * @param userId
 */
private void submitDocJob(Doc docEntity, int userId) throws IOException {
    //建立一個文檔轉換任務對象
    DocJob docJob = new DocJob();
    //1.設置提交者
    docJob.setUserId(userId);
    //2.設置任務名
    docJob.setName("doc convent");
    //3.任務的狀態
    docJob.setJobStatus(JobStatus.SUBMIT);
    //4.設置任務類型
    docJob.setJobType(DocJobType.DOC_JOB_CONVERT);
    //5.設置提交時間
    docJob.setSubmitTime(System.nanoTime());
    //6.設置輸入路徑
    docJob.setInput(docEntity.getDocDir()+"/"+docEntity.getDocName());
    //7.設置輸出路徑
    docJob.setOutput(docEntity.getDocDir());
    //8.設置重試次數
    docJob.setRetryTime(4);
    //9.設置文件名
    docJob.setFileName(docEntity.getDocName());
    //todo 將job元數據保存到數據庫
    //獲取動態代理對象
    JobDaemonService jobDaemonService = RPC.getProxy(JobDaemonService.class, 1L, new InetSocketAddress("localhost", 7788), new Configuration());
    //提交任務到服務器(hdfs上)
    log.info("submit job:{}",docJob);
    jobDaemonService.submitDocJob(docJob);

}

 

將上傳到hdfs上的文件下載到本地,將下載的文件轉化爲HTML(經過runtime調用exec來執行命令)並保存到本地(客戶端提交任務到服務器)經過hadoop IPC來接受任務。將任務保存在序列化隊列中,1.保證任務不丟失 2.併發控制,內存溢出

---------------------------<待更>-------------------------

相關文章
相關標籤/搜索