近期在處理mongoDB 數據的時候,遇到了很是奇怪的格式,帳號密碼的日誌都追加在一條記錄裏面,要取一個密碼的時長和全部密碼的平均時長就很是繁瑣。java
用了各類迭代計算,很是困難,並且printschema出來結構也是不規範的。mysql
和同事一塊兒研究後用了StructType 效率很是高。sql
代碼以下:apache
import java.sql.{DriverManager, ResultSet} import mongoDb_foundation_data20180913.url import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.types._ import org.apache.spark.sql.SparkSession object devicests_20180916 { // spark-submit --driver-class-path /usr/local/jdk/lib/mysql-connector-java-5.1.46.jar --class "devicests_20180916" /testdata/u3.jar val url = "jdbc:mysql://192.168.1.10:3306/saas?user=saas&password=saas2018" //val url = "jdbc:mysql://134.175.180.116:3306/saas?user=saas&password=saas2018" val conn = DriverManager.getConnection(url) def main(args: Array[String]): Unit = { val conn = DriverManager.getConnection(url) val conf = new SparkConf().setAppName("appName").setMaster("local") val sc = new SparkContext(conf) val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.debug.maxToStringFields", "200").getOrCreate() spark.sql("use saas") import spark.implicits._ import org.apache.spark.sql.types._ import org.apache.spark.sql.SparkSession val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE) // val logData=spark.read.textFile("file:////mysqldata/aasdata/2018-08-17/devices_2018-08-17") // val log = spark.read.json(logData) val prop = new java.util.Properties // // log.createOrReplaceTempView("devicests_states") // // // // df.write.mode("append").jdbc(url, "saas.devicests_states", prop) //import org.apache.calcite.adapter val schema = new StructType() .add("__v", StringType) .add("_id", new StructType() .add("$oid", StringType)) .add("device_type", StringType) .add("hardware_info", new StructType() .add("cid", StringType) .add("mac", StringType) .add("sn", StringType) .add("versions", new StructType() .add("app_version", StringType) .add("hardware_version", StringType) .add("zigbee_version", StringType))) .add("model_id", StringType) .add("name", StringType) .add("nickname", StringType) .add("parent", StringType) .add("services", ArrayType(StringType)) .add("states", new StructType() .add("onoff_line", StringType) .add("passwords", // spark 默認將 passwords 視爲 struct,不便於使用 explode 和 map_values // 須要手動定義爲 Map[String, Struct] MapType(StringType, new StructType() .add("description", StringType) .add("id", StringType) .add("is_default", StringType) .add("name", StringType) .add("permission", new StructType() .add("begin", StringType) .add("end", StringType) .add("status", StringType)) .add("status", IntegerType) .add("time", StringType))) .add("power", StringType)) .add("status", IntegerType) .add("time", StringType) .add("uuid", StringType) spark.read.schema(schema) .json(s"file:///mysqldata/aasdata/2018-09-12/devices_2018-09-12") .createOrReplaceTempView("devices") val res = spark.sql( """ |SELECT uuid, | COUNT(passwords.permission) AS count, | AVG(passwords.permission.end - passwords.permission.begin) AS avg |FROM | ( | SELECT uuid,explode(map_values(states.passwords)) AS passwords | FROM devices | ) |WHERE | passwords.permission.begin IS NOT NULL | AND passwords.permission.end IS NOT NULL group by uuid""".stripMargin)//.collect.head res.write.mode("overwrite").jdbc(url, "saas.res_count_avg", prop) //// //// val count = Long(res(0)) //// val avg = Double(res(1)) } }