spark+kafka+Elasticsearch單機環境的部署和性能測試

版本選型

spark 1.5.2 + kafka 0.9.0.1 + Elasticsearch 2.2.1java

安裝部署

1. 安裝腳本及文件 密碼 4m7l

安裝腳本和服務都是單機簡化版,沒有保護機制。有興趣的朋友能夠一塊兒寫一個集羣的安裝腳本和服務
http://pan.baidu.com/s/1jIuezxOnode

2. 腳本使用

  • vi /etc/hosts
    添加 127.0.0.1 hostname
  • cd npminstalllinux

    install.sh

#!/bin/sh

DIRNAME=`dirname "$0"`
localhome=`cd "$DIRNAME"; pwd`
upperhome=`cd "$DIRNAME/.."; pwd`

export username=root
export installpath=/data/mdware

export hadoopFileName=hadoop-2.4.1
export hadoopPackageName=hadoop-2.4.1.tar.gz

export sparkFileName=spark-1.5.2-bin-hadoop2.4
export sparkPackageName=spark-1.5.2-bin-hadoop2.4.tgz

export kafkaFileName=kafka_2.10-0.9.0.1
export kafkaPackageName=kafka_2.10-0.9.0.1.tgz

export elasticFileName=elasticsearch-2.2.1
export elasticPackageName=elasticsearch-2.2.1.tar.gz

export kibanaFileName=kibana-4.4.2-linux-x64
export kibanaPackageName=kibana-4.4.2-linux-x64.tar.gz

export installServices="hadoop
spark
kafka
elastic
kibana"

mkdir -p $installpath

for com in $installServices ; do

        if [ $com"" == "hadoop" ] ; then
                cp $localhome/files/$hadoopPackageName $installpath
                cd $installpath && tar -zxf $hadoopPackageName
                \cp -r $localhome/conf/hadoop/* $installpath/$hadoopFileName/etc/hadoop/
                sh $installpath/$hadoopFileName/bin/hdfs namenode -format
                rm -rf $installpath/$hadoopPackageName
                ln -s $installpath/$hadoopFileName/ $installpath/hadoop
        fi

        if [ $com"" == "spark" ] ; then
                cp $localhome/files/$sparkPackageName $installpath
                cd $installpath && tar -zxf $sparkPackageName
                \cp -r $localhome/conf/spark-env.sh $installpath/$sparkFileName/conf/
                rm -rf $installpath/$sparkPackageName
                ln -s $installpath/$sparkFileName/ $installpath/spark
                fi

        if [ $com"" == "kafka" ] ; then
                cp $localhome/files/$kafkaPackageName $installpath
                cd $installpath && tar -zxf $kafkaPackageName
                \cp $localhome/conf/server.properties $installpath/$kafkaFileName/config/
                rm -rf $installpath/$kafkaPackageName
                ln -s $installpath/$kafkaFileName/ $installpath/kafka
        fi

        if [ $com"" == "elastic" ] ; then
                cp $localhome/files/$elasticPackageName $installpath
                cd $installpath && tar -zxf $elasticPackageName
                \cp $localhome/conf/elasticsearch.yml $installpath/$elasticFileName/config/
                rm -rf $installpath/$elasticPackageName
                ln -s $installpath/$elasticFileName/ $installpath/es
                $installpath/es/bin/plugin install mobz/elasticsearch-head/2.2.1
                $installpath/es/bin/plugin install lmenezes/elasticsearch-kopf/2.2.1
        fi

        if [ $com"" == "kibana" ] ; then
                cp $localhome/files/$kibanaPackageName $installpath
                cd $installpath && tar -zxf $kibanaPackageName
                rm -rf $installpath/$kibanaPackageName
                ln -s $installpath/$kibanaFileName/ $installpath/kibana
        fi
done
chmod +x $localhome/manage.sh
cp $localhome/manage.sh /etc/init.d/npm
chkconfig npm on
chmod +x install.sh
./install.sh

3. 啓動進程

service npm start

npm服務

#!/bin/bash
# chkconfig: 2345 20 81
# description: start and stop npm service
# processname: npm

. /etc/rc.d/init.d/functions
prog="npm"

DIRNAME=`dirname "$0"`
localhome=`cd "$DIRNAME"; pwd`

menSize=`free -g | awk 'NR==2{print $2}'`
men_size=`expr ${menSize} + 1`
heap_size=`expr ${men_size} / 4`

export installpath=/data/mdware

start(){
                ulimit -n 655360
        sh $installpath/hadoop/sbin/hadoop-daemon.sh start namenode
        sh $installpath/hadoop/sbin/hadoop-daemon.sh start datanode
        $installpath/hadoop/bin/hdfs dfsadmin -safemode leave
        sh $installpath/spark/sbin/start-master.sh
        sh $installpath/spark/sbin/start-slave.sh spark://localhost:7077
        nohup $installpath/kafka/bin/zookeeper-server-start.sh $installpath/kafka/config/zookeeper.properties >> $installpath/kafka/zookeeper.log &
        sleep 60
        nohup $installpath/kafka/bin/kafka-server-start.sh $installpath/kafka/config/server.properties >> $installpath/kafka/kafka.log &
        export ES_HEAP_SIZE=${heap_size}g    
              $installpath/es/bin/elasticsearch -Des.insecure.allow.root=true -d
}

stop(){
        sh $installpath/hadoop/sbin/hadoop-daemon.sh stop namenode
        sh $installpath/hadoop/sbin/hadoop-daemon.sh stop datanode

        sh $installpath/spark/sbin/stop-master.sh
        sh $installpath/spark/sbin/stop-slave.sh
        
        zookeeper_id=`ps -ef | grep -i zookeeper.properties | grep -v grep | awk '{print $2}'`
        
        if [[ -z $zookeeper_id ]];then
                echo "The task is not running ! "
        else
                kill ${zookeeper_id}
        fi

        kafka_id=`ps -ef | grep -i server.properties | grep -v grep | awk '{print $2}'`
        if [[ -z $kafka_id ]];then
                echo "The task is not running ! "
        else
                kill ${kafka_id}
        fi

        es_id=`ps -ef|grep -i elasticsearch  | grep -v "grep"|awk '{print $2}'`
        if [[ -z $es_id ]];then
                echo "The task is not running ! "
        else
                kill ${es_id}
        fi
        
        
        sleep 20
        
        if [[ -z $zookeeper_id ]];then
                echo "The task is not running ! "
        else
                kill -9 ${zookeeper_id}
        fi

        kafka_id=`ps -ef | grep -i server.properties | grep -v grep | awk '{print $2}'`
        if [[ -z $kafka_id ]];then
                echo "The task is not running ! "
        else
                kill -9 ${kafka_id}
        fi

        es_id=`ps -ef|grep -i elasticsearch  | grep -v "grep"|awk '{print $2}'`
        if [[ -z $es_id ]];then
                echo "The task is not running ! "
        else
                kill -9 ${es_id}
        fi
        
}

case "$1" in
        start)
                start
    ;;
  stop)
        stop
        ;;
  *)
    echo $"Usage: $0 {start|stop}"
    exit 2
esac
exit $?
注:進程已設爲開機自啓動

測試代碼

https://github.com/engimatic/effectivejava/tree/master/sparkanalysisgit

public class KafkaDataProducer implements Runnable{
    private static Logger log = Logger.getLogger(KafkaDataProducer.class);

    private static Producer<String, String> producer;

    private String topic;

    private String path;

    public KafkaDataProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", Config.getConfig("database.cnf").getProperty("bootstrap.server"));
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    }

    public KafkaDataProducer(String topic, String path) {
        this.path = path;
        this.topic = topic;

        Properties props = new Properties();
        props.put("bootstrap.servers", Config.getConfig("database.cnf").getProperty("bootstrap.server"));
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    }

    public static void main(String[] args) throws Exception {
        KafkaDataProducer kafkaDataProducer1 = new KafkaDataProducer("test","datafile");
        new Thread(kafkaDataProducer1).start();

//        KafkaDataProducer kafkaDataProducer2 = new KafkaDataProducer("tcptest","tcp.file");
//        new Thread(kafkaDataProducer2).start();
//
//        KafkaDataProducer kafkaDataProducer3 = new KafkaDataProducer("httptest","http.file");
//        new Thread(kafkaDataProducer3).start();

    }

    @Override
    public void run() {
        BufferedReader br = null;
        try {
            while ( true ) {
                br = new BufferedReader(new FileReader(Config.getConfig("database.cnf").getProperty(path)));
                String line;

                while ((line = br.readLine()) != null) {
                    if (!"".equals(line.trim())) {
                        producer.send(new ProducerRecord<>(topic, "", line));
                    }
                }
                Thread.sleep(Long.valueOf(Config.getConfig("database.cnf").getProperty("sleep.time")));
            }
        } catch (Exception e) {
            log.error("The read streaming error: ", e);
        } finally {
            if (br != null) {
                try {
                    br.close();
                } catch (IOException e) {
                    log.warn("close the read streaming error: ", e);
                }
            }
        }
    }
}
public class SSDPerformanceTest extends Analysis {
    public static final Logger LOG = LoggerFactory.getLogger(SSDPerformanceTest.class);

    protected static final Pattern TAB = Pattern.compile("\t");

    private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm", Locale.CHINA);

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd", Locale.CHINA);

    public static void main(String[] args) throws IOException {

        String configfile = "database.cnf";

        Properties config = Config.getConfig(configfile);

        JavaPairReceiverInputDStream<String, byte[]> rawStream = setupRawStreamFromKafka(
                config, config.getProperty("group.id"));

        LOG.info("database config:" + config.toString());

        rawStream.foreachRDD(new Function<JavaPairRDD<String, byte[]>, Void>() {
            @Override
            public Void call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception {
                JavaRDD<Map<String, ?>> es = stringJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, byte[]>, DBKey, DBData>() {
                    public Tuple2<DBKey, DBData> call(Tuple2<String, byte[]> stringTuple2) throws Exception {
                        String[] database = TAB.split(new String(stringTuple2._2));

                        DBKey dbKey = new DBKey();
                        DBData dbData = new DBData();
                        String sqlString = new String(Base64.decodeBase64(database[10].trim()));
                        String storageSql;
                        if(sqlString.length() > 1000){
                            storageSql = sqlString.substring(0,1000);
                        }else{
                            storageSql = sqlString;
                        }

                        //DBKey
                        dbKey.setProbeName(database[0].trim());
                        dbKey.setCustomService(database[1].trim());
                        dbKey.setIpClient(database[2].trim());
                        dbKey.setIpServer(database[3].trim());
                        dbKey.setPortServer(database[5].trim());
                        dbKey.setTimeStart(format.format(new Date().getTime()));
                        dbKey.setOperateType(storageSql.split(" ")[0]);   //Select, Insert, Update, Drop, Procedure
                        dbKey.setDbType(database[8].trim());

                        dbKey.setResponseCode(database[9].trim());
                        dbKey.setUser(database[2].trim());
                        dbKey.setSqlString(storageSql);

                        if(!database[12].trim().equals("-")) {
                            dbData.setOperateTime(Double.parseDouble(database[12].trim()));
                        }else if(!database[7].trim().equals("-")){
                            dbData.setOperateTime(Double.parseDouble(database[7].trim()) - Double.parseDouble(database[6].trim()));
                        }else{
                            dbData.setOperateTime(0);
                        }

                        if(!database[13].trim().equals("-")) {
                            dbData.setReqTransTime(Double.parseDouble(database[13].trim()));
                        }else{
                            dbData.setReqTransTime(0);
                        }

                        if(!database[14].trim().equals("-")) {
                            dbData.setRespTransTime(Double.parseDouble(database[14].trim()));
                        }else{
                            dbData.setRespTransTime(0);
                        }

                        if(!database[15].trim().equals("-")) {
                            dbData.setRespPayload(Integer.parseInt(database[15].trim()));
                        }else{
                            dbData.setRespPayload(0);
                        }

                        dbData.setCount(1);

                        dbData.setSlowCount(1);

                        return new Tuple2<>(dbKey,dbData);

                    }
                }).filter(new Function<Tuple2<DBKey, DBData>, Boolean>() {
                    @Override
                    public Boolean call(Tuple2<DBKey, DBData> v1) throws Exception {
                        return v1 != null;
                    }
                }).reduceByKey(new Function2<DBData, DBData, DBData>() {
                    public DBData call(DBData v1, DBData v2) throws Exception {
                        DBData result = new DBData();
                        result.setOperateTime(v1.getOperateTime() + v2.getOperateTime());
                        result.setReqTransTime(v1.getReqTransTime() + v1.getReqTransTime());
                        result.setRespTransTime(v1.getRespTransTime() + v2.getRespTransTime());
                        result.setRespPayload(v1.getRespPayload() + v2.getRespPayload());
                        result.setCount(v1.getCount() + v2.getCount());
                        result.setSlowCount(v1.getSlowCount() + v1.getSlowCount());
                        return result;
                    }
                }).map(new Function<Tuple2<DBKey,DBData>, Map<String, ?>>() {
                    public Map<String, ?> call(Tuple2<DBKey, DBData> v1) throws Exception {
                        DBKey dbKey = v1._1;
                        DBData dbData = v1._2;
                        ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
                        builder.put("index_name", sdf.format(format.parse(dbKey.getTimeStart())));
                        builder.put("probeName",dbKey.getProbeName());
                        builder.put("customService",dbKey.getCustomService());
                        builder.put("ipClient",dbKey.getIpClient());
                        builder.put("ipServer",dbKey.getIpServer());
                        builder.put("portServer",dbKey.getPortServer());
                        builder.put("operateType",dbKey.getOperateType());
                        builder.put("timeStart",format.parse(dbKey.getTimeStart()));
                        builder.put("dbType",dbKey.getDbType());
                        builder.put("user",dbKey.getUser());
                        builder.put("responseCode",dbKey.getResponseCode());
                        builder.put("sqlString",dbKey.getSqlString());
                        builder.put("operateTime",dbData.getOperateTime());
                        builder.put("reqTransTime",dbData.getReqTransTime());
                        builder.put("respTransTime",dbData.getRespTransTime());
                        builder.put("respPayload",dbData.getRespPayload());
                        builder.put("count",dbData.getCount());
                        builder.put("slowCount",dbData.getSlowCount());
                        return builder.build();
                    }
                }).cache();

                if (es != null) {
                    JavaEsSpark.saveToEs(es, "ni-database-{index_name}/database", ImmutableMap.of
                            (ConfigurationOptions.ES_MAPPING_EXCLUDE, "index_name"));
                }
                return null;
            }
        });

        rawStream.context().start();
        rawStream.context().awaitTermination();

    }

}

測試環境

測試環境一 虛擬機環境(8G內存 2核 非ssd)

  1. 分鐘寫入數據量
  2. 分鐘寫入事件數

測試環境二 虛擬機環境(8G內存 2核 ssd)

  1. 分鐘寫入數據量
  2. 分鐘寫入事件數

測試環境三 IBM服務器(126G內存 16核 非ssd)

  1. 分鐘寫入數據量
  2. 分鐘寫入事件數

測試環境四 IBM服務器(126G內存 16核 ssd約160G)

任務資源分配 2G 2coregithub

  1. 分鐘寫入數據量
    忘了記錄
  2. 分鐘寫入事件數
  • 單獨寫入database數據
  • database和tcp數據一塊兒寫入
    database
    tcp

ps : 後續測試與配置變動會及時更新
集羣安裝請參考http://www.jianshu.com/p/654b5fd42a5dsql

相關文章
相關標籤/搜索