spark數據監控實戰

版權申明:轉載請註明出處。
文章來源:http://bigdataer.net/?p=248html

排版亂?請移步原文得到更好的閱讀體驗java

 

1.概述

數據準確性,穩定性,時效性是數據開發中須要重點關注的,通常稱之爲數據質量。保證數據質量每每會佔用數據開發工程師的不少精力,因此一個好的數據監控系統或者一個合理的數據監控方案對於數據質量的保證相當重要。本文將展現一種實際生產中使用過的數據監控方案,並給出相關的代碼。
數據計算採用spark,報警形式採用郵件報警。涉及到的內容有使用springMVC構建一個支持發送html和文件的郵件接口;在spark計算任意過程當中調用郵件接口;在spark中經過郵件接口發送hdfs上的結果數據。web

2.架構圖

架構圖

說明:一般狀況下公司內部的hadoop/spark集羣和外網隔離,直接在spark做業裏發送郵件顯然不現實。因此須要構建一個郵件發送服務,暴露內網接口給spark做業調用,同時也能訪問外網,把郵件發送到用戶郵箱中。spring

3.基於springMVC構建的郵件服務

3.1 設計目標

(1)支持自定義郵件發件人暱稱,主題,內容等
(2)支持發送html以及文件apache

3.2技術方案

springMVC,JavaMailjson

3.3核心代碼

郵件發送工具類EmailSendUtilapi

java    98行tomcat

import java.io.File;
import java.util.Date;
import java.util.Properties;

import javax.activation.CommandMap;
import javax.activation.DataHandler;
import javax.activation.FileDataSource;
import javax.activation.MailcapCommandMap;
import javax.mail.Authenticator;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;

import org.springframework.stereotype.Service;

@Service
public class EmailSendUtil {

public void sendEmail(String nick,String subject,String content,String receivers,File file) throws Exception {
		
		Properties proper = new Properties();
		proper.setProperty("mail.transport.protocol", "smtp");
		proper.setProperty("mail.stmp.auth", "true");
		
		//帳號密碼認證
		Session session = Session.getInstance(proper);
		MimeMessage msg = new MimeMessage(session);
		
		try {
			
			MailcapCommandMap mc = (MailcapCommandMap) CommandMap.getDefaultCommandMap();
	        mc.addMailcap("text/html;; x-Java-content-handler=com.sun.mail.handlers.text_html");
	        mc.addMailcap("text/xml;; x-java-content-handler=com.sun.mail.handlers.text_xml");
	        mc.addMailcap("text/plain;; x-java-content-handler=com.sun.mail.handlers.text_plain");
	        mc.addMailcap("multipart/*;; x-java-content-handler=com.sun.mail.handlers.multipart_mixed");
	        mc.addMailcap("message/rfc822;; x-java-content-handler=com.sun.mail.handlers.message_rfc822");
	        CommandMap.setDefaultCommandMap(mc);
			//設置發件人
			String nickname=javax.mail.internet.MimeUtility.encodeText(nick); 
			msg.setFrom(new InternetAddress(nickname+"發件人郵箱地址"));
			//設置收件人
			msg.setRecipients(Message.RecipientType.TO, InternetAddress.parse(receivers));
			//設置郵件主題
			msg.setSubject(subject);
			MimeMultipart  msgMimePart = new MimeMultipart ("mixed");
			//正文內容
			MimeBodyPart contents = getBodyPart(content);
			msgMimePart.addBodyPart(contents);
			//附件
			if(file!=null){
				MimeBodyPart attachment = getAttachPart(file);
				msgMimePart.addBodyPart(attachment);
			}
			
			//設置郵件消息體
			msg.setContent(msgMimePart);
			//設置發送時間
			msg.setSentDate(new Date());
			msg.saveChanges();
			
			Transport trans=session.getTransport();
			trans.connect("smtp.exmail.qq.com", "發件人郵箱地址", "密碼");
			trans.sendMessage(msg, msg.getRecipients(Message.RecipientType.TO));
			trans.close();
		} catch (Exception e) {
			throw new Exception("email send error:"+e.getMessage());
		}finally{
			if(file!=null&&file.exists()){
				file.delete();
			}
		}
	}

	private static MimeBodyPart getBodyPart(String content) throws MessagingException{
		MimeBodyPart body = new MimeBodyPart();
		MimeMultipart mmp = new MimeMultipart("related");
		MimeBodyPart contents = new MimeBodyPart();
		contents.setContent(content, "text/html;charset=utf-8");
		mmp.addBodyPart(contents);
		body.setContent(mmp);
		return body;
	}
	
	private static MimeBodyPart getAttachPart(File file) throws MessagingException{
		MimeBodyPart attach = new MimeBodyPart();
		FileDataSource fds = new FileDataSource(file);
		attach.setDataHandler(new DataHandler(fds));
		attach.setFileName(file.getName());
		return attach;
	}
}

controller類,寫的比較粗糙,提供了兩個接口,一個發純html,一個能夠發送混合格式的郵件。session

java    47行架構

import java.io.File;

import net.bigdataer.api.weixin.utils.EmailSendUtil;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;

@Controller
@RequestMapping("/email_api")
public class EmailSendController extends DmBaseController{

	@Autowired
	private EmailSendUtil est;
	
	//只發送html
	@RequestMapping("/send")
	public @ResponseBody String sendEmail(@RequestParam("nickname") String nickname,@RequestParam("subject") String subject,@RequestParam("content") String content,@RequestParam("receivers") String receivers){
		String result = "{\"status\":\"0\",\"msg\":\"success\"}";
		try{
			est.sendEmail(nickname,subject, content, receivers,null);
		}catch(Exception e){
			result = "{\"status\":\"-1\",\"msg\":\""+e.getMessage()+"\"}";
		}
		return result;
	}
	
	//發送混合格式的郵件
	@RequestMapping("/sendattachment")
	public @ResponseBody String sendAttachment(@RequestParam("nickname") String nickname,@RequestParam("subject") String subject,@RequestParam("content") String content,@RequestParam("receivers") String receivers,@RequestParam("attachment") MultipartFile attachment){
		String result = "{\"status\":\"0\",\"msg\":\"success\"}";
		File file = new File("/opt/soft/tomcat/temp/"+attachment.getOriginalFilename());
		try {
			attachment.transferTo(file); 
			est.sendEmail(nickname,subject, content, receivers,file);
		} catch (Exception e) { 
			result = "{\"status\":\"-1\",\"msg\":\""+e.getMessage()+"\"}";
		}
		
		return result;
	}
	
}

4.spark做業調用郵件接口

4.1封裝一個接口調用工具類

這個類提供了對https及http協議的訪問,同時支持get和post請求。在本例中沒有使用到get請求。
另外這個類依賴於httpclient的相關jar包。我這裏使用的jarmaven依賴以下:

xml    11行

<dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.5.2</version>
		</dependency>

注意:由於spark源碼中也使用了http-core的包,你的引用可能會和spark集羣中自己的包衝突致使拋找不到某個類或者沒有某個方法的異常,須要根據實際狀況調整。不過代碼大致上都同樣,下面的代碼能夠參考

java    174行

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpResponseException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.List;

/**
 * 服務請求類
 * 封裝了post,get請求
 * 同時支持http和https方式
 * @author liuxuecheng
 *
 */
public class HttpClientUtil { 

	private static final Log logger = LogFactory.getLog(HttpClientUtil.class);
	//設置超時(單位ms)
	private static int TIME_OUT = 3000;
	private static CloseableHttpClient client = null;
	
	private static TrustManager  trustManager = new X509TrustManager() { 
	    @Override 
	    public X509Certificate[] getAcceptedIssuers() { 
	        return null; 
	    } 
		@Override
		public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
		}
		@Override
		public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
		}
	};
	
	static{
		
		try{
		//請求配置
		RequestConfig config = RequestConfig.custom()
				.setConnectTimeout(TIME_OUT)
				.setConnectionRequestTimeout(TIME_OUT)
				.setSocketTimeout(TIME_OUT)
				.build();
		
		//訪問https站點相關
		SSLContext context = SSLContext.getInstance("TLS");
		context.init(null, new TrustManager[]{trustManager}, null);
		SSLConnectionSocketFactory scsf = new SSLConnectionSocketFactory(context, NoopHostnameVerifier.INSTANCE);
		//註冊
		Registry<ConnectionSocketFactory> registry = RegistryBuilder
				.<ConnectionSocketFactory>create()
				.register("http", PlainConnectionSocketFactory.INSTANCE)
				.register("https", scsf)
				.build();
		
		//鏈接池
		PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(registry);
		
		//構造請求client
		client = HttpClients.custom()
				.setConnectionManager(manager)
				.setDefaultRequestConfig(config)
				.build();
		
		}catch(Exception e){
			e.printStackTrace();
		}
	}
	
	/**
	 * post方法
	 * post請求涉及到不一樣contentType,對應不一樣的HttpEntity
	 * 這裏定義HttpEntity接口,全部實現了這個接口的實體都可傳入
	 * @param token
	 * @param url
	 * @param entity
	 * @return
	 * @throws ClientProtocolException
	 * @throws IOException
	 */
	public static JSONObject post(String token,String url,HttpEntity entity) throws ClientProtocolException, IOException{
		//UrlEncodedFormEntity stringEntity = new UrlEncodedFormEntity(,"UTF-8");
		HttpPost post = new HttpPost(url);

		if(token !=null){
			post.setHeader("Authorization", "Bearer "+token);
		}
		post.setHeader("Accept-Charset", "UTF-8");
		post.setEntity(entity);
		
		return client.execute(post, handler);
	}
	
	/**
	 * get請求
	 * @param token
	 * @param url
	 * @param content_type
	 * @param params
	 * @return
	 * @throws ClientProtocolException
	 * @throws IOException
	 * @throws URISyntaxException 
	 */
	public static JSONObject get(String token,String url,String content_type,List<NameValuePair> params) throws ClientProtocolException, IOException, URISyntaxException{
		UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params,"UTF-8");
		entity.setContentType(content_type);
		
		String para = EntityUtils.toString(entity);
		
		HttpGet get = new HttpGet(url);
		if(token !=null){
			get.setHeader("Authorization", "Bearer "+token);
		}
		get.setHeader("Accept-Charset", "UTF-8");

		//get請求將參數拼接在參數中
		get.setURI(new URI(get.getURI().toString()+"?"+para));
		return client.execute(get, handler);
	}
	
	//請求返回格式
	public static ResponseHandler<JSONObject> handler = new ResponseHandler<JSONObject>(){

		@Override
		public JSONObject handleResponse(HttpResponse res) throws ClientProtocolException, IOException {
			StatusLine status = res.getStatusLine();
			HttpEntity entity = res.getEntity();
			
			if(status.getStatusCode()>300){
				throw new HttpResponseException(status.getStatusCode(),
						status.getReasonPhrase());
			} 
			
			if(entity==null){
				throw new ClientProtocolException("respones has no content");
			}
			
			String res_str = EntityUtils.toString(entity);
			
			return JSONObject.parseObject(res_str);
		}
		
	};
}

4.2進一步對發送郵件的接口封裝

如下爲scala代碼

scala    51行

import java.io.File
import java.util

import org.apache.http.NameValuePair
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.entity.ContentType
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.entity.mime.content.{FileBody, StringBody}
import org.apache.http.message.BasicNameValuePair
import org.apache.http.util.CharsetUtils

/**
  * Created by liuxuecheng on 2017/1/4.
  */
object EmailSendUtil {

  def sendEmail(nickname: String,subject:String,content:String,receivers:String):Unit={
    val url = "郵件發送接口地址"
    val content_type = "application/x-www-form-urlencoded"

    val params = new util.ArrayList[NameValuePair]()
    params.add(new BasicNameValuePair ("nickname",nickname))
    params.add(new BasicNameValuePair ("subject",subject))
    params.add(new BasicNameValuePair ("content",content))
    params.add(new BasicNameValuePair ("receivers",receivers))
    try{
      val entity = new UrlEncodedFormEntity(params,"UTF-8")
      entity.setContentType(content_type)
      HttpClientUtil.post(null,url,entity)
    }catch{
      case e:Throwable=>e.printStackTrace()
    }
  }

  def sendAttachment(nickname: String,subject:String,content:String,receivers:String,file:File):Unit={
    val url = "郵件發送接口地址"
    val body = new FileBody(file)
    val entity = MultipartEntityBuilder.create()
      .setCharset(CharsetUtils.get("UTF-8"))
      .addPart("attachment",body)
      .addPart("nickname",new StringBody(nickname,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addPart("subject",new StringBody(subject,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addPart("content",new StringBody(content,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addTextBody("receivers",receivers)
      .setContentType(ContentType.MULTIPART_FORM_DATA)
      .build()

    HttpClientUtil.post(null,url,entity)
  }
}

4.3spark讀取hdfs文件並建立File對象

如下截取代碼片斷

scala    7行

def getHdfsFile(sc:SparkContext,path:String):File = {
 //須要hdfs的全路徑,開頭通常爲hdfs://或者viewfs:// 而且具體到文件名
    val filePath = "viewfs://xx-cluster"+path+"part-00000"
    sc.addFile(filePath)
    new File(SparkFiles.get(new File(filePath).getName))
  }

既然都拿到文件了,發送郵件不就很簡單了,調用上面封裝好的接口就行。

4.4spark發送html

有時候不必生成hdfs,計算結果適合報表展現的時候能夠直接collect到內存中,而後構建一段html發送,上代碼。

scala    17行

val rdd = group_rdd.groupByKey(200)
      .map(x=>{
        val raw_uv = x._2.flatMap(e=>e._1).toSeq.distinct.size
        val raw_pv = x._2.map(e=>e._2).reduce(_+_)
        val cm_uv = x._2.flatMap(e=>e._3).toSeq.distinct.size
        val cm_pv = x._2.map(e=>e._4).reduce(_+_)
        IndexEntity(x._1._1,x._1._2,x._1._3,raw_uv,raw_pv,cm_uv,cm_pv)
      }).collect().sortBy(_.search_flag).sortBy(_.platform).sortBy(_.bd)

    //模板拼接
    val tbody:StringBuffer = new StringBuffer()
    rdd.foreach(entity=>{
      tbody.append(s"<tr><td>${entity.bd}</td><td>${entity.platform}</td><td>${entity.search_flag}</td>" +
        s"<td>${entity.raw_uv}</td><td>${entity.cm_uv}</td><td>${new DecimalFormat(".00").format((entity.cm_uv.toDouble/entity.raw_uv)*100)}%</td>" +
        s"<td>${entity.raw_pv}</td><td>${entity.cm_pv}</td><td>${new DecimalFormat(".00").format((entity.cm_pv.toDouble/entity.raw_pv)*100)}%</td></tr>")
    })
相關文章
相關標籤/搜索