osgi實戰項目(osmp)一步一步玩轉osgi之服務發現與服務路由(5)

這一節裏主要講解osmp的服務發現與路由。osmp經過osmp-http組件對外發布了一個cxf的restful服務,全部的請求都經過這個restful這個接口接受請求並解析請求後再調用osgi的服務完成請求後返回到前端。前端

 

request->osmp-http的restful接口->解析請求->osgi服務發現->服務路由->調用服務->返回-->組裝返回參數->返回java

 

osmp經過osmp-service組件來對服務進行統一管理,主要功能包括服務的監聽、bundle的監聽、服務容器管理、服務註冊到zookeeper等功能。node

 

在osgi裏能夠經過ServiceTracker 服務跟蹤器來跟蹤某一類接口服務的新增、修改、刪除等操做,經過BundlerContext.addBundleListener()註冊 bundle監聽器(BundleListener)來監聽bundle各生命週期的事件,好比bundle的安裝、卸載、更新、啓動、中止等事件。spring

 

osmp爲了便於對服務的統一管理,要求全部的服務都必須實現 osmp-intf-define裏的定義的BaseDataService接口。這樣咱們就能夠經過服務跟蹤器跟蹤到BaseDataService接口的服務新增、修改、刪除等事件!數據庫

具體代碼能夠參照osmp-service裏 ServiceWatcher.java類restful

 

/*   
 * Project: OSMP
 * FileName: ServiceWatcher.java
 * version: V1.0
 */
package com.osmp.service.watch;

import java.util.Date;
import java.util.UUID;

import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
import org.osgi.framework.BundleListener;
import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.osgi.context.BundleContextAware;
import org.springframework.util.Assert;

import com.osmp.intf.define.config.FrameConst;
import com.osmp.intf.define.interceptor.ServiceInterceptor;
import com.osmp.intf.define.service.BaseDataService;
import com.osmp.intf.define.service.ZookeeperService;
import com.osmp.service.bean.DataServiceInfo;
import com.osmp.service.bean.InterceptorInfo;
import com.osmp.service.factory.ServiceFactoryImpl;
import com.osmp.service.manager.ServiceStateManager;
import com.osmp.service.registration.ServiceContainer;
import com.osmp.service.util.ServiceUtil;
import com.osmp.utils.net.RequestInfoHelper;

/**
 * 
 * Description:服務註冊、註銷、監聽
 * @author: wangkaiping
 * @date: 2016年8月9日 上午10:27:15上午10:51:30
 */
public class ServiceWatcher implements BundleContextAware, InitializingBean, DisposableBean {
	private Logger logger = LoggerFactory.getLogger(ServiceWatcher.class);

	private ServiceTracker dataServiceTracker;
	private ServiceTracker serviceInterceptorTracker;

	private ServiceStateManager serviceStateManager;
	private ServiceFactoryImpl serviceFactoryImpl;
	private BundleContext context;
	private ZookeeperService zookeeper;
	private final static String NODE_CHANGE = "/osmp/nodechange";

	@Override
	public void setBundleContext(BundleContext context) {
		this.context = context;
	}

	public void setServiceFactoryImpl(ServiceFactoryImpl serviceFactoryImpl) {
		this.serviceFactoryImpl = serviceFactoryImpl;
	}

	public void setServiceStateManager(ServiceStateManager serviceStateManager) {
		this.serviceStateManager = serviceStateManager;
	}

	public void setZookeeper(ZookeeperService zookeeper) {
		this.zookeeper = zookeeper;
	}

	@Override
	public void destroy() throws Exception {
		if (dataServiceTracker != null) {
			dataServiceTracker.close();
		}
		if (serviceInterceptorTracker != null) {
			serviceInterceptorTracker.close();
		}

		logger.info("服務監聽結束");
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		Assert.notNull(context);
		Assert.notNull(serviceStateManager);
		Assert.notNull(serviceFactoryImpl);
		dataServiceTracker = new ServiceTracker(context, BaseDataService.class.getName(),
				new DataServiceTrackerCustomizer());
		serviceInterceptorTracker = new ServiceTracker(context, ServiceInterceptor.class.getName(),
				new ServiceInterceptorTrackerCustomizer());

		dataServiceTracker.open(true);
		serviceInterceptorTracker.open(true);

		context.addBundleListener(new BundleListener() {

			@Override
			public void bundleChanged(BundleEvent event) {
				if (event.getType() == BundleEvent.UNINSTALLED) {
					String name = event.getBundle().getSymbolicName();
					try {
						logger.info("uninstall bundle " + name);
						zookeeper.deleteNodeByBundle(name);
						ServiceWatcher.this.nodeUpdate();
					} catch (Exception e) {
						logger.error(
								"zookeeper delete service by bundle, bundle name : "
										+ name, e);
					}
				}
			}
		});

		logger.info("服務監聽啓動");
	}

	// dataService監聽
	private class DataServiceTrackerCustomizer implements ServiceTrackerCustomizer {
		@Override
		public Object addingService(ServiceReference reference) {
			BaseDataService bsService = (BaseDataService) context.getService(reference);
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion().toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("組件" + bundleName + "(" + bundleVersion
						+ ") dataService服務name未設置");
				return bsService;
			}
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			ServiceContainer.getInstance().putDataService(bundleName, bundleVersion, name.toString(), bsService);
			DataServiceInfo info = new DataServiceInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(1);
			info.setUpdateTime(new Date());
			info.setMark(mark == null ? "" : mark.toString());
			serviceStateManager.updateDataService(info);
			String path = ZookeeperService.ROOT_PATH
					+ ZookeeperService.SERVICE + "/"
					+ RequestInfoHelper.getLocalIp() + "/";
			logger.debug("register service path: " + path + " bundle : " + bundleName + " to zookeeper ");
			ServiceWatcher.this.registerService(path, info);
			return bsService;
		}

		@Override
		public void modifiedService(ServiceReference reference, Object service) {
		}

		@Override
		public void removedService(ServiceReference reference, Object service) {
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion()
					.toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("組件" + bundleName + "(" + bundleVersion
						+ ") dataService服務name未設置");
				return;
			}
			ServiceContainer.getInstance().removeDataService(bundleName, bundleVersion, name.toString());
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			DataServiceInfo info = new DataServiceInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(0);
			info.setMark(mark == null ? "" : mark.toString());
			info.setUpdateTime(new Date());
			serviceStateManager.updateDataService(info);
			System.out.println("===============remove service bundleName:"
					+ bundleName + " name: " + name.toString() + " mark: "
					+ mark.toString());
			String path = ZookeeperService.ROOT_PATH
					+ ZookeeperService.SERVICE + "/"
					+ RequestInfoHelper.getLocalIp() + "/";
			ServiceWatcher.this.unRegisterService(path, info);
		}

	}

	// ServiceInterceptor監聽
	private class ServiceInterceptorTrackerCustomizer implements ServiceTrackerCustomizer {
		@Override
		public Object addingService(ServiceReference reference) {
			ServiceInterceptor sicpt = (ServiceInterceptor) context.getService(reference);
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion().toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("組件" + bundleName + "(" + bundleVersion
						+ ") serviceInterceptor服務name未設置");
				return sicpt;
			}
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			ServiceContainer.getInstance().putInterceptor(
					ServiceUtil.generateServiceName(bundleName, bundleVersion, name.toString()), sicpt);
			InterceptorInfo info = new InterceptorInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(1);
			info.setUpdateTime(new Date());
			info.setMark(mark == null ? "" : mark.toString());
			serviceStateManager.updateServiceInterceptor(info);
			return sicpt;
		}

		@Override
		public void modifiedService(ServiceReference reference, Object service) {
		}

		@Override
		public void removedService(ServiceReference reference, Object service) {
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion().toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("組件" + bundleName + "(" + bundleVersion
						+ ") serviceInterceptor服務name未設置");
				return;
			}
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			ServiceContainer.getInstance().removeInterceptor(
					ServiceUtil.generateServiceName(bundleName, bundleVersion, name.toString()));
			InterceptorInfo info = new InterceptorInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(0);
			info.setMark(mark == null ? "" : mark.toString());
			info.setUpdateTime(new Date());
			serviceStateManager.updateServiceInterceptor(info);
		}

	}

	/**
	 * 向zookeeper註冊服務
	 * 
	 * @param path
	 * @param ds
	 */
	public void registerService(String path, DataServiceInfo ds) {
		String bundle = ds.getBundle();
		String cname = ds.getMark();
		String name = ds.getName();
		String version = ds.getVersion();
		String status = String.valueOf(ds.getState());
		try {
			if (!zookeeper.exists(path + name)) {
				zookeeper.createNode(path + name);
			}
			if (!zookeeper.exists(path + name + "/bundle")) {
				zookeeper.createNode(path + name + "/bundle", bundle);
			} else {
				zookeeper.setNodeData(path + name + "/bundle", bundle);
			}
			if (!zookeeper.exists(path + name + "/cname")) {
				zookeeper.createNode(path + name + "/cname", cname);
			} else {
				zookeeper.setNodeData(path + name + "/cname", cname);
			}
			if (!zookeeper.exists(path + name + "/version")) {
				zookeeper.createNode(path + name + "/version", version);
			} else {
				zookeeper.setNodeData(path + name + "/version", version);
			}
			if (!zookeeper.exists(path + name + "/status")) {
				zookeeper.createNode(path + name + "/status", status);
			} else {
				zookeeper.setNodeData(path + name + "/status", status);
			}
			this.nodeUpdate();
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("zookeeper register service fail, service name : "
					+ name, e);
		}
	}

	/**
	 * 卸載服務
	 * 
	 * @param path
	 * @param ds
	 */
	public void unRegisterService(String path, DataServiceInfo ds) {
		String name = ds.getName();
		try {
			if (zookeeper.exists(path + name + "/status")) {
				zookeeper.setNodeData(path + name + "/status",
						String.valueOf(ds.getState()));
				this.nodeUpdate();
			}
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("zookeeper unRegister service fail, service name : "
					+ name, e);
		}
	}
	
	/**
	 * 更新節點狀態變化
	 */
	public void nodeUpdate(){
		String data = UUID.randomUUID().toString();
		try {
			if(zookeeper.exists(NODE_CHANGE)){
				zookeeper.setNodeData(NODE_CHANGE, data);
			}else{
				zookeeper.createNode(NODE_CHANGE);
				zookeeper.setNodeData(NODE_CHANGE, data);
			}
		} catch (Exception e) {
			logger.error("更新節點變化狀態錯誤", e);
		}
	}

}

 

  1. ServiceWatcher 實現 BundleContextAware接口,將BundleContext 設置進來。
  2. 在ServiceWatcher初始化的時候 實例化了兩個服務跟蹤器 (ServiceTracker)分別用來跟蹤 BaseDataService、ServiceInterceptor 兩類接口的服務。
  3. 在ServiceWatcher初始化的時候 給BundleContext新增Bunlde監聽用來監聽bundle的事件
  4. ServiceTracker須要傳遞一個ServiceTrackerCustomizer實例來具體執行監聽的事件,在這裏咱們經過DataServiceTrackerCustomizer來具體執行BaseDataService接口服務跟蹤操做。
  5. BaseDataService 服務被髮布到osgi容器裏的時候,會自動調用DataServiceTrackerCustomizer.addingService(ServiceReference reference) 方法,經過BundleContext.getService(ServiceReference reference),咱們能夠獲取到當前被髮布到osgi容器裏的服務。
  6. 經過獲取bundleName、bundleVersion、之前發佈服務時定義的tag標籤(name)組成惟一的key(bundleName+bundleVersion+name) 以獲取到service爲value。將此保存到服務容器(Map)裏。

注:osmp-service裏將監聽到的服務同時保存到數據庫裏和註冊到zookeeper請暫時忽略,稍後osmp註冊中會做詳細講解。dom

 

服務發現和路由:ide

 

osmp-http接收到請求後解析服務名稱,經過服務查詢此服務是否綁定了攔截器,若是綁定了攔截器,則先執行攔截器鏈,攔截器若是執行失敗則直接返回,若是攔截器執行成功則經過服務名稱獲取服務,獲取服務成功後直接執行服務的execute方法,將並結果返回!this

 

這裏講的簡單點兒,具體osmp封裝了一層代理實現,有興趣的能夠直接查看osmp-http源碼。spa

 

至此osmp的服務發現和服務調用功能就講到這裏!

相關文章
相關標籤/搜索