藥品監管系統架構揭祕:海量溯源數據存儲與查詢

前言

在剛剛過去的2018年,「毒疫苗」事件再次觸及了大衆的敏感神經,由於十年前的「毒奶粉」事件還歷歷在目。咱們急需建立一個全國性的藥品(食品)監控追蹤體系。與此同時,近年來隨着國家對醫藥行業的大力支持,中國的醫療事業也出現了跨越式的發展,大量的新型藥品上市,極大的豐富了患者和消費者的選擇範圍。大量的藥品在市面上流通,產生了大量的狀態數據,且這類數據在爆發式的增加。如何高效的存儲和溯源藥品狀態數據已經成爲一個行業難題。傳統方案經常採用好比MySQL數據庫分庫分表的方式,可是這個方案在開發、運維、可擴展性都有很多弊端。數據庫

業界開始愈來愈多的使用分佈式的NoSQL方案來解決大數據的問題。好比阿里健康基於表格存儲(Tablestore)推出了「碼上放心」 藥品監管碼查詢功能,解決了大衆的藥品查詢需求。這僅僅是第一步,創建一個完善全國性藥品追蹤體系是一個艱鉅而漫長的任務。借用網上的一句話,最終咱們要實現藥品的:「來源可查,去向可追,責任可究」。併發

圖1 碼上放心 溯源截圖運維

在整個藥品監管體系中,藥品自己的管理和藥品軌跡溯源是藥品監管體系的兩大核心功能,本篇文章主要是介紹使用表格存儲的Timestream模型快速高效的實現這兩類功能。async

核心需求

藥品元數據

藥品的元數據是指藥品在上市以前的在國家藥品監督管理局(CFDA)備案信息,記錄了藥品名稱、分類、成分、批次、臨牀一期、二期、N期測試數據、自研或進口等詳細信息,多達幾十個字段。分佈式

圖2 藥品元數據ide

用戶會經過頁面或者APP的方式瀏覽和查詢藥品信息,這須要應用提供多種組合的查詢方式,好比:性能

  1. 按照藥品名稱查詢:好比查詢「阿莫西林」爲關鍵字的藥品列表。
  2. 按照生產企業名稱查詢:好比以前的疫苗事件,咱們能夠查詢生產企業爲「長春長生生物科技股份有限公司」的藥品列表。
  3. 按照時間維度,查詢一個時間範圍的數據:好比查詢某個藥企在2017年~2018年生產的抗生素批次。
  4. 按照某個地域或者範圍查詢:好比患者能夠經過頁面,搜索本身附近5千米內特定感冒藥。又好比,咱們在面對天然災害時,咱們可使用Geo功能,查詢最近範圍的應急藥品,緊急調往災區。

上面只是列舉的一些典型查詢場景,藥品備案信息中擁有大量的字段,使用者會從多個查詢維度查詢數據。所以在保證性能的前提下,提供豐富的查詢功能成爲元數據管理的主要技術難點。測試

狀態數據

藥品的狀態數據是指藥品在生產、流經過程中產生的狀態數據,好比藥品的原材料流通、藥企生產藥品過程當中的狀態、運輸過程的軌跡、醫院藥店存儲和使用數據等。fetch

圖3 常見狀態數據大數據

藥品流通會產生大量的狀態數據,這些數據須要持續的記錄下來,後續才能夠作到真正的藥品溯源。咱們先來羅列一下藥品狀態數據:

  1. 藥企的狀態數據:這裏主要指藥品依賴的原材料溯源信息和生產過程的環境數據。這些數據幫助企業監控藥品生產狀態,幫助藥監局審計藥品生產過程,在溯源過程當中,結合元數據信息,可讓用戶對藥品有一個更全面的瞭解。
  2. 運輸的軌跡狀態數據:這個主要指藥品的運輸的產生的軌跡、存儲容器高溫低溫異常事件。「軌跡溯源」能夠基於這些數據實現。
  3. 藥店、醫院的庫存數據:這個主要指藥品在相關的醫藥機構流轉和庫存信息等,好比上面的「附近藥品」查詢就能夠基於這個數據實現。

從上面的數據來源可知,一盒簡單的藥品在到送到患者手上以前,會有大量的流通環節,每一個環節都會產生大量的狀態數據。同時,中國市場藥品的規模在萬億人民幣級別,而且伴隨每一年有將近一成的增加,是全球第二大醫藥市場。要知足如此巨大的規模下的狀態數據的存儲,極高的寫入吞吐、海量存儲規模、可控的存儲成本成爲必需要解決的問題。

解決方案

圖4 MySQL分庫分表 vs Tablestore

從對藥品元數據管理和狀態數據溯源的總結可知,要知足以上的功能和性能需求,單機已經沒法知足要求,須要使用分佈式的方案。通常傳統的方案會採用MySQL分庫分表的方案,可是這個方案在實際生產和運維中面臨很多問題,好比:

  1. 擴容不方便,須要作數據的從新分佈。
  2. 分佈鍵變動很麻煩,分佈鍵須要謹慎選擇。
  3. SQL限制多,功能缺失多,沒法充分發揮MySQL自身的優點。
  4. 傳統的關係模型新增字段須要極大的成本,嚴重阻礙用戶業務的擴展。
  5. 因爲單個節點是孤立的節點,須要提供主備來保障數據的可靠性。沒法像分佈式的NoSQL同樣實現自動的故障恢復,須要一個DBA來及時維護庫的狀態。
  6. 沒法提供靈活的多字段查詢,只能依賴二級索引和全表掃描Fliter實現多維查詢功能,效率相對較低。
  7. 沒法作到計算和存儲分離,用戶很難作到計算和存儲均衡匹配,致使資源浪費。
  8. 沒法原生支持Geo查詢。

總結來看,從理論上能知足以上的功能需求,可是要想真正在生產中使用和維護好這套存儲系統,只能說「想愛你並不容易」。在這種大數據的OLTP的場景下,業界通常選用分佈式的NoSQL方案。所以咱們推薦使用Tablestore一站式的解決以上問題。Tablestore是一款阿里自研的分佈式NoSQL服務,提供多元索引支持豐富的查詢需求,支撐超大規模的併發訪問和低延遲的性能,能夠很好的解決藥品元數據管理和溯源的需求。

Timestream

Timestream是表格存儲推出的最新數據模型,這個模型針對時序數據、軌跡數據、溯源數據,定義了一套簡單清晰易用的API,細節能夠參考《Tablestore Timestream:爲海量時序數據存儲設計的全新數據模型》。

在咱們列舉的藥品監管場景中,藥品的元數據能夠很是簡單的抽象爲Timestream的元數據(Meta),狀態數據抽象爲Timestream的Data數據。本文做爲一個實戰文章,所以使用Timestream模型來快速高效的實現以上兩個功能。

從上面的Timestream介紹文章可知,Timestream擁有幾個核心概念,分別是:Name, Tag, Attribute, Timestamp, Point(Fields)。咱們羅列一個表格,展現怎麼將藥品的相關數據映射到Timestream的模型中,如圖所示:

圖5 模型轉換圖

  • 分類(Name)+標識符(Tag): 這兩個字段惟一決定一個藥品數據。
  • 元數據(Attribute):  藥品的相關屬性,當藥品在登記在案時這個數據被持久化存儲。
  • 最新狀態數據(Attribute):  如標題,藥品最新的狀態,好比上面的‘地點’信息,咱們能夠建立Geo的索引,用戶地理信息的查詢。
  • 時間(Timestamp): 狀態數據的發生時間。
  • 軌跡、狀態: 具體的狀態數據,上面只是兩個示例,實際上能夠支持很是多的字段。

接下來咱們經過一個能夠運行的Demo,向你們展現怎麼使用Timestream API實現元數據管理和溯源功能。

功能實現(Java)

功能列表

寫入

  1. 藥品元數據持久化,將藥品的相關元數據信息存儲到Tablestore中。
  2. 藥品運輸軌跡持久化,主要是運輸和流轉的軌跡,藥品的實時狀態等,並將Location(位置)做爲Geo索引,方便後期的Geo查詢。

查詢

  1. 基本的藥品詳細信息查詢,主要是根據用戶輸入條件,顯示藥品的元數據。
  2. 藥品的防僞鑑定,結合生產日期,運輸軌跡、銷售狀態和查詢用戶等數據對藥品實行防僞鑑定。
  3. 查詢指定地點範圍內的特定藥品。
  4. 藥品軌跡重放

依賴

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>4.11.2</version>
</dependency>

Meta表的建立

對於一些固定且有特殊索引需求的字段,咱們在建立Meta表的時候須要單獨指定,好比「生產日期」、地理信息、狀態數據等。

考慮到後面的擴展需求,咱們增長一個擴展字段,「extension」,用於存儲未定義的元數據。

如下示例只是給了部分元數據字段,用戶能夠根據本身的需求設置更多的索引字段。

public void createMetaTable() {
    List<AttributeIndexSchema> index = new ArrayList<AttributeIndexSchema>();
    index.add(new AttributeIndexSchema("produced_date", AttributeIndexSchema.Type.LONG));
    index.add(new AttributeIndexSchema("period_of_validity", AttributeIndexSchema.Type.LONG));
    index.add(new AttributeIndexSchema("loc", AttributeIndexSchema.Type.GEO_POINT));
    index.add(new AttributeIndexSchema("links", AttributeIndexSchema.Type.KEYWORD));
    index.add(new AttributeIndexSchema("status", AttributeIndexSchema.Type.KEYWORD));
    index.add(new AttributeIndexSchema("extension", AttributeIndexSchema.Type.KEYWORD).setIsArray(true));
    db.createMetaTable(index);
}

Data表的建立

這個比較簡單,只須要設定表名便可。由於咱們是Schema Free的體系,不須要預先指定列,在寫入的時候指定便可。

public void createDataTable() {
    db.createDataTable(conf.getDataTableName());
}

錄入藥品元數據和狀態數據

元數據導入,咱們將一個本地的csv文件中的數據導入到數據庫中

public void importMeta() throws IOException {
    TimestreamMetaTable metaTable = db.metaTable();
    String [] fileHeader = {"分類", "名稱", "監管號", "受理號", "生產日期", "有效日期", "註冊分類", "申請類型", "企業名稱", "任務類型"};
    String csvFile = conf.getMetaFile();
    CSVFormat format = CSVFormat.DEFAULT.withHeader(fileHeader).withIgnoreHeaderCase().withTrim();
    Reader reader = Files.newBufferedReader(Paths.get(csvFile));
    CSVParser csvParser = new CSVParser(reader, format);
    for (CSVRecord r : csvParser.getRecords()) {
        TimestreamIdentifier identifier = new TimestreamIdentifier.Builder(r.get("分類"))
                .addTag("名稱", r.get("名稱"))
                .addTag("監管號", r.get("監管號"))
                .build();
        TimestreamMeta meta = new TimestreamMeta(identifier);

        meta.addAttribute("produced_date", r.get("生產日期"));
        meta.addAttribute("period_of_validity", r.get("有效日期"));

        List<String> extension = new ArrayList();
        extension.add("受理號=" + r.get("受理號"));
        extension.add("註冊分類=" + r.get("註冊分類"));
        extension.add("申請類型=" + r.get("申請類型"));
        extension.add("企業名稱=" + r.get("企業名稱"));
        extension.add("任務類型=" + r.get("任務類型"));
        meta.addAttribute("extension", new Gson().toJson(extension));

        metaTable.put(meta);
        System.out.println(meta.toString());
    }
}

狀態數據導入,這裏loc, links,status在Meta和Data都存儲了一次,Meta表中存儲主要是作後續的索引查詢,Data表中存儲主要是作

public void importData() throws Exception {
    TimestreamMetaTable metaTable = db.metaTable();
    TimestreamDataTable dataTable = db.dataTable(conf.getDataTableName());

    String [] fileHeader = {"分類", "名稱", "監管號", "生產日期", "位置", "環節", "狀態"};
    String csvFile = conf.getDataFile();
    CSVFormat format = CSVFormat.DEFAULT.withHeader(fileHeader).withIgnoreHeaderCase().withTrim();
    Reader reader = Files.newBufferedReader(Paths.get(csvFile));
    CSVParser csvParser = new CSVParser(reader, format);
    for (CSVRecord r : csvParser.getRecords()) {
        TimestreamIdentifier identifier = new TimestreamIdentifier.Builder(r.get("分類"))
                .addTag("名稱", r.get("名稱"))
                .addTag("監管號", r.get("監管號"))
                .build();

        TimestreamMeta meta = new TimestreamMeta(identifier);

        String loc = toLocationString(r.get("位置"));
        String links = r.get("環節");
        String status = r.get("狀態");

        meta.addAttribute("loc", loc);
        meta.addAttribute("links", links);
        meta.addAttribute("status", status);
        metaTable.update(meta);

        Point point = new Point.Builder(this.getTimestamp(r, "生產日期"), TimeUnit.MILLISECONDS)
                .addField("loc", loc)
                .addField("links", links)
                .addField("status", status)
                .build();
        dataTable.asyncWrite(identifier, point);
        System.out.println(point.toString());
    }
    dataTable.flush();
}

 

多維度查詢藥品溯源信息

1. 基本的藥品詳細信息查詢,主要是根據用戶輸入條件,顯示藥品的元數據。咱們這裏根據藥品分類、藥品名稱、生產企業來查詢藥品。

Filter filter = and(
        Name.equal("中藥"),                                    
        Tag.equal("名稱", "複方阿膠"),                      
        Attribute.in("extension", new String[]{"企業名稱=山東****也有限公司"})
);

Iterator<TimestreamMeta> iter = metaTable.filter(filter).fetchAll();
while (iter.hasNext()) {
    TimestreamMeta m = iter.next();
    System.out.println(m);
}

2. 藥品的防僞鑑定,結合生產日期,運輸軌跡、銷售狀態和查詢用戶等數據對藥品實行防僞鑑定。咱們這裏輸入名稱和藥品監管碼。

Filter filter = and(
        Name.equal("中藥"),                                 
        Tag.equal("名稱", "複方阿膠"),                         
        Tag.equal("監管號", "8160000000000019")               
);
Iterator<TimestreamMeta> iter = metaTable.filter(filter).selectAttributes("status").fetchAll();
while (iter.hasNext()) {
    TimestreamMeta m = iter.next();
    System.out.println(m.getAttributeAsString("status"));
}
// 從查詢的結果來看,藥品處於召回中,有使用風險

3. 查詢指定地點範圍內的特定藥品。好比查詢使用者5KM範圍的「阿莫西林」。

Filter filter = and(
        Name.equal("化藥"),
        Tag.prefix("名稱", "阿莫西林"),
        Attribute.inGeoDistance("loc", "31.6533906593,103.8427768645", 5 * 1000)
);
Iterator<TimestreamMeta> iter = metaTable.filter(filter).fetchAll();
while (iter.hasNext()) {
    TimestreamMeta m = iter.next();
    System.out.println(m);
}

4. 藥品軌跡重放,遍歷指定藥品的一個軌跡溯源信息。

TimestreamIdentifier identifier = new TimestreamIdentifier.Builder("化藥")
        .addTag("名稱", "阿莫西林")
        .addTag("監管號", "8150000000000000")
        .build();

Iterator<Point> iter = dataTable.get(identifier).select("loc").fetchAll();
while (iter.hasNext()) {
    Point p = iter.next();
    System.out.println(p);
}


原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索