FloodlightProvider 使用 Netty 庫來處理到交換機的線程和鏈接。
每一個 OpenFlow 消息將經過一個 Netty 的線程進行處理,並執行與全部模塊的消息相關聯的全部邏輯
其餘模塊也能夠註冊相似交換機鏈接或斷開和端口狀態通知特定時間。
爲了使模塊註冊爲基於 OpenFlow 消息的,必須實現 IOFMessageListener 接口java
要監聽 OpenFlow 消息,要先向 FloodlightProvider 註冊
調用 IFloodlightProviderService(具體由 Controller 類實現)的 addOFMessageListener 方法進行註冊訂閱
核心工做是在 ListenerDispatcher 類來完成。
每次增長觀察者都會判斷是不是終結點(也就是不被其餘的 Listener 所依賴),由於最終肯定這些觀察者順序的時候就是由這些終結點開始往前進行 DFS 遍歷獲得node
@Override public synchronized void addOFMessageListener(OFType type, IOFMessageListener listener) { //先判斷與type對應的 ListenerDispatcher對象是否存在 ListenerDispatcher<OFType, IOFMessageListener> ldd = messageListeners.get(type); if (ldd == null) { ldd = new ListenerDispatcher<OFType, IOFMessageListener>(); messageListeners.put(type, ldd); } //註冊監聽type這個消息; ldd.addListener(type, listener); }
volatile List<T> listeners = new ArrayList<T>();
//每一個OF msg都有惟一的ListenerDispatcher對象,觀察者存在listeners鏈表中緩存
private boolean ispre(U type, T l1, T l2) { return (l2.isCallbackOrderingPrereq(type, l1.getName()) || l1.isCallbackOrderingPostreq(type, l2.getName())); }
返回兩個傳入的監聽器的順序安全
public void addListener(U type, T listener) { List<T> newlisteners = new ArrayList<T>(); if (listeners != null) newlisteners.addAll(listeners); newlisteners.add(listener); // Find nodes without outgoing edges // 查找沒有出邊的節點 List<T> terminals = new ArrayList<T>(); for (T i : newlisteners) { boolean isterm = true; for (T j : newlisteners) { if (ispre(type, i, j)) { //兩個都不關心先後順序的時候 isterm = false; break; } } if (isterm) { //關乎有先後順序的監聽模塊存入 terminals.add(i); } } if (terminals.size() == 0) { logger.error("No listener dependency solution: " + "No listeners without incoming dependencies"); listeners = newlisteners; return; } // visit depth-first traversing in the opposite order from // the dependencies. Note we will not generally detect cycles /** * 以相反順序訪問深度優先遍歷依賴。 注意咱們一般不會檢測週期 */ HashSet<T> visited = new HashSet<T>(); List<T> ordering = new ArrayList<T>(); for (T term : terminals) { //進行排序 visit(newlisteners, type, visited, ordering, term); } listeners = ordering; }
private void visit(List<T> newlisteners, U type, HashSet<T> visited, List<T> ordering, T listener) { if (!visited.contains(listener)) { visited.add(listener); for (T i : newlisteners) { if (ispre(type, i, listener)) { visit(newlisteners, type, visited, ordering, i); } } ordering.add(listener); // } }
public interface IListener<T> public enum Command { CONTINUE, STOP } 狀態值,用來判斷是否繼續執行 public String getName(); //用來判斷 name 的這個模塊是否要在當前對象以前執行 public boolean isCallbackOrderingPrereq(T type, String name); //用來判斷 name 的這個模塊是否要在當前對象以後執行 public boolean isCallbackOrderingPostreq(T type, String name); IOFMessageListener接口繼承了 IListener 接口,同時定義了 receive 方法 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx); 返回 CONTINUE 或者 STOP,繼續看每一個繼承這個接口的模塊的重寫
TopologyManager 模塊的IOFMessageListener 重寫的方法:網絡
@Override public String getName() { return MODULE_NAME; //此處爲 topology,每一個模塊都有本身的 MODULE_NAME } @Override public boolean isCallbackOrderingPrereq(OFType type, String name) { //今後處能夠看出,在執行這個模塊以前,須要先執行 MODULE_NAME 爲 linkiscovery 的模塊 return "linkdiscovery".equals(name); } @Override public boolean isCallbackOrderingPostreq(OFType type, String name) { return false; } @Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: ctrIncoming.increment();//計數器,加一 //調用這裏的執行方法 return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; }
經過 Type Hierarchy 能夠找到Packet-In消息處理順序的幾個模塊
數據結構
基本數據結構,這是一個上下文對象,Floodlight代碼監聽器能夠註冊它,稍後能夠檢索與事件相關聯的上下文信息app
public class FloodlightContext { protected ConcurrentHashMap<String, Object> storage = new ConcurrentHashMap<String, Object>(); public ConcurrentHashMap<String, Object> getStorage() { return storage; } }
建立了一個 HashMap storage,tcp
public class FloodlightContextStore<V> { @SuppressWarnings("unchecked") public V get(FloodlightContext bc, String key) { return (V)bc.storage.get(key); } public void put(FloodlightContext bc, String key, V value) { bc.storage.put(key, value); } public void remove(FloodlightContext bc, String key) { bc.storage.remove(key); } }
一個FloodlightContextStore對象,可用於PACKET-IN有效內容,消息對象是Ethernet類型ide
public static final FloodlightContextStore<Ethernet> bcStore = new FloodlightContextStore<Ethernet>();
IOFMessageListener 的 receive 方法oop
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: ctrIncoming.increment(); return this.handlePacketIn(sw.getId(), (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; }
主要使用了 handlePacketIn()方法
protected Command handlePacketIn(DatapathId sw, OFPacketIn pi, FloodlightContext cntx) { //提取 Packet-In 的有效分組內容 Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); OFPort inPort = (pi.getVersion().compareTo(OFVersion.OF_12) < 0 ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT)); if (eth.getPayload() instanceof BSN) { BSN bsn = (BSN) eth.getPayload(); if (bsn == null) return Command.STOP; if (bsn.getPayload() == null) return Command.STOP; // It could be a packet other than BSN LLDP, therefore // continue with the regular processing. // 它能夠是除BSN LLDP以外的分組,所以繼續進行常規處理。 if (bsn.getPayload() instanceof LLDP == false) return Command.CONTINUE; return handleLldp((LLDP) bsn.getPayload(), sw, inPort, false, cntx); } else if (eth.getPayload() instanceof LLDP) { return handleLldp((LLDP) eth.getPayload(), sw, inPort, true, cntx); } else if (eth.getEtherType().getValue() < 1536 && eth.getEtherType().getValue() >= 17) { long destMac = eth.getDestinationMACAddress().getLong(); if ((destMac & LINK_LOCAL_MASK) == LINK_LOCAL_VALUE) { ctrLinkLocalDrops.increment(); if (log.isTraceEnabled()) { log.trace("Ignoring packet addressed to 802.1D/Q " + "reserved address."); } return Command.STOP; } } else if (eth.getEtherType().getValue() < 17) { log.error("Received invalid ethertype of {}.", eth.getEtherType()); return Command.STOP; } if (ignorePacketInFromSource(eth.getSourceMACAddress())) { ctrIgnoreSrcMacDrops.increment(); return Command.STOP; } // If packet-in is from a quarantine port, stop processing. NodePortTuple npt = new NodePortTuple(sw, inPort); if (quarantineQueue.contains(npt)) { ctrQuarantineDrops.increment(); return Command.STOP; } return Command.CONTINUE; }
IOFMessageListener 的 receive 方法 @Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: ctrIncoming.increment(); return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; } 主要使用了processPacketInMessage()方法 protected Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) { // get the packet-in switch. Ethernet eth = IFloodlightProviderService.bcStore. get(cntx,IFloodlightProviderService.CONTEXT_PI_PAYLOAD); if (eth.getPayload() instanceof BSN) { BSN bsn = (BSN) eth.getPayload(); if (bsn == null) return Command.STOP; if (bsn.getPayload() == null) return Command.STOP; // 可能不是 BSN LLDP,繼續常規處理 if (bsn.getPayload() instanceof LLDP == false) return Command.CONTINUE; doFloodBDDP(sw.getId(), pi, cntx); return Command.STOP; } else { return dropFilter(sw.getId(), pi, cntx); } }
設備管理器經過 PACKET-IN 消息請求瞭解設備,經過 PACKET-IN 消息獲取信息,根據實體如何創建進行分類。默認狀況下,entity classifies 使用 MAC 地址和 VLAN 來識別設備。這兩個屬性定義一個獨一無二的設備。設備管理器將瞭解其餘屬性,如 IP 地址。
信息中的一個重要的部分是設備的鏈接點,若是一個交換機接受到一個 PACKET-IN 消息,則交換機將會建立一個鏈接點,設備也會根據時間清空鏈接點,IP 地址,以及設備自己,最近看到的時間戳是用來保持清空過程的控制
IOFMessageListener 的 receive 方法
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: cntIncoming.increment(); return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; }
主要使用了processPacketInMessage()方法
protected Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx,IFloodlightProviderService.CONTEXT_PI_PAYLOAD); OFPort inPort = (pi.getVersion().compareTo(OFVersion.OF_12) < 0 ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT)); // Extract source entity information Entity srcEntity = getSourceEntityFromPacket(eth, sw.getId(), inPort); if (srcEntity == null) { cntInvalidSource.increment(); return Command.STOP; } // Learn from ARP packet for special VRRP settings. // In VRRP settings, the source MAC address and sender MAC // addresses can be different. In such cases, we need to learn // the IP to MAC mapping of the VRRP IP address. The source // entity will not have that information. Hence, a separate call // to learn devices in such cases. learnDeviceFromArpResponseData(eth, sw.getId(), inPort); // Learn/lookup device information Device srcDevice = learnDeviceByEntity(srcEntity); if (srcDevice == null) { cntNoSource.increment(); return Command.STOP; } // Store the source device in the context fcStore.put(cntx, CONTEXT_SRC_DEVICE, srcDevice); // Find the device matching the destination from the entity // classes of the source. if (eth.getDestinationMACAddress().getLong() == 0) { cntInvalidDest.increment(); return Command.STOP; } Entity dstEntity = getDestEntityFromPacket(eth); Device dstDevice = null; if (dstEntity != null) { dstDevice = findDestByEntity(srcDevice.getEntityClass(), dstEntity); if (dstDevice != null) fcStore.put(cntx, CONTEXT_DST_DEVICE, dstDevice); else cntNoDest.increment(); } else { cntNoDest.increment(); } if (logger.isTraceEnabled()) { logger.trace("Received PI: {} on switch {}, port {} *** eth={}" + " *** srcDev={} *** dstDev={} *** ", new Object[] { pi, sw.getId().toString(), inPort, eth, srcDevice, dstDevice }); } snoopDHCPClientName(eth, srcDevice); return Command.CONTINUE; }
在 Floodlight 啓動時,沒有虛擬網絡建立,這時主機之間不能相互通訊。
一旦用戶建立虛擬網絡,則主機就可以被添加。
在 PACKET-IN 消息轉發實現前,模塊將啓動。
一旦,一條 PACKET-IN 消息被接受,模塊將查看源 MAC 地址和目的 MAC 地址,若是2個 MAC 地址是同一個虛擬網絡,模塊將返回 Command.CONINUE消息,而且繼續處理流。若是MAC 地址不在同一個虛擬網絡則返回 Command.STOP 消息,並丟棄包
該模塊可用於 OpenStack 的部署
包含此模塊的默認配置文件位置:
src/main/resources/neutron.properties
IOFMessageListener 的 receive 方法
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: return processPacketIn(sw, (OFPacketIn)msg, cntx); default: break; } log.warn("Received unexpected message {}", msg); return Command.CONTINUE; }
主要使用了processPacketIn()方法
protected Command processPacketIn(IOFSwitch sw, OFPacketIn msg, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); Command ret = Command.STOP; String srcNetwork = macToGuid.get(eth.getSourceMACAddress()); // If the host is on an unknown network we deny it. // We make exceptions for ARP and DHCP. if (eth.isBroadcast() || eth.isMulticast() || isDefaultGateway(eth) || isDhcpPacket(eth)) { ret = Command.CONTINUE; } else if (srcNetwork == null) { log.trace("Blocking traffic from host {} because it is not attached to any network.", eth.getSourceMACAddress().toString()); ret = Command.STOP; } else if (oneSameNetwork(eth.getSourceMACAddress(), eth.getDestinationMACAddress())) { // if they are on the same network continue ret = Command.CONTINUE; } if (log.isTraceEnabled()) log.trace("Results for flow between {} and {} is {}", new Object[] {eth.getSourceMACAddress(), eth.getDestinationMACAddress(), ret}); /* * TODO - figure out how to still detect gateways while using * drop mods if (ret == Command.STOP) { if (!(eth.getPayload() instanceof ARP)) doDropFlow(sw, msg, cntx); } */ return ret; }
IOFMessageListener 的 receive 方法
@Override public net.floodlightcontroller.core.IListener.Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: return processPacketIn(sw, (OFPacketIn)msg, cntx); default: break; } log.warn("Received unexpected message {}", msg); return Command.CONTINUE; }
主要使用了processPacketIn()方法
private net.floodlightcontroller.core.IListener.Command processPacketIn(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); IPacket pkt = eth.getPayload(); if (eth.isBroadcast() || eth.isMulticast()) { // handle ARP for VIP if (pkt instanceof ARP) { // retrieve arp to determine target IP address ARP arpRequest = (ARP) eth.getPayload(); IPv4Address targetProtocolAddress = arpRequest.getTargetProtocolAddress(); if (vipIpToId.containsKey(targetProtocolAddress.getInt())) { String vipId = vipIpToId.get(targetProtocolAddress.getInt()); vipProxyArpReply(sw, pi, cntx, vipId); return Command.STOP; } } } else { // currently only load balance IPv4 packets - no-op for other traffic if (pkt instanceof IPv4) { IPv4 ip_pkt = (IPv4) pkt; // If match Vip and port, check pool and choose member int destIpAddress = ip_pkt.getDestinationAddress().getInt(); if (vipIpToId.containsKey(destIpAddress)){ IPClient client = new IPClient(); client.ipAddress = ip_pkt.getSourceAddress(); client.nw_proto = ip_pkt.getProtocol(); if (ip_pkt.getPayload() instanceof TCP) { TCP tcp_pkt = (TCP) ip_pkt.getPayload(); client.srcPort = tcp_pkt.getSourcePort(); client.targetPort = tcp_pkt.getDestinationPort(); } if (ip_pkt.getPayload() instanceof UDP) { UDP udp_pkt = (UDP) ip_pkt.getPayload(); client.srcPort = udp_pkt.getSourcePort(); client.targetPort = udp_pkt.getDestinationPort(); } if (ip_pkt.getPayload() instanceof ICMP) { client.srcPort = TransportPort.of(8); client.targetPort = TransportPort.of(0); } LBVip vip = vips.get(vipIpToId.get(destIpAddress)); if (vip == null) // fix dereference violations return Command.CONTINUE; LBPool pool = pools.get(vip.pickPool(client)); if (pool == null) // fix dereference violations return Command.CONTINUE; LBMember member = members.get(pool.pickMember(client)); if(member == null) //fix dereference violations return Command.CONTINUE; // for chosen member, check device manager and find and push routes, in both directions pushBidirectionalVipRoutes(sw, pi, cntx, client, member); // packet out based on table rule pushPacket(pkt, sw, pi.getBufferId(), (pi.getVersion().compareTo(OFVersion.OF_12) < 0) ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT), OFPort.TABLE, cntx, true); return Command.STOP; } } } // bypass non-load-balanced traffic for normal processing (forwarding) return Command.CONTINUE; }
IOFMessageListener 的 receive 方法
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: IRoutingDecision decision = null; if (cntx != null) { decision = RoutingDecision.rtStore.get(cntx, IRoutingDecision.CONTEXT_DECISION); } return this.processPacketInMessage(sw, (OFPacketIn) msg, decision, cntx); default: break; } return Command.CONTINUE; }
主要使用了processPacketInMessage()方法
public abstract Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, IRoutingDecision decision, FloodlightContext cntx);
全部繼承了 ForwardingBase 的子類Forwarding重寫了這個方法,實現具體的操做
@Override public Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, IRoutingDecision decision, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); // We found a routing decision (i.e. Firewall is enabled... it's the only thing that makes RoutingDecisions) if (decision != null) { if (log.isTraceEnabled()) { log.trace("Forwarding decision={} was made for PacketIn={}", decision.getRoutingAction().toString(), pi); } switch(decision.getRoutingAction()) { case NONE: // don't do anything return Command.CONTINUE; case FORWARD_OR_FLOOD: case FORWARD: doForwardFlow(sw, pi, decision, cntx, false); return Command.CONTINUE; case MULTICAST: // treat as broadcast doFlood(sw, pi, decision, cntx); return Command.CONTINUE; case DROP: doDropFlow(sw, pi, decision, cntx); return Command.CONTINUE; default: log.error("Unexpected decision made for this packet-in={}", pi, decision.getRoutingAction()); return Command.CONTINUE; } } else { // No routing decision was found. Forward to destination or flood if bcast or mcast. if (log.isTraceEnabled()) { log.trace("No decision was made for PacketIn={}, forwarding", pi); } if (eth.isBroadcast() || eth.isMulticast()) { doFlood(sw, pi, decision, cntx); } else { doForwardFlow(sw, pi, decision, cntx, false); } } return Command.CONTINUE; }