1.保存到redshift數據庫的代碼java
package test05
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
object SaveDataToMysql {
def main(args: Array[String]): Unit = {
// 屏蔽沒必要要的日誌 ,在終端上顯示須要的日誌
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF)
//初始化sparkStreaming
val conf = new SparkConf().setAppName("SaveDataToMysql").setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
//設置鏈接Kafka的配置信息
val zkQuorum = "192.168.1.112:2181" //zookeeper集羣的IP:port,IP:port,IP:port
val group = "testgroup" //在consumer.properties配置group.id
val topics = "huiliyang" //選擇要鏈接的producer,它是以topic來區分每一個producer的。例如:我這裏的建立的topic是huiliyang
val numThreads = 2 //線程
val topicpMap = topics.split("\n").map((_,numThreads.toInt)).toMap //這個是有可能有好幾個topic同時提供數據,那麼咱們要把它用空格分割開,而後映射成(topic,2),再轉換成map集合
ssc.checkpoint("checkpoint"
val lines: DStream[String] = KafkaUtils.createStream(ssc,zkQuorum,group,topicpMap).map(_._2) //建立流
lines.print()
//保存到redshift
lines.map(x=>x.split(",")).foreachRDD(line =>{
line.foreachPartition(rdd =>{
val conn = ConnectPoolUtil.getConnection //ConnectPoolUtil是我建立的一個數據庫鏈接池,getConnection是它的一個方法
conn.setAutoCommit(false); //設爲手動提交
val stmt = conn.createStatement()
rdd.foreach(word=>{
stmt.addBatch("insert into test_log2(time, ip, user_id, user_type, source, scene) values('" + word(0)+"','"+word(1)+"','"+word(2)+"','"+word(3)+"','"+word(4)+"','"+word(5) + "')")
})
stmt.executeBatch()
conn.commit()
conn.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}
數據庫鏈接池代碼:
package test05
import java.sql.{Connection, PreparedStatement, ResultSet}
import org.apache.commons.dbcp.BasicDataSource
object ConnectPoolUtil {
private var bs:BasicDataSource = null
/**
* 建立數據源
* @return
*/
def getDataSource():BasicDataSource={
if(bs==null){
bs = new BasicDataSource()
bs.setDriverClassName("org.postgresql.Driver")
bs.setUrl("jdbc:postgresql://172.30.11.61:5439/test")
bs.setUsername("*****")
bs.setPassword("*****")
bs.setMaxActive(200) //設置最大併發數
bs.setInitialSize(30) //數據庫初始化時,建立的鏈接個數
bs.setMinIdle(50) //最小空閒鏈接數
bs.setMaxIdle(200) //數據庫最大鏈接數
bs.setMaxWait(1000)
bs.setMinEvictableIdleTimeMillis(60*1000) //空閒鏈接60秒中後釋放
bs.setTimeBetweenEvictionRunsMillis(5*60*1000) //5分鐘檢測一次是否有死掉的線程
bs.setTestOnBorrow(true)
}
bs
}
/**
* 釋放數據源
*/
def shutDownDataSource(){
if(bs!=null){
bs.close()
}
}
/**
* 獲取數據庫鏈接
* @return
*/
def getConnection():Connection={
var con:Connection = null
try {
if(bs!=null){
con = bs.getConnection()
}else{
con = getDataSource().getConnection()
}
} catch{
case e:Exception => println(e.getMessage)
}
con
}
/**
* 關閉鏈接
*/
def closeCon(rs:ResultSet ,ps:PreparedStatement,con:Connection){
if(rs!=null){
try {
rs.close()
} catch{
case e:Exception => println(e.getMessage)
}
}
if(ps!=null){
try {
ps.close()
} catch{
case e:Exception => println(e.getMessage)
}
}
if(con!=null){
try {
con.close()
} catch{
case e:Exception => println(e.getMessage)
}
}
}
}
pom文件
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.7.2</hadoop.version>
<spark.pom.scope>compile</spark.pom.scope>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>${spark.pom.scope}</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>${spark.pom.scope}</scope>-->
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.4.1212</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
</dependencies>