使用騰訊雲 SCF 雲函數壓縮 COS 對象存儲文件

歡迎你們前往騰訊雲技術社區,獲取更多騰訊海量技術實踐乾貨哦~node

做者:騰訊雲Serverless團隊git

在使用騰訊雲 COS 對象存儲的過程當中,咱們常常有想要把整個 Bucket 打包下載的需求,可是 COS 並無提供整個 Bucket 打包下載的能力。這時,咱們能夠利用騰訊雲的 SCF 無服務器雲函數,完成 COS Bucket 的打包,並從新保存壓縮後的文件到 COS 中,而後經過 COS 提供的文件訪問連接下載文件。github

可是在使用 SCF 雲函數進行 COS Bucket 打包的過程當中,偶爾會碰到這樣的問題:我指望將某個 COS Bucket 內的文件所有下載下來而後打包壓縮,把壓縮文件再上傳到 COS 中進行備份;可是在這個過程當中,COS Bucket 內的文件可能數量多體積大,而 SCF 雲函數的運行環境,實際只有 512MB 的 /tmp 目錄是能夠讀寫的。這樣算上下載的文件,和生成的 ZIP 包,可能僅支持必定體積的文件處理,知足不了所需。怎麼辦?算法

在這種狀況下,可能有的同窗會想到使用內存,將內存轉變爲文件系統,即內存文件系統,或者直接讀取文件並放置在內存中,或者在內存中生成文件。這種方法能解決一部分問題,但同時也帶來了些其餘問題:緩存

  1. SCF 雲函數的內存配置也是有上限的,目前上限是 1.5GB。
  2. SCF 雲函數的收費方式是按配置內存*運行時間。若是使用配置大內存的方法,實際是在爲可能偶爾碰到的極端狀況支付沒必要要的費用,不符合咱們使用 SCF 雲函數就是要精簡費用的目的。

咱們在這裏嘗試了一種流式文件處理的方式,經過單個文件壓縮後數據當即提交 COS 寫的方法,一次處理一個文件,使得被壓縮文件無需在 SCF 的緩存空間內堆積,壓縮文件也無需放在緩存或內存中,而是直接寫入 COS。在這裏,咱們實際利用了兩種特性:ZIP 文件的數據結構特性和 COS 的分片上傳特性。服務器

zip 文件的數據結構

在官方文檔中給出的 zip 文件格式以下:數據結構

Overall .ZIP file format:

    [local file header 1]
    [file data 1]
    [data descriptor 1]
    . 
    .
    .
    [local file header n]
    [file data n]
    [data descriptor n]
    [archive decryption header] (EFS)
    [archive extra data record] (EFS)
    [central directory]
    [zip64 end of central directory record]
    [zip64 end of central directory locator] 
    [end of central directory record]

能夠看到,實際的 zip 文件格式基本是[文件頭+文件數據+數據描述符]{此處可重複n次}+核心目錄+目錄結束標識組成的,壓縮文件的文件數據和壓縮數據是在文件頭部,相關的目錄結構,zip文件信息存儲在文件尾部。這樣的結構,爲咱們後續 COS 分片上傳寫入帶來了方便,能夠先寫入壓縮數據內容,再寫入最終文件信息。多線程

COS 分片上傳

COS 分片上傳按照以下操做便可進行:app

  1. 初始化分片上傳:經過初始化動做,獲取到這次上傳的惟一標識ID。此ID須要保存在本地並在後續上傳分片時使用。
  2. 上傳分片:經過初始化時獲取到的ID,配合文件分片的順序編號,依次上傳文件分片,獲取到每一個分片的ETag;COS 會經過 ID 和分片順序編號,拼接文件。
  3. 結束上傳:經過初始化時獲取到的ID,結合分片的順序編號和ETag,通知 COS 分片上傳已經完成,能夠進行拼接。

在上傳過程當中,還隨時能夠查詢已上傳分片,或結束取消分片上傳。less

文件壓縮處理流程設計

利用 zip 文件數據結構中文件壓縮數據在前目錄和額外標識在後的特性,和 COS 支持分片上傳的特性,咱們能夠利用流式文件處理方式來依次處理文件,而且作處處理完成一個文件壓縮就上傳處理後的壓縮數據分片。這種處理流程能夠簡化爲以下說明:

  1. 初始化 zip 文件數據結構,並將數據結構保存在內存中。
  2. 初始化 COS 分片上傳文件,保存好分片上傳 ID。
  3. 下載要放入壓縮包的文件至本地,使用 zip 算法,生成壓縮文件的數據內容並保存在內存中,並根據目錄格式,更新zip數據格式中的目錄標識。
  4. 將壓縮後的文件數據使用 COS 上傳分片,上傳至 COS 中。
  5. 清理刪除下載至本地的需壓縮文件。
  6. 根據須要,重複 3~5 步驟,增長壓縮包內的文件。
  7. 在壓縮文件處理完成後,使用分片上傳,將內存中的 zip 文件數據結構最後的目錄結構部分上傳至 COS。
  8. 通知 COS 結束上傳,完成最終 zip 文件的自動拼接。

在這個處理流程中,一次只處理一個文件,對本地緩存和內存使用都只這一個文件的佔用,相比下載所有文件再處理,大大減少了本地緩存佔用和內存佔用,這種狀況下,使用少許緩存和內存就能夠完成 COS 中大量文件的壓縮打包處理。

使用SCF進行 COS 文件壓縮處理實現

流式壓縮文件庫 archiver

咱們這裏使用 node.js 開發語言來實現 COS 文件壓縮處理。咱們這裏使用了 cos-nodejs-sdk-v5 sdk 和 archiver 模塊。其中 archiver 模塊是實現zip和tar包壓縮的流式處理庫,可以經過 append 輸入欲壓縮文件,經過 stream 輸出壓縮後的文件流。archiver的簡單用法以下:

// require modules
var fs = require('fs');
var archiver = require('archiver');

// create a file to stream archive data to.
var output = fs.createWriteStream(__dirname + '/example.zip');
var archive = archiver('zip', {
    zlib: { level: 9 } // Sets the compression level.
});

// pipe archive data to the file
archive.pipe(output);

// append a file from stream
var file1 = __dirname + '/file1.txt';
archive.append(fs.createReadStream(file1), { name: 'file1.txt' });

// append a file
archive.file('file1.txt', { name: 'file4.txt' });

// finalize the archive (ie we are done appending files but streams have to finish yet)
// 'close', 'end' or 'finish' may be fired right after calling this method so register to them beforehand
archive.finalize();

archiver 將會在每次 append 文件的時候,將文件的壓縮數據輸出到 pipe 指定的輸出流上。所以,咱們在這裏能夠經過實現咱們自身的 WriteStream,獲取到 archiver 的寫請求,並把寫入內容轉移到 COS 模塊的分片上傳能力上。在這裏,咱們實現的 WriteStream 爲:

var Writable = require('stream').Writable;
var util = require('util');

module.exports = TempWriteStream;

let handlerBuffer;

function TempWriteStream(options) {
  if (!(this instanceof TempWriteStream))
    return new TempWriteStream(options);
  if (!options) options = {};
  options.objectMode = true;
  handlerBuffer = options.handlerBuffer;
  Writable.call(this, options);
}

util.inherits(TempWriteStream, Writable);

TempWriteStream.prototype._write = function write(doc, encoding, next) {
  handlerBuffer(doc);
  process.nextTick(next)
};

經過集成 nodejs 中的 Writable stream,咱們能夠將寫操做轉移到咱們所需的 handle 上去,handle 能夠對接 COS 的分片上傳功能。

COS 分片上傳

COS 分片上傳功能的實現以下,咱們將其封裝爲 Upload 模塊:

const cos = require('./cos')

let Duplex = require('stream').Duplex;
function bufferToStream(buffer) {
    let stream = new Duplex();
    stream.push(buffer);
    stream.push(null);
    return stream;
}

// 大於4M上傳
const sliceSize = 4 * 1024 * 1024

function Upload(cosParams) {
    this.cosParams = cosParams;
    this.partNumber = 1;
    this.uploadedSize = 0;
    this.Parts = []
    this.tempSize = 0;
    this.tempBuffer = new Buffer('')
}

Upload.prototype.init = function (next) {
    const _this = this;
    cos.multipartInit(this.cosParams, function (err, data) {
        _this.UploadId = data.UploadId
        next()
    });
}
Upload.prototype.upload = function(partNumber, buffer) {
    const _this = this;
    const params = Object.assign({
        Body: bufferToStream(buffer),
        PartNumber: partNumber,
        UploadId: this.UploadId,
        ContentLength: buffer.length
    }, this.cosParams);
    cos.multipartUpload(params, function (err, data) {
        if (err) {
            console.log(err)
        } else {
            _this.afterUpload(data.ETag, buffer, partNumber)
        }
    });
}


Upload.prototype.sendData = function (buffer) {
    this.tempSize += buffer.length;
    if (this.tempSize >= sliceSize) {
        this.upload(this.partNumber, Buffer.concat([this.tempBuffer, buffer]))
        this.partNumber++;
        this.tempSize = 0;
        this.tempBuffer = new Buffer('')
    } else {
        this.tempBuffer = Buffer.concat([this.tempBuffer, buffer]);
    }
}

Upload.prototype.afterUpload = function (etag, buffer, partNumber) {
    this.uploadedSize += buffer.length
    this.Parts.push({ ETag: etag, PartNumber: partNumber })
    if (this.uploadedSize == this.total) {
        this.complete();
    }
}

Upload.prototype.complete = function () {
    this.Parts.sort((part1, part2) => {
        return part1.PartNumber - part2.PartNumber
    });
    const params = Object.assign({
        UploadId: this.UploadId,
        Parts: this.Parts,
    }, this.cosParams);
    cos.multipartComplete(params, function (err, data) {
        if (err) {
            console.log(err)
        } else {
            console.log('Success!')
        }
    });
}

Upload.prototype.sendDataFinish = function (total) {
    this.total = total;
    this.upload(this.partNumber, this.tempBuffer);
}

module.exports = Upload;

對於 COS 自己已經提供的 SDK,咱們在其基礎上封裝了相關查詢,分片上傳初始化,分片上傳等功能以下:

const COS = require('cos-nodejs-sdk-v5');

const cos = new COS({
    AppId: '125xxxx227',
    SecretId: 'AKIDutrojxxxxxxx5898Lmciu',
    SecretKey: '96VJ5tnlxxxxxxxl5To6Md2',
});
const getObject = (event, callback) => {
    const Bucket = event.Bucket;
    const Key = event.Key;
    const Region = event.Region
    const params = {
        Region,
        Bucket,
        Key
    };
    cos.getObject(params, function (err, data) {
        if (err) {
            const message = `Error getting object ${Key} from bucket ${Bucket}.`;
            callback(message);
        } else {
            callback(null, data);
        }
    });
};

const multipartUpload = (config, callback) => {
    cos.multipartUpload(config, function (err, data) {
        if (err) {
            console.log(err);
        }
        callback && callback(err, data);
    });
};

const multipartInit = (config, callback) => {
    cos.multipartInit(config, function (err, data) {
        if (err) {
            console.log(err);
        }
        callback && callback(err, data);
    });
};

const multipartComplete = (config, callback) => {
    cos.multipartComplete(config, function (err, data) {
        if (err) {
            console.log(err);
        }
        callback && callback(err, data);
    });
};

const getBucket = (config, callback) => {
    cos.getBucket(config, function (err, data) {
        if (err) {
            console.log(err);
        }
        callback && callback(err, data);
    });
};


module.exports = {
    getObject,
    multipartUpload,
    multipartInit,
    multipartComplete,
    getBucket
};

在具體使用時,須要將文件中 COS 相關登陸信息的APPId,SecretId,SecretKey等替換爲自身可用的真實內容。

功能入口實現函數

咱們在最終入口函數 index.js 中使用各個組件來完成最終的目錄檢索,文件壓縮打包上傳。在這裏,咱們利用函數入參來肯定要訪問的 bucket 名稱和所屬地域,指望壓縮的文件夾和最終壓縮後文件名。雲函數入口函數仍然爲 main_handler。

// require modules
const fs = require('fs');
const archiver = require('archiver');

const cos = require('./cos');

const Upload = require('./Upload')

const TempWriteStream = require('./TempWriteStream')

const start = new Date();

const getDirFileList = (region, bucket, dir, next) => {
    const cosParams = {
        Bucket: bucket,
        Region: region,
    }
    const params = Object.assign({ Prefix: dir }, cosParams);

    cos.getBucket(params, function (err, data) {
        if (err) {
            console.log(err)
        } else {
            let fileList = [];
            data.Contents.forEach(function (item) {
                if (!item.Key.endsWith('/')) {
                    fileList.push(item.Key)
                }
            });
            next && next(fileList)
        }
    })
}

const handler = (region, bucket, source, target) => {

    const cosParams = {
        Bucket: bucket,
        Region: region,
    }
    const multipartUpload = new Upload(Object.assign({ Key: target}, cosParams));

    const output = TempWriteStream({ handlerBuffer: multipartUpload.sendData.bind(multipartUpload) })

    var archive = archiver('zip', {
        zlib: { level: 9 } // Sets the compression level.
    });
    output.on('finish', function () {
        multipartUpload.sendDataFinish(archive.pointer());
    });

    output.on('error', function (error) {
        console.log(error);
    });

    archive.on('error', function (err) {
        console.log(err)
    });

    archive.pipe(output);

    multipartUpload.init(function () {
        getDirFileList(region, bucket, source, function(fileList) {
            let count = 0;
            const total = fileList.length;
            for (let fileName of fileList) {
                ((fileName) => {
                    let getParams = Object.assign({ Key: fileName }, cosParams)
                    cos.getObject(getParams, (err, data) => {
                        if (err) {
                            console.log(err)
                            return
                        }
                        var buffer = data.Body;
                        console.log("download file "+fileName);
                        archive.append(buffer, { name: fileName.split('/').pop() });
                        console.log("zip file "+fileName);
                        count++;
                        if (count == total) {
                            console.log("finish zip "+count+" files")
                            archive.finalize();
                        }
                    })
                })(fileName)
            }
        })
    })
}

exports.main_handler = (event, context, callback) => {
    var region = event["region"];
    var bucket = event["bucket"];
    var source = event["source"];
    var zipfile = event["zipfile"];
    //handler('ap-guangzhou', 'testzip', 'pic/', 'pic.zip');
    handler(region, bucket, source, zipfile)
}

測試及輸出

最終咱們將如上的代碼文件及相關依賴庫打包爲zip代碼包,建立函數並上傳代碼包。同時咱們準備好一個 COS Bucket命名爲 testzip, 在其中建立 pic 文件夾,並在文件夾中傳入若干文件。經過函數頁面的測試功能,咱們使用以下模版測試函數:

{
"region":"ap-guangzhou",
"bucket":"testzip",
"source":"pic/",
"zipfile":"pic.zip"
}

函數輸出日誌爲:

...
2017-10-13T12:18:18.579Z 9643c683-b010-11e7-a4ea-5254001df6c6 download file pic/DSC_3739.JPG
2017-10-13T12:18:18.579Z 9643c683-b010-11e7-a4ea-5254001df6c6 zip file pic/DSC_3739.JPG
2017-10-13T12:18:18.689Z 9643c683-b010-11e7-a4ea-5254001df6c6 download file pic/DSC_3775.JPG
2017-10-13T12:18:18.690Z 9643c683-b010-11e7-a4ea-5254001df6c6 zip file pic/DSC_3775.JPG
2017-10-13T12:18:18.739Z 9643c683-b010-11e7-a4ea-5254001df6c6 download file pic/DSC_3813.JPG
2017-10-13T12:18:18.739Z 9643c683-b010-11e7-a4ea-5254001df6c6 finish zip 93 files
2017-10-13T12:18:56.887Z 9643c683-b010-11e7-a4ea-5254001df6c6 Success!

能夠看到函數執行成功,並從 COS Bucket 根目錄看到新增長的 pic.zip 文件。

項目源代碼及改進方向

目前項目全部源代碼已經放置在 Github 上,路徑爲 https://github.com/qcloud-scf/demo-scf-compress-cos。能夠經過下載或 git clone 項目,獲取到項目源代碼,根據自身賬號信息,修改 cos 文件內的賬號 APPId、SecretId、SecretKey這些認證信息,而後將根目錄下全部文件打包至 zip 壓縮包後,經過 SCF 建立函數並經過 zip 文件上傳代碼來完成函數建立,根據上面所屬的「測試及輸出」步驟來測試函數的可用性。

函數在此提供的仍然只是個demo代碼,更多的是爲你們帶來一種新的思路及使用騰訊雲 SCF 無服務器雲函數和 COS 對象存儲。基於此思路,Demo自己後續還有不少能夠改進的方法,或根據業務進行變化的思路:

  1. 文件的處理目前仍是下載一個處理一個,其實咱們可使用多線程和隊列來加速處理過程,使用若干線程持續下載文件,使用隊列對已經下載完成待處理的文件進行排隊,而後使用一個壓縮線程從隊列中讀取已下載的文件後進行壓縮上傳處理。這種方式能夠進一步加快大量文件的處理速度,只是須要當心處理好緩存空間被使用佔滿後的等待和文件處理完成後的刪除釋放空間。
  2. 目前 Demo 從入參接受的是單個地域、Bucket、目錄和輸出文件,咱們徹底能夠改造爲從多個地域或Bucket拉取文件,也能夠傳遞指定的文件列表而不是僅一個目錄,同時函數執行觸發可使用 COS 觸發或 CMQ 消息隊列觸發,可以造成更加通用的壓縮處理函數。

後續對於此 Demo 若是有更多疑問,想法,或改進需求,歡迎你們提交 git pr 或 issue。項目地址:https://github.com/qcloud-scf/demo-scf-compress-cos

相關閱讀

使用騰訊雲 CDN 、COS 以及萬象優圖實現HTTP/2樣例
如何利用雲對象存儲 COS 免費託管靜態網站
Serverless 初探

此文已由做者受權騰訊雲技術社區發佈,轉載請註明文章出處
原文連接:https://cloud.tencent.com/community/article/810260

相關文章
相關標籤/搜索