Apache Atlas是一個優秀的服務治理組件,用於企業Hadoop集羣上的數據治理和元數據管理的數據治理工具。接下來咱們將討論構建本身的Java API,這些Java API可以使用Apache atlas客戶端與Apache Atlas交互以在其中建立新的實體和類型。數據庫
1、Atlas客戶端Maven依賴關係
如下依賴項可用於pom.xml文件apache
<dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-client</artifactId> <version>0.7-incubating</version> </dependency> <dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-typesystem</artifactId> <version>0.7-incubating</version> </dependency> <dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-notification</artifactId> <version>0.7-incubating</version> </dependency> <dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-repository</artifactId> <version>0.7-incubating</version> </dependency>
2、設置atlas-application.properties
Apache Atlas客戶端使用atlas-application屬性在咱們的API和Apache Atlas服務器之間創建鏈接。這些屬性應放置在resources/atlas-application.properties中bootstrap
######### Security Properties ######### # SSL config atlas.enableTLS=false ######### Server Properties ######### atlas.rest.address=http://192.168.5.95:21000 atlas.hook.demo.kafka.retries=1 atlas.kafka.zookeeper.connect=192.168.5.93:2181,192.168.5.94:2181,192.168.5.95:2181 atlas.kafka.bootstrap.servers=192.168.5.93:9092,192.168.5.94:9092,192.168.5.95:9092 atlas.kafka.zookeeper.session.timeout.ms=4000 atlas.kafka.zookeeper.connection.timeout.ms=2000 atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.auto.commit.interval.ms=1000 atlas.kafka.hook.group.id=atlas
3、建立與Atlas服務器的鏈接
要與Apache atlas Server,baseUrl和用戶名建立鏈接,必須在AtlasClient構造函數中傳遞密碼服務器
final AtlasClient atlasClient = new AtlasClient (new String[]{"http://192.168.5.95:21000"}, new String[]{"admin", "admin"});
4、關於Type相關的測試類
public class AtlasTypesTest { final AtlasClient atlasClient = new AtlasClient (new String[]{"http://192.168.5.95:21000"}, new String[]{"admin", "admin"}); static final String DATABASE_TYPE = "DB_Sync"; static final String COLUMN_TYPE = "Column_Sync"; static final String TABLE_TYPE = "Table_Sync"; static final String VIEW_TYPE = "View_Sync"; public static final String DB_ATTRIBUTE = "db"; static final String STORAGE_DESC_TYPE = "StorageDesc"; public static final String COLUMNS_ATTRIBUTE = "columns"; public static final String INPUT_TABLES_ATTRIBUTE = "inputTables"; private static final String[] TYPES = {DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, VIEW_TYPE, "JdbcAccess", "ETL", "Metric", "PII", "Fact", "Dimension", "Log Data"}; /** * 組織定義types * @return */ TypesDef createTypeDefinitions() { HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil .createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE, null, TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE), attrDef("description", DataTypes.STRING_TYPE.getName()), attrDef("locationUri", DataTypes.STRING_TYPE.getName()), attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName())); HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil .createClassTypeDef(COLUMN_TYPE, COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE.getName()), attrDef("dataType", DataTypes.STRING_TYPE.getName()), attrDef("comment", DataTypes.STRING_TYPE.getName())); HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil .createClassTypeDef(TABLE_TYPE, TABLE_TYPE, ImmutableSet.of("DataSet"), new AttributeDefinition(DB_ATTRIBUTE, DATABASE_TYPE, Multiplicity.REQUIRED, false, null), new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null), attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()), attrDef("lastAccessTime", DataTypes.LONG_TYPE.getName()), attrDef("retention", DataTypes.LONG_TYPE.getName()), attrDef("viewOriginalText", DataTypes.STRING_TYPE.getName()), attrDef("viewExpandedText", DataTypes.STRING_TYPE.getName()), attrDef("tableType", DataTypes.STRING_TYPE.getName()), attrDef("temporary", DataTypes.BOOLEAN_TYPE.getName()), new AttributeDefinition(COLUMNS_ATTRIBUTE, DataTypes.arrayTypeName(COLUMN_TYPE), Multiplicity.COLLECTION, true, null)); HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil .createClassTypeDef(VIEW_TYPE, VIEW_TYPE, ImmutableSet.of("DataSet"), new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null), new AttributeDefinition("inputTables", DataTypes.arrayTypeName(TABLE_TYPE), Multiplicity.COLLECTION, false, null)); return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(), ImmutableList.of(), ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, viewClsDef)); } private void createTypes() throws Exception { TypesDef typesDef = createTypeDefinitions(); String typesAsJSON = TypesSerialization.toJson(typesDef); System.out.println("typesAsJSON = " + typesAsJSON); atlasClient.createType(typesAsJSON); verifyTypesCreated(); } private void verifyTypesCreated() throws Exception { List<String> types = atlasClient.listTypes(); for (String type : TYPES) { assert types.contains(type); } } AttributeDefinition attrDef(String name, String dT) { return attrDef(name, dT, Multiplicity.OPTIONAL, false, null); } AttributeDefinition attrDef(String name, String dT, Multiplicity m, boolean isComposite, String reverseAttributeName) { return new AttributeDefinition(name, dT, m, isComposite, reverseAttributeName); } @Test public void createNewTypes() throws Exception { createTypes(); } }
5、關於Entities相關的測試類
public class AtlasEntitiesTest { final AtlasClient atlasClient = new AtlasClient (new String[]{"http://192.168.5.95:21000"}, new String[]{"admin", "admin"}); /** * 建立實例並返建立的Id對象 * @param referenceable * @return * @throws Exception */ private Id createInstance(Referenceable referenceable) throws Exception { String typeName = referenceable.getTypeName(); String entityJSON = InstanceSerialization.toJson(referenceable, true); System.out.println("Submitting new entity= " + entityJSON); List<String> guids = atlasClient.createEntity(entityJSON); System.out.println("created instance for type " + typeName + ", guid: " + guids); return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(), referenceable.getTypeName()); } /** * 建立數據庫實例並返建立的數據庫Id對象 * @param name * @param description * @param owner * @param locationUri * @param traitNames * @return * @throws Exception */ Id database(String name, String description, String owner, String locationUri, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames); referenceable.set("name", name); referenceable.set("description", description); referenceable.set("owner", owner); referenceable.set("locationUri", locationUri); referenceable.set("createTime", System.currentTimeMillis()); return createInstance(referenceable); } /** * 建立列的實例並返建立的列的實例對象 * @param name * @param dataType * @param comment * @param traitNames * @return * @throws Exception */ Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames); referenceable.set("name", name); referenceable.set("dataType", dataType); referenceable.set("comment", comment); return referenceable; } /** * 建立表的實例並返建立的表的Id對象 * @param name * @param description * @param dbId * @param sd * @param owner * @param tableType * @param columns * @param traitNames * @return * @throws Exception */ Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType, List<Referenceable> columns, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames); referenceable.set("name", name); referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set("description", description); referenceable.set("owner", owner); referenceable.set("tableType", tableType); referenceable.set("createTime", System.currentTimeMillis()); referenceable.set("lastAccessTime", System.currentTimeMillis()); referenceable.set("retention", System.currentTimeMillis()); referenceable.set("db", dbId); referenceable.set("sd", sd); referenceable.set("columns", columns); return createInstance(referenceable); } /** * 建立視圖的實例並返建立的視圖的Id對象 * @param name * @param dbId * @param inputTables * @param traitNames * @return * @throws Exception */ Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames); referenceable.set("name", name); referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set("db", dbId); referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables); return createInstance(referenceable); } /** * 原始存儲描述符 * @param location * @param inputFormat * @param outputFormat * @param compressed * @return * @throws Exception */ Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed) throws Exception { Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE); referenceable.set("location", location); referenceable.set("inputFormat", inputFormat); referenceable.set("outputFormat", outputFormat); referenceable.set("compressed", compressed); return referenceable; } @Test public void createEntities() throws Exception { //建立數據庫實例 Id syncDB = database("sy_sync", "Sync Database", "root", ""); //存儲描述符 Referenceable sd = storageDescriptor("", "TextInputFormat", "TextOutputFormat", true); //建立列實例 //一、數據源 List<Referenceable> databaseColumns = ImmutableList .of(column("id", "long", "id"), column("name", "string", "name"), column("type", "string", "type"), column("url", "string", "url"), column("database_name", "string", "database name"), column("username", "string", "username"), column("password","string","password"), column("description", "string", "description"), column("create_time", "string", "create time"), column("update_time", "string", "update time"), column("create_id", "long", "user id"), column("update_id", "long", "user id")); //二、同步文件夾 List<Referenceable> syncFolderColumns = ImmutableList .of(column("id", "long", "id"), column("name", "string", "name"), column("description", "string", "description"), column("create_time", "string", "create time"), column("update_time", "string", "update time"), column("create_id", "long", "user id"), column("update_id", "long", "user id")); //建立表實例 Id database = table("datasource", "database table", syncDB, sd, "root", "External", databaseColumns); Id syncFolder = table("folder", "sync folder table", syncDB, sd, "root", "External", syncFolderColumns); //建立視圖實例 } @Test public void getEntity() throws AtlasServiceException { Referenceable referenceable = atlasClient.getEntity("1406ddd0-5d51-41d4-b174-859bd4f34a5b"); System.out.println(InstanceSerialization.toJson(referenceable, true)); } }