一、數據源讀取python
使用的時候,須要加載驅動 --jars 或者添加到classpath中 或scaddjar算法
Spark對Oracle數據庫讀取,代碼以下:sql
conf = SparkConf().setAppName(string_test)
sc = SparkContext(conf=conf)
ctx = SQLContext(sc)
sqltext = "(select dbms_lob.substr(title,500) as title,id,content,country,languages,time as publishDate,source,subject,source_url from news t where id <= 24) news"
news =ctx.read \
.format("jdbc") \
.option("url", "jdbc:oracle:thin:username/password@//ip:port/sid") \
.option("dbtable", sql) \
.option("user", "user") \
.option("password", "password") \
.option("driver", "oracle.jdbc.driver.OracleDriver") \
.load()
news.registerTempTable("news")
Spark 對Mongo讀數據mongodb
ctx = SQLContext(sc)
mongourl = "mongodb://username:password@ip:port"
mongoDB = "dbname"
mongoCollection = "collectionName"
mongoRows = ctx.read.format("com.mongodb.spark.sql").options(uri=mongourl,database=mongoDB, collection=mongoCollection).load()
mongoResultRdd = mongoRows.rdd
二、機器學習算法轉換數據庫
機器學習算法有兩類不能直接添加到spark中:oracle
1) 包中含有複雜依賴關係的,如scipy、numpy等,scipy.special.beta函數在spark中不可使用的。機器學習
2) 包不是.py結尾的,而是有第三方編譯包的,不能夠添加到spark中函數
解決辦法:學習
在spark改寫的代碼中使用到上述相關的程序,闊以用subprocess調用python程序,以進行數據處理,而後獲得程序返回結果。以下:url
test= subprocess.getoutput("python /home/pytest.py \""+content.replace("\'","’")+"\"")re= test[test.index("::")+2:len(test)].replace(" ","")