使用singer tap-postgres 同步數據到pg

singer 是一個很不錯的開源etl 解決方案,如下演示一個簡單的數據從pg 同步到pghtml

很簡單就是使用tap-postgres + target-postgrespython

環境準備

對於測試的環境的數據庫使用docker-compose 運行git

  • docker-compose 文件
version: "3"
services:
  tap:
    image: postgres:9.6.11
    ports:
    - "5433:5432"
    environment:
    - "POSTGRES_PASSWORD:dalong"
  target:
    image: postgres:9.6.11
    ports:
    - "5432:5432"
    environment:
    - "POSTGRES_PASSWORD:dalong"
 
  • tap 以及target 環境的配置

    singer 推薦的環境配置使用python venv 虛擬環境github

tap 配置sql

mkdir tap-pg
cd tap-pg
python3 -m venv venv
source venv/bin/activate
pip install tap-postgre

target 配置docker


mkdir target-pg
cdtarget-pg
python3 -m venv venv
source venv/bin/activate
pip installtarget-postgres
  • 項目結構
-rw-r--r-- 1 dalong staff 251B 6 5 14:20 docker-compose.yaml
 -rw-r--r-- 1 dalong staff 145B 6 5 14:27 tap-pg.json
 -rw-r--r-- 1 dalong staff 143B 6 5 14:27 target-pg.json
  • 啓動pg 數據庫以及初始化測試數據
docker-compose up -d
 

導入測試數據: 注意鏈接 localhost 5433 端口pg 服務數據庫

CREATE TABLE userapps (
    id SERIAL PRIMARY KEY,
    username text,
    userappname text
);
INSERT INTO "public"."userapps"("id","username","userappname")
VALUES
(1,E'dalong',E'app'),
(2,E'first',E'login');

使用tap 以及target

  • 配置數據庫鏈接
    tap: tap-pg.json
 
{
    "host": "localhost",
    "port": 5433,
    "dbname": "postgres",
    "user": "postgres",
    "password": "dalong",
    "schema": "public"
}

target: target 數據庫配置json

{
    "host": "localhost",
    "port": 5432,
    "dbname": "postgres",
    "user": "postgres",
    "password": "dalong",
    "schema": "copy"
}
 
 
  • tap 模式發現
    運行方式
 
./tap-pg/venv/bin/tap-postgres -c ta-pg.json -d > catalog.json
  • 選擇須要同步的表以及同步方式
    如下爲一個簡單的demo,實際能夠本身根據狀況調整
 
{
  "streams": [
    {
      "table_name": "userapps",
      "stream": "userapps",
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "table-key-properties": [
              "id"
            ],
+ "selected": true,
+ "replication-method": "FULL_TABLE",
            "schema-name": "public",
            "database-name": "postgres",
            "row-count": 0,
            "is-view": false
          }
        },
        {
          "breadcrumb": [
            "properties",
            "id"
          ],
          "metadata": {
            "sql-datatype": "integer",
            "inclusion": "automatic",
            "selected-by-default": true
          }
        },
        {
          "breadcrumb": [
            "properties",
            "username"
          ],
          "metadata": {
            "sql-datatype": "text",
            "inclusion": "available",
            "selected-by-default": true
          }
        },
        {
          "breadcrumb": [
            "properties",
            "userappname"
          ],
          "metadata": {
            "sql-datatype": "text",
            "inclusion": "available",
            "selected-by-default": true
          }
        }
      ],
      "tap_stream_id": "postgres-public-userapps",
      "schema": {
        "type": "object",
        "properties": {
          "id": {
            "type": [
              "integer"
            ],
            "minimum": -2147483648,
            "maximum": 2147483647
          },
          "username": {
            "type": [
              "null",
              "string"
            ]
          },
          "userappname": {
            "type": [
              "null",
              "string"
            ]
          }
        },
        "definitions": {
          "sdc_recursive_integer_array": {
            "type": [
              "null",
              "integer",
              "array"
            ],
            "items": {
              "$ref": "#/definitions/sdc_recursive_integer_array"
            }
          },
          "sdc_recursive_number_array": {
            "type": [
              "null",
              "number",
              "array"
            ],
            "items": {
              "$ref": "#/definitions/sdc_recursive_number_array"
            }
          },
          "sdc_recursive_string_array": {
            "type": [
              "null",
              "string",
              "array"
            ],
            "items": {
              "$ref": "#/definitions/sdc_recursive_string_array"
            }
          },
          "sdc_recursive_boolean_array": {
            "type": [
              "null",
              "boolean",
              "array"
            ],
            "items": {
              "$ref": "#/definitions/sdc_recursive_boolean_array"
            }
          },
          "sdc_recursive_timestamp_array": {
            "type": [
              "null",
              "string",
              "array"
            ],
            "format": "date-time",
            "items": {
              "$ref": "#/definitions/sdc_recursive_timestamp_array"
            }
          },
          "sdc_recursive_object_array": {
            "type": [
              "null",
              "object",
              "array"
            ],
            "items": {
              "$ref": "#/definitions/sdc_recursive_object_array"
            }
          }
        }
      }
    }
  ]
}
 
 
  • 執行同步
 ./tap-pg/venv/bin/tap-postgres -c tap-pg.json --catalog catalog.json | ./target-pg/venv/bin/target-postgres -c target-pg.json
 

效果app

/Users/dalong/mylearning/singer-project/target-pg/venv/lib/python3.7/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel
 package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For det
ails see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
  """)
INFO Selected streams: ['postgres-public-userapps'] 
INFO No currently_syncing found
INFO Beginning sync of stream(postgres-public-userapps) with sync method(full)
INFO Stream postgres-public-userapps is using full_table replication
INFO Current Server Encoding: UTF8
INFO Current Client Encoding: UTF8
INFO hstore is UNavailable
INFO Beginning new Full Table replication 1559717835286
INFO select SELECT "id" , "userappname" , "username" , xmin::text::bigint
                                      FROM "public"."userapps"
                                     ORDER BY xmin::text ASC with itersize 20000
INFO METRIC: {"type": "counter", "metric": "record_count", "value": 2, "tags": {}}
INFO Table 'userapps' does not exist. Creating... CREATE TABLE copy.userapps ("id" bigint, "userappname" character varying, "username" character varying, PRIMARY KEY ("id"))
INFO Loading 2 rows into 'userapps'
INFO COPY userapps_temp ("id", "userappname", "username") FROM STDIN WITH (FORMAT CSV, ESCAPE '\')
INFO UPDATE 0
INFO INSERT 0 2
{"bookmarks": {"postgres-public-userapps": {"last_replication_method": "FULL_TABLE", "version": 1559717835286, "xmin": null}}, "currently_syncing": null}
 
  • 界面效果

 

說明

以上只是一個簡單的演示,實際上咱們可選的工具不少,好比dbt,pgloader,數據導出導入,其餘相似etl 工具,或者使用pg 的fdw,dblink。。。工具

參考資料

https://github.com/rongfengliang/singer-pg2pg
https://github.com/singer-io/tap-postgres
https://www.getdbt.com/
https://github.com/dimitri/pgloader
https://www.postgresql.org/docs/10/contrib-dblink-function.html

相關文章
相關標籤/搜索