近年來,基於hadoop的sql框架層出不窮,presto也是其中的一員.從2012年發展至今,依然保持年輕的活力(版本迭代依然很快),presto的相關介紹,咱們就不贅述了,相信看官多對presto有或多或少的瞭解,詳細的一些說明能夠看官網(https://prestodb.io)的說明.java
presto自身功能和思想富有先進性,雖然因爲是內存計算,穩定性方面還有很大提高空間,但總體依然在adhoc方面有很好的競爭力.其中在catalog加載的方式上來講比較的固化,官方並無作出動態的方案出來,致使在添加catalog後必須重啓整個集羣才能夠將新添加的catalog數據源添加到presto中,這無疑在實際的生產環境中很不友好.尤爲是在一些中臺項目中,須要動態規劃的東西很是多.這種模式的catalog添加方式顯然不能知足咱們的開發須要.mysql
所以,在環境的加持下,對presto的加載catlog的方式的源碼進行了改造,使其具備熱動態添加的功能.咱們採用了外部數據庫做爲他的catlog資源庫,對其進行熱加載sql
(1)添加restful API請求接口.數據庫
爲了使框架自己具備添加catalog的功能,須要使其自己具備Api訪問接口的方式來來對catalog的資源進行調整的功能api
1.新增CatalogResource類來實現api的請求接口
2.新增TimiCatalogStoreConfig類來實現與數據庫交互的持久層
3.新增TimiCatalogStore類來替換本來的catlog加載類
4.新增CatalogInfo類來實現對catalog Model信息的解析
#1 CatalogResource
@Path("/presto/catalog") public class CatalogResource{
@GET
@Path("test")
public Response test()
{
return Response.ok("Hello world").build();
}
}
在ServerMainModule類中setup方法,最後一行添加jaxrsBinder(binder).bind(CatalogResource.class);將添加的請求類添加進來,而後啓動主服務,並確認所開啓的presto的請求接口地址,默認端口是:8080請求http://localhost:8080/presto/catalog/test服務器
返回 "Hello world" 則表示restful API 接口添加成功.restful
#2 TimiCatalogStoreConfig 類中主要實現了讀取數據庫鏈接配置,以及具體執行的catalog執行動做,並使用jaxrsBinder(binder).bind(TimiCatalogStoreConfig.class);注入到項目啓動的容器中.並將Announcer,disabledCatalogs,ConnectorManager注入到類中.具體實現框架
public class TimiCatalogStoreConfig { private final Announcer announcer; private static final Logger log = Logger.get(TimiCatalogStoreConfig.class); private final Set<String> disabledCatalogs; private final ConnectorManager connectorManager;
public TimiCatalogStoreConfig(Announcer announcer,Set<String> disabledCatalogs,ConnectorManager connectorManager ) { this.announcer = announcer; this.disabledCatalogs = ImmutableSet.copyOf(disabledCatalogs); this.connectorManager = connectorManager; } }
而後就是實現對catlog增刪查改動做,並將操做的結構實現到ConnectorManager中,ide
首先將Server中的CatalogStore替換成咱們自定義實現的TimiCatalogStore並注入相關類oop
@Inject public TimiCatalogStore(ConnectorManager connectorManager, Announcer announcer, StaticCatalogStoreConfig config,TimiCatalogStoreConfig catalogStoreConfig) { this(connectorManager, announcer, config.getCatalogConfigurationDir(), firstNonNull(config.getDisabledCatalogs(), ImmutableList.of()), catalogStoreConfig ); }
而後實現loadCatalogs方法,首次調用的時候使用load();方法加載mysql中存儲的全部catlog,而後使用ScheduledExecutorService定時的方式從mysql中提取有變化的catlog加載到presto的ConnectorManager中.
public static void updateConnectorIdAnnouncement(Announcer announcer, CatalogName connectorId) { // // This code was copied from PrestoServer, and is a hack that should be removed when the connectorId property is removed // // get existing announcement ServiceAnnouncement announcement = getPrestoAnnouncement(announcer.getServiceAnnouncements()); // update connectorIds property Map<String, String> properties = new LinkedHashMap<>(announcement.getProperties()); String property = nullToEmpty(properties.get("connectorIds")); Set<String> connectorIds = new LinkedHashSet<>(Splitter.on(',').trimResults().omitEmptyStrings().splitToList(property)); connectorIds.add(connectorId.toString()); properties.put("connectorIds", Joiner.on(',').join(connectorIds)); // update announcement announcer.removeServiceAnnouncement(announcement.getId()); announcer.addServiceAnnouncement(serviceAnnouncement(announcement.getType()).addProperties(properties).build()); announcer.forceAnnounce(); }
在這裏咱們設定的1分鐘從mysql庫充更新一次catalog列表
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { reload(); } }, 60, 60, TimeUnit.SECONDS);
調用reload方法定時讀取
public void reload() { try{ //獲取最新的catalogs Map<String, CatalogInfo> catalogInfos = catalogStoreConfig.load(); catalogInfos.forEach( (key, catalogInfo) -> { if (!catalogInfoMap.containsKey(key)) { //相同--catlog try { System.out.println("添加數據源"+JSON.toJSONString(catalogInfo)); // log.info("添加數據源:{}",JSON.toJSONString(catalogInfos.get(key))); CatalogName catalogName = loadCatalog(catalogInfo); updateConnectorIdAnnouncement(announcer,catalogName); } catch (Exception e) { e.printStackTrace(); } } else { //不一樣catlog if (!JSON.toJSONString(catalogInfoMap.get(key)).equals(JSON.toJSONString(catalogInfo))){ connectorManager.dropConnection(catalogInfo.getCatalogName()); try { System.out.println("添加數據源"+JSON.toJSONString(catalogInfo)); CatalogName catalogName = loadCatalog(catalogInfo); updateConnectorIdAnnouncement(announcer,catalogName); } catch (Exception e) { e.printStackTrace(); } } } } ); catalogInfoMap.putAll(catalogInfos); }catch (Exception e){ e.printStackTrace(); } }
從mysql庫中取出來的catlog信息和對現有的catlog進行對比,若是是不一樣的catlog就添加到presto中,重複的catlog不添加,刪除的catlog就從現有的catlog管理器中刪除,以此來達到動態添加catlog的動做,不須要重啓presto服務器.