Spark Streaming 基本概念

介紹

Spark Streaming架構圖java

the micro-batch architecture of Spark Streaming apache

Execution of Spark Streaming within Spark’s components 架構

JAVA代碼示例

執行方式

1:修改log4j的日誌級別爲error,否則會打印太多的日誌socket

2:將以下兩個類導出一個jaride

3: nc -lk ip portspa

3: 使用spark-submit提交任務 .net

spark-submit  --class com.spark.streaming.SimpleDemo test.jar

JAVA代碼

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.Socket;

import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;


/**
 * DateTime: 2015年6月18日 下午5:08:20
 *
 */
public class JavaCustomReceiver extends Receiver<String> {

    String host = null;
    int port = -1;
    Socket socket = null;
    BufferedReader reader = null;


    public JavaCustomReceiver(String host_, int port_) {
        super(StorageLevel.MEMORY_ONLY());
        host = host_;
        port = port_;
    }


    public void onStart() {
        // Start the thread that receives data over a connection
        new Thread() {
            @Override
            public void run() {
                receive();
            }
        }.start();
    }


    public void onStop() {
        try {
            if (socket != null) {
                socket.close();
            }
            if (reader != null) {
                reader.close();
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }


    /** Create a socket connection and receive data until receiver is stopped */
    private void receive() {
        String userInput = null;
        try {
            // connect to the server
            socket = new Socket(host, port);
            reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            // Until stopped or connection broken continue reading
            while (!isStopped() && (userInput = reader.readLine()) != null) {
                System.out.println("Received data '" + userInput + "'");
                store(userInput);
            }
            reader.close();
            socket.close();

            // Restart in an attempt to connect again when server is active
            // again
            restart("Trying to connect again");
        }
        catch (ConnectException ce) {
            // restart if could not connect to server
            restart("Could not connect", ce);
        }
        catch (Throwable t) {
            // restart if there is any other error
            restart("Error receiving data", t);
        }
    }

}
相關文章
相關標籤/搜索