開發環境:Hadoop+HBASE+Phoenix+flum+kafka+spark+MySQLjava
默認配置好了Hadoop的開發環境,而且已經安裝好HBASE等組件。mysql
下面經過一個簡單的案例進行整合:linux
這是整個工做的流程圖:android
第一步:獲取數據源ios
因爲外部埋點獲取資源較爲繁瑣,所以,本身寫了個自動生成相似數據代碼:web
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class Genlog { static String[] srcurls={"http://www.baidu.com","http://www.sougou.com", "http://www.360.com","http://www.taobao.com"}; static String[] oss={"android","ios","mac","win","linux"}; static String[] sexs={"f","m"}; public static void main(String[] args) throws InterruptedException { //http://xxxxx?refurl=http://www.baidu.com&pid=xx&os=andriod&sex=f/m&wx=abc Logger logger=LogManager.getLogger(Genlog.class); while(true){ String srcurl=srcurls[(int) (Math.random()*srcurls.length)]; String os=oss[(int) (Math.random()*oss.length)]; String sex=sexs[(int) (Math.random()*sexs.length)]; String url=String.format("http://xxxxx?refurl=%s&pid=xx&os=%s&wx=abc&sex=%s/m",srcurl,os,sex); logger.info(url); Thread.sleep(300); } } }
這部分代碼表示,在啓動程序後,將會不斷生成相似文中註釋類型的數據,這樣flume的source端就能夠源源不斷的獲取到數據。正則表達式
pom.xml文件就是關於log4j的依賴api core 和flum-ng便可,再也不贅述。spring
同時,在項目中,要編寫鏈接虛擬機的配置文件,放在resource下,配置文件以下:sql
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="WARN"> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> </Console> <Flume name ="hi" compress="false" type="avro"> <agent host ="192.168.110.101" port="44444"></agent> </Flume> </Appenders> <Loggers> <Root level="INFO"> <AppenderRef ref="Console"/> <AppenderRef ref="hi"></AppenderRef> </Root> </Loggers> </Configuration>
這樣,咱們的配置數據源的項目就已經完成了,固然,在實際生產中,確定要比這複雜的多。數據庫
第二步:配置flume
配置flume/config/a1.conf,文件能夠直接touch建立,配置以下:
# 定義資源 管道 目的地 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 設置源的屬性 a1.sources.r1.type =avro a1.sources.r1.bind=192.168.110.101 a1.sources.r1.port=44444 # 設置目的地屬性 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.topic = mylog a1.sinks.k1.kafka.bootstrap.servers = 192.168.110.101:9092 # 管道屬性 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 把源經過管道鏈接到目的地 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
注意更換本身的IP地址,同時,根據需求更改acks的結果,如一、-一、0,具體介紹看官網便可。此時flume是依賴kafka的。因此啓動順序請先啓動kafka,不然會報錯。
第三步:編寫spark stream項目
項目目標主要是將kafka中的數據拉取下來消費,經過內部邏輯,將數據轉變爲DataFrame格式,經過Phoenix存儲在HBASE上,以方便對數據進行分析。
項目配置文件pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.yzhl</groupId> <artifactId>spark-streaming-phoneix-kafkademo</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.1</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> </plugin> </plugins> </build> </project>
邏輯代碼以下:
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} object LogSave extends App { //定義brokers, groupId, topics /** * 關於driver和worker的執行位置的代碼 */ val Array(brokers, groupId, topics) = Array("192.168.86.128:9092","mylog","mylog")//driver //spark上下文對象至關於connection val spark = SparkSession.builder().appName("mylog").getOrCreate()//driver //建立spark streaming 上下文 val ssc = new StreamingContext(spark.sparkContext, Seconds(5))//driver val topicsSet = topics.split(",").toSet//driver //定義kafka配置屬性 val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])//driver //使用KafkaUtils工具來的createDirectStream靜態方法建立DStream對象 val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))//driver //messages中的每一條數據都是一個(key,value) 其中value指的是log中的一行數據 val lines = messages.map(_.value)//worker import spark.implicits._//driver worker //在driver端編譯成了class,以後上傳到worker中 case class MyRecord(id:String,time:String,srcUrl:String,os:String,sex:String) //爲記錄產生ID lines.print(5)//driver //foreachRDD在driver上執行, lines.foreachRDD((rdd,t) =>{ val props = scala.collection.mutable.Map[String,String]()//driver props += "table" -> "tb_mylog" props += "zkUrl" -> "jdbc:phoenix:hadoop" //從下面到toDF.都會放在worker上執行 rdd.zipWithUniqueId().map( x =>{ val p =""".+(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}).+refurl=(.*)&.+&os=(.+)&.+&sex=(.+)""".r x._1 match { case p(time,srcUrl,os,sex) => MyRecord(t.toString()+x._2,time,srcUrl,os,sex) case _ => MyRecord(null,null,null,null,null) } }).filter(_.id !=null).toDF().write.format("org.apache.phoenix.spark") .mode("overwrite") .options(props).save();//todf--save之間都是在worker上執行,save()是在driver上 }) ssc.start()//driver ssc.awaitTermination()//driver /** * spark的全部上下文的建立都在driver上執行 * spark的全部action都在driver上執行 * spark的全部transformation都在worker上執行 * */ }
這部分代碼能夠將拉取的數據進行格式化 的存儲。其中正則表達式是對數據行的拆分,並經過Phoenix存儲到HBASE上。
第四步:項目打包
我用的idea,打包很簡單,maven-->plugins-->scala:compile(編譯)-->Lifecycle的package 便可打包完成,可在target目錄下查看。
eclipse的打包也很簡單,網上一大堆。
到此,在代碼階段的操做基本完成,接下來就是在集羣上的運行過程。
第五步:啓動各個進程
本次的部署是在yarn上的,因此確定有yarn的啓動。咱們按照順序啓動。
1,啓動HDFS:start-dfs.sh
2.啓動yarn:start-yarn.sh
3.啓動zookeeper:若是是本身安裝的zookeeper,能夠直接用./zkServer.sh start
若是是用kafka自帶的zookeeper,啓動命令:bin/zookeeper-server-start.sh config/zookeeper.properties
4.啓動kafka:bin/kafka-server-start.sh config/server.properties
5.啓動flume:bin/flume-ng agent -n a1 -c conf -f conf/a1.conf 此時能夠啓動數據源的生成項目運行
6.啓動kafka的消費者consumer:bin/kafka-console-consumer.sh --bootstrap-server 192.168.110.101:9092 --topic mylog
7.啓動HBASE:start-hbase.sh
8.啓動Phoenix: ./sqlline.py localhost
第六步:以上進程都啓動成功後,能夠將打包好的jar包上傳到系統路徑
此時有一個問題必定要注意,否則確定會報錯,列如空指針的異常,但沒法查詢錯誤具體信息,根本緣由是缺乏對於的依賴包。
在下載依賴包的時候,咱們還須要將兩個必須的依賴包導入到spark的jars文件中,由於咱們打包的瘦包,沒法包含全部的依賴包。
這兩包是:spark-streaming-kafka-0-10_2.11和他的依賴包kafka_2.11。根據你本身的版本不一樣,找到對應的版本依賴包,不然會報出版本依賴的異常信息。
添加方法:cd到spark的jars目錄先,在maven官網,右鍵點擊相應的依賴包的jar,複製路徑,運用命令 」wget 複製的路徑」,也能夠本身下載到本地後上傳。
接着,在啓動的Phoenix中,建立咱們本身的表,在編碼中的表名爲tb_mylog,因此建立表:
!create table tb_mylog(id varchar(255) primary key,time varchar(255),srcUrl varchar(255),os varchar(255),sex varchar(20));
此時!tables裏面就會存在了tb_mylog個表。
第七步:運行上傳的jar包,處理數據
運行命令:spark-submit --master yarn --deploy-mode client --class 包名 jar包
運行後,能夠看到數據在不斷的寫入,spark Stream在不斷的獲取,此時,進入Phoenix中,
select * from tb_mylog,能夠看到數據在表中存在,並不斷的增加,若是機器性能不是很好,建議運行一段時間後,能夠停掉源數據的生成。
對於關閉HBASE,須要注意,不可直接stop掉HBASE,這樣數據就會丟失或者出發預寫機制,沒法將數據徹底的保存到HDFS上,因此停掉HBASE的最好方式是:先運行hbase-daemon.sh stop master,而後在運行stop-hbase.sh. 這樣既可。
因爲是基於yarn模式,因此要讀取到yarn-site.xml文件,因此在spark-env.sh中配置HADOOP_CONF-DIR=Hadoop路徑,或者YARN_CONF_DIR=yarn路徑。
注意:
若是用Phoenix鏈接spark,那麼須要Phoenix裏的Phoenix-spark-hbase.jar和Phoenix-HBASE-client.jar。
且,worker節點經過Phoenix鏈接HBASE時,本身有了客戶端,那麼HBASE的regionserver端須要Phoenix-HBASE-server.jar和Phoenix-spark-hbase.jar兩個包。
flume通訊數據源:經過通訊協議avro. 給到flume的source處,經過配置channel後,獲得下沉的位置,即獲得kafka的producer,而後經過worker節點進行消費,消費形式是kafkaDStream。
接下來是數據的分析,而後存儲到MySQL中。
第八步:存儲到數據庫中的編碼
新建項目:
import org.apache.spark.sql.{SaveMode, SparkSession} object ETLSparkSql extends App { val spark = SparkSession.builder().appName("from-hbase-etl-to-mysql using spark+phoenix").getOrCreate()//driver val props = scala.collection.mutable.Map[String,String]() //driver props += "table" -> "tb_mylog" props += "zkUrl" -> "hadoop:2181" val df = spark.read.format("org.apache.phoenix.spark").options(props).load(); df.createOrReplaceTempView("tb_mylog") val df2 = spark.sql("select srcUrl,count(1) as count_nums from tb_mylog group by srcUrl"); df2.createOrReplaceTempView("tb_url_count") val sql = """ |select | case when srcUrl = 'http://www.baidu.com' then count_nums | else 0 end as baidu, | case when srcUrl = 'http://www.souguo.com' then count_nums | else 0 end as souguo, | case when srcUrl = 'http://www.360.com' then count_nums | else 0 end as `360`, | case when srcUrl = 'http://www.taobao.com' then count_nums | else 0 end as `taobao`, | case when srcUrl not in ('http://www.baidu.com','http://www.souguo.com','http://www.taobao.com','http://www.360.com') then count_nums | else 0 end as `qita` | from tb_url_count """.stripMargin val df3 = spark.sql(sql) df3.createOrReplaceTempView("tb_case") val jdbcops = scala.collection.mutable.Map[String,String]() //driver props += "table" -> "tb_log_count" props += "url" -> "jdbc:mysql://192.168.86.1:3306/logdb" props += "user" -> "root" props += "password" -> "root" props += "driver" -> "com.mysql.jdbc.Driver" spark.sql("select sum(baidu),sum(souguo),sum(`360`),sum(taobao),sum(qita) from tb_case").write.format("jdbc").mode(SaveMode.Append).options(jdbcops).save() println("任務提交,等待結果") }
第九步:建立數據庫和表
建立logdb的數據庫,建立表tb_log_count,列名分別爲id,baidu,souguo,360,taobao,qita。
而後對項目進行編譯和打包,上傳到客戶端driver上,
啓動HDFS,啓動yarn,啓動HBASE,同時能夠執行編譯運行語句:
spark-submit --master yarn --deploy-mode client ETLSparkSql 包名
到此爲止,咱們的數據的獲取,數據的處理,數據的存儲,數據的存庫都已經完成,能夠在MySQL數據庫中查看結果了。
第十步:數據庫數據的展現
咱們用到的技術是Dubbo,對項目作微服務。本項目的Dubbo框架以下:
下面開始創建咱們的項目:
1.創建entity:
創建一個maven項目,建立一個實體類對象,並實現序列化接口,以便讀取數據庫對象。設置對應數據庫的屬性,並添加set和get方法,以方便後面的過程調用。
同時,在pom文件中,添加<packaging>jar</packaging>用來打包,此時能夠經過install進行打包,能夠在本地磁盤的.m2相應的目錄中找到對用的jar文件。
2.建立dao-interface項目
此時,建立的項目pom文件中一樣加入jar,另外,將上一個entity項目中pom文件中的信息做爲本項目的依賴,這樣兩個項目就能夠關聯到一塊兒了。接口類寫到了一個裝載實體的列表list方法。而後一樣,經過install進行打包。
3.建立dao-impl類,即dao的實現類:
此時建立的項目是spring-boot項目,這個項目要用到mybatis進行整合。
建立後,首先導入依賴問題,在pom文件中加入依賴:
<dependency> <groupId>com.yzhl</groupId> <artifactId>dao-api</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>com.alibaba.boot</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> <version>0.2.0</version> </dependency>
證實此時依賴的時上一個項目dao接口,同時還依賴了Dubbo.
接下來,建立一個接口類,一樣具備的時實體類的集合方法。有了接口,須要作映射文件,建立映射文件mapper.xml,文件內容大體爲
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org//dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.yzhl.dao.LogMapper"> <select id="list" resultType="com.yzhl.commen.Logentity"> select * from tb_log_count </select> </mapper>
映射完成,須要經過App作掃描,添加掃描註解:@MapperScan(basePackages = "com.yzhl.dao")
接下來編寫實現類:
@Service @Component public class LogServiceImpl implements LogService { @Autowired private LogMapper logMapper; @Override public List<Logentity> list() { return logMapper.list(); } }
同時配置properties.yml文件:
spring: datasource: url: jdbc:mysql://localhost:3306/logdb username: root password: XXoo0321 driver-class-name: com.mysql.jdbc.Driver mybatis: mapper-locations: classpath:mapping/*xml dubbo: application: id: dao-impl name: dao-impl protocol: id: dubbo name: dubbo port: 9999 registry: id: my-1 address: zookeeper://192.168.110.101:2181 scan: basePackages: com.yzhl.dao
到此,dao的實現類也已經完成了。
4.建立web項目:
一樣是spring-boot項目,pom文件依然須要dao接口項目和Dubbo的依賴,導入便可。
配置properties.yml文件:
server: port: 8888 dubbo: application: id: web name: web protocol: id: dubbo name: dubbo registry: id: my-2 address: zookeeper://192.168.110.101:2181 scan: basePackages: com.yzhl.webs
若是是非本地操做,須要在protocol中添加port端口號,且不能與前面實現類的相同,本地操做可不用添加。
建立Controller對象:
@RestController @RequestMapping("/log") public class LogController { @Reference//由於是外部的對象,這個注入只能用阿里的 private LogService logService; @GetMapping("list") @ResponseBody public List<Logentity> list(){ return logService.list(); } }
到此,咱們對數據庫的資源獲取已經完成,接下來就是利用Angular進行展現效果的編寫。
第十一步:Angular展現效果圖
新手上路,有不對的地方還請指正。