數據治理的王者Apache Atlas如何構建本身的API

Apache Atlas是一個優秀的服務治理組件,用於企業Hadoop集羣上的數據治理和元數據管理的數據治理工具。接下來咱們將討論構建本身的Java API,這些Java API可以使用Apache atlas客戶端與Apache Atlas交互以在其中建立新的實體和類型。數據庫

1、Atlas客戶端Maven依賴關係

如下依賴項可用於pom.xml文件apache

<dependency>
      <groupId>org.apache.atlas</groupId>
      <artifactId>atlas-client</artifactId>
      <version>0.7-incubating</version>
  </dependency>
  <dependency>
      <groupId>org.apache.atlas</groupId>
      <artifactId>atlas-typesystem</artifactId>
      <version>0.7-incubating</version>
  </dependency>
  <dependency>
      <groupId>org.apache.atlas</groupId>
      <artifactId>atlas-notification</artifactId>
      <version>0.7-incubating</version>
  </dependency>
  <dependency>
      <groupId>org.apache.atlas</groupId>
      <artifactId>atlas-repository</artifactId>
      <version>0.7-incubating</version>
  </dependency>

2、設置atlas-application.properties

Apache Atlas客戶端使用atlas-application屬性在咱們的API和Apache Atlas服務器之間創建鏈接。這些屬性應放置在resources/atlas-application.properties中bootstrap

#########  Security Properties  #########

# SSL config
atlas.enableTLS=false

#########  Server Properties  #########
atlas.rest.address=http://192.168.5.95:21000

atlas.hook.demo.kafka.retries=1
atlas.kafka.zookeeper.connect=192.168.5.93:2181,192.168.5.94:2181,192.168.5.95:2181
atlas.kafka.bootstrap.servers=192.168.5.93:9092,192.168.5.94:9092,192.168.5.95:9092
atlas.kafka.zookeeper.session.timeout.ms=4000
atlas.kafka.zookeeper.connection.timeout.ms=2000
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
atlas.kafka.hook.group.id=atlas

3、建立與Atlas服務器的鏈接

要與Apache atlas Server,baseUrl和用戶名建立鏈接,必須在AtlasClient構造函數中傳遞密碼服務器

final AtlasClient atlasClient = new AtlasClient
            (new String[]{"http://192.168.5.95:21000"},
                    new String[]{"admin",
                            "admin"});

4、關於Type相關的測試類

public class AtlasTypesTest {

    final AtlasClient atlasClient = new AtlasClient
            (new String[]{"http://192.168.5.95:21000"},
                    new String[]{"admin",
                            "admin"});

    static final String DATABASE_TYPE = "DB_Sync";
    static final String COLUMN_TYPE = "Column_Sync";
    static final String TABLE_TYPE = "Table_Sync";
    static final String VIEW_TYPE = "View_Sync";
    public static final String DB_ATTRIBUTE = "db";
    static final String STORAGE_DESC_TYPE = "StorageDesc";
    public static final String COLUMNS_ATTRIBUTE = "columns";
    public static final String INPUT_TABLES_ATTRIBUTE = "inputTables";
    private static final String[] TYPES =
            {DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, VIEW_TYPE, "JdbcAccess",
                    "ETL", "Metric", "PII", "Fact", "Dimension", "Log Data"};

    /**
     * 組織定義types
     * @return
     */
    TypesDef createTypeDefinitions() {
        HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
                .createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE, null,
                        TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
                        attrDef("description", DataTypes.STRING_TYPE.getName()), attrDef("locationUri", DataTypes.STRING_TYPE.getName()),
                        attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()));

        HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
                .createClassTypeDef(COLUMN_TYPE, COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE.getName()),
                        attrDef("dataType", DataTypes.STRING_TYPE.getName()), attrDef("comment", DataTypes.STRING_TYPE.getName()));

        HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
                .createClassTypeDef(TABLE_TYPE, TABLE_TYPE, ImmutableSet.of("DataSet"),
                        new AttributeDefinition(DB_ATTRIBUTE, DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
                        new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),
                        attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()),
                        attrDef("lastAccessTime", DataTypes.LONG_TYPE.getName()), attrDef("retention", DataTypes.LONG_TYPE.getName()),
                        attrDef("viewOriginalText", DataTypes.STRING_TYPE.getName()),
                        attrDef("viewExpandedText", DataTypes.STRING_TYPE.getName()), attrDef("tableType", DataTypes.STRING_TYPE.getName()),
                        attrDef("temporary", DataTypes.BOOLEAN_TYPE.getName()),
                        new AttributeDefinition(COLUMNS_ATTRIBUTE, DataTypes.arrayTypeName(COLUMN_TYPE),
                                Multiplicity.COLLECTION, true, null));

        HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
                .createClassTypeDef(VIEW_TYPE, VIEW_TYPE, ImmutableSet.of("DataSet"),
                        new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
                        new AttributeDefinition("inputTables", DataTypes.arrayTypeName(TABLE_TYPE),
                                Multiplicity.COLLECTION, false, null));

        return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
                ImmutableList.of(),
                ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, viewClsDef));
    }

    private void createTypes() throws Exception {
        TypesDef typesDef = createTypeDefinitions();
        String typesAsJSON =  TypesSerialization.toJson(typesDef);
        System.out.println("typesAsJSON = " + typesAsJSON);
        atlasClient.createType(typesAsJSON);
        verifyTypesCreated();
    }

    private void verifyTypesCreated() throws Exception {
        List<String> types = atlasClient.listTypes();
        for (String type : TYPES) {
            assert types.contains(type);
        }
    }

    AttributeDefinition attrDef(String name, String dT) {
        return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
    }

    AttributeDefinition attrDef(String name, String dT, Multiplicity m, boolean isComposite,
                                String reverseAttributeName) {
        return new AttributeDefinition(name, dT, m, isComposite, reverseAttributeName);
    }

    @Test
    public void createNewTypes() throws Exception {
        createTypes();
    }
}

5、關於Entities相關的測試類

public class AtlasEntitiesTest {


    final AtlasClient atlasClient = new AtlasClient
            (new String[]{"http://192.168.5.95:21000"},
                    new String[]{"admin",
                            "admin"});

    /**
     * 建立實例並返建立的Id對象
     * @param referenceable
     * @return
     * @throws Exception
     */
    private Id createInstance(Referenceable referenceable) throws Exception {
        String typeName = referenceable.getTypeName();
        String entityJSON = InstanceSerialization.toJson(referenceable, true);
        System.out.println("Submitting new entity= " + entityJSON);
        List<String> guids = atlasClient.createEntity(entityJSON);
        System.out.println("created instance for type " + typeName + ", guid: " + guids);
        return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(),
                referenceable.getTypeName());
    }

    /**
     * 建立數據庫實例並返建立的數據庫Id對象
     * @param name
     * @param description
     * @param owner
     * @param locationUri
     * @param traitNames
     * @return
     * @throws Exception
     */
    Id database(String name, String description, String owner, String locationUri, String... traitNames)
            throws Exception {
        Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
        referenceable.set("name", name);
        referenceable.set("description", description);
        referenceable.set("owner", owner);
        referenceable.set("locationUri", locationUri);
        referenceable.set("createTime", System.currentTimeMillis());

        return createInstance(referenceable);
    }

    /**
     * 建立列的實例並返建立的列的實例對象
     * @param name
     * @param dataType
     * @param comment
     * @param traitNames
     * @return
     * @throws Exception
     */
    Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
        Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
        referenceable.set("name", name);
        referenceable.set("dataType", dataType);
        referenceable.set("comment", comment);

        return referenceable;
    }

    /**
     * 建立表的實例並返建立的表的Id對象
     * @param name
     * @param description
     * @param dbId
     * @param sd
     * @param owner
     * @param tableType
     * @param columns
     * @param traitNames
     * @return
     * @throws Exception
     */
    Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
             List<Referenceable> columns, String... traitNames) throws Exception {
        Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames);
        referenceable.set("name", name);
        referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
        referenceable.set("description", description);
        referenceable.set("owner", owner);
        referenceable.set("tableType", tableType);
        referenceable.set("createTime", System.currentTimeMillis());
        referenceable.set("lastAccessTime", System.currentTimeMillis());
        referenceable.set("retention", System.currentTimeMillis());
        referenceable.set("db", dbId);
        referenceable.set("sd", sd);
        referenceable.set("columns", columns);

        return createInstance(referenceable);
    }

    /**
     * 建立視圖的實例並返建立的視圖的Id對象
     * @param name
     * @param dbId
     * @param inputTables
     * @param traitNames
     * @return
     * @throws Exception
     */
    Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
        Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
        referenceable.set("name", name);
        referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
        referenceable.set("db", dbId);

        referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables);

        return createInstance(referenceable);
    }

    /**
     * 原始存儲描述符
     * @param location
     * @param inputFormat
     * @param outputFormat
     * @param compressed
     * @return
     * @throws Exception
     */
    Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed)
            throws Exception {
        Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
        referenceable.set("location", location);
        referenceable.set("inputFormat", inputFormat);
        referenceable.set("outputFormat", outputFormat);
        referenceable.set("compressed", compressed);

        return referenceable;
    }


    @Test
    public void createEntities() throws Exception {
        //建立數據庫實例
        Id syncDB = database("sy_sync", "Sync Database", "root", "");
        //存儲描述符
        Referenceable sd =
                storageDescriptor("", "TextInputFormat", "TextOutputFormat",
                        true);
        //建立列實例
        //一、數據源
        List<Referenceable> databaseColumns = ImmutableList
                .of(column("id", "long", "id"),
                        column("name", "string", "name"),
                        column("type", "string", "type"),
                        column("url", "string", "url"),
                        column("database_name", "string", "database name"),
                        column("username", "string", "username"),
                        column("password","string","password"),
                        column("description", "string", "description"),
                        column("create_time", "string", "create time"),
                        column("update_time", "string", "update time"),
                        column("create_id", "long", "user id"),
                        column("update_id", "long", "user id"));
        //二、同步文件夾
        List<Referenceable> syncFolderColumns = ImmutableList
                .of(column("id", "long", "id"),
                        column("name", "string", "name"),
                        column("description", "string", "description"),
                        column("create_time", "string", "create time"),
                        column("update_time", "string", "update time"),
                        column("create_id", "long", "user id"),
                        column("update_id", "long", "user id"));
        //建立表實例
        Id database = table("datasource", "database table", syncDB, sd, "root", "External", databaseColumns);
        Id syncFolder = table("folder", "sync folder table", syncDB, sd, "root", "External", syncFolderColumns);
        //建立視圖實例

    }


    @Test
    public void getEntity() throws AtlasServiceException {
        Referenceable referenceable = atlasClient.getEntity("1406ddd0-5d51-41d4-b174-859bd4f34a5b");
        System.out.println(InstanceSerialization.toJson(referenceable, true));
    }

}
相關文章
相關標籤/搜索