SOA
服務治理--最主要就是服務之間的治理(治理基本上要作成運行時治理)java
Dubbo使用條件
必需要與spring集成node
Dubbo的通訊協議(消費者調用生產者)
netty和mina實現web
手寫一個相似於dubbo SOA須要解決的問題
一、爲何消費者可以調用到生產者的服務層
二、生產者怎麼把內容註冊到註冊中心的,而後消費者是如何獲取這個信息的
三、dubbo框架是如何跟spring進行整合的,消費者獲取的代理實例是如何建立redis
手寫SOA框架算法
1.新建Maven工程,引入Spring的依賴包spring
2.在resources包下,新建META-INF包,新增soa.xsd, spring.handlers, spring.schemasjson
soa.xsdpromise
<?xml version="1.0" encoding="UTF-8"?> <xsd:schema xmlns="http://www.youierdong.com/schema/soa" xmlns:xsd="http://www.w3.org/2001/XMLSchema" targetNamespace="http://www.youierdong.com/schema/soa" elementFormDefault="qualified" attributeFormDefault="unqualified"> <!-- 註冊標籤 --> <xsd:element name="registry"> <xsd:complexType> <xsd:attribute name="protocol" type="xsd:string"></xsd:attribute> <xsd:attribute name="address" type="xsd:string"></xsd:attribute> </xsd:complexType> </xsd:element> <!--reference標籤--> <xsd:element name="reference"> <xsd:complexType> <xsd:attribute name="id" type="xsd:string"></xsd:attribute> <xsd:attribute name="interface" type="xsd:string"></xsd:attribute> <xsd:attribute name="loadbalance" type="xsd:string"></xsd:attribute> <xsd:attribute name="protocol" type="xsd:string"></xsd:attribute> <xsd:attribute name="cluster" type="xsd:string"></xsd:attribute> <xsd:attribute name="retries" type="xsd:string"></xsd:attribute> </xsd:complexType> </xsd:element> <!-- 協議標籤--> <xsd:element name="protocol"> <xsd:complexType> <xsd:attribute name="name" type="xsd:string"></xsd:attribute> <xsd:attribute name="port" type="xsd:string"></xsd:attribute> <xsd:attribute name="host" type="xsd:string"></xsd:attribute> </xsd:complexType> </xsd:element> <!--service標籤--> <xsd:element name="service"> <xsd:complexType> <xsd:attribute name="interface" type="xsd:string"></xsd:attribute> <xsd:attribute name="ref" type="xsd:string"></xsd:attribute> <xsd:attribute name="protocol" type="xsd:string"></xsd:attribute> </xsd:complexType> </xsd:element> </xsd:schema>
spring.handlers緩存
http\://www.youierdong.com/schema/soa=com.youi.erdong.spring.parse.SOANamespaceHandlers 注意:www.youierdong.com/schema/soa與xsd的配置對應
spring.schemasapp
http\://www.youierdong.com/schema/soa.xsd=META-INF/soa.xsd
3新建SOANamespaceHandlers (spring標籤解析類)
import org.springframework.beans.factory.xml.NamespaceHandlerSupport; /** * spring標籤解析類 繼承NamespaceHandlerSupport * * @author Administrator * */ public class SOANamespaceHandlers extends NamespaceHandlerSupport { /** * 註冊標籤 */ public void init() { this.registerBeanDefinitionParser("registry", new RegistryBeanDefinitionParse(Registry.class)); this.registerBeanDefinitionParser("reference", new ReferenceBeanDefinitionParse(Reference.class)); this.registerBeanDefinitionParser("protocol", new ProtocolBeanDefinitionParse(Protocol.class)); this.registerBeanDefinitionParser("service", new ServiceBeanDefinitionParse(Service.class)); } }
其餘bean解析類
import org.springframework.beans.factory.config.BeanDefinition; /** * bean初始化轉換 Protocol標籤解析類 * * @author Administrator */ public class ProtocolBeanDefinitionParse implements BeanDefinitionParser { // Protocol private Class<?> beanClass; public ProtocolBeanDefinitionParse(Class<?> beanClass) { this.beanClass = beanClass; } // parse轉換方法 解析soa.xsd的Protocol標籤 獲取BeanDefinition public BeanDefinition parse(Element element, ParserContext parserContext) { RootBeanDefinition beanDefinition = new RootBeanDefinition(); beanDefinition.setBeanClass(beanClass); beanDefinition.setLazyInit(false); String name = element.getAttribute("name"); String port = element.getAttribute("port"); String host = element.getAttribute("host"); if (name == null || name.length() == 0) { throw new RuntimeException("Protocol name 不能爲空"); } if (port == null || port.length() == 0) { throw new RuntimeException("Protocol port 不能爲空"); } if (host == null || host.length() == 0) { throw new RuntimeException("Protocol host 不能爲空"); } beanDefinition.getPropertyValues().add("name", name); beanDefinition.getPropertyValues().add("port", port); beanDefinition.getPropertyValues().add("host", host); // 註冊 parserContext.getRegistry().registerBeanDefinition( "Protocol" + host + port, beanDefinition); return beanDefinition; } }
/** * bean初始化轉換 Reference標籤解析類 * * @author Administrator */ public class ReferenceBeanDefinitionParse implements BeanDefinitionParser { // Reference private Class<?> beanClass; public ReferenceBeanDefinitionParse(Class<?> beanClass) { this.beanClass = beanClass; } // parse轉換方法 解析soa.xsd的Reference標籤 獲取BeanDefinition public BeanDefinition parse(Element element, ParserContext parserContext) { RootBeanDefinition beanDefinition = new RootBeanDefinition(); beanDefinition.setBeanClass(beanClass); beanDefinition.setLazyInit(false); String id = element.getAttribute("id"); String intf = element.getAttribute("interface"); String loadbalance = element.getAttribute("loadbalance"); String protocol = element.getAttribute("protocol"); if (id == null || id.length() == 0) { throw new RuntimeException("Reference id 不能爲空"); } if (intf == null || intf.length() == 0) { throw new RuntimeException("Reference intf 不能爲空"); } if (loadbalance == null || loadbalance.length() == 0) { throw new RuntimeException("Reference loadbalance 不能爲空"); } if (protocol == null || protocol.length() == 0) { throw new RuntimeException("Reference protocol 不能爲空"); } beanDefinition.getPropertyValues().add("id", id); beanDefinition.getPropertyValues().add("intf", intf); beanDefinition.getPropertyValues().add("loadbalance", loadbalance); beanDefinition.getPropertyValues().add("protocol", protocol); // 註冊 parserContext.getRegistry().registerBeanDefinition("Reference" + id, beanDefinition); return beanDefinition; } }
/** * bean初始化轉換 Registry標籤解析類 * * @author Administrator */ public class RegistryBeanDefinitionParse implements BeanDefinitionParser { // Registry private Class<?> beanClass; public RegistryBeanDefinitionParse(Class<?> beanClass) { this.beanClass = beanClass; } // parse轉換方法 解析soa.xsd的registry標籤 獲取BeanDefinition public BeanDefinition parse(Element element, ParserContext parserContext) { RootBeanDefinition beanDefinition = new RootBeanDefinition(); // spring會把beanClass實例化 BeanDefinitionNames?? beanDefinition.setBeanClass(beanClass); beanDefinition.setLazyInit(false); String protocol = element.getAttribute("protocol"); String address = element.getAttribute("address"); if (protocol == null || address.length() == 0) { throw new RuntimeException("Registry protocol 不能爲空"); } if (address == null || address.length() == 0) { throw new RuntimeException("Registry address 不能爲空"); } beanDefinition.getPropertyValues().add("protocol", protocol); beanDefinition.getPropertyValues().add("address", address); // 註冊 parserContext.getRegistry().registerBeanDefinition( "Registry" + address, beanDefinition); return beanDefinition; } }
/** * bean初始化轉換 Service標籤解析類 * * @author Administrator */ public class ServiceBeanDefinitionParse implements BeanDefinitionParser { // Service private Class<?> beanClass; public ServiceBeanDefinitionParse(Class<?> beanClass) { this.beanClass = beanClass; } // parse轉換方法 解析soa.xsd的Service標籤 獲取BeanDefinition public BeanDefinition parse(Element element, ParserContext parserContext) { RootBeanDefinition beanDefinition = new RootBeanDefinition(); beanDefinition.setBeanClass(beanClass); beanDefinition.setLazyInit(false); String intf = element.getAttribute("interface"); String ref = element.getAttribute("ref"); String protocol = element.getAttribute("protocol"); if (intf == null || intf.length() == 0) { throw new RuntimeException("Service intf 不能爲空"); } if (ref == null || ref.length() == 0) { throw new RuntimeException("Service ref 不能爲空"); } /** * if (protocol == null || protocol.length() == 0) { throw new * RuntimeException("Service protocol 不能爲空"); } */ beanDefinition.getPropertyValues().add("intf", intf); beanDefinition.getPropertyValues().add("ref", ref); beanDefinition.getPropertyValues().add("protocol", protocol); // 註冊 parserContext.getRegistry().registerBeanDefinition( "Service" + ref + intf, beanDefinition); return beanDefinition; } }
其餘bean實體類
public class Protocol { private String name; private String port; private String host; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPort() { return port; } public void setPort(String port) { this.port = port; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } }
public class Reference { private String id; private String intf; private String loadbalance; private String protocol; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getIntf() { return intf; } public void setIntf(String intf) { this.intf = intf; } public String getLoadbalance() { return loadbalance; } public void setLoadbalance(String loadbalance) { this.loadbalance = loadbalance; } public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } }
public class Registry { private String protocol; private String address; public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } }
public class Service { private String intf; private String ref; private String protocol; public String getIntf() { return intf; } public void setIntf(String intf) { this.intf = intf; } public String getRef() { return ref; } public void setRef(String ref) { this.ref = ref; } public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } }
其餘service類
public interface UserService { public String eat(String param); }
public class UserServiceImpl implements UserService { public String eat(String param) { // TODO Auto-generated method stub return null; } }
測試的xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:erdong="http://www.youierdong.com/schema/soa" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.youierdong.com/schema/soa http://www.youierdong.com/schema/soa.xsd" default-lazy-init="true"> <bean id="userServiceImpl" class="com.youi.erdong.test.service.UserServiceImpl"></bean> <erdong:service interface="com.youi.erdong.test.service.UserService" ref="userServiceImpl"></erdong:service> <erdong:registry protocol="redis" address="127.0.0.1:6379"></erdong:registry> <erdong:reference id="testServiceImpl3" interface="com.youi.erdong.test.service.UserService" loadbalance="random" protocol="http"></erdong:reference> <erdong:protocol name="http" port="27017" host="127.0.0.1"></erdong:protocol> </beans>
運行測試類
public class Mytest { public static void main(String[] args) { ApplicationContext app = new ClassPathXmlApplicationContext( "mytest.xml"); } }
4 打包後,新建消費者工程引用。
消費者獲取的代理實例是如何建立?
序列化對象
import java.io.Serializable; public class Protocol implements Serializable, InitializingBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> { // 序列化id private static final long serialVersionUID = -323182438989154754L; private String name; private String port; private String host; private String contextpath; public void afterPropertiesSet() throws Exception { // TODO Auto-generated method stub if (name.equalsIgnoreCase("rmi")) { RmiUtil rmiUtil = new RmiUtil(); rmiUtil.startRmiServer(host, port, "erdongrmi"); } } // spring啓動後的事件 public void onApplicationEvent(ContextRefreshedEvent event) { if (!ContextRefreshedEvent.class.getName().equals( event.getClass().getName())) { return; } if (!"netty".equalsIgnoreCase(name)) { return; } new Thread(new Runnable() { public void run() { try { NettyUtil.startServer(port); } catch (Exception e) { e.printStackTrace(); } } }).start(); } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPort() { return port; } public void setPort(String port) { this.port = port; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public String getContextpath() { return contextpath; } public void setContextpath(String contextpath) { this.contextpath = contextpath; } public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // TODO Auto-generated method stub } }
public class Registry implements Serializable { // 序列化id private static final long serialVersionUID = 1672531801363254807L; private String protocol; private String address; public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } }
/** * InitializingBean SpringIOC注入後調用 * */ public class Service implements Serializable, InitializingBean, ApplicationContextAware { // 序列化id private static final long serialVersionUID = -2814888066547175285L; private String intf; private String ref; private String protocol; public static ApplicationContext application; // InitializingBean 實現方法 public void afterPropertiesSet() throws Exception { // 註冊生產者的服務 BaseRegistryDelegate.registry(ref, application); } // ApplicationContextAware 實現方法 用來獲取Spring上下文 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // TODO Auto-generated method stub this.application = applicationContext; } public String getIntf() { return intf; } public void setIntf(String intf) { this.intf = intf; } public String getRef() { return ref; } public void setRef(String ref) { this.ref = ref; } public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } public static ApplicationContext getApplication() { return application; } public void setApplication(ApplicationContext application) { this.application = application; } }
代理實例
/** * 代理實例接口FactoryBean 獲取Spring上下文 ApplicationContextAware */ public class Reference implements Serializable, FactoryBean, ApplicationContextAware, InitializingBean { // 序列化id private static final long serialVersionUID = 6496428948999332452L; private String id; private String intf; private String loadbalance; private String protocol; private String cluster; private String retries; private Invoke invoke;// 調用者 private ApplicationContext application; private static Map<String, Invoke> invokes = new HashMap<String, Invoke>(); private List<String> registryInfo = new ArrayList<String>(); private static Map<String, LoadBalance> loadBalance = new HashMap<String, LoadBalance>(); private static Map<String, Cluster> clusters = new HashMap<String, Cluster>(); /** * 註冊遠程調用協議 */ static { invokes.put("http", new HttpInvoke()); invokes.put("rmi", new RmiInvoke()); invokes.put("netty", new NettyInvoke()); loadBalance.put("random", new RandomLoadBalance()); loadBalance.put("roundrobin", new RoundRobinLoadBalance()); clusters.put("failover", new FailoverClusterInvoke()); clusters.put("failfast", new FailfastClusterInvoke()); clusters.put("failsafe", new FailsafeClusterInvoke()); } public Reference() { System.out.println("66666666666666666666"); } /** ApplicationContextAware的實現方法 */ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.application = applicationContext; } /** * 拿到一個實例,這個方法是Spring初始化時getBean方法裏調用的 * ApplicationContext.getBean("id")時就會調用getObject(),其返回值會交給Spring進行管理 * 在getObject()方法裏返回的是intf接口的代理對象 */ public Object getObject() throws Exception { System.out.println("返回intf的代理對象"); if (protocol != null && protocol.length() > 0) { invoke = invokes.get(protocol); } else { // Protocol的實例在spring容器中 Protocol protocol = application.getBean(Protocol.class); if (protocol != null) { invoke = invokes.get(protocol.getName()); } else { invoke = invokes.get("http"); } } // 返回代理對象 return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class<?>[] { Class.forName(intf) }, new InvokeInvocationHandler(invoke, this)); } public Class getObjectType() { if (intf != null && intf.length() > 0) { try { return Class.forName(intf); } catch (Exception e) { // TODO: handle exception } } return null; } /** 返回的代理實例是不是單例 */ public boolean isSingleton() { // TODO Auto-generated method stub return false; } // InitializingBean實現的方法 public void afterPropertiesSet() throws Exception { // 消費者得到生產者全部註冊信息 registryInfo = BaseRegistryDelegate.getRegistry(id, application); } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getIntf() { return intf; } public void setIntf(String intf) { this.intf = intf; } public String getLoadbalance() { return loadbalance; } public void setLoadbalance(String loadbalance) { this.loadbalance = loadbalance; } public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } public List<String> getRegistryInfo() { return registryInfo; } public void setRegistryInfo(List<String> registryInfo) { this.registryInfo = registryInfo; } public static Map<String, LoadBalance> getLoadBalance() { return loadBalance; } public static void setLoadBalance(Map<String, LoadBalance> loadBalance) { Reference.loadBalance = loadBalance; } public String getCluster() { return cluster; } public void setCluster(String cluster) { this.cluster = cluster; } public String getRetries() { return retries; } public void setRetries(String retries) { this.retries = retries; } public static Map<String, Cluster> getClusters() { return clusters; } public static void setClusters(Map<String, Cluster> clusters) { Reference.clusters = clusters; } }
調用策略
/** * 採用策略模式進行rpc調用 返回json Stringj進行通訊 * * @author Administrator * */ public interface Invoke { public String invoke(Invocation invocation); }
public class HttpInvoke implements Invoke { public String invoke(Invocation invocation) { Reference reference = invocation.getReference(); List<String> registryInfo = reference.getRegistryInfo(); // 負載均衡算法 String loadbalance = reference.getLoadbalance(); LoadBalance loadBalanceBean = reference.getLoadBalance().get( loadbalance); NodeInfo nodeInfo = loadBalanceBean.doSelect(registryInfo); // 調用遠程的生產者是傳輸json字符串 // 根據serviceId調用生產者spring容器中的service實例 // 根據methodName和methodtype獲取利用反射調用method方法 JSONObject sendParam = new JSONObject(); sendParam.put("methodName", invocation.getMethod().getName()); sendParam.put("methodParams", invocation.getObjs()); sendParam.put("serviceId", reference.getId()); sendParam.put("paramTypes", invocation.getMethod().getParameterTypes()); String url = "http://" + nodeInfo.getHost() + ":" + nodeInfo.getPort() + nodeInfo.getContextpath(); // 調用生產者的服務 String result = HttpRequest.sendPost(url, sendParam.toJSONString()); return result; } }
public class NettyInvoke implements Invoke { public String invoke(Invocation invocation) { Reference reference = invocation.getReference(); List<String> registryInfo = reference.getRegistryInfo(); // 負載均衡算法 String loadbalance = reference.getLoadbalance(); LoadBalance loadBalanceBean = reference.getLoadBalance().get( loadbalance); NodeInfo nodeInfo = loadBalanceBean.doSelect(registryInfo); // 調用遠程的生產者是傳輸json字符串 // 根據serviceId調用生產者spring容器中的service實例 // 根據methodName和methodtype獲取利用反射調用method方法 JSONObject sendParam = new JSONObject(); sendParam.put("methodName", invocation.getMethod().getName()); sendParam.put("methodParams", invocation.getObjs()); sendParam.put("serviceId", reference.getId()); sendParam.put("paramTypes", invocation.getMethod().getParameterTypes()); try { return NettyUtil.sendMsg(nodeInfo.getHost(), nodeInfo.getPort(), sendParam.toJSONString()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } }
/** * RMI通訊協議 * * @author Administrator * */ public class RmiInvoke implements Invoke { public String invoke(Invocation invocation) { Reference reference = invocation.getReference(); List<String> registryInfo = reference.getRegistryInfo(); // 負載均衡算法 String loadbalance = reference.getLoadbalance(); LoadBalance loadBalanceBean = reference.getLoadBalance().get( loadbalance); NodeInfo nodeInfo = loadBalanceBean.doSelect(registryInfo); // 調用遠程的生產者是傳輸json字符串 // 根據serviceId調用生產者spring容器中的service實例 // 根據methodName和methodtype獲取利用反射調用method方法 JSONObject sendParam = new JSONObject(); sendParam.put("methodName", invocation.getMethod().getName()); sendParam.put("methodParams", invocation.getObjs()); sendParam.put("serviceId", reference.getId()); sendParam.put("paramTypes", invocation.getMethod().getParameterTypes()); RmiUtil rmi = new RmiUtil(); SOARmi soaRmi = rmi.startRmiClient(nodeInfo, "erdongrmi"); try { return soaRmi.invoke(sendParam.toString()); } catch (Exception e) { // TODO: handle exception } return null; } }
/** * 封裝InvokeInvocationHandler的invoke方法裏的三個參數Object , Method , Object[] * * @author Administrator * */ public class Invocation { private Method method; private Object[] objs; private Invoke invoke; private Reference reference; public Method getMethod() { return method; } public void setMethod(Method method) { this.method = method; } public Object[] getObjs() { return objs; } public void setObjs(Object[] objs) { this.objs = objs; } public Reference getReference() { return reference; } public void setReference(Reference reference) { this.reference = reference; } public Invoke getInvoke() { return invoke; } public void setInvoke(Invoke invoke) { this.invoke = invoke; } }
動做代理
/** * jdk動做代理 InvokeInvocationHandler 是一個advice(通知、加強),它進行了rpc(http、rmi、netty)的遠程調用 * * @author Administrator * */ public class InvokeInvocationHandler implements InvocationHandler { private Invoke invoke; private Reference reference; public InvokeInvocationHandler(Invoke invoke, Reference reference) { this.invoke = invoke; this.reference = reference; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 這裏最終要調用多個遠程的生產者 生產者在啓動的時候要在註冊中心(redis)註冊信息 System.out.println("已經獲取到了代理實例 InvokeInvocationHandler invoke"); Invocation invocation = new Invocation(); invocation.setMethod(method); invocation.setObjs(args); invocation.setReference(reference); invocation.setInvoke(invoke); // String result = invoke.invoke(invocation); Cluster cluster = reference.getClusters().get(reference.getCluster()); String result = cluster.invoke(invocation); return result; } }
生產者信息註冊 RedisApi是Rdis工具類
public interface BaseRegistry { public boolean registry(String param, ApplicationContext application); public List<String> getRegistry(String id, ApplicationContext application); }
/** * redis註冊中心處理類 * */ public class RedisRegistry implements BaseRegistry { public boolean registry(String ref, ApplicationContext application) { try { Protocol protocol = application.getBean(Protocol.class); Map<String, Service> services = application .getBeansOfType(Service.class); Registry registry = application.getBean(Registry.class); RedisApi.createJedisPool(registry.getAddress()); for (Map.Entry<String, Service> entry : services.entrySet()) { if (entry.getValue().getRef().equals(ref)) { JSONObject jo = new JSONObject(); jo.put("protocol", JSONObject.toJSONString(protocol)); jo.put("services", JSONObject.toJSONString(entry.getValue())); JSONObject ipport = new JSONObject(); ipport.put(protocol.getHost() + ":" + protocol.getPort(), jo); lpush(ipport, ref); } } return true; } catch (Exception e) { // TODO: handle exception } return false; } /** * [ { "127.0.0.1:27017":{ * "protocol":"{"host":"127.0.0.1","name":"http","port":"27017"}", * "services" * :"{"intf":"com.youi.erdong.test.service.UserService4","protocol":" * http","ref":"userServiceImpl4"}" },{ "127.0.0.1:27017":{ * "protocol":"{"host":"127.0.0.1","name":"http","port":"27017"}", * "services" * :"{"intf":"com.youi.erdong.test.service.UserService1","protocol":" * http","ref":"userServiceImpl1"}" } } ] */ // 數據加入到redis private void lpush(JSONObject ipport, String key) { if (RedisApi.exists(key)) { Set<String> keys = ipport.keySet(); String ipportStr = ""; for (String ks : keys) { ipportStr = ks; } // 去重 boolean isold = false; List<String> registryInfo = RedisApi.lrange(key); List<String> newRegistry = new ArrayList<String>(); for (String node : registryInfo) { JSONObject jo = JSONObject.parseObject(node); if (jo.containsKey(ipportStr)) { newRegistry.add(ipport.toJSONString()); isold = true; } else { newRegistry.add(node); } } if (isold) { if (newRegistry.size() > 0) { RedisApi.del(key); String[] newReStr = new String[newRegistry.size()]; for (int i = 0; i < newReStr.length; i++) { newReStr[i] = newRegistry.get(i); } RedisApi.lpush(key, newReStr); } } else { RedisApi.lpush(key, ipport.toJSONString()); } } else { RedisApi.lpush(key, ipport.toJSONString()); } } public List<String> getRegistry(String id, ApplicationContext application) { try { Registry registry = application.getBean(Registry.class); RedisApi.createJedisPool(registry.getAddress()); if (RedisApi.exists(id)) { // 獲取list return RedisApi.lrange(id); } } catch (Exception e) { // TODO: handle exception } return null; } }
/** * 委託者類 註冊內容委託類 */ public class BaseRegistryDelegate { public static void registry(String ref, ApplicationContext application) { // 獲取註冊信息 Registry registry = application.getBean(Registry.class); String protocol = registry.getProtocol(); BaseRegistry baseRegistry = Registry.registryMap.get(protocol); baseRegistry.registry(ref, application); } public static List<String> getRegistry(String id, ApplicationContext application) { Registry registry = application.getBean(Registry.class); String protocol = registry.getProtocol(); BaseRegistry registryBean = Registry.registryMap.get(protocol); return registryBean.getRegistry(id, application); } }
負載均衡算法
public interface LoadBalance { NodeInfo doSelect(List<String> registryInfo); }
public class NodeInfo { private String host; private String port; private String contextpath; public String getHost() { return host; } public void setHost(String host) { this.host = host; } public String getPort() { return port; } public void setPort(String port) { this.port = port; } public String getContextpath() { return contextpath; } public void setContextpath(String contextpath) { this.contextpath = contextpath; } }
/** * 負載均衡隨機算法 */ public class RandomLoadBalance implements LoadBalance { public NodeInfo doSelect(List<String> registryInfo) { Random random = new Random(); int index = random.nextInt(registryInfo.size()); String registry = registryInfo.get(index); JSONObject registryJO = (JSONObject) JSONObject.parse(registry); Collection values = registryJO.values(); JSONObject node = new JSONObject(); for (Object value : values) { node = JSONObject.parseObject(value.toString()); } JSONObject protocol = node.getJSONObject("protocol"); NodeInfo nodeInfo = new NodeInfo(); nodeInfo.setHost(protocol.get("host") != null ? protocol .getString("host") : ""); nodeInfo.setPort(protocol.get("port") != null ? protocol .getString("port") : ""); nodeInfo.setContextpath(protocol.get("contextpath") != null ? protocol .getString("contextpath") : ""); return nodeInfo; } }
/*** * 輪詢負載均衡算法 */ public class RoundRobinLoadBalance implements LoadBalance { private static Integer index = 0; public NodeInfo doSelect(List<String> registryInfo) { synchronized (index) { if (index >= registryInfo.size()) { index = 0; } String registry = registryInfo.get(index); index++; JSONObject registryJO = (JSONObject) JSONObject.parse(registry); Collection values = registryJO.values(); JSONObject node = new JSONObject(); for (Object value : values) { node = JSONObject.parseObject(value.toString()); } JSONObject protocol = node.getJSONObject("protocol"); NodeInfo nodeInfo = new NodeInfo(); nodeInfo.setHost(protocol.get("host") != null ? protocol .getString("host") : ""); nodeInfo.setPort(protocol.get("port") != null ? protocol .getString("port") : ""); nodeInfo.setContextpath(protocol.get("contextpath") != null ? protocol .getString("contextpath") : ""); return nodeInfo; } } }
非web工程的http接收方法
/** * 這個是soa框架中給生產者接收請求用的servlet,這個必須是採用http協議才能掉獲得 * * @author Administrator * */ public class DispatcherServlet extends HttpServlet { private static final long serialVersionUID = 7394893382457783784L; @Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { try { JSONObject requestparam = httpProcess(req, resp); String serviceId = requestparam.getString("serviceId"); String methodName = requestparam.getString("methodName"); JSONArray paramTypes = requestparam.getJSONArray("paramTypes"); JSONArray methodParamJa = requestparam.getJSONArray("methodParams"); // Object[] objs = null; if (methodParamJa != null) { objs = new Object[methodParamJa.size()]; int i = 0; for (Object o : methodParamJa) { objs[i++] = o; } } // 獲取spring上下文 ApplicationContext application = Service.getApplication(); Object serviceBean = application.getBean(serviceId); // 須要考慮重載 Method method = getMethod(serviceBean, methodName, paramTypes); if (method != null) { // 反射調用 Object result = method.invoke(serviceBean, objs); resp.getWriter().write(result.toString()); } else { resp.getWriter().write("no such method!!!!"); } } catch (Exception e) { // TODO: handle exception } } private Method getMethod(Object bean, String methodName, JSONArray paramType) { Method[] methods = bean.getClass().getMethods(); List<Method> retmMethod = new ArrayList<Method>(); for (Method method : methods) { // 找到相同方法名的方法 if (methodName.trim().equals(method.getName())) { retmMethod.add(method); } } if (retmMethod.size() == 0) { return retmMethod.get(0); } boolean isSameSize = false; boolean isSameType = false; for (Method method : retmMethod) { Class<?>[] typs = method.getParameterTypes(); if (typs.length == paramType.size()) { isSameSize = true; } if (isSameSize) { continue; } for (int i = 0; i < typs.length; i++) { if (typs[i].toString().contains(paramType.getString(i))) { isSameType = true; } else { isSameType = false; break; } } if (isSameType) { return method; } } return null; } // 獲取請求參數 public static JSONObject httpProcess(HttpServletRequest req, HttpServletResponse resp) throws IOException { StringBuffer sb = new StringBuffer(); InputStream is = req.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(is, "utf-8")); String s = ""; while ((s = br.readLine()) != null) { sb.append(s); } if (sb.toString().length() <= 0) { return null; } return JSONObject.parseObject(sb.toString()); } }
/** * 繼承Remote * * @author Administrator * */ public interface SOARmi extends Remote { public String invoke(String param) throws RemoteException; }
/** * 生產者端 * * @author Administrator * */ public class SOARmiImpl extends UnicastRemoteObject implements SOARmi { private static final long serialVersionUID = 6735305564709334218L; protected SOARmiImpl() throws RemoteException { super(); // TODO Auto-generated constructor stub } public String invoke(String param) throws RemoteException { JSONObject requestparam = JSONObject.parseObject(param); // 要從遠程的生產者的spring容器中拿到對應的serviceid實例 String serviceId = requestparam.getString("serviceId"); String methodName = requestparam.getString("methodName"); JSONArray paramTypes = requestparam.getJSONArray("paramTypes"); // 這個對應的方法參數 JSONArray methodParamJa = requestparam.getJSONArray("methodParams"); // 這個就是反射的參數 Object[] objs = null; if (methodParamJa != null) { objs = new Object[methodParamJa.size()]; int i = 0; for (Object o : methodParamJa) { objs[i++] = o; } } // 拿到spring的上下文 ApplicationContext application = Service.getApplication(); // 服務層的實例 Object serviceBean = application.getBean(serviceId); // 這個方法的獲取,要考慮到這個方法的重載 Method method = getMethod(serviceBean, methodName, paramTypes); if (method != null) { Object result; try { result = method.invoke(serviceBean, objs); return result.toString(); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvocationTargetException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { return "---------------------------------nosuchmethod-----------------------------"; } return null; } private Method getMethod(Object bean, String methodName, JSONArray paramTypes) { Method[] methods = bean.getClass().getMethods(); List<Method> retMethod = new ArrayList<Method>(); for (Method method : methods) { // 把名字和methodName入參相同的方法加入到list中來 if (methodName.trim().equals(method.getName())) { retMethod.add(method); } } // 若是大小是1就說明相同的方法只有一個 if (retMethod.size() == 1) { return retMethod.get(0); } boolean isSameSize = false; boolean isSameType = false; jack: for (Method method : retMethod) { Class<?>[] types = method.getParameterTypes(); if (types.length == paramTypes.size()) { isSameSize = true; } if (!isSameSize) { continue; } for (int i = 0; i < types.length; i++) { if (types[i].toString().contains(paramTypes.getString(i))) { isSameType = true; } else { isSameType = false; } if (!isSameType) { continue jack; } } if (isSameType) { return method; } } return null; } }
/** * Rmi工具 RMI是底層是socket 不能跨平臺 java底層的協議 * * @author Administrator * */ public class RmiUtil { /** * @Description 啓動rmi服務 * @param @param host * @param @param port * @param @param id 參數 * @return void 返回類型 * @throws */ public void startRmiServer(String host, String port, String id) { try { SOARmi soaRmi = new SOARmiImpl(); LocateRegistry.createRegistry(Integer.valueOf(port)); // rmi://127.0.0.1:1135/fudisfuodsuf id保證bind的惟一 Naming.bind("rmi://" + host + ":" + port + "/" + id, soaRmi); System.out.println("rmi server start !!!"); } catch (RemoteException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MalformedURLException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (AlreadyBoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public SOARmi startRmiClient(NodeInfo nodeinfo, String id) { String host = nodeinfo.getHost(); String port = nodeinfo.getPort(); try { return (SOARmi) Naming.lookup("rmi://" + host + ":" + port + "/" + id); } catch (MalformedURLException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (RemoteException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (NotBoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } }
Netty
/** * Netty 底層NIO無阻塞協議 * * @author Administrator * */ public class NettyUtil { public static void startServer(String port) throws Exception { // BOOS線程 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 工做者線程 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyServerInHandler()); } }).option(ChannelOption.SO_BACKLOG, 128); // 完成同步功能 ChannelFuture f = b.bind(Integer.parseInt(port)).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static String sendMsg(String host, String port, final String sendmsg) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); final StringBuffer resultmsg = new StringBuffer(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyClientInHandler(resultmsg, sendmsg)); } }); // 這個是鏈接服務端,一直在等待着服務端的返回消息,返回的信息封裝到future,能夠監控線程的返回 ChannelFuture f = b.connect(host, Integer.parseInt(port)).channel() .closeFuture().await(); return resultmsg.toString(); } finally { workerGroup.shutdownGracefully(); } } }
public class NettyServerInHandler extends ChannelInboundHandlerAdapter { /* * @see netty的客戶端有消息過來的時候就會調用這個方法 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf result = (ByteBuf) msg; byte[] result1 = new byte[result.readableBytes()]; result.readBytes(result1); String resultStr = new String(result1); System.out.println(resultStr); result.release(); String response = invokeService(resultStr); // 放入緩存 ByteBuf encoded = ctx.alloc().buffer(4 * response.length()); encoded.writeBytes(response.getBytes()); ctx.writeAndFlush(encoded); ctx.close(); } private String invokeService(String param) { JSONObject requestparam = JSONObject.parseObject(param); // 要從遠程的生產者的spring容器中拿到對應的serviceid實例 String serviceId = requestparam.getString("serviceId"); String methodName = requestparam.getString("methodName"); JSONArray paramTypes = requestparam.getJSONArray("paramTypes"); // 這個對應的方法參數 JSONArray methodParamJa = requestparam.getJSONArray("methodParams"); // 這個就是反射的參數 Object[] objs = null; if (methodParamJa != null) { objs = new Object[methodParamJa.size()]; int i = 0; for (Object o : methodParamJa) { objs[i++] = o; } } // 拿到spring的上下文 ApplicationContext application = Service.getApplication(); // 服務層的實例 Object serviceBean = application.getBean(serviceId); // 這個方法的獲取,要考慮到這個方法的重載 Method method = getMethod(serviceBean, methodName, paramTypes); if (method != null) { Object result; try { result = method.invoke(serviceBean, objs); return result.toString(); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvocationTargetException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { return "---------------------------------nosuchmethod-----------------------------"; } return null; } private Method getMethod(Object bean, String methodName, JSONArray paramTypes) { Method[] methods = bean.getClass().getMethods(); List<Method> retMethod = new ArrayList<Method>(); for (Method method : methods) { // 把名字和methodName入參相同的方法加入到list中來 if (methodName.trim().equals(method.getName())) { retMethod.add(method); } } // 若是大小是1就說明相同的方法只有一個 if (retMethod.size() == 1) { return retMethod.get(0); } boolean isSameSize = false; boolean isSameType = false; jack: for (Method method : retMethod) { Class<?>[] types = method.getParameterTypes(); if (types.length == paramTypes.size()) { isSameSize = true; } if (!isSameSize) { continue; } for (int i = 0; i < types.length; i++) { if (types[i].toString().contains(paramTypes.getString(i))) { isSameType = true; } else { isSameType = false; } if (!isSameType) { continue jack; } } if (isSameType) { return method; } } return null; } @Override public boolean isSharable() { // TODO Auto-generated method stub return super.isSharable(); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.handlerRemoved(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub super.exceptionCaught(ctx, cause); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelInactive(ctx); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelReadComplete(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // TODO Auto-generated method stub super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelWritabilityChanged(ctx); } @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.bind(ctx, localAddress, promise); } @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.connect(ctx, remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.disconnect(ctx, promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.close(ctx, promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.deregister(ctx, promise); } @Override public void read(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.read(ctx); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.write(ctx, msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.flush(ctx); } @Override protected Object clone() throws CloneNotSupportedException { // TODO Auto-generated method stub return super.clone(); } @Override public boolean equals(Object obj) { // TODO Auto-generated method stub return super.equals(obj); } @Override protected void finalize() throws Throwable { // TODO Auto-generated method stub super.finalize(); } @Override public int hashCode() { // TODO Auto-generated method stub return super.hashCode(); } @Override public String toString() { // TODO Auto-generated method stub return super.toString(); } }
public class NettyClientInHandler extends ChannelInboundHandlerAdapter { public StringBuffer message;// 接受服務端的消息 public String sendMsg;// 發送給服務端的消息 public NettyClientInHandler(StringBuffer message, String sendMsg) { this.message = message; this.sendMsg = sendMsg; } /* * @see 當咱們鏈接成功之後會觸發這個方法 在這個方法裏面完成消息的發送 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("--------------channelActive-------------"); ByteBuf encoded = ctx.alloc().buffer(4 * sendMsg.length()); encoded.writeBytes(sendMsg.getBytes()); ctx.write(encoded); ctx.flush(); } /* * @see 一旦服務端有消息過來,這個方法會觸發 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("------------------channelRead--------------------"); ByteBuf result = (ByteBuf) msg; byte[] result1 = new byte[result.readableBytes()]; result.readBytes(result1); System.out.println("server response msg:" + new String(result1)); message.append(new String(result1)); result.release(); } @Override public boolean isSharable() { // TODO Auto-generated method stub return super.isSharable(); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.handlerRemoved(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub super.exceptionCaught(ctx, cause); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelUnregistered(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelReadComplete(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // TODO Auto-generated method stub super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelWritabilityChanged(ctx); } @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.bind(ctx, localAddress, promise); } @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.connect(ctx, remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.disconnect(ctx, promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.close(ctx, promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.deregister(ctx, promise); } @Override public void read(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.read(ctx); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // TODO Auto-generated method stub super.write(ctx, msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.flush(ctx); } @Override protected Object clone() throws CloneNotSupportedException { // TODO Auto-generated method stub return super.clone(); } @Override public boolean equals(Object obj) { // TODO Auto-generated method stub return super.equals(obj); } @Override protected void finalize() throws Throwable { // TODO Auto-generated method stub super.finalize(); } @Override public int hashCode() { // TODO Auto-generated method stub return super.hashCode(); } @Override public String toString() { // TODO Auto-generated method stub return super.toString(); } }
集羣容錯
public interface Cluster { public String invoke(Invocation invocation) throws Exception; }
/** * @Description 這個若是調用節點異常,直接失敗 * @ClassName FailfastClusterInvoke * @Date 2017年11月18日 下午9:55:23 * @Author dn-jack */ public class FailfastClusterInvoke implements Cluster { public String invoke(Invocation invocation) throws Exception { Invoke invoke = invocation.getInvoke(); try { return invoke.invoke(invocation); } catch (Exception e) { e.printStackTrace(); throw e; } } }
/** * @Description 這個若是調用失敗就自動切換到其餘集羣節點 * @ClassName FailoverClusterInvoke * @Date 2017年11月18日 下午9:37:46 * @Author dn-jack */ public class FailoverClusterInvoke implements Cluster { public String invoke(Invocation invocation) throws Exception { String retries = invocation.getReference().getRetries(); Integer retriint = Integer.parseInt(retries); for (int i = 0; i < retriint; i++) { try { Invoke invoke = invocation.getInvoke(); String result = invoke.invoke(invocation); return result; } catch (Exception e) { e.printStackTrace(); continue; } } throw new RuntimeException("retries" + retries + "所有失敗!!!!"); } }
/** * @Description 調用節點失敗,直接忽略 * @ClassName FailsafeClusterInvoke * @Date 2017年11月18日 下午9:55:49 * @Author dn-jack */ public class FailsafeClusterInvoke implements Cluster { public String invoke(Invocation invocation) throws Exception { Invoke invoke = invocation.getInvoke(); try { return invoke.invoke(invocation); } catch (Exception e) { e.printStackTrace(); return "忽略"; } } }
消息發佈與訂閱
/** * redis的發佈與訂閱,跟咱們的activemq中的topic消息消費機制差很少 是一個廣播形式的消費消息 */ public class RedisServerRegistry extends JedisPubSub { /* * @see 當往頻道其實就是隊列,當往裏面發佈消息的時候,這個方法就會觸發 */ @Override public void onMessage(String channel, String message) { } @Override public void subscribe(String... channels) { // TODO Auto-generated method stub super.subscribe(channels); } }