presto 轉換靜態catlog爲動態catlog

  近年來,基於hadoop的sql框架層出不窮,presto也是其中的一員.從2012年發展至今,依然保持年輕的活力(版本迭代依然很快),presto的相關介紹,咱們就不贅述了,相信看官多對presto有或多或少的瞭解,詳細的一些說明能夠看官網(https://prestodb.io)的說明.java

  presto自身功能和思想富有先進性,雖然因爲是內存計算,穩定性方面還有很大提高空間,但總體依然在adhoc方面有很好的競爭力.其中在catalog加載的方式上來講比較的固化,官方並無作出動態的方案出來,致使在添加catalog後必須重啓整個集羣才能夠將新添加的catalog數據源添加到presto中,這無疑在實際的生產環境中很不友好.尤爲是在一些中臺項目中,須要動態規劃的東西很是多.這種模式的catalog添加方式顯然不能知足咱們的開發須要.mysql

  所以,在環境的加持下,對presto的加載catlog的方式的源碼進行了改造,使其具備熱動態添加的功能.咱們採用了外部數據庫做爲他的catlog資源庫,對其進行熱加載sql

 (1)添加restful API請求接口.數據庫

  爲了使框架自己具備添加catalog的功能,須要使其自己具備Api訪問接口的方式來來對catalog的資源進行調整的功能api

1.新增CatalogResource類來實現api的請求接口
 2.新增TimiCatalogStoreConfig類來實現與數據庫交互的持久層
  3.新增TimiCatalogStore類來替換本來的catlog加載類
   4.新增CatalogInfo類來實現對catalog Model信息的解析

#1 CatalogResource
@Path("/presto/catalog")
public class CatalogResource{

 
@GET
@Path("test")
public Response test()
{
return Response.ok("Hello world").build();
}
}

  在ServerMainModule類中setup方法,最後一行添加jaxrsBinder(binder).bind(CatalogResource.class);將添加的請求類添加進來,而後啓動主服務,並確認所開啓的presto的請求接口地址,默認端口是:8080請求http://localhost:8080/presto/catalog/test服務器

 返回 "Hello world" 則表示restful API 接口添加成功.restful

#2 TimiCatalogStoreConfig 類中主要實現了讀取數據庫鏈接配置,以及具體執行的catalog執行動做,並使用jaxrsBinder(binder).bind(TimiCatalogStoreConfig.class);注入到項目啓動的容器中.並將Announcer,disabledCatalogs,ConnectorManager注入到類中.具體實現框架

public class TimiCatalogStoreConfig {

    private final Announcer announcer;
    private static final Logger log = Logger.get(TimiCatalogStoreConfig.class);
    private final Set<String> disabledCatalogs;
    private final ConnectorManager connectorManager;
public TimiCatalogStoreConfig(Announcer announcer,Set<String> disabledCatalogs,ConnectorManager connectorManager ) { this.announcer = announcer; this.disabledCatalogs = ImmutableSet.copyOf(disabledCatalogs); this.connectorManager = connectorManager; } }

  而後就是實現對catlog增刪查改動做,並將操做的結構實現到ConnectorManager中,ide

首先將Server中的CatalogStore替換成咱們自定義實現的TimiCatalogStore並注入相關類oop

 @Inject
    public TimiCatalogStore(ConnectorManager connectorManager, Announcer announcer, StaticCatalogStoreConfig config,TimiCatalogStoreConfig catalogStoreConfig) {
        this(connectorManager,
                announcer,
                config.getCatalogConfigurationDir(),
                firstNonNull(config.getDisabledCatalogs(), ImmutableList.of()),
                catalogStoreConfig
        );
    }

  而後實現loadCatalogs方法,首次調用的時候使用load();方法加載mysql中存儲的全部catlog,而後使用ScheduledExecutorService定時的方式從mysql中提取有變化的catlog加載到presto的ConnectorManager中.

public static void updateConnectorIdAnnouncement(Announcer announcer, CatalogName connectorId)
    {
        //
        // This code was copied from PrestoServer, and is a hack that should be removed when the connectorId property is removed
        //
        // get existing announcement
        ServiceAnnouncement announcement = getPrestoAnnouncement(announcer.getServiceAnnouncements());
        // update connectorIds property
        Map<String, String> properties = new LinkedHashMap<>(announcement.getProperties());
        String property = nullToEmpty(properties.get("connectorIds"));
        Set<String> connectorIds = new LinkedHashSet<>(Splitter.on(',').trimResults().omitEmptyStrings().splitToList(property));
        connectorIds.add(connectorId.toString());
        properties.put("connectorIds", Joiner.on(',').join(connectorIds));
        // update announcement
        announcer.removeServiceAnnouncement(announcement.getId());
        announcer.addServiceAnnouncement(serviceAnnouncement(announcement.getType()).addProperties(properties).build());
        announcer.forceAnnounce();
    }

  在這裏咱們設定的1分鐘從mysql庫充更新一次catalog列表

        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                reload();
            }
        }, 60, 60, TimeUnit.SECONDS);

  調用reload方法定時讀取

    public void reload() {

        try{
            //獲取最新的catalogs
            Map<String, CatalogInfo> catalogInfos = catalogStoreConfig.load();

            catalogInfos.forEach(
                    (key, catalogInfo) -> {
                        if (!catalogInfoMap.containsKey(key)) {
                            //相同--catlog
                            try {
                                System.out.println("添加數據源"+JSON.toJSONString(catalogInfo));
//                                log.info("添加數據源:{}",JSON.toJSONString(catalogInfos.get(key)));
                                CatalogName catalogName = loadCatalog(catalogInfo);
                                updateConnectorIdAnnouncement(announcer,catalogName);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }

                        } else {
                            //不一樣catlog
                            if (!JSON.toJSONString(catalogInfoMap.get(key)).equals(JSON.toJSONString(catalogInfo))){
                                connectorManager.dropConnection(catalogInfo.getCatalogName());
                                try {
                                    System.out.println("添加數據源"+JSON.toJSONString(catalogInfo));
                                    CatalogName catalogName  = loadCatalog(catalogInfo);
                                    updateConnectorIdAnnouncement(announcer,catalogName);
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }

                        }
                    }
            );
            catalogInfoMap.putAll(catalogInfos);

        }catch (Exception e){
            e.printStackTrace();
        }
    }

  從mysql庫中取出來的catlog信息和對現有的catlog進行對比,若是是不一樣的catlog就添加到presto中,重複的catlog不添加,刪除的catlog就從現有的catlog管理器中刪除,以此來達到動態添加catlog的動做,不須要重啓presto服務器.

相關文章
相關標籤/搜索