val ssc = new StreamingContext(conf, Seconds(10))
val lines = ssc.textFileStream("hdfs://master:9000/woozoom/").repartition(12).map(_.split(","))
lines.foreachRDD { rdd =>
{
if (rdd.count > 100) {
rdd.foreachPartition { part =>
{
val hbaseConf = HBaseConfiguration.create()
val htable = new HTable(hbaseConf, TableName.valueOf("UAV_LOG"))
htable.setAutoFlush(false, false)
htable.setWriteBufferSize(3 * 1024 * 1024)
part.foreach { log =>
{
htable.put(convertFromLogToHabse(log))
}
}
htable.flushCommits()
}
}
}
}
}spa
ssc.start()
ssc.awaitTermination()it