MongoDB Spark Connector 實戰指南

Why Spark with MongoDB?

  1. 高性能,官方號稱 100x faster,由於能夠全內存運行,性能提高確定是很明顯的
  2. 簡單易用,支持 Java、Python、Scala、SQL 等多種語言,使得構建分析應用很是簡單
  3. 統一構建 ,支持多種數據源,經過 Spark RDD 屏蔽底層數據差別,同一個分析應用可運行於不一樣的數據源;
  4. 應用場景普遍,能同時支持批處理以及流式處理

MongoDB Spark Connector 爲官方推出,用於適配 Spark 操做 MongoDB 數據;本文以 Python 爲例,介紹 MongoDB Spark Connector 的使用,幫助你基於 MongoDB 構建第一個分析應用。java

準備 MongoDB 環境

安裝 MongoDB 參考 Install MongoDB Community Edition on Linuxpython

mkdir mongodata
mongod --dbpath mongodata --port 9555

準備 Spark python 環境

參考 PySpark - Quick Guidelinux

下載 Spark

cd /home/mongo-spark
wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
tar zxvf spark-2.4.4-bin-hadoop2.7.tgz

設置 Spark 環境變量

export SPARK_HOME=/home/mongo-spark/spark-2.4.4-bin-hadoop2.7
export PATH=$PATH:/home/mongo-spark/spark-2.4.4-bin-hadoop2.7/bin
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH=$SPARK_HOME/python:$PATH

運行 Spark RDD 示例

# count.py
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()

$SPARK_HOME/bin/spark-submit count.py
Number of elements in RDD → 8

若是上述程序運行成功,說明 Spark python 環境準備成功,還能夠測試 Spark 的其餘 RDD 操做,好比 collector、filter、map、reduce、join 等,更多示例參考 PySpark - Quick Guidesql

Spark 操做 MongoDB 數據

參考 Spark Connector Python Guidemongodb

準備測試數據 test.coll01 插入3條測試數據,test.coll02 未空

mongo --port 9555

> db.coll01.find()
{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }
> db.coll02.find()

準備操做腳本,將輸入集合的數據按條件進行過濾,寫到輸出集合

# mongo-spark-test.py
from pyspark.sql import SparkSession

# Create Spark Session

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:9555/test.coll01") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1:9555/test.coll") \
    .getOrCreate()


# Read from MongoDB
df = spark.read.format("mongo").load()
df.show()

# Filter and Write
df.filter(df['qty'] >= 10).write.format("mongo").mode("append").save()    

# Use SQL 
# df.createOrReplaceTempView("temp")
# some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
# some_fruit.show()

運行腳本

$SPARK_HOME/bin/spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 mongo-spark-test.py

mongo --port 9555

> db.coll02.find()
{ "_id" : 2, "qty" : 10, "type" : "orange" }
{ "_id" : 3, "qty" : 15, "type" : "banana" }

 

閱讀原文apache

本文爲雲棲社區原創內容,未經容許不得轉載。api

相關文章
相關標籤/搜索