在一些大型企事業單位,工做中存在各類各樣的工做文檔,技術文檔,規範等等。這些文檔以word,xls,ppt,wps,pdf,txt存在。在此項目以前,文檔的分享主要靠單位內部人員的互相發送。沒有一個統一的平臺對企業現存的各類文檔進行統一管理。DocCloud項目提供了統一的文檔管理平臺。用戶能夠將文檔上傳至平臺,全部其餘用戶能夠在線查看此文檔。同時知足搜索文檔,分享,收藏等等一系列需求。在實踐中,有百度文庫,doc88,豆丁等公網項目。可是沒有一個專門爲企業用戶服務的一個文檔管理平臺。css
1.文檔的統一存儲html
2.文檔的檢索前端
3.文檔的在線預覽java
4.文檔分享mysql
5.文檔推薦jquery
6.文檔上傳下載linux
7.用戶的註冊,登陸nginx
8.文檔權限管理git
HDFS+LibreOffice6.0+solr+nginx+flume+hive+springboot+jpa+js+html+cssweb
文檔存儲: HDFS
文件存儲:1.本地(linux)-web服務器 優缺點:內存小,可是存儲方便
2.ftp服務器(搭建一個存儲文件的集羣)優缺點:文檔存儲內存夠,可是不能容錯
3.hdfs (hadoop集羣)優缺點:可擴展、容錯、分佈式存儲
文檔格式轉換: LibreOffice6.0
由於存儲的內容不是純文本,就是傳統的io流不能用、須要變成純文本文件(txt)
doc、docx、ppt---→html---→txt
使用LibreOffice6.0,不適用word的緣由:1.沒有接口(不開源)2.不能再linux上運行
進程間通訊:hadoop ipc
全文檢索: solr
日誌記錄服務器:ngnix
web日誌採集:flume
日誌分析:hive
webMvc:springboot
持久層框架:jpa
單元測試:junit4
前端:css+html+js+jquery+bootstrap
版本管理:svn
依賴管理:maven
開發環境:idea
部署環境:linux
數據庫:mysql
1.文檔的上傳下載
a.用戶在前端點擊上傳按鈕
b.在本地選擇上傳文檔
c.開始上傳
b.服務端校驗文件後綴是否符合文檔格式。
容許格式:doc,docx,ppt,pptx,xls,xlsx,pdf,txt
目的:避免上傳不能轉碼的文檔如:exe,zip,….
e.校驗文檔大小,容許128兆如下的文檔上傳。
128M:爲了使文檔在hdfs是一個塊的形式保存。
f.計算文檔的md5值,判斷文檔是否在文庫中已經存在,若是存在,告知用戶已經存在。
g.不存在,則上傳至hdfs,同時數據庫中保存用戶上傳文檔信息。
數據保存在hdfs上,元數據保存在數據庫mysql
2.上傳成功之後須要提交文檔轉換任務(主要功能以下)
1>轉換成html
2>轉換成pdf提取縮略圖,頁數
3>提取文本 創建索引
如下代碼沒有涉及前端、只是後臺測試(使用Postman測試),部分參數都沒有從session中獲取,都是隨機生成的
沒有軟件的附上資源(下載雙擊安裝就能夠):連接:https://pan.baidu.com/s/1SibrDOB4GwkX4L0iw3nYTA
提取碼:bisn
1、功能一:上傳文件/2018.10.29
日誌配置:
在類名上添加註解
@Slf4j
直接在類中使用log記錄日誌
文件上傳:
關鍵註解:
@RequestParam("file") MultipartFile file
獲取上傳文件名:
file.getOriginalFilename()
在DocCloud中建立model---->doccloudweb(模塊名要小寫)
在module中選擇Spring Initializr
在建立module時,選擇pom中的依賴以下,選完以後一路下一步
core下選擇DevTools、LomBok
web下選擇web
sql下選擇JPA、mysql
NoSQL下選solr
<dependencies>
<!--數據相關依賴-持久層-->
<!--<dependency>-->
<!--<groupId>org.springframework.boot</groupId>-->
<!--<artifactId>spring-boot-starter-data-jpa</artifactId>-->
<!--</dependency>-->
<!--全文檢索-->
<!--<dependency>-->
<!--<groupId>org.springframework.boot</groupId>-->
<!--<artifactId>spring-boot-starter-data-solr</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--熱部署-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<!--數據庫鏈接-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--單元測試-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
</dependencies>
<!--編譯、打jar包-->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<!--上傳文件到hdfs上-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
Doc:記錄文件屬性
使用jpa---->java+persistence+api
JPA是Java Persistence API的簡稱,中文名Java持久層API,是JDK 5.0註解或XML描述對象-關係表的映射關係,並將運行期的實體對象持久化到數據庫中
#數據源配置
spring.datasource.name=root
spring.datasource.password=123
#true:表示展現sql語句
spring.jpa.show-sql=true
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/doc_cloud
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://master2:9000</value>
</property>
<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
</property>
</configuration>
import com.zhiyou100.doccloudweb.service.DocService;
import com.zhiyou100.doccloudweb.util.HdfsUtil;
import com.zhiyou100.doccloudweb.util.MD5Util;
import com.zhiyou100.doccloudweb.entity.Doc;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;
import java.util.Random;
@Controller //表示是controller層--業務層
@RequestMapping("/doc")
@Slf4j
public class DocController {
@Autowired
private DocService docService;
//定義合法的文件後綴類型
public static final String[] DOC_SUFFIXS= new String[]{"doc", "docx", "ppt", "pptx", "txt", "xls", "xlsx", "pdf"};
//定義文件最大大小
public static final int DOC_MAX_SIZE = 128*1024*1024;
//定義文件保存到hdfs上的根目錄
public static final String HOME="hdfs://192.168.228.13:9000/doccloud";
@RequestMapping("/upload")
@ResponseBody
public String upload(@RequestParam("file") MultipartFile file){
//判斷是不是文件
if (file.isEmpty()){
return "file is empty";
}
//獲取文件名
String filename = file.getOriginalFilename();
//以點分割-獲取文件後綴
String[] strings = filename.split("\\.");
if (strings.length==1){
return "file does not has suffix";
}
String suffix = strings[1];
log.info("doc suffix is {}",suffix);
//1.判斷文件後綴是否合法
boolean flag = isSuffixLegal(suffix);
if (!flag){
return "file is illegal";
}
try {
//2.判斷文件大小是否合法
byte[] bytes = file.getBytes();
log.info("file size is {}",bytes.length);
if (bytes.length>DOC_MAX_SIZE){
return "file is large,file Max size:"+DOC_MAX_SIZE;
}
//3.計算文檔的MD5值
String md5 = getMD5(bytes);
log.info("file is md5 {} ",md5);
//用戶上傳文件,保存到數據庫
//1.校驗數據庫中的md5值,判斷數據庫中是否存在
Optional<Doc> doc = docService.findByMd5(md5);
if (doc.isPresent()){
//2.若是存在,更新
// 2.1獲取文件對象
Doc docEntity = doc.get();
//2.2設置文件更新的人
docEntity.setUserId(new Random().nextInt());
//2.3保存到數據庫
docService.save(docEntity);
}else {
//3.若是不存在,將文件元數據保存到數據庫,將數據保存到hdfs
//3.1保存數據到hdfs
//3.1.1生成文件保存路徑:HOME+當前時間
String date = getDate();
String dst = HOME+"/"+date+"/"+file.getOriginalFilename()+"/";
log.info("file dst {}",dst);
//3.1.2上傳文件
HdfsUtil.upload(bytes,file.getOriginalFilename(),dst);
//3.2將元數據保存到數據庫
//3.2.1建立一個文件對象
Doc docEntity = new Doc();
//3.2.2設置做者
docEntity.setUserId(new Random().nextInt());
//3.2.3設置備註
docEntity.setDocComment("hadoop");
//3.2.4設置文件路徑
docEntity.setDocDir(dst);
//3.2.5設置文件名
docEntity.setDocName(filename);
//3.2.6設置文件大小
docEntity.setDocSize(bytes.length);
//3.2.7設置文件權限
docEntity.setDocPermission("1");
//3.2.8設置文件類型(後綴)
docEntity.setDocType(suffix);
//3.2.9設置文件狀態
docEntity.setDocStatus("upload");
//3.2.10設置文件的md5值--保證文件的惟一性
docEntity.setMd5(md5);
//3.2.11設置文件創做時間
docEntity.setDocCreateTime(new Date());
//3.2.12保存元數據
docService.save(docEntity);
}
} catch (IOException e) {
e.printStackTrace();
}
return "upload success";
}
/**
* 獲取當前是時間,用於文件的保存路徑
* @return
*/
private String getDate() {
Date date = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
return simpleDateFormat.format(date);
}
/**
* 計算字節數組的MD5值
* @param bytes
* @return
*/
private String getMD5(byte[] bytes) {
return MD5Util.getMD5String(bytes);
}
/**
* 判斷文件後綴是否合法
* @param suffix
* @return
*/
private boolean isSuffixLegal(String suffix) {
for (String docsuffix :
DOC_SUFFIXS) {
if (suffix.equals(docsuffix)){
return true;
}
}
return false;
}
}
import com.zhiyou100.doccloudweb.dao.DocRepository;
import com.zhiyou100.doccloudweb.entity.Doc;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Service
public class DocService {
@Autowired
private DocRepository docRepository;
//經過id獲取文件對象
public Optional<Doc> findById(int id) {
return docRepository.findById(id);
}
//經過MD5獲取文件對象
public Optional<Doc> findByMd5(String md5) {
return docRepository.findByMd5(md5);
}
//保存文件對象到數據庫
public void save(Doc docEntity) {
docRepository.save(docEntity);
}
}
//定義合法的文件後綴類型
public static final String[] DOC_SUFFIXS= new String[]{"doc", "docx", "ppt", "pptx", "txt", "xls", "xlsx", "pdf"};
//定義文件最大大小
public static final int DOC_MAX_SIZE = 128*1024*1024;
@RequestMapping("/doclist")
@ResponseBody
Doc doList(){
Optional<Doc> id = docService.findById(1);
return null;
}
import com.zhiyou100.doccloudweb.entity.Doc;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@Repository
//Doc:表示定義的實體類,Integer:表示主鍵類型
public interface DocRepository extends JpaRepository<Doc,Integer> {
//利用反射機制自動識別
Optional<Doc> findByMd5(String md5);
}
import lombok.Data;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.persistence.*;
import java.util.Date;
/**
* 文件屬性
*/
@Entity
@Table(name = "doc") //映射到數據庫中的表
@Data //get/set
public class Doc {
@Id //主鍵
//告訴框架id生成策略(怎麼生成)GenerationType.IDENTITY:表示自動生成
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id;
@Column(name = "md5")//若是數據庫字段與entity中字段名同樣,則不用加此註解
private String md5;
@Column(name = "doc_name")
private String docName;
@Column(name = "doc_type")
private String docType;
@Column(name = "doc_status")
private String docStatus;
@Column(name = "doc_size")
private int docSize;
@Column(name = "doc_dir")
private String docDir;
@Column(name = "user_id")
private int userId;
@Column(name = "doc_create_time")
private Date docCreateTime;
@Column(name = "doc_comment")
private String docComment;
@Column(name = "doc_permission")
private String docPermission;
}
7.工具類
HdfsUtil
import com.google.common.io.Resources; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; /* *@ClassName:HdfsUtil @Description:TODO @Author: @Date:2018/10/29 17:17 @Version:v1.0 */ public class HdfsUtil { //文檔上傳工具類 public static void upload(byte[] src, String docName, String dst) throws IOException { //加載配置文件 Configuration coreSiteConf = new Configuration(); coreSiteConf.addResource(Resources.getResource("core-site.xml")); //獲取文件系統客戶端對象 FileSystem fileSystem = FileSystem.get(coreSiteConf); FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(dst + "/" + docName)); fsDataOutputStream.write(src); fsDataOutputStream.close(); fileSystem.close(); } }
MD5Util
import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; public class MD5Util { protected static char hexDigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; protected static MessageDigest messagedigest = null; /** * MessageDigest初始化 * * @author */ static { try { messagedigest = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { System.err.println("MD5FileUtil messagedigest初始化失敗"); e.printStackTrace(); } } /** * 對文件進行MD5加密 * * @author */ public static String getFileMD5String(File file) throws IOException { FileInputStream in = new FileInputStream(file); FileChannel ch = in.getChannel(); MappedByteBuffer byteBuffer = ch.map(FileChannel.MapMode.READ_ONLY, 0, file.length()); messagedigest.update(byteBuffer); return bufferToHex(messagedigest.digest()); } /** * 對字符串進行MD5加密 * * @author */ public static String getMD5String(String s) { return getMD5String(s.getBytes()); } /** * 對byte類型的數組進行MD5加密 * * @author */ public static String getMD5String(byte[] bytes) { messagedigest.update(bytes); return bufferToHex(messagedigest.digest()); } private static String bufferToHex(byte bytes[]) { return bufferToHex(bytes, 0, bytes.length); } private static String bufferToHex(byte bytes[], int m, int n) { StringBuffer stringbuffer = new StringBuffer(2 * n); int k = m + n; for (int l = m; l < k; l++) { char c0 = hexDigits[(bytes[l] & 0xf0) >> 4]; char c1 = hexDigits[bytes[l] & 0xf]; stringbuffer.append(c0); stringbuffer.append(c1); } return stringbuffer.toString(); } }
2、功能:文檔轉換
上傳成功之後須要提交文檔轉換任務(主要功能以下)
1>轉換成html
2>轉換成pdf提取縮略圖,頁數
3>提取文本 創建索引
1.定義一個docjob對象,用於封裝任務信息
2.實現writable接口。由於要經過hadoop ipc序列化實現文檔轉換守護進程
該進程的做用是完成存放在本節點文檔的轉換,索引的任務。
1.文檔轉換成htm!經過runtime.exec執行命令來實現
2.經過hadoop ipc來接受任務
hadoop ipc
hadoop ipc是一套hadoop自帶的成熟的rpc框架,性能高,穩定性性強。
server:
a.服務端定義接口b.定義按口的實現類
c用hadoop ipc暴露服務
client;
經過rpc.geproxy來調用服務崗的接口。
1.新建模塊--docservicedeamon文件轉換守護進程
2.pom文件中的依賴
<dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> </dependency> <!--ipc通訊模塊--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.5</version> </dependency> <!--註解、--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.20</version> </dependency> <!--berklydb數據庫依賴--> <!-- https://mvnrepository.com/artifact/com.sleepycat/je --> <dependency> <groupId>com.sleepycat</groupId> <artifactId>je</artifactId> <version>5.0.73</version> </dependency> <!--hdfs文件上傳與下載--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.5</version> </dependency>
3.配置core-site.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://master2:9000</value> </property> <property> <name>fs.hdfs.impl</name> <value>org.apache.hadoop.hdfs.DistributedFileSystem</value> </property> </configuration>
4.類一---DocJob
import lombok.Data; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; /** *此方法用於封裝任務信息 */ @Data public class DocJob implements Writable,Serializable { private static final long serialVersionUID = 12345678L; //任務id private int id; //任務名 private String name; //任務類型 private DocJobType jobType; //提交者 private int userId; //提交時間 private long submitTime; //完成時間 private long finishTime; //任務狀態 private JobStatus jobStatus; //任務重試次數 private int retryTime; //文檔輸入路徑 private String input; //任務輸出路徑 private String output; //文件名 private String fileName; public void write(DataOutput out) throws IOException { out.writeInt(id); out.writeUTF(name); out.writeUTF(jobType.name()); out.writeInt(userId); out.writeLong(finishTime); out.writeLong(submitTime); out.writeUTF(jobStatus.name()); out.writeInt(retryTime); out.writeUTF(input); out.writeUTF(output); out.writeUTF(fileName); } public void readFields(DataInput in) throws IOException { id= in.readInt(); name=in.readUTF(); jobType=DocJobType.valueOf(in.readUTF()); userId=in.readInt(); finishTime=in.readLong(); submitTime=in.readLong(); jobStatus=JobStatus.valueOf(in.readUTF()); retryTime=in.readInt(); input=in.readUTF(); output=in.readUTF(); fileName=in.readUTF(); } }
5.類2、DocJobType
/** * 項目的類型:文檔轉換、定義索引... */ public enum DocJobType { DOC_JOB_CONVERT,DOC_JOB_CREATE_INDEX,DOC_JOB_UPDATE_INDEX }
6.類3、JobStatus
/** * 文檔狀態:準備、提交、運行、失敗、完成 */ public enum JobStatus { PREPARE,SUBMIT,RUNNING,FAILED,SUCCEED, }
7類4、.JobDaemonService
/** * 服務端 * 1.定義接口繼承VersionedProtocol */ public interface JobDaemonService extends VersionedProtocol { //定義通訊間的暗號 long versionID=1L; //定義提交方法 void submitDocJob(DocJob job); }
8.類5、JobDaemonServiceImpl
import com.zhiyou100.doccloud.utils.BdbPersistentQueue; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.ipc.ProtocolSignature; import java.io.File; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 2.定義接口的實現類 * 實現Runnable接口:是爲了使用多線程處理 */ @Slf4j public class JobDaemonServiceImpl implements JobDaemonService,Runnable{ //定義將hdfs下載到本地的目錄的根路徑 private static final String WORK_DIR="/tmp/docjobdaemon/"; //定義持久化對象 public BdbPersistentQueue<DocJob> queue; //定義線程池--多線程並行處理 private ExecutorService pool = Executors.newFixedThreadPool(4); //定義一個標準-讓線程運行 private boolean flag = true; //構造方法:用於建立berkly數據庫目錄,並初始化持久化隊列 public JobDaemonServiceImpl(){ //建立工做目錄--本地保存路徑 File workDir = new File(WORK_DIR + "/" + "bdb/"); if (!workDir.exists()){ //若是不存在將建立 workDir.mkdirs(); System.out.println(workDir.getAbsolutePath()); } //初始化持久化隊列 queue = new BdbPersistentQueue<DocJob>(WORK_DIR+"/"+"bdb/", "docjob", DocJob.class); } public void submitDocJob(DocJob job) { System.out.println(job); //將任務保存在序列化隊列中,1.保證任務不丟失 2.併發控制,內存溢出 log.info("receive job {}",job); queue.offer(job); } public long getProtocolVersion(String s, long l) throws IOException { return versionID; } public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException { return null; } @Override public void run() { while (flag){ //將任務從序列化隊列中取出任務,poll:每取出一個就從磁盤中移除一個 DocJob docJob = queue.poll(); //判斷docjob中否爲空 if (docJob==null){ //爲空,等待5000毫秒 try { Thread.sleep(5000); System.out.println("waiting for docjob"); } catch (InterruptedException e) { e.printStackTrace(); } }else { pool.submit(new DocJobHandler(docJob)); } } } }
9.類6、Main
import com.zhiyou100.doccloud.job.JobDaemonService; import com.zhiyou100.doccloud.job.JobDaemonServiceImpl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import java.io.IOException; /** * 守護進程--項目的入口類 * 3.服務端:暴露端口 */ public class Main { public static void main(String[] args) throws IOException { //建立服務端接口實現類對象 JobDaemonServiceImpl instance = new JobDaemonServiceImpl(); //開啓線程 new Thread(instance).start(); // 建立一個RPC builder RPC.Builder builder = new RPC.Builder(new Configuration()); //指定RPC Server的參數 builder.setBindAddress("localhost"); builder.setPort(7788); //將本身的程序部署到server上 builder.setProtocol(JobDaemonService.class); builder.setInstance(instance); //建立Server RPC.Server server = builder.build(); //啓動服務 server.start(); } }
10.類七--DocJobHandler
import com.zhiyou100.doccloud.utils.HdfsUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; import java.io.File; import java.io.IOException; import java.util.UUID; @Slf4j public class DocJobHandler implements Runnable { private DocJob docJob; public DocJobHandler(DocJob docJob) { this.docJob = docJob; log.info("start to deal job {}",docJob); } /** *將文件衝hdfs上下載到本地,再將文件格式轉化成HTML,最終上傳到hdfs上 */ @Override public void run() { //1.將hdfs上的文件下載到本地 //1.1獲取文件的下載路徑(在hdfs上的位置) String input = docJob.getInput(); //1.2建立目標路徑(下載到本地的路徑) String tmpWorkDirPath = "/tmp/docjobdaemon/" + UUID.randomUUID().toString() + "/"; File tmpWorkDir = new File(tmpWorkDirPath); tmpWorkDir.mkdirs(); System.out.println("tmpWorkDirPath: "+tmpWorkDirPath); //1.3下載文件到臨時目錄 try { HdfsUtil.copyToLocal(input,tmpWorkDirPath); log.info("download file to {}",tmpWorkDirPath); //step1:將下載到本地的文件格式轉化成HTML String command = "D:\\soft\\LibreOffice_6.0.6\\program\\soffice --headless --invisible --convert-to html " + docJob.getFileName(); Process process = Runtime.getRuntime().exec(command, null, tmpWorkDir); //結果信息 System.out.println(IOUtils.toString(process.getInputStream())); //錯誤信息 System.out.println(IOUtils.toString(process.getErrorStream())); //step2 轉換成pdf //step3 提取頁碼 //step4 提取首頁縮略圖 //step5 利用solr創建索引 //step6 上傳結果 //step7 清理臨時目錄 //step8 任務成功回調 } catch (IOException e) { e.printStackTrace(); } } }
將hdfs上的文件下載到本地--HdfsUtil
import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.AbstractQueue; import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.FileUtils; import com.sleepycat.bind.EntryBinding; import com.sleepycat.bind.serial.SerialBinding; import com.sleepycat.bind.serial.StoredClassCatalog; import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.collections.StoredMap; import com.sleepycat.collections.StoredSortedMap; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.DatabaseExistsException; import com.sleepycat.je.DatabaseNotFoundException; import com.sleepycat.je.EnvironmentConfig; /** * 持久化隊列,基於BDB實現,也繼承Queue,以及能夠序列化.但不等同於Queue的時,再也不使用後須要關閉 * 相比通常的內存Queue,插入和獲取值須要多消耗必定的時間 * 這裏爲何是繼承AbstractQueue而不是實現Queue接口,是由於只要實現offer,peek,poll幾個方法便可, * 其餘如remove,addAll,AbstractQueue會基於這幾個方法去實現 * * @contributor * @param <E> */ public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements Serializable { private static final long serialVersionUID = 3427799316155220967L; private transient BdbEnvironment dbEnv; // 數據庫環境,無需序列化 private transient Database queueDb; // 數據庫,用於保存值,使得支持隊列持久化,無需序列化 private transient StoredMap<Long,E> queueMap; // 持久化Map,Key爲指針位置,Value爲值,無需序列化 private transient String dbDir; // 數據庫所在目錄 private transient String dbName; // 數據庫名字 //AtomicLong:元子類型,線程安全 //i++線程不安全 private AtomicLong headIndex; // 頭部指針 private AtomicLong tailIndex; // 尾部指針 private transient E peekItem=null; // 當前獲取的值 /** * 構造函數,傳入BDB數據庫 * * @param db * @param valueClass * @param classCatalog */ public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){ this.queueDb=db; this.dbName=db.getDatabaseName(); headIndex=new AtomicLong(0); tailIndex=new AtomicLong(0); bindDatabase(queueDb,valueClass,classCatalog); } /** * 構造函數,傳入BDB數據庫位置和名字,本身建立數據庫 * * @param dbDir * @param dbName * @param valueClass */ public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){ //headIndex=new AtomicLong(0); //tailIndex=new AtomicLong(0); this.dbDir=dbDir; this.dbName=dbName; createAndBindDatabase(dbDir,dbName,valueClass); } /** * 綁定數據庫 * * @param db * @param valueClass * @param classCatalog */ public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){ EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass); if(valueBinding == null) { valueBinding = new SerialBinding<E>(classCatalog, valueClass); // 序列化綁定 } queueDb = db; queueMap = new StoredSortedMap<Long,E>( db, // db TupleBinding.getPrimitiveBinding(Long.class), //Key 序列化類型 valueBinding, // Value true); // allow write //todo Long firstKey = ((StoredSortedMap<Long, E>) queueMap).firstKey(); Long lastKey = ((StoredSortedMap<Long, E>) queueMap).lastKey(); headIndex=new AtomicLong(firstKey == null ? 0 : firstKey); tailIndex=new AtomicLong(lastKey==null?0:lastKey+1); } /** * 建立以及綁定數據庫 * * @param dbDir * @param dbName * @param valueClass * @throws DatabaseNotFoundException * @throws DatabaseExistsException * @throws DatabaseException * @throws IllegalArgumentException */ private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException, DatabaseExistsException,DatabaseException,IllegalArgumentException{ File envFile = null; EnvironmentConfig envConfig = null; DatabaseConfig dbConfig = null; Database db=null; try { // 數據庫位置 envFile = new File(dbDir); // 數據庫環境配置 envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); //不支持事務 envConfig.setTransactional(false); // 數據庫配置 dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); dbConfig.setTransactional(false); //是否要延遲寫 dbConfig.setDeferredWrite(true); // 建立環境 dbEnv = new BdbEnvironment(envFile, envConfig); // 打開數據庫 db = dbEnv.openDatabase(null, dbName, dbConfig); // 綁定數據庫 bindDatabase(db,valueClass,dbEnv.getClassCatalog()); } catch (DatabaseNotFoundException e) { throw e; } catch (DatabaseExistsException e) { throw e; } catch (DatabaseException e) { throw e; } catch (IllegalArgumentException e) { throw e; } } /** * 值遍歷器 */ @Override public Iterator<E> iterator() { return queueMap.values().iterator(); } /** * 大小 */ @Override public int size() { synchronized(tailIndex){ synchronized(headIndex){ return (int)(tailIndex.get()-headIndex.get()); } } } /** * 插入值 */ @Override public boolean offer(E e) { synchronized(tailIndex){ queueMap.put(tailIndex.getAndIncrement(), e);// 從尾部插入 //將數據不保存在緩衝區,直接存入磁盤 dbEnv.sync(); } return true; } /** * 獲取值,從頭部獲取 */ @Override public E peek() { synchronized(headIndex){ if(peekItem!=null){ return peekItem; } E headItem=null; while(headItem==null&&headIndex.get()<tailIndex.get()){ // 沒有超出範圍 headItem=queueMap.get(headIndex.get()); if(headItem!=null){ peekItem=headItem; continue; } headIndex.incrementAndGet(); // 頭部指針後移 } return headItem; } } /** * 移出元素,移出頭部元素 */ @Override public E poll() { synchronized(headIndex){ E headItem=peek(); if(headItem!=null){ queueMap.remove(headIndex.getAndIncrement()); //從磁盤上移除 dbEnv.sync(); peekItem=null; return headItem; } } return null; } /** * 關閉,也就是關閉所是用的BDB數據庫但不關閉數據庫環境 */ public void close(){ try { if(queueDb!=null){ queueDb.sync(); queueDb.close(); } } catch (DatabaseException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (UnsupportedOperationException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 清理,會清空數據庫,而且刪掉數據庫所在目錄,慎用.若是想保留數據,請調用close() */ @Override public void clear() { try { close(); if(dbEnv!=null&&queueDb!=null){ dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName); dbEnv.close(); } } catch (DatabaseNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (DatabaseException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ try { if(this.dbDir!=null){ FileUtils.deleteDirectory(new File(this.dbDir)); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
import java.io.File; import com.sleepycat.bind.serial.StoredClassCatalog; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; /** * BDB數據庫環境,能夠緩存StoredClassCatalog並共享 * * @contributor */ public class BdbEnvironment extends Environment { StoredClassCatalog classCatalog; Database classCatalogDB; /** * Constructor * * @param envHome 數據庫環境目錄 * @param envConfig config options 數據庫換記念館配置 * @throws DatabaseException */ public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException { super(envHome, envConfig); } /** * 返回StoredClassCatalog * @return the cached class catalog */ public StoredClassCatalog getClassCatalog() { if(classCatalog == null) { DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); try { //事務、數據庫名、配置項 classCatalogDB = openDatabase(null, "classCatalog", dbConfig); classCatalog = new StoredClassCatalog(classCatalogDB); } catch (DatabaseException e) { // TODO Auto-generated catch block throw new RuntimeException(e); } } return classCatalog; } @Override public synchronized void close() throws DatabaseException { if(classCatalogDB!=null) { classCatalogDB.close(); } super.close(); } }
客戶端--doccloudweb
1.將上面客戶端的DocJob、DocJobType、JobDeamonService、JobStatus類複製到客戶端
2.將DocController中接着添加
//上傳成功之後須要提交文檔轉換任務 //轉換成html, submitDocJob(docEntity,new Random().nextInt());
/** * 提交任務到集羣上運行--文檔轉換任務 * @param docEntity * @param userId */ private void submitDocJob(Doc docEntity, int userId) throws IOException { //建立一個文檔轉換任務對象 DocJob docJob = new DocJob(); //1.設置提交者 docJob.setUserId(userId); //2.設置任務名 docJob.setName("doc convent"); //3.任務的狀態 docJob.setJobStatus(JobStatus.SUBMIT); //4.設置任務類型 docJob.setJobType(DocJobType.DOC_JOB_CONVERT); //5.設置提交時間 docJob.setSubmitTime(System.nanoTime()); //6.設置輸入路徑 docJob.setInput(docEntity.getDocDir()+"/"+docEntity.getDocName()); //7.設置輸出路徑 docJob.setOutput(docEntity.getDocDir()); //8.設置重試次數 docJob.setRetryTime(4); //9.設置文件名 docJob.setFileName(docEntity.getDocName()); //todo 將job元數據保存到數據庫 //獲取動態代理對象 JobDaemonService jobDaemonService = RPC.getProxy(JobDaemonService.class, 1L, new InetSocketAddress("localhost", 7788), new Configuration()); //提交任務到服務器(hdfs上) log.info("submit job:{}",docJob); jobDaemonService.submitDocJob(docJob); }
將上傳到hdfs上的文件下載到本地,將下載的文件轉化爲HTML(經過runtime調用exec來執行命令)並保存到本地(客戶端提交任務到服務器)經過hadoop IPC來接受任務。將任務保存在序列化隊列中,1.保證任務不丟失 2.併發控制,內存溢出
---------------------------<待更>-------------------------