Spark 2.3.3 Kafka 2.11-1.0.2 Java jdk1.8.0_191 Hbase 1.2.11python
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils,TopicAndPartition
KAFKA_BROKER_LIST = "10.2.XX.XX:9092,10.2.XX.XX:9092,10.2.XX.XX:9092"
KAFKA_TOPICS = ["streamingTest"]
SPARK_STREAMING_TIME_DELAY = 5
kafka_topic_partition_offset_ranges = []
LOCAL_OFFSET_FILE = "offset_test.txt"
def get_offset_ranges(rdd):
global kafak_topic_partition_offset_ranges
kafka_topic_partition_offset_ranges = rdd.offsetRanges()
rdd
def save_offset_ranges(rdd):
root_path = os.path.dirname(os.path.realpath(__file__))
local_offset_path = os.path.join(root_path,LOCAL_OFFSET_FILE)
data = list()
for o in offsetRanges:
data.append({"topic":o.topic, "partition": o.partition, "fromOffset": o.fromOffset, "unitilOffset": o.untilOffset})
with open(local_offset_path,'w') as f:
f.write(json.dumps(data))
def deal_data(rdd):
def convert_dict_to_tuple(dict2):
tuple2 = []
for rowkey,values in dict2.items():
for k,v in values.items():
tuple2.append((rowkey, k.split(':'),v))
return tuple2
rdd1 = rdd.flatMap(lambda x : convert_dict_to_tuple(x)).map(lambdax: (x[0],[x[0], x[1][0], x[1][1], x[2]]))
data = rdd1.first()
logger.warning('rdd data[0]:{}'.format(data))
host = 'master,slave1,slave2'
table = 'TEST:somestatus'
keyConv = 'org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter'
valueConv ='org.apache.spark.examples.pythonconverters.StringListToPutConverter' conf = {"hbase.zookeeper.quorum":host,"hbase.mapred.outputtable":table, "mapreduce.outputformat.class":"org.apache.hadoop.hbase.mapreduce.TableOutputFormat", "mapreduce.job.output.key.class":"org.apache.hadoop.hbase.io.ImmutableBytesWritable", "mapreduce.job.output.value.class":"org.apache.hadoop.io.Writeables"} rdd1.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv, valueConverter=valueConv)def save_by_spark_streaming(): root_path = os.path.dirname(os.path.realpath(__file__)) record_path = os.path.join(root_path,local_offset_file) from_offsets = {} if o.path.exits(record_path): f = open(record_path,"r") offset_data = json.loads(f.read()) f.close() for o in offset_data: if o['topic'] !=topic_name: raise Exception("the topic name in %s is incorrect"% local_offset_file) topic_partition = TopicAndPartition(o['topic'],o['partition']) from_offsets[topic_partition] = int(o['untilOffset']) logger.warning("partition start from offset:%s" % from_offsets) sc = SparkContext(appName="test-kafka-integrating-streaming") ssc = StreamingContext(sc,int(timer)) kvs = KafkaUtils.createDirectStream(ssc=ssc,topics=[topic_name],fromOffsets=from_offsets,kafkaParams={"metadata.broker.list":broker_list}) kvs.map(lambda x:json.loads(x[1])).foreachRDD(lambda rec:deal_data)rec)) kvs.transform(store_offset_ranges).foreachRDD(save_offset_ranges) ssc.start() ssc.awaitTermination() ssc.stop()if __name__ == '__main__': save_by_spark_streaming()