spark 1.5.2 + kafka 0.9.0.1 + Elasticsearch 2.2.1java
安裝腳本和服務都是單機簡化版,沒有保護機制。有興趣的朋友能夠一塊兒寫一個集羣的安裝腳本和服務
http://pan.baidu.com/s/1jIuezxOnode
cd npminstalllinux
#!/bin/sh DIRNAME=`dirname "$0"` localhome=`cd "$DIRNAME"; pwd` upperhome=`cd "$DIRNAME/.."; pwd` export username=root export installpath=/data/mdware export hadoopFileName=hadoop-2.4.1 export hadoopPackageName=hadoop-2.4.1.tar.gz export sparkFileName=spark-1.5.2-bin-hadoop2.4 export sparkPackageName=spark-1.5.2-bin-hadoop2.4.tgz export kafkaFileName=kafka_2.10-0.9.0.1 export kafkaPackageName=kafka_2.10-0.9.0.1.tgz export elasticFileName=elasticsearch-2.2.1 export elasticPackageName=elasticsearch-2.2.1.tar.gz export kibanaFileName=kibana-4.4.2-linux-x64 export kibanaPackageName=kibana-4.4.2-linux-x64.tar.gz export installServices="hadoop spark kafka elastic kibana" mkdir -p $installpath for com in $installServices ; do if [ $com"" == "hadoop" ] ; then cp $localhome/files/$hadoopPackageName $installpath cd $installpath && tar -zxf $hadoopPackageName \cp -r $localhome/conf/hadoop/* $installpath/$hadoopFileName/etc/hadoop/ sh $installpath/$hadoopFileName/bin/hdfs namenode -format rm -rf $installpath/$hadoopPackageName ln -s $installpath/$hadoopFileName/ $installpath/hadoop fi if [ $com"" == "spark" ] ; then cp $localhome/files/$sparkPackageName $installpath cd $installpath && tar -zxf $sparkPackageName \cp -r $localhome/conf/spark-env.sh $installpath/$sparkFileName/conf/ rm -rf $installpath/$sparkPackageName ln -s $installpath/$sparkFileName/ $installpath/spark fi if [ $com"" == "kafka" ] ; then cp $localhome/files/$kafkaPackageName $installpath cd $installpath && tar -zxf $kafkaPackageName \cp $localhome/conf/server.properties $installpath/$kafkaFileName/config/ rm -rf $installpath/$kafkaPackageName ln -s $installpath/$kafkaFileName/ $installpath/kafka fi if [ $com"" == "elastic" ] ; then cp $localhome/files/$elasticPackageName $installpath cd $installpath && tar -zxf $elasticPackageName \cp $localhome/conf/elasticsearch.yml $installpath/$elasticFileName/config/ rm -rf $installpath/$elasticPackageName ln -s $installpath/$elasticFileName/ $installpath/es $installpath/es/bin/plugin install mobz/elasticsearch-head/2.2.1 $installpath/es/bin/plugin install lmenezes/elasticsearch-kopf/2.2.1 fi if [ $com"" == "kibana" ] ; then cp $localhome/files/$kibanaPackageName $installpath cd $installpath && tar -zxf $kibanaPackageName rm -rf $installpath/$kibanaPackageName ln -s $installpath/$kibanaFileName/ $installpath/kibana fi done chmod +x $localhome/manage.sh cp $localhome/manage.sh /etc/init.d/npm chkconfig npm on
chmod +x install.sh ./install.sh
service npm start
#!/bin/bash # chkconfig: 2345 20 81 # description: start and stop npm service # processname: npm . /etc/rc.d/init.d/functions prog="npm" DIRNAME=`dirname "$0"` localhome=`cd "$DIRNAME"; pwd` menSize=`free -g | awk 'NR==2{print $2}'` men_size=`expr ${menSize} + 1` heap_size=`expr ${men_size} / 4` export installpath=/data/mdware start(){ ulimit -n 655360 sh $installpath/hadoop/sbin/hadoop-daemon.sh start namenode sh $installpath/hadoop/sbin/hadoop-daemon.sh start datanode $installpath/hadoop/bin/hdfs dfsadmin -safemode leave sh $installpath/spark/sbin/start-master.sh sh $installpath/spark/sbin/start-slave.sh spark://localhost:7077 nohup $installpath/kafka/bin/zookeeper-server-start.sh $installpath/kafka/config/zookeeper.properties >> $installpath/kafka/zookeeper.log & sleep 60 nohup $installpath/kafka/bin/kafka-server-start.sh $installpath/kafka/config/server.properties >> $installpath/kafka/kafka.log & export ES_HEAP_SIZE=${heap_size}g $installpath/es/bin/elasticsearch -Des.insecure.allow.root=true -d } stop(){ sh $installpath/hadoop/sbin/hadoop-daemon.sh stop namenode sh $installpath/hadoop/sbin/hadoop-daemon.sh stop datanode sh $installpath/spark/sbin/stop-master.sh sh $installpath/spark/sbin/stop-slave.sh zookeeper_id=`ps -ef | grep -i zookeeper.properties | grep -v grep | awk '{print $2}'` if [[ -z $zookeeper_id ]];then echo "The task is not running ! " else kill ${zookeeper_id} fi kafka_id=`ps -ef | grep -i server.properties | grep -v grep | awk '{print $2}'` if [[ -z $kafka_id ]];then echo "The task is not running ! " else kill ${kafka_id} fi es_id=`ps -ef|grep -i elasticsearch | grep -v "grep"|awk '{print $2}'` if [[ -z $es_id ]];then echo "The task is not running ! " else kill ${es_id} fi sleep 20 if [[ -z $zookeeper_id ]];then echo "The task is not running ! " else kill -9 ${zookeeper_id} fi kafka_id=`ps -ef | grep -i server.properties | grep -v grep | awk '{print $2}'` if [[ -z $kafka_id ]];then echo "The task is not running ! " else kill -9 ${kafka_id} fi es_id=`ps -ef|grep -i elasticsearch | grep -v "grep"|awk '{print $2}'` if [[ -z $es_id ]];then echo "The task is not running ! " else kill -9 ${es_id} fi } case "$1" in start) start ;; stop) stop ;; *) echo $"Usage: $0 {start|stop}" exit 2 esac exit $?
注:進程已設爲開機自啓動
https://github.com/engimatic/effectivejava/tree/master/sparkanalysisgit
public class KafkaDataProducer implements Runnable{ private static Logger log = Logger.getLogger(KafkaDataProducer.class); private static Producer<String, String> producer; private String topic; private String path; public KafkaDataProducer() { Properties props = new Properties(); props.put("bootstrap.servers", Config.getConfig("database.cnf").getProperty("bootstrap.server")); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } public KafkaDataProducer(String topic, String path) { this.path = path; this.topic = topic; Properties props = new Properties(); props.put("bootstrap.servers", Config.getConfig("database.cnf").getProperty("bootstrap.server")); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } public static void main(String[] args) throws Exception { KafkaDataProducer kafkaDataProducer1 = new KafkaDataProducer("test","datafile"); new Thread(kafkaDataProducer1).start(); // KafkaDataProducer kafkaDataProducer2 = new KafkaDataProducer("tcptest","tcp.file"); // new Thread(kafkaDataProducer2).start(); // // KafkaDataProducer kafkaDataProducer3 = new KafkaDataProducer("httptest","http.file"); // new Thread(kafkaDataProducer3).start(); } @Override public void run() { BufferedReader br = null; try { while ( true ) { br = new BufferedReader(new FileReader(Config.getConfig("database.cnf").getProperty(path))); String line; while ((line = br.readLine()) != null) { if (!"".equals(line.trim())) { producer.send(new ProducerRecord<>(topic, "", line)); } } Thread.sleep(Long.valueOf(Config.getConfig("database.cnf").getProperty("sleep.time"))); } } catch (Exception e) { log.error("The read streaming error: ", e); } finally { if (br != null) { try { br.close(); } catch (IOException e) { log.warn("close the read streaming error: ", e); } } } } }
public class SSDPerformanceTest extends Analysis { public static final Logger LOG = LoggerFactory.getLogger(SSDPerformanceTest.class); protected static final Pattern TAB = Pattern.compile("\t"); private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm", Locale.CHINA); private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd", Locale.CHINA); public static void main(String[] args) throws IOException { String configfile = "database.cnf"; Properties config = Config.getConfig(configfile); JavaPairReceiverInputDStream<String, byte[]> rawStream = setupRawStreamFromKafka( config, config.getProperty("group.id")); LOG.info("database config:" + config.toString()); rawStream.foreachRDD(new Function<JavaPairRDD<String, byte[]>, Void>() { @Override public Void call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception { JavaRDD<Map<String, ?>> es = stringJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, byte[]>, DBKey, DBData>() { public Tuple2<DBKey, DBData> call(Tuple2<String, byte[]> stringTuple2) throws Exception { String[] database = TAB.split(new String(stringTuple2._2)); DBKey dbKey = new DBKey(); DBData dbData = new DBData(); String sqlString = new String(Base64.decodeBase64(database[10].trim())); String storageSql; if(sqlString.length() > 1000){ storageSql = sqlString.substring(0,1000); }else{ storageSql = sqlString; } //DBKey dbKey.setProbeName(database[0].trim()); dbKey.setCustomService(database[1].trim()); dbKey.setIpClient(database[2].trim()); dbKey.setIpServer(database[3].trim()); dbKey.setPortServer(database[5].trim()); dbKey.setTimeStart(format.format(new Date().getTime())); dbKey.setOperateType(storageSql.split(" ")[0]); //Select, Insert, Update, Drop, Procedure dbKey.setDbType(database[8].trim()); dbKey.setResponseCode(database[9].trim()); dbKey.setUser(database[2].trim()); dbKey.setSqlString(storageSql); if(!database[12].trim().equals("-")) { dbData.setOperateTime(Double.parseDouble(database[12].trim())); }else if(!database[7].trim().equals("-")){ dbData.setOperateTime(Double.parseDouble(database[7].trim()) - Double.parseDouble(database[6].trim())); }else{ dbData.setOperateTime(0); } if(!database[13].trim().equals("-")) { dbData.setReqTransTime(Double.parseDouble(database[13].trim())); }else{ dbData.setReqTransTime(0); } if(!database[14].trim().equals("-")) { dbData.setRespTransTime(Double.parseDouble(database[14].trim())); }else{ dbData.setRespTransTime(0); } if(!database[15].trim().equals("-")) { dbData.setRespPayload(Integer.parseInt(database[15].trim())); }else{ dbData.setRespPayload(0); } dbData.setCount(1); dbData.setSlowCount(1); return new Tuple2<>(dbKey,dbData); } }).filter(new Function<Tuple2<DBKey, DBData>, Boolean>() { @Override public Boolean call(Tuple2<DBKey, DBData> v1) throws Exception { return v1 != null; } }).reduceByKey(new Function2<DBData, DBData, DBData>() { public DBData call(DBData v1, DBData v2) throws Exception { DBData result = new DBData(); result.setOperateTime(v1.getOperateTime() + v2.getOperateTime()); result.setReqTransTime(v1.getReqTransTime() + v1.getReqTransTime()); result.setRespTransTime(v1.getRespTransTime() + v2.getRespTransTime()); result.setRespPayload(v1.getRespPayload() + v2.getRespPayload()); result.setCount(v1.getCount() + v2.getCount()); result.setSlowCount(v1.getSlowCount() + v1.getSlowCount()); return result; } }).map(new Function<Tuple2<DBKey,DBData>, Map<String, ?>>() { public Map<String, ?> call(Tuple2<DBKey, DBData> v1) throws Exception { DBKey dbKey = v1._1; DBData dbData = v1._2; ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder(); builder.put("index_name", sdf.format(format.parse(dbKey.getTimeStart()))); builder.put("probeName",dbKey.getProbeName()); builder.put("customService",dbKey.getCustomService()); builder.put("ipClient",dbKey.getIpClient()); builder.put("ipServer",dbKey.getIpServer()); builder.put("portServer",dbKey.getPortServer()); builder.put("operateType",dbKey.getOperateType()); builder.put("timeStart",format.parse(dbKey.getTimeStart())); builder.put("dbType",dbKey.getDbType()); builder.put("user",dbKey.getUser()); builder.put("responseCode",dbKey.getResponseCode()); builder.put("sqlString",dbKey.getSqlString()); builder.put("operateTime",dbData.getOperateTime()); builder.put("reqTransTime",dbData.getReqTransTime()); builder.put("respTransTime",dbData.getRespTransTime()); builder.put("respPayload",dbData.getRespPayload()); builder.put("count",dbData.getCount()); builder.put("slowCount",dbData.getSlowCount()); return builder.build(); } }).cache(); if (es != null) { JavaEsSpark.saveToEs(es, "ni-database-{index_name}/database", ImmutableMap.of (ConfigurationOptions.ES_MAPPING_EXCLUDE, "index_name")); } return null; } }); rawStream.context().start(); rawStream.context().awaitTermination(); } }
任務資源分配 2G 2coregithub
ps : 後續測試與配置變動會及時更新
集羣安裝請參考http://www.jianshu.com/p/654b5fd42a5dsql