Floodlight 源碼解讀:FloodlightProvider

FloodlightProvider

  1. 處理交換機之間的鏈接並將 OpenFlow 的消息轉化成其餘模塊能夠監聽的時間
  2. 決定某些特定的 OpenFLow 消息(即 PacketIn,FlowRemove,PortStatus 等)被分派到該偵聽消息的模塊的順序,模塊能夠決定容許該消息進入下一個監聽對象或者中止處理消息

FloodlightProvider 如何工做?

FloodlightProvider 使用 Netty 庫來處理到交換機的線程和鏈接。
每一個 OpenFlow 消息將經過一個 Netty 的線程進行處理,並執行與全部模塊的消息相關聯的全部邏輯
其餘模塊也能夠註冊相似交換機鏈接或斷開和端口狀態通知特定時間。
爲了使模塊註冊爲基於 OpenFlow 消息的,必須實現 IOFMessageListener 接口java

要監聽 OpenFlow 消息,要先向 FloodlightProvider 註冊
調用 IFloodlightProviderService(具體由 Controller 類實現)的 addOFMessageListener 方法進行註冊訂閱
核心工做是在 ListenerDispatcher 類來完成。
每次增長觀察者都會判斷是不是終結點(也就是不被其餘的 Listener 所依賴),由於最終肯定這些觀察者順序的時候就是由這些終結點開始往前進行 DFS 遍歷獲得node

Controller中實現 IFloodlightProviderService 的方法

@Override
    public synchronized void addOFMessageListener(OFType type, IOFMessageListener listener) {
        //先判斷與type對應的 ListenerDispatcher對象是否存在
        ListenerDispatcher<OFType, IOFMessageListener> ldd =
            messageListeners.get(type);
        if (ldd == null) {
            ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
            messageListeners.put(type, ldd);
        }
      //註冊監聽type這個消息;
        ldd.addListener(type, listener);
    }

ListenerDispatcher 維護這些觀察者,有依賴關係

volatile List<T> listeners = new ArrayList<T>();

//每一個OF msg都有惟一的ListenerDispatcher對象,觀察者存在listeners鏈表中緩存

private boolean ispre(U type, T l1, T l2) {
        return (l2.isCallbackOrderingPrereq(type, l1.getName()) ||
                l1.isCallbackOrderingPostreq(type, l2.getName()));
    }

返回兩個傳入的監聽器的順序安全

public void addListener(U type, T listener) {
        List<T> newlisteners = new ArrayList<T>();
        if (listeners != null)
            newlisteners.addAll(listeners);
        newlisteners.add(listener);
        // Find nodes without outgoing edges
        // 查找沒有出邊的節點
        List<T> terminals = new ArrayList<T>();
        for (T i : newlisteners) {
            boolean isterm = true;
            for (T j : newlisteners) {
                if (ispre(type, i, j)) {
                    //兩個都不關心先後順序的時候
                    isterm = false;
                    break;
                }
            }
            if (isterm) {
                //關乎有先後順序的監聽模塊存入
                terminals.add(i);
            }
        }
        if (terminals.size() == 0) {
            logger.error("No listener dependency solution: " +
                         "No listeners without incoming dependencies");
            listeners = newlisteners;
            return;
        }
        // visit depth-first traversing in the opposite order from
        // the dependencies.  Note we will not generally detect cycles
        /**
         * 以相反順序訪問深度優先遍歷依賴。 注意咱們一般不會檢測週期
         */
        HashSet<T> visited = new HashSet<T>();
        List<T> ordering = new ArrayList<T>();
        for (T term : terminals) {
            //進行排序
            visit(newlisteners, type, visited, ordering, term);
        }
        listeners = ordering;
    }

private void visit(List<T> newlisteners, U type, HashSet<T> visited,
                       List<T> ordering, T listener) {
        if (!visited.contains(listener)) {
            visited.add(listener);
            for (T i : newlisteners) {
                if (ispre(type, i, listener)) {
                    visit(newlisteners, type, visited, ordering, i);
                }
            }
            ordering.add(listener);
            //
        }
    }

監聽器具備的方法

public interface IListener<T>

    public enum Command {
        CONTINUE, STOP
    }
狀態值,用來判斷是否繼續執行

    public String getName(); 
    //用來判斷 name 的這個模塊是否要在當前對象以前執行
    public boolean isCallbackOrderingPrereq(T type, String name);
    //用來判斷 name 的這個模塊是否要在當前對象以後執行
    public boolean isCallbackOrderingPostreq(T type, String name);

IOFMessageListener接口繼承了 IListener 接口,同時定義了 receive 方法

    public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx);
返回 CONTINUE 或者 STOP,繼續看每一個繼承這個接口的模塊的重寫

查看繼承了 IOFMessageListener 的Type Hierarchy

TopologyManager 模塊的IOFMessageListener 重寫的方法:網絡

@Override
    public String getName() {
        return MODULE_NAME; //此處爲 topology,每一個模塊都有本身的 MODULE_NAME
    }

    @Override
    public boolean isCallbackOrderingPrereq(OFType type, String name) {
        //今後處能夠看出,在執行這個模塊以前,須要先執行 MODULE_NAME 爲 linkiscovery 的模塊
        return "linkdiscovery".equals(name);
    }

    @Override
    public boolean isCallbackOrderingPostreq(OFType type, String name) {
        return false;
    }

    @Override
    public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
        switch (msg.getType()) {
        case PACKET_IN:
            ctrIncoming.increment();//計數器,加一
            //調用這裏的執行方法
            return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx);
        default:
            break;
        }

        return Command.CONTINUE;
    }

經過 Type Hierarchy 能夠找到Packet-In消息處理順序的幾個模塊
數據結構

FloodlightContextStore 數據結構

  • FloodlightContextStore 表明的是一種緩存模型(利用的是ConcurrentHashMap,線程安全的 HashMap)
  • 裏面存儲的是上下文相關的對象,可以根據相應的key獲得具體的 Object
  • 存在的意義是Floodlight中註冊監聽某個事件的listener能夠在被調用的時候直接從中取出上下文信息(context information)

基本數據結構,這是一個上下文對象,Floodlight代碼監聽器能夠註冊它,稍後能夠檢索與事件相關聯的上下文信息app

public class FloodlightContext {
        protected ConcurrentHashMap<String, Object> storage =
                new ConcurrentHashMap<String, Object>();

        public ConcurrentHashMap<String, Object> getStorage() {
            return storage;
        }
    }

建立了一個 HashMap storage,tcp

public class FloodlightContextStore<V> {
        
        @SuppressWarnings("unchecked")
        public V get(FloodlightContext bc, String key) {
            return (V)bc.storage.get(key);
        }
        
        public void put(FloodlightContext bc, String key, V value) {
            bc.storage.put(key, value);
        }
        
        public void remove(FloodlightContext bc, String key) {
            bc.storage.remove(key);
        }
    }

一個FloodlightContextStore對象,可用於PACKET-IN有效內容,消息對象是Ethernet類型ide

public static final FloodlightContextStore<Ethernet> bcStore =
            new FloodlightContextStore<Ethernet>();

LinkDiscoveryManager 模塊

  • 連接發現服務負責發現和維護 OpenFlow 網絡中的網絡鏈接的狀態

IOFMessageListener 的 receive 方法oop

@Override
    public Command receive(IOFSwitch sw, OFMessage msg,
            FloodlightContext cntx) {
        switch (msg.getType()) {
        case PACKET_IN:
            ctrIncoming.increment();
            return this.handlePacketIn(sw.getId(), (OFPacketIn) msg,
                    cntx);
        default:
            break;
        }
        return Command.CONTINUE;
    }

主要使用了 handlePacketIn()方法

protected Command handlePacketIn(DatapathId sw, OFPacketIn pi,
            FloodlightContext cntx) {
        //提取 Packet-In 的有效分組內容
        Ethernet eth = IFloodlightProviderService.bcStore.get(cntx,
                IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
        OFPort inPort = (pi.getVersion().compareTo(OFVersion.OF_12) < 0 ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT));
        if (eth.getPayload() instanceof BSN) {
            BSN bsn = (BSN) eth.getPayload();
            if (bsn == null) return Command.STOP;
            if (bsn.getPayload() == null) return Command.STOP;
            // It could be a packet other than BSN LLDP, therefore
            // continue with the regular processing.
            // 它能夠是除BSN LLDP以外的分組,所以繼續進行常規處理。
            if (bsn.getPayload() instanceof LLDP == false)
                return Command.CONTINUE;
            return handleLldp((LLDP) bsn.getPayload(), sw, inPort, false, cntx);
        } else if (eth.getPayload() instanceof LLDP) {
            return handleLldp((LLDP) eth.getPayload(), sw, inPort, true, cntx);
        } else if (eth.getEtherType().getValue() < 1536 && eth.getEtherType().getValue() >= 17) {
            long destMac = eth.getDestinationMACAddress().getLong();
            if ((destMac & LINK_LOCAL_MASK) == LINK_LOCAL_VALUE) {
                ctrLinkLocalDrops.increment();
                if (log.isTraceEnabled()) {
                    log.trace("Ignoring packet addressed to 802.1D/Q "
                            + "reserved address.");
                }
                return Command.STOP;
            }
        } else if (eth.getEtherType().getValue() < 17) {
            log.error("Received invalid ethertype of {}.", eth.getEtherType());
            return Command.STOP;
        }

        if (ignorePacketInFromSource(eth.getSourceMACAddress())) {
            ctrIgnoreSrcMacDrops.increment();
            return Command.STOP;
        }

        // If packet-in is from a quarantine port, stop processing.
        NodePortTuple npt = new NodePortTuple(sw, inPort);
        if (quarantineQueue.contains(npt)) {
            ctrQuarantineDrops.increment();
            return Command.STOP;
        }

        return Command.CONTINUE;
    }

TopolopyManager

  • 爲控制器維護拓撲信息,以及在網絡中尋找路由
IOFMessageListener 的 receive 方法

    @Override
    public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
        switch (msg.getType()) {
        case PACKET_IN:
            ctrIncoming.increment();
            return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx);
        default:
            break;
        }

        return Command.CONTINUE;
    }
主要使用了processPacketInMessage()方法

    protected Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) {
        // get the packet-in switch.
        Ethernet eth =
                IFloodlightProviderService.bcStore.
                get(cntx,IFloodlightProviderService.CONTEXT_PI_PAYLOAD);

        if (eth.getPayload() instanceof BSN) {
            BSN bsn = (BSN) eth.getPayload();
            if (bsn == null) return Command.STOP;
            if (bsn.getPayload() == null) return Command.STOP;

            // 可能不是 BSN LLDP,繼續常規處理
            if (bsn.getPayload() instanceof LLDP == false)
                return Command.CONTINUE;

            doFloodBDDP(sw.getId(), pi, cntx);
            return Command.STOP;
        } else {
            return dropFilter(sw.getId(), pi, cntx);
        }
    }

DeviceManagerImpl

  • DeviceManager基於在網絡中看到的MAC地址建立設備
  • 它跟蹤映射到設備的任何網絡地址及其在網絡中的位置

設備管理器經過 PACKET-IN 消息請求瞭解設備,經過 PACKET-IN 消息獲取信息,根據實體如何創建進行分類。默認狀況下,entity classifies 使用 MAC 地址和 VLAN 來識別設備。這兩個屬性定義一個獨一無二的設備。設備管理器將瞭解其餘屬性,如 IP 地址。
信息中的一個重要的部分是設備的鏈接點,若是一個交換機接受到一個 PACKET-IN 消息,則交換機將會建立一個鏈接點,設備也會根據時間清空鏈接點,IP 地址,以及設備自己,最近看到的時間戳是用來保持清空過程的控制

IOFMessageListener 的 receive 方法

@Override
    public Command receive(IOFSwitch sw, OFMessage msg,
            FloodlightContext cntx) {
        switch (msg.getType()) {
        case PACKET_IN:
            cntIncoming.increment();
            return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx);
        default:
            break;
        }
        return Command.CONTINUE;
    }

主要使用了processPacketInMessage()方法

protected Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) {
        Ethernet eth = IFloodlightProviderService.bcStore.get(cntx,IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
        OFPort inPort = (pi.getVersion().compareTo(OFVersion.OF_12) < 0 ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT));
        // Extract source entity information
        Entity srcEntity = getSourceEntityFromPacket(eth, sw.getId(), inPort);
        if (srcEntity == null) {
            cntInvalidSource.increment();
            return Command.STOP;
        }

        // Learn from ARP packet for special VRRP settings.
        // In VRRP settings, the source MAC address and sender MAC
        // addresses can be different.  In such cases, we need to learn
        // the IP to MAC mapping of the VRRP IP address.  The source
        // entity will not have that information.  Hence, a separate call
        // to learn devices in such cases.
        learnDeviceFromArpResponseData(eth, sw.getId(), inPort);

        // Learn/lookup device information
        Device srcDevice = learnDeviceByEntity(srcEntity);
        if (srcDevice == null) {
            cntNoSource.increment();
            return Command.STOP;
        }

        // Store the source device in the context
        fcStore.put(cntx, CONTEXT_SRC_DEVICE, srcDevice);

        // Find the device matching the destination from the entity
        // classes of the source.
        if (eth.getDestinationMACAddress().getLong() == 0) {
            cntInvalidDest.increment();
            return Command.STOP;
        }
        Entity dstEntity = getDestEntityFromPacket(eth);
        Device dstDevice = null;
        if (dstEntity != null) {
            dstDevice = findDestByEntity(srcDevice.getEntityClass(), dstEntity);
            if (dstDevice != null)
                fcStore.put(cntx, CONTEXT_DST_DEVICE, dstDevice);
            else
                cntNoDest.increment();
        } else {
            cntNoDest.increment();
        }

        if (logger.isTraceEnabled()) {
            logger.trace("Received PI: {} on switch {}, port {} *** eth={}" +
                    " *** srcDev={} *** dstDev={} *** ",
                    new Object[] { pi, sw.getId().toString(), inPort, eth,
                    srcDevice, dstDevice });
        }

        snoopDHCPClientName(eth, srcDevice);

        return Command.CONTINUE;
    }

vitualNetworkFilter

  • 虛擬網絡過濾器模塊是基於虛擬化網絡的數據鏈路層,它容許你在獨立的數據鏈路層上建立多個邏輯鏈路
  • 如果使用 floodlightdefault.properties 則沒有這個模塊

如何工做

在 Floodlight 啓動時,沒有虛擬網絡建立,這時主機之間不能相互通訊。
一旦用戶建立虛擬網絡,則主機就可以被添加。
在 PACKET-IN 消息轉發實現前,模塊將啓動。
一旦,一條 PACKET-IN 消息被接受,模塊將查看源 MAC 地址和目的 MAC 地址,若是2個 MAC 地址是同一個虛擬網絡,模塊將返回 Command.CONINUE消息,而且繼續處理流。若是MAC 地址不在同一個虛擬網絡則返回 Command.STOP 消息,並丟棄包

限制

  • 必須在同一個物理數據鏈路層中
  • 每一個虛擬網絡只能擁有一個網關()【一個網關可被多個虛擬網絡共享】
  • 多播和廣播沒有被隔離
  • 容許全部的 DHCP 路徑

配置

該模塊可用於 OpenStack 的部署
包含此模塊的默認配置文件位置:
src/main/resources/neutron.properties

IOFMessageListener 的 receive 方法

@Override
    public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
        switch (msg.getType()) {
        case PACKET_IN:
            return processPacketIn(sw, (OFPacketIn)msg, cntx);
        default:
            break;
        }
        log.warn("Received unexpected message {}", msg);
        return Command.CONTINUE;
    }

主要使用了processPacketIn()方法

protected Command processPacketIn(IOFSwitch sw, OFPacketIn msg, FloodlightContext cntx) {
        Ethernet eth = IFloodlightProviderService.bcStore.get(cntx,
                IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
        Command ret = Command.STOP;
        String srcNetwork = macToGuid.get(eth.getSourceMACAddress());
        // If the host is on an unknown network we deny it.
        // We make exceptions for ARP and DHCP.
        if (eth.isBroadcast() || eth.isMulticast() || isDefaultGateway(eth) || isDhcpPacket(eth)) {
            ret = Command.CONTINUE;
        } else if (srcNetwork == null) {
            log.trace("Blocking traffic from host {} because it is not attached to any network.",
                    eth.getSourceMACAddress().toString());
            ret = Command.STOP;
        } else if (oneSameNetwork(eth.getSourceMACAddress(), eth.getDestinationMACAddress())) {
            // if they are on the same network continue
            ret = Command.CONTINUE;
        }

        if (log.isTraceEnabled())
            log.trace("Results for flow between {} and {} is {}",
                    new Object[] {eth.getSourceMACAddress(), eth.getDestinationMACAddress(), ret});
        /*
         * TODO - figure out how to still detect gateways while using
         * drop mods
        if (ret == Command.STOP) {
            if (!(eth.getPayload() instanceof ARP))
                doDropFlow(sw, msg, cntx);
        }
         */
        return ret;
    }

LoadBalancer

IOFMessageListener 的 receive 方法

@Override
    public net.floodlightcontroller.core.IListener.Command
            receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
        switch (msg.getType()) {
            case PACKET_IN:
                return processPacketIn(sw, (OFPacketIn)msg, cntx);
            default:
                break;
        }
        log.warn("Received unexpected message {}", msg);
        return Command.CONTINUE;
    }

主要使用了processPacketIn()方法

private net.floodlightcontroller.core.IListener.Command processPacketIn(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) {
        
        Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
        IPacket pkt = eth.getPayload(); 
 
        if (eth.isBroadcast() || eth.isMulticast()) {
            // handle ARP for VIP
            if (pkt instanceof ARP) {
                // retrieve arp to determine target IP address                                                       
                ARP arpRequest = (ARP) eth.getPayload();

                IPv4Address targetProtocolAddress = arpRequest.getTargetProtocolAddress();

                if (vipIpToId.containsKey(targetProtocolAddress.getInt())) {
                    String vipId = vipIpToId.get(targetProtocolAddress.getInt());
                    vipProxyArpReply(sw, pi, cntx, vipId);
                    return Command.STOP;
                }
            }
        } else {
            // currently only load balance IPv4 packets - no-op for other traffic 
            if (pkt instanceof IPv4) {
                IPv4 ip_pkt = (IPv4) pkt;
                
                // If match Vip and port, check pool and choose member
                int destIpAddress = ip_pkt.getDestinationAddress().getInt();
                
                if (vipIpToId.containsKey(destIpAddress)){
                    IPClient client = new IPClient();
                    client.ipAddress = ip_pkt.getSourceAddress();
                    client.nw_proto = ip_pkt.getProtocol();
                    if (ip_pkt.getPayload() instanceof TCP) {
                        TCP tcp_pkt = (TCP) ip_pkt.getPayload();
                        client.srcPort = tcp_pkt.getSourcePort();
                        client.targetPort = tcp_pkt.getDestinationPort();
                    }
                    if (ip_pkt.getPayload() instanceof UDP) {
                        UDP udp_pkt = (UDP) ip_pkt.getPayload();
                        client.srcPort = udp_pkt.getSourcePort();
                        client.targetPort = udp_pkt.getDestinationPort();
                    }
                    if (ip_pkt.getPayload() instanceof ICMP) {
                        client.srcPort = TransportPort.of(8); 
                        client.targetPort = TransportPort.of(0); 
                    }
                    
                    LBVip vip = vips.get(vipIpToId.get(destIpAddress));
                    if (vip == null)            // fix dereference violations           
                        return Command.CONTINUE;
                    LBPool pool = pools.get(vip.pickPool(client));
                    if (pool == null)            // fix dereference violations
                        return Command.CONTINUE;
                    LBMember member = members.get(pool.pickMember(client));
                    if(member == null)            //fix dereference violations
                        return Command.CONTINUE;
                    
                    // for chosen member, check device manager and find and push routes, in both directions                    
                    pushBidirectionalVipRoutes(sw, pi, cntx, client, member);
                   
                    // packet out based on table rule
                    pushPacket(pkt, sw, pi.getBufferId(), (pi.getVersion().compareTo(OFVersion.OF_12) < 0) ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT), OFPort.TABLE,
                                cntx, true);

                    return Command.STOP;
                }
            }
        }
        // bypass non-load-balanced traffic for normal processing (forwarding)
        return Command.CONTINUE;
    }

ForwardingBase

IOFMessageListener 的 receive 方法

@Override
    public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
        switch (msg.getType()) {
        case PACKET_IN:
            IRoutingDecision decision = null;
            if (cntx != null) {
                decision = RoutingDecision.rtStore.get(cntx, IRoutingDecision.CONTEXT_DECISION);
            }

            return this.processPacketInMessage(sw, (OFPacketIn) msg, decision, cntx);
        default:
            break;
        }
        return Command.CONTINUE;
    }

主要使用了processPacketInMessage()方法

public abstract Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, 
            IRoutingDecision decision, FloodlightContext cntx);

全部繼承了 ForwardingBase 的子類Forwarding重寫了這個方法,實現具體的操做

@Override
    public Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, IRoutingDecision decision, FloodlightContext cntx) {
        Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
        // We found a routing decision (i.e. Firewall is enabled... it's the only thing that makes RoutingDecisions)
        if (decision != null) {
            if (log.isTraceEnabled()) {
                log.trace("Forwarding decision={} was made for PacketIn={}", decision.getRoutingAction().toString(), pi);
            }

            switch(decision.getRoutingAction()) {
            case NONE:
                // don't do anything
                return Command.CONTINUE;
            case FORWARD_OR_FLOOD:
            case FORWARD:
                doForwardFlow(sw, pi, decision, cntx, false);
                return Command.CONTINUE;
            case MULTICAST:
                // treat as broadcast
                doFlood(sw, pi, decision, cntx);
                return Command.CONTINUE;
            case DROP:
                doDropFlow(sw, pi, decision, cntx);
                return Command.CONTINUE;
            default:
                log.error("Unexpected decision made for this packet-in={}", pi, decision.getRoutingAction());
                return Command.CONTINUE;
            }
        } else { // No routing decision was found. Forward to destination or flood if bcast or mcast.
            if (log.isTraceEnabled()) {
                log.trace("No decision was made for PacketIn={}, forwarding", pi);
            }

            if (eth.isBroadcast() || eth.isMulticast()) {
                doFlood(sw, pi, decision, cntx);
            } else {
                doForwardFlow(sw, pi, decision, cntx, false);
            }
        }

        return Command.CONTINUE;
    }

PACKET-IN

相關文章
相關標籤/搜索