for f in `cat ~/machines`; do scp .ssh/id_dsa.pub $f:~/ ssh $f "cat id_dsa.pub >> .ssh/authorized_keys" done
for f in `cat machines `; do scp ~/software/soft/spark-2.1.0-bin-hadoop2.7.tgz $f:~/ ; ssh -t $f " sudo mkdir /opt/spark cd /opt/spark/; sudo tar -zxvf /home/keke.zhaokk/spark-2.1.0-bin-hadoop2.7.tgz ; sudo ln -s spark-2.1.0-bin-hadoop2.7 current; sudo chown -R keke.zhaokk /opt/spark/*; " done
修改配置
注意 SPARK_MASTER_IP=10.11.143.30
須要改成 SPARK_MASTER_HOST=10.11.143.30
python
複製配置sql
for f in `cat machines `; do scp -r $SPARK_HOME/conf/ $f:~/ ssh $f " mv conf/slaves $SPARK_HOME/conf/ mv conf/*.sh $SPARK_HOME/conf/ " done
角色 | IP | 安裝軟件 |
---|---|---|
worker | 10.11.143.24 | python(Anaconda3 4.0.0), R, spark-2.X-bin-hadoopX.X |
worker | 10.11.143.26 | python(Anaconda3 4.0.0), R, spark-2.X-bin-hadoopX.X |
master | 10.11.143.30 | python(Anaconda3 4.0.0), R, spark-2.X-bin-hadoopX.X |
http://10.11.143.30:8080/
start-all.sh
,stop-all.sh
來開啓和中止.spark-shell --total-executor-cores 30 --executor-memory 50g \ --master spark://10.11.143.30:7077 # 或者 pyspark --total-executor-cores 10 --executor-memory 50g \ --master spark://10.11.143.30:7077
## 配置 library(SparkR) sc = sparkR.init( master = "spark://10.11.143.30:7077", sparkEnvir = list(spark.executor.memory="100g",spark.cores.max="20") ) sqlContext = sparkRSQL.init(sc) ## 處理數據 ### 本地數據 read.df(sqlContext, paste0(Sys.getenv("SPARK_HOME"), "/examples/src/main/resources/people.json" ), "json") -> people head(people) ### 數據 createDataFrame( sqlContext, data_dataframe) -> data_spark head(data_spark) printSchema(data_spark) ### SQL registerTempTable(people,"people") teenagers = sql(sqlContext, "select name from people where age>13 and age<=19") ### 轉爲data.frame as.data.frame(teenagers) -> T ## stop sparkR.stop()
$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.SparkLR --master spark://10.11.143.30:7077 \ $SPARK_HOME/examples/jars/spark-examples_2.11-2.0.0-preview.jar > out.log
$sed -r '/^#/d;/^$/d' spark-env.sh SPARK_MASTER_IP=10.11.143.30 #SPARK_LOCAL_IP=10.11.143.26 # 26這臺機器出現錯誤, 須要加上這句. [keke.zhaokk@r73f16622.zui /opt/spark/spark-2.0.0-preview-bin-hadoop2.7/conf] $sed -r '/^#/d;/^$/d' spark-defaults.conf spark.master spark://10.11.143.30:7077 spark.executor.memory 100g spark.cores.max 30 [keke.zhaokk@r73f16622.zui /opt/spark/spark-2.0.0-preview-bin-hadoop2.7/conf] $sed -r '/^#/d;/^$/d' slaves 10.11.143.24 10.11.143.26 10.11.143.30 $tail -6 /etc/bashrc export JAVA_HOME=/opt/taobao/install/ajdk-8.0.0-b60 export PATH=$PATH:$JAVA_HOME/bin # worker 機器不須要這句 export SPARK_HOME=/opt/spark/spark-2.0.0-preview-bin-hadoop2.7 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin # worker 機器不須要這句
import os,sys os.environ['SPARK_HOME'] = '/opt/spark/spark-2.0.0-preview-bin-hadoop2.7' os.environ['JAVA_HOME']='/opt/taobao/install/ajdk-8.0.0-b60' os.environ['PYSPARK_SUBMIT_ARGS']=''' --total-executor-cores 10 --executor-memory 50g --master spark://10.11.143.30:7077 pyspark-shell '''
import os,sys spark_home = os.environ.get('SPARK_HOME', None) if any(map(lambda x: spark_home in x, sys.path)) is False: sys.path.insert(0, os.path.join(spark_home ,"python")) sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.1-src.zip')) with open(os.path.join(spark_home, "python/pyspark/shell.py")) as f: code = compile(f.read(), "shell.py", 'exec') exec(code) text_file = sc.textFile(spark_home + "/README.md") word_counts = text_file \ .flatMap(lambda line: line.split()) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) len(word_counts.collect())
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.0.0-preview /_/ Using Python version 2.7.8 (default, Nov 27 2014 17:41:17) SparkSession available as 'spark'. 263
sc.stop()