將sparkStreaming的結果保存到S3

將spark解析的結果保存到S3java

這個和保存到本地的區別在於,你須要配置aws的key和密碼,以及它的region,代碼以下apache

package com.alo7.spark

import java.util.Properties
import test07.DWReadS3LogToKafka_Tpuser.getProperties
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.util.parsing.json.JSON

object TestSaveDataToS3Time {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF)

//
val conf = new SparkConf().setAppName("ReadS3LogToKafka").setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))

ssc.sparkContext.hadoopConfiguration.set("fs.s3a.access.key","這裏是你aws的key")
ssc.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key","這裏是你aws的密碼")
ssc.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn")


val zkQuorum = "192.168.1.112:2181"
val group = "testgroup"
val topics = "test"
val numThreads = 2
val topicpMap = topics.split("\n").map((_,numThreads.toInt)).toMap

val lines: DStream[String] = KafkaUtils.createStream(ssc,zkQuorum,group,topicpMap).map(_._2)

//lines.count().print()
val analysisProps = getProperties("/Users/huiliyang/config/tpuser_log_info_config.properties")

//
getKeyValue()是個人解析數據的函數
 val formatResult: DStream[String] = getKeyValue(lines,"iclass-tpuser",analysisProps).filter(!_.matches(analysisProps.getProperty("default_output")))

formatResult.count().print()
//保存數據到S3
formatResult.saveAsTextFiles("s3a://alo7-dw/tmp/test/2017-10-26/log")

ssc.start()
ssc.awaitTermination()
}

spark與S3集成須要的jar包
<properties>  <scala.version>2.11.8</scala.version>  <spark.version>2.2.0</spark.version>  <hadoop.version>2.7.2</hadoop.version>  <spark.pom.scope>compile</spark.pom.scope></properties><dependencies>  <dependency>    <groupId>org.apache.spark</groupId>    <artifactId>spark-core_2.11</artifactId>    <version>${spark.version}</version>    <!--<scope>${spark.pom.scope}</scope>-->  </dependency>  <dependency>    <groupId>org.apache.spark</groupId>    <artifactId>spark-streaming_2.11</artifactId>    <version>${spark.version}</version>  </dependency>  <dependency>    <groupId>org.apache.spark</groupId>    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>    <version>2.2.0</version>  </dependency>  <dependency>    <groupId>com.databricks</groupId>    <artifactId>spark-redshift_2.11</artifactId>    <version>3.0.0-preview1</version>  </dependency>  <dependency>    <groupId>com.databricks</groupId>    <artifactId>spark-avro_2.11</artifactId>    <version>3.2.0</version>  </dependency>  <dependency>    <groupId>org.scala-lang</groupId>    <artifactId>scala-library</artifactId>    <version>${scala.version}</version>    <!--<scope>${spark.pom.scope}</scope>-->  </dependency>  <dependency>    <groupId>com.fasterxml.jackson.core</groupId>    <artifactId>jackson-core</artifactId>    <version>2.6.5</version>  </dependency>  <dependency>    <groupId>com.fasterxml.jackson.core</groupId>    <artifactId>jackson-databind</artifactId>    <version>2.6.5</version>  </dependency>  <dependency>    <groupId>com.fasterxml.jackson.core</groupId>    <artifactId>jackson-annotations</artifactId>    <version>2.6.5</version>  </dependency>  <dependency>    <groupId>com.fasterxml.jackson.module</groupId>    <artifactId>jackson-module-scala_2.11</artifactId>    <version>2.6.5</version>  </dependency>  <dependency>    <groupId>com.fasterxml.jackson.module</groupId>    <artifactId>jackson-module-paranamer</artifactId>    <version>2.6.5</version>  </dependency>  <dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-aws</artifactId>    <version>${hadoop.version}</version>  </dependency>  <dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-client</artifactId>    <version>${hadoop.version}</version>  </dependency>  <dependency>    <groupId>net.java.dev.jets3t</groupId>    <artifactId>jets3t</artifactId>    <version>0.9.4</version>  </dependency>  <dependency>    <groupId>org.apache.httpcomponents</groupId>    <artifactId>httpcore</artifactId>    <version>4.4</version>  </dependency>  <dependency>    <groupId>org.apache.httpcomponents</groupId>    <artifactId>httpclient</artifactId>    <version>4.4</version>  </dependency>
相關文章
相關標籤/搜索