這一節裏主要講解osmp的服務發現與路由。osmp經過osmp-http組件對外發布了一個cxf的restful服務,全部的請求都經過這個restful這個接口接受請求並解析請求後再調用osgi的服務完成請求後返回到前端。前端
request->osmp-http的restful接口->解析請求->osgi服務發現->服務路由->調用服務->返回-->組裝返回參數->返回java
osmp經過osmp-service組件來對服務進行統一管理,主要功能包括服務的監聽、bundle的監聽、服務容器管理、服務註冊到zookeeper等功能。node
在osgi裏能夠經過ServiceTracker 服務跟蹤器來跟蹤某一類接口服務的新增、修改、刪除等操做,經過BundlerContext.addBundleListener()註冊 bundle監聽器(BundleListener)來監聽bundle各生命週期的事件,好比bundle的安裝、卸載、更新、啓動、中止等事件。spring
osmp爲了便於對服務的統一管理,要求全部的服務都必須實現 osmp-intf-define裏的定義的BaseDataService接口。這樣咱們就能夠經過服務跟蹤器跟蹤到BaseDataService接口的服務新增、修改、刪除等事件!數據庫
具體代碼能夠參照osmp-service裏 ServiceWatcher.java類restful
/* * Project: OSMP * FileName: ServiceWatcher.java * version: V1.0 */ package com.osmp.service.watch; import java.util.Date; import java.util.UUID; import org.osgi.framework.BundleContext; import org.osgi.framework.BundleEvent; import org.osgi.framework.BundleListener; import org.osgi.framework.ServiceReference; import org.osgi.util.tracker.ServiceTracker; import org.osgi.util.tracker.ServiceTrackerCustomizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.osgi.context.BundleContextAware; import org.springframework.util.Assert; import com.osmp.intf.define.config.FrameConst; import com.osmp.intf.define.interceptor.ServiceInterceptor; import com.osmp.intf.define.service.BaseDataService; import com.osmp.intf.define.service.ZookeeperService; import com.osmp.service.bean.DataServiceInfo; import com.osmp.service.bean.InterceptorInfo; import com.osmp.service.factory.ServiceFactoryImpl; import com.osmp.service.manager.ServiceStateManager; import com.osmp.service.registration.ServiceContainer; import com.osmp.service.util.ServiceUtil; import com.osmp.utils.net.RequestInfoHelper; /** * * Description:服務註冊、註銷、監聽 * @author: wangkaiping * @date: 2016年8月9日 上午10:27:15上午10:51:30 */ public class ServiceWatcher implements BundleContextAware, InitializingBean, DisposableBean { private Logger logger = LoggerFactory.getLogger(ServiceWatcher.class); private ServiceTracker dataServiceTracker; private ServiceTracker serviceInterceptorTracker; private ServiceStateManager serviceStateManager; private ServiceFactoryImpl serviceFactoryImpl; private BundleContext context; private ZookeeperService zookeeper; private final static String NODE_CHANGE = "/osmp/nodechange"; @Override public void setBundleContext(BundleContext context) { this.context = context; } public void setServiceFactoryImpl(ServiceFactoryImpl serviceFactoryImpl) { this.serviceFactoryImpl = serviceFactoryImpl; } public void setServiceStateManager(ServiceStateManager serviceStateManager) { this.serviceStateManager = serviceStateManager; } public void setZookeeper(ZookeeperService zookeeper) { this.zookeeper = zookeeper; } @Override public void destroy() throws Exception { if (dataServiceTracker != null) { dataServiceTracker.close(); } if (serviceInterceptorTracker != null) { serviceInterceptorTracker.close(); } logger.info("服務監聽結束"); } @Override public void afterPropertiesSet() throws Exception { Assert.notNull(context); Assert.notNull(serviceStateManager); Assert.notNull(serviceFactoryImpl); dataServiceTracker = new ServiceTracker(context, BaseDataService.class.getName(), new DataServiceTrackerCustomizer()); serviceInterceptorTracker = new ServiceTracker(context, ServiceInterceptor.class.getName(), new ServiceInterceptorTrackerCustomizer()); dataServiceTracker.open(true); serviceInterceptorTracker.open(true); context.addBundleListener(new BundleListener() { @Override public void bundleChanged(BundleEvent event) { if (event.getType() == BundleEvent.UNINSTALLED) { String name = event.getBundle().getSymbolicName(); try { logger.info("uninstall bundle " + name); zookeeper.deleteNodeByBundle(name); ServiceWatcher.this.nodeUpdate(); } catch (Exception e) { logger.error( "zookeeper delete service by bundle, bundle name : " + name, e); } } } }); logger.info("服務監聽啓動"); } // dataService監聽 private class DataServiceTrackerCustomizer implements ServiceTrackerCustomizer { @Override public Object addingService(ServiceReference reference) { BaseDataService bsService = (BaseDataService) context.getService(reference); String bundleName = reference.getBundle().getSymbolicName(); String bundleVersion = reference.getBundle().getVersion().toString(); Object name = reference.getProperty(FrameConst.SERVICE_NAME); if (name == null || "".equals(name)) { logger.error("組件" + bundleName + "(" + bundleVersion + ") dataService服務name未設置"); return bsService; } Object mark = reference.getProperty(FrameConst.SERVICE_MARK); ServiceContainer.getInstance().putDataService(bundleName, bundleVersion, name.toString(), bsService); DataServiceInfo info = new DataServiceInfo(); info.setBundle(bundleName); info.setVersion(bundleVersion); info.setName(name.toString()); info.setState(1); info.setUpdateTime(new Date()); info.setMark(mark == null ? "" : mark.toString()); serviceStateManager.updateDataService(info); String path = ZookeeperService.ROOT_PATH + ZookeeperService.SERVICE + "/" + RequestInfoHelper.getLocalIp() + "/"; logger.debug("register service path: " + path + " bundle : " + bundleName + " to zookeeper "); ServiceWatcher.this.registerService(path, info); return bsService; } @Override public void modifiedService(ServiceReference reference, Object service) { } @Override public void removedService(ServiceReference reference, Object service) { String bundleName = reference.getBundle().getSymbolicName(); String bundleVersion = reference.getBundle().getVersion() .toString(); Object name = reference.getProperty(FrameConst.SERVICE_NAME); if (name == null || "".equals(name)) { logger.error("組件" + bundleName + "(" + bundleVersion + ") dataService服務name未設置"); return; } ServiceContainer.getInstance().removeDataService(bundleName, bundleVersion, name.toString()); Object mark = reference.getProperty(FrameConst.SERVICE_MARK); DataServiceInfo info = new DataServiceInfo(); info.setBundle(bundleName); info.setVersion(bundleVersion); info.setName(name.toString()); info.setState(0); info.setMark(mark == null ? "" : mark.toString()); info.setUpdateTime(new Date()); serviceStateManager.updateDataService(info); System.out.println("===============remove service bundleName:" + bundleName + " name: " + name.toString() + " mark: " + mark.toString()); String path = ZookeeperService.ROOT_PATH + ZookeeperService.SERVICE + "/" + RequestInfoHelper.getLocalIp() + "/"; ServiceWatcher.this.unRegisterService(path, info); } } // ServiceInterceptor監聽 private class ServiceInterceptorTrackerCustomizer implements ServiceTrackerCustomizer { @Override public Object addingService(ServiceReference reference) { ServiceInterceptor sicpt = (ServiceInterceptor) context.getService(reference); String bundleName = reference.getBundle().getSymbolicName(); String bundleVersion = reference.getBundle().getVersion().toString(); Object name = reference.getProperty(FrameConst.SERVICE_NAME); if (name == null || "".equals(name)) { logger.error("組件" + bundleName + "(" + bundleVersion + ") serviceInterceptor服務name未設置"); return sicpt; } Object mark = reference.getProperty(FrameConst.SERVICE_MARK); ServiceContainer.getInstance().putInterceptor( ServiceUtil.generateServiceName(bundleName, bundleVersion, name.toString()), sicpt); InterceptorInfo info = new InterceptorInfo(); info.setBundle(bundleName); info.setVersion(bundleVersion); info.setName(name.toString()); info.setState(1); info.setUpdateTime(new Date()); info.setMark(mark == null ? "" : mark.toString()); serviceStateManager.updateServiceInterceptor(info); return sicpt; } @Override public void modifiedService(ServiceReference reference, Object service) { } @Override public void removedService(ServiceReference reference, Object service) { String bundleName = reference.getBundle().getSymbolicName(); String bundleVersion = reference.getBundle().getVersion().toString(); Object name = reference.getProperty(FrameConst.SERVICE_NAME); if (name == null || "".equals(name)) { logger.error("組件" + bundleName + "(" + bundleVersion + ") serviceInterceptor服務name未設置"); return; } Object mark = reference.getProperty(FrameConst.SERVICE_MARK); ServiceContainer.getInstance().removeInterceptor( ServiceUtil.generateServiceName(bundleName, bundleVersion, name.toString())); InterceptorInfo info = new InterceptorInfo(); info.setBundle(bundleName); info.setVersion(bundleVersion); info.setName(name.toString()); info.setState(0); info.setMark(mark == null ? "" : mark.toString()); info.setUpdateTime(new Date()); serviceStateManager.updateServiceInterceptor(info); } } /** * 向zookeeper註冊服務 * * @param path * @param ds */ public void registerService(String path, DataServiceInfo ds) { String bundle = ds.getBundle(); String cname = ds.getMark(); String name = ds.getName(); String version = ds.getVersion(); String status = String.valueOf(ds.getState()); try { if (!zookeeper.exists(path + name)) { zookeeper.createNode(path + name); } if (!zookeeper.exists(path + name + "/bundle")) { zookeeper.createNode(path + name + "/bundle", bundle); } else { zookeeper.setNodeData(path + name + "/bundle", bundle); } if (!zookeeper.exists(path + name + "/cname")) { zookeeper.createNode(path + name + "/cname", cname); } else { zookeeper.setNodeData(path + name + "/cname", cname); } if (!zookeeper.exists(path + name + "/version")) { zookeeper.createNode(path + name + "/version", version); } else { zookeeper.setNodeData(path + name + "/version", version); } if (!zookeeper.exists(path + name + "/status")) { zookeeper.createNode(path + name + "/status", status); } else { zookeeper.setNodeData(path + name + "/status", status); } this.nodeUpdate(); } catch (Exception e) { e.printStackTrace(); logger.error("zookeeper register service fail, service name : " + name, e); } } /** * 卸載服務 * * @param path * @param ds */ public void unRegisterService(String path, DataServiceInfo ds) { String name = ds.getName(); try { if (zookeeper.exists(path + name + "/status")) { zookeeper.setNodeData(path + name + "/status", String.valueOf(ds.getState())); this.nodeUpdate(); } } catch (Exception e) { e.printStackTrace(); logger.error("zookeeper unRegister service fail, service name : " + name, e); } } /** * 更新節點狀態變化 */ public void nodeUpdate(){ String data = UUID.randomUUID().toString(); try { if(zookeeper.exists(NODE_CHANGE)){ zookeeper.setNodeData(NODE_CHANGE, data); }else{ zookeeper.createNode(NODE_CHANGE); zookeeper.setNodeData(NODE_CHANGE, data); } } catch (Exception e) { logger.error("更新節點變化狀態錯誤", e); } } }
- ServiceWatcher 實現 BundleContextAware接口,將BundleContext 設置進來。
- 在ServiceWatcher初始化的時候 實例化了兩個服務跟蹤器 (ServiceTracker)分別用來跟蹤 BaseDataService、ServiceInterceptor 兩類接口的服務。
- 在ServiceWatcher初始化的時候 給BundleContext新增Bunlde監聽用來監聽bundle的事件
- ServiceTracker須要傳遞一個ServiceTrackerCustomizer實例來具體執行監聽的事件,在這裏咱們經過DataServiceTrackerCustomizer來具體執行BaseDataService接口服務跟蹤操做。
- 當BaseDataService 服務被髮布到osgi容器裏的時候,會自動調用DataServiceTrackerCustomizer.addingService(ServiceReference reference) 方法,經過BundleContext.getService(ServiceReference reference),咱們能夠獲取到當前被髮布到osgi容器裏的服務。
- 經過獲取bundleName、bundleVersion、之前發佈服務時定義的tag標籤(name)組成惟一的key(bundleName+bundleVersion+name) 以獲取到service爲value。將此保存到服務容器(Map)裏。
注:osmp-service裏將監聽到的服務同時保存到數據庫裏和註冊到zookeeper請暫時忽略,稍後osmp註冊中會做詳細講解。dom
服務發現和路由:ide
osmp-http接收到請求後解析服務名稱,經過服務查詢此服務是否綁定了攔截器,若是綁定了攔截器,則先執行攔截器鏈,攔截器若是執行失敗則直接返回,若是攔截器執行成功則經過服務名稱獲取服務,獲取服務成功後直接執行服務的execute方法,將並結果返回!this
這裏講的簡單點兒,具體osmp封裝了一層代理實現,有興趣的能夠直接查看osmp-http源碼。spa
至此osmp的服務發現和服務調用功能就講到這裏!