Spark Streaming架構圖java
the micro-batch architecture of Spark Streaming apache
Execution of Spark Streaming within Spark’s components 架構
1:修改log4j的日誌級別爲error,否則會打印太多的日誌socket
2:將以下兩個類導出一個jaride
3: nc -lk ip portspa
3: 使用spark-submit提交任務 .net
spark-submit --class com.spark.streaming.SimpleDemo test.jar
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.ConnectException; import java.net.Socket; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; /** * DateTime: 2015年6月18日 下午5:08:20 * */ public class JavaCustomReceiver extends Receiver<String> { String host = null; int port = -1; Socket socket = null; BufferedReader reader = null; public JavaCustomReceiver(String host_, int port_) { super(StorageLevel.MEMORY_ONLY()); host = host_; port = port_; } public void onStart() { // Start the thread that receives data over a connection new Thread() { @Override public void run() { receive(); } }.start(); } public void onStop() { try { if (socket != null) { socket.close(); } if (reader != null) { reader.close(); } } catch (IOException e) { e.printStackTrace(); } } /** Create a socket connection and receive data until receiver is stopped */ private void receive() { String userInput = null; try { // connect to the server socket = new Socket(host, port); reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); // Until stopped or connection broken continue reading while (!isStopped() && (userInput = reader.readLine()) != null) { System.out.println("Received data '" + userInput + "'"); store(userInput); } reader.close(); socket.close(); // Restart in an attempt to connect again when server is active // again restart("Trying to connect again"); } catch (ConnectException ce) { // restart if could not connect to server restart("Could not connect", ce); } catch (Throwable t) { // restart if there is any other error restart("Error receiving data", t); } } }