Java網絡編程(3):使用 UDP 探測局域網內特定類型的機器

記得之前咱們使用相似「快牙」這些文件分享工具的時候,一開始就是先在 手機A 上建立一個「房間」,而後鏈接上 手機A WiFi 熱點的其餘手機(即這些手機處於一個局域網內)就能夠發現到這個房間並加入到這個房間裏面,而後就能夠互相分享文件了。那沒有創建鏈接的狀況下,「發現房間」這個功能是怎麼實現的呢?
首先,既然 手機A 處於局域網中,那麼根據 手機A 當前在局域網的 IP 地址和子網掩碼,就能夠得到這個局域網內全部機器的 IP 地址 的範圍。若是在沒有創建鏈接的狀況下,手機A 就能夠給這個範圍內的每一個 IP 地址都發送一個消息 —— 那麼若是某個 IP 地址的機器(設爲 手機B)會對這個消息作出迴應,便說明 手機B手機A 的「本身人」,那麼 手機A 即可以告訴 手機B 它在當前的局域網建了一個「房間」,房間號是個啥,而後 手機B 能夠選擇是否加入到這個「房間」。java

  1. Java網絡編程(1)中,咱們已經知道可使用 NetworkInterface 來得到機器在局域網內 IP 地址;
  2. Java網絡編程(2)中,咱們知道使用 UDP,即可以在不創建鏈接的狀況下,直接向某個 IP 地址發送消息;
  3. 若是每次都是遍歷這個局域網內全部的 IP 地址,並使用 UDP 向每一個 IP 發送消息,那樣就有點麻煩了。事實上,咱們可使用廣播。每一個局域網都有一個對應的廣播地址,向廣播地址發送的數據包經過網關設備(好比路由器)時,網關設備會向局域網的每臺設備發送一份該數據包的副本。經過 IP 和子網掩碼計算廣播地址的方法簡單的形容就是 (IP地址)|(~子網掩碼)—— 將子網掩碼按位取反再和IP地址進行或運算,好比當前機器在局域網內的地址爲 192.168.1.3,子網掩碼爲 255.255.255.0(取反後爲 0.0.0.255),那麼廣播地址爲 192.168.1.255。廣播也是在不創建鏈接的狀況下就發送數據,因此廣播不能經過 TCP 實現,只能是 UDP。在 Java 中,經過 UDP 進行廣播和單播(即只向一個 IP 地址發送數據包)的程序幾乎沒有區別,只是地址由一個特定的單播地址(如 192.168.1.3)變爲了其對應的廣播地址(192.168.1.255)。

如今讓咱們來實現下面的功能:
一、Broadcaster 建立一個房間,並每隔 1 秒向局域網廣播一個特定的消息;
二、同一個局域網的 Device 若是收到了 3 次這個特定的消息,以後便向 Broadcaster 發送加入房間的消息;
三、Broadcaster 收到 Device 請求加入房間的消息後,將 Device 加入房間。git

首先定義發送者類和接收者類,他們都實現了 Runnable,分別能夠用來發送和接收:github

Sender.java編程

import java.io.IOException;
import java.net.*;

public class Sender implements Runnable {

    private static final byte[] EMPTY_DATA = new byte[0];

    private final DatagramSocket socket;
    private final SocketAddress broadcastAddress;
    private final long sendingInterval; // unit is ms

    public Sender(DatagramSocket socket,
            SocketAddress broadcastAddress, int sendingInterval) {
        this.socket = socket;
        this.broadcastAddress = broadcastAddress;
        this.sendingInterval = sendingInterval;
    }

    @Override
    public void run() {
        while (true) {
            byte[] data = getNextData();
            if (data == null || data.length == 0) {
                break;
            }

            DatagramPacket outPacket = new DatagramPacket(
                    data, data.length, broadcastAddress);
            try {
                socket.send(outPacket);
                System.out.println("Sender: Data has been sent");

                Thread.sleep(sendingInterval);
            } catch (IOException | InterruptedException ex) {
                System.err.println("Sender: Error occurred while sending packet");
                break;
            }

        }
        System.out.println("Sender: Thread is end");
    }

    /**
     * 得到下一次發送的數據<br>
     * 子類須要重寫這個方法,返回下一次要發送的數據
     *
     * @return 下一次發送的數據
     */
    public byte[] getNextData() {
        return EMPTY_DATA;
    }
}

Receiver.javasegmentfault

import java.io.IOException;
import java.net.*;

public class Receiver implements Runnable {

    private final int BUF_SIZE = 512;

    private final DatagramSocket socket;

    public Receiver(DatagramSocket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        byte[] inData = new byte[BUF_SIZE];
        DatagramPacket inPacket = new DatagramPacket(inData, inData.length);

        while (true) {
            try {
                socket.receive(inPacket);
                if (!handlePacket(inPacket)) {
                    break;
                }
            } catch (IOException ex) {
                System.out.println("Receiver: Socket was closed.");
                break;
            }
        }
        System.out.println("Receiver: Thread is end");
    }

    /**
     * 處理接收到的數據報<br>
     * 子類須要重寫這個方法,處理接收到的數據包,並返回是否繼續接收
     *
     * @param packet 接收到的數據報
     * @return 是否須要繼續接收
     */
    public boolean handlePacket(DatagramPacket packet) {
        return false;
    }
}

而後咱們定義 Device 和 Broadcaster:網絡

Device.javasocket

import java.io.IOException;
import java.net.*;

public class Device {

    private static final int DEFAULT_LISTENING_PORT = 10000;

    private final InetAddress address;
    private final int port;

    private DatagramSocket socket;

    public Device(int port) throws IOException {
        this.port = port;
        this.address = InetAddress.getLocalHost();
    }

    public Device(InetAddress address, int port) {
        this.address = address;
        this.port = port;
    }

    public void start() throws SocketException, InterruptedException {
        System.out.println("Device has been started...");
        InetAddress lanAddr = LANAddressTool.getLANAddressOnWindows();
        if (lanAddr != null) {
            System.out.println("Device: LAN Address: " + lanAddr.getHostAddress());
        }

        socket = new DatagramSocket(port);
        Receiver receiver = new Receiver(socket) {
            int recvCount = 0;

            @Override
            public boolean handlePacket(DatagramPacket packet) {
                String recvMsg = new String(packet.getData(), 0, packet.getLength());
                if ("ROOM".equals(recvMsg)) {
                    System.out.printf("Device: Received msg '%s'\n", recvMsg);
                    recvCount++;
                    if (recvCount == 3) {
                        byte[] data = "JOIN".getBytes();
                        DatagramPacket respMsg = new DatagramPacket(
                                data, data.length, packet.getSocketAddress()); // 此時 packet 包含了發送者地址和監聽端口
                        try {
                            socket.send(respMsg);
                            System.out.println("Device: Sent response 'JOIN'");
                        } catch (IOException ex) {
                            ex.printStackTrace(System.err);
                        }
                        return false; // 中止接收
                    }
                }
                return true;
            }
        };

        Thread deviceThread = new Thread(receiver);
        deviceThread.start(); // 啓動接收數據包的線程
        deviceThread.join();

        close();

        System.out.println("Device has been closed.");
    }

    public void close() {
        if (socket != null) {
            socket.close();
        }
    }

    @Override
    public String toString() {
        return "Device {" + "address=" + address + ", port=" + port + '}';
    }

    public static void main(String[] args) throws Exception {
        Device device = new Device(DEFAULT_LISTENING_PORT);
        device.start();
    }
}

Broadcaster.javaide

import java.net.*;

public class Broadcaster {

    private static final int DEFAULT_BROADCAST_PORT = 10000;

    private final InetAddress bcAddr;
    private final int bcPort;

    private DatagramSocket socket;

    public Broadcaster(InetAddress broadcastAddress, int broadcastPort) {
        this.bcAddr = broadcastAddress;
        this.bcPort = broadcastPort;
    }

    public void start() throws SocketException, InterruptedException {
        System.out.println("Broadcaster has been started...");

        final Room room = new Room("Test");
        System.out.printf("Broadcaster: Created room '%s'\n\n", room.getName());

        socket = new DatagramSocket();
        SocketAddress bcSocketAddr = new InetSocketAddress(bcAddr, bcPort);

        Sender sender = new Sender(socket, bcSocketAddr, 1000) {// 每隔 1000ms 廣播一次
            final byte[] DATA = "ROOM".getBytes();

            @Override
            public byte[] getNextData() {
                return DATA;
            }
        };

        Receiver recver = new Receiver(socket) {

            @Override
            public boolean handlePacket(DatagramPacket packet) {
                String recvMsg = new String(packet.getData(), 0, packet.getLength());
                if ("JOIN".equals(recvMsg)) {
                    Device device = new Device(packet.getAddress(), packet.getPort());
                    room.addDevice(device);
                    room.listDevices();
                }
                return true; // 一直接收
            }
        };

        Thread senderThread = new Thread(sender);
        Thread recverThread = new Thread(recver);
        senderThread.start(); // 啓動發送(廣播)數據包的線程
        recverThread.start(); // 啓動接收數據包的線程

        senderThread.join();
        recverThread.join();

        close();
    }

    public void close() {
        if (socket != null) {
            socket.close();
        }
    }

    public static void main(String[] args) throws Exception {
        InetAddress bcAddr = LANAddressTool.getLANBroadcastAddressOnWindows();

        if (bcAddr != null) {
            System.out.println("Broadcast Address: " + bcAddr.getHostAddress());
            Broadcaster broadcaster = new Broadcaster(bcAddr, DEFAULT_BROADCAST_PORT);
            broadcaster.start();
        } else {
            System.out.println("Please check your LAN~");
        }
    }
}

Room.java工具

import java.util.*;

public class Room {

    private final String name;
    private final List<Device> devices;

    public Room(String name) {
        this.name = name;
        this.devices = new ArrayList<>();
    }

    public boolean addDevice(Device device) {
        return devices.add(device);
    }

    public String getName() {
        return name;
    }

    public void listDevices() {
        System.out.printf("Room (%s), current devices:\n", name);
        for (Device device : devices) {
            System.out.println(device);
        }
    }
}

(完整的 Demo 能夠訪問:https://github.com/mizhoux/LA...this

咱們將這個 Demo 打包成 jar,而後開始運行:
一、首先咱們在本機上啓動 Broadcaster:
啓動 Broadcaster

二、咱們將本機做爲一個 Device 啓動:
將本機做爲一個 Device 啓動

能夠看到此時 Broadcaster 建立的房間已經有了一個 Device:
建立的房間已經有了一個 Device

三、咱們啓動局域網內的另一臺設備:
啓動局域網內的另一臺設備

此時 Broadcaster 建立的房間便有兩個 Device:
建立的房間已經有了兩個 Device

四、再啓動局域網內的一臺設備:
再啓動局域網內的一臺設備

此時房間裏則有三個 Device:
房間裏已經有了三個 Device

由於 UDP 在不須要創建鏈接的基礎上就能夠發送消息,因此它能夠方便的用來探測局域網內特定類型的機器 —— 這是個頗有用的功能 —— 又好比一個集羣當中可能會忽然有機器宕機,爲了檢測這一事件的發生,就須要集羣 master機器 每隔必定的時間向每臺機器發送若干心跳檢測包,若是有回覆說明機器正常,不然說明該機器出現了故障,此時不須要鏈接並且高效的 UDP 就十分適合這種場合。固然,咱們始終仍是要考慮到 UDP 是不可靠的協議,它並不能代替 TCP —— 永遠須要根據環境,來選擇最合適的技術。

相關文章
相關標籤/搜索