每日 mark

 

SIGNAL=${SIGNAL:-TERM}html

PIDS=$(jps -lm | grep -i 'kafka\.Kafka' | awk '{print $1}')
if [ -z "$PIDS" ]; then
echo "No kafka server to stop"
exit 1
else
kill -s $SIGNAL $PIDS
fijava

 

PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')web

if [ -z "$PIDS" ]; then
echo "No kafka server to stop"
exit 1
else
kill -s TERM $PIDS
fi面試

 

 

1.面試題目集合算法

2.分佈式實現算法sql

3.spark mllibapache

 

 

 

 



package sqlparser;
import java.io.*;
public class func{
public static String readToString(String fileName) {
String encoding = "UTF-8";
File file = new File(fileName);
Long filelength = file.length();
byte[] filecontent = new byte[filelength.intValue()];
try {
FileInputStream in = new FileInputStream(file);
in.read(filecontent);
in.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
try {
return new String(filecontent, encoding);
} catch (UnsupportedEncodingException e) {
System.err.println("The OS does not support " + encoding);
e.printStackTrace();
return null;
}
}}





秋天的顏色01 name+qq
Paic880807json











import numpy as np
import pandas as pd
pd.set_option('display.max_columns', 10)
pd.set_option('expand_frame_repr', False)
def loadData():
df_off = pd.read_csv(r'ccf_offline_stage1_train.csv')
df_on = pd.read_csv(r'ccf_online_stage1_train.csv')
df_test = pd.read_csv(r'ccf_offline_stage1_test_revised.csv')

return df_off ,df_on ,df_test
# return df_off[:10],df_on[:10],df_test[:10]
# User_id Merchant_id Coupon_id Discount_rate Distance Date_received Date
# 0 1439408 2632 NaN NaN 0.0 NaN 20160217.0
# 1 1439408 4663 11002.0 150:20 1.0 20160528.0 NaN
# 2 1439408 2632 8591.0 20:1 0.0 20160217.0 NaN
# 3 1439408 2632 1078.0 20:1 0.0 20160319.0 NaN
# 4 1439408 2632 8591.0 20:1 0.0 20160613.0 NaN
# User_id Merchant_id Action Coupon_id Discount_rate Date_received Date
# 0 13740231 18907 2 100017492 500:50 20160513.0 NaN
# 1 13740231 34805 1 NaN NaN NaN 20160321.0
# 2 14336199 18907 0 NaN NaN NaN 20160618.0
# 3 14336199 18907 0 NaN NaN NaN 20160618.0
# 4 14336199 18907 0 NaN NaN NaN 20160618.0
# User_id Merchant_id Coupon_id Discount_rate Distance Date_received
# 0 4129537 450 9983 30:5 1.0 20160712
# 1 6949378 1300 3429 30:5 NaN 20160706
# 2 2166529 7113 6928 200:20 5.0 20160727
# 3 2166529 7113 1808 100:10 5.0 20160727
# 4 6172162 7605 6500 30:1 2.0 20160708
# 0 977900 'User_id','Merchant_id','Coupon_id','Discount_rate','Date_received','Date'
# -1 701602
# 1 75382
# Name: label, dtype: int64
# -1 10557469
# 0 655898
# 1 216459
df_off,df_on,df_test = loadData()

df_off['label'] = -1
df_off.loc[df_off['Coupon_id'].notnull() & df_off['Date'].notnull(),'label'] = 1
df_off.loc[df_off['Coupon_id'].notnull() & df_off['Date'].isnull(),'label'] = 0

df_on['label'] = -1
df_on.loc[df_on['Coupon_id'].notnull() & df_on['Date'].notnull(),'label'] = 1
df_on.loc[df_on['Coupon_id'].notnull() & df_on['Date'].isnull(),'label'] = 0


real_off = df_off[df_off.label.isin([0,1])]
real_on = df_on[df_on.label.isin([0,1])]

real_all = pd.concat([real_off[['User_id','Merchant_id','Coupon_id','Discount_rate','Date_received','Date','label']],real_on[['User_id','Merchant_id','Coupon_id','Discount_rate','Date_received','Date','label']]])
print (real_all.iloc[:,0].size,real_off.iloc[:,0].size,real_on.iloc[:,0].size)
real_all['tmp'] = pd.to_datetime( (real_all['Date_received'].astype(int).apply(str)))
real_all['weekday'] = real_all['tmp'].dt.weekday_name
print (real_all.groupby(['weekday','label']).count())
# pd.pivot_table(real_all,values = 'label',index='weekday')




















http://mooc.study.163.com/university/deeplearning_ai#/c



__author__ = 'Administrator'
import time
import pandas as pd
def runtime(func):
def wrapper(*args,**kwargs):
t1 = time.time()
func(*args,**kwargs)
t2=time.time()
print ("{0}函數調用耗時:{1:.2f}".format (func.__name__,t2-t1))
return wrapper()


def loadData():
df_train = pd.read_csv("weibo_train_data.txt",header = None,sep = '\t')
df_train.columns = ["uid","mid","date","forward","comment","like","content"]
df_test = pd.read_csv("weibo_predict_data.txt",header = None,sep = '\t')
df_test.columns = ["uid","mid","date","content"]
return df_train,df_test

def dataProcess(data):
df = data.groupby('uid').agg(['median','mean'])
df.columns =[ 'forward_median','forward_mean','comment_median','comment_mean','like_median','like_mean']
train_stat = df.apply(pd.Series.round)
uid_dict = {}
for uid,row in df.iterrows():
uid_dict[uid] = row
return uid_dict


def fill_with_fixed_data(f,c,l):
df_train,df_test = loadData()
df1 = df_test[['uid','mid']]
df1['forward'] = f
df1['comment'] = c
df1['like'] = l
result = []
for _,row in df1.iterrows():
result.append("{0}\t{1}\t{2},{3},{4}\n".format(row[0],row[1],row[2],row[3],row[4]))
filename = "weibo_predict_{}_{}_{}.txt".format( f,c,l)
f= open(filename,'w')
f.writelines(result)
f.close()
return result


def fill_with_stat_data(stat = 'median'):
df_train,df_test = loadData()
uid_dict = dataProcess(df_train)
df1 = df_test[['uid','mid']]
forward,comment,like = [],[],[]
print (uid_dict)
for uid in df_test['uid']:
if uid in uid_dict:
forward.append(int(uid_dict[uid]["forward_"+stat]))
comment.append(int(uid_dict[uid]["comment_"+stat]))
like.append(int(uid_dict[uid]["like_"+stat]))
else:
forward.append(0)
comment.append(0)
like.append(0)
df1['forward'] = forward
df1['comment'] = comment
df1['like'] = like
result = []
for _,row in df1.iterrows():
result.append("{0}\t{1}\t{2},{3},{4}\n".format(row[0],row[1],row[2],row[3],row[4]))
filename = "weibo_predict_{}.txt".format( stat)
f= open(filename,'w')
f.writelines(result)
f.close()
return result

fill_with_stat_data( )










from numpy import *
import operator
from functools import reduce
def loadDataSet():
postingList=[['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'],
['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'],
['my', 'dalmation', 'is', 'so', 'cute', 'I', 'love', 'him'],
['stop', 'posting', 'stupid', 'worthless', 'garbage'],
['mr', 'licks', 'ate', 'my', 'steak', 'how', 'to', 'stop', 'him'],
['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']]
classVec = [0,1,0,1,0,1] #1 is abusive, 0 not
return postingList,classVec


def createVocabList(dataSet):
vocabSet = set(reduce(operator.add, dataSet))
return list(vocabSet)



def setOfWords2Vec(vocabList, inputSet):
returnVec = [0]*len(vocabList)
for word in inputSet:
if word in vocabList:
returnVec[vocabList.index(word)] = 1
else: print ("the word: %s is not in my Vocabulary!" % word)
return returnVec






















from numpy import *
from os import listdir
from numpy.ma import zeros
class kNN(object):
def __init__(self,**kwargs):
pass

def data2matrix(self):
fr = open('G:\zqh_work\ML\datasets\ml_ac\Ch02\datingTestSet2.txt')
lines = fr.readlines()
line_num = len(lines)
mat = zeros((line_num,3))
labels = []
for i in range(line_num):
mat[i] = lines[i].strip().split('\t')[0:3]
labels[i] = lines[i].strip().split('\t')[-1]
return mat,labels

def norm_data(self,mat):
max = mat.max(0)
min = mat.min(0)
diff = max - min
rows = mat.shape[0]
norm_mat = (mat - tile(min,(rows,1)))/tile(diff,(rows,1))
return norm_mat

def classify(self,inX,norm_mat,labels,k):
rows = norm_mat.shape[0]
for i in range(rows):



def test(self):
mat,labels = self.data2matrix()
norm_mat = self.norm_data(mat)
print (norm_mat)
return norm_mat


if __name__ == '__main__':
knn=kNN()
knn.test()

 

 

 

 

 

 

 

 

 

 

 

 

 

http://keras-cn.readthedocs.io/en/latest/segmentfault

http://wiki.jikexueyuan.com/project/tensorflow-zh/get_started/introduction.htmltomcat

 

 

https://segmentfault.com/a/1190000002766035

 

 

https://www.jianshu.com/p/8bb456cb7c77 

 

http://cache.baiducontent.com/c?m=9f65cb4a8c8507ed4fece763104d96275e03c1743ca083572c85c91f84642c1c0733fee37c6243198385212240f8543d8883560b200356b799c28f4ac9fecf6879877a74250b873105d36eb8ca36768373c100beb81897adf04584afa2929d07139344040a97f0fc4d01648b2cae033093b1993f025e60eda76734b81f2c74c33441c650f997256f77d1b189081b837d867610e7ef68f52913c548e2485b7702fd0ca6092131309758268f1e6e4585ea2dbb7d3306&p=c2769a479d9e0bb312bd9b7e0d1488&newp=8465c64ad49506e42abd9b7e0d1496231610db2151d7d4146b82c825d7331b001c3bbfb423251003d2c0776600af495ee8f5367630032ba3dda5c91d9fb4c57479de607f02&user=baidu&fm=sc&query=org%2Eapache%2Espark%2Esql%2Eexecution%2EBufferedRowIterator%2EhasNext&qid=853831ee00006451&p1=7

org.apache.spark.sql.execution.BufferedRowIterator.hasNext

spark.write是不是分佈式寫?

scala 事務控制?

yarn的web ui 配置,rm是哪臺機器 ?

爲何不用yarn-cluster?很差收集日誌?

 executor 日誌 如何 查看 ?

spark的幾個配置文件適用情形?

 

 https://www.cnblogs.com/sorco/p/7070922.html

 

http://hongjiang.info/scala/    寫點什麼

 spark executor 日誌:$SPARK_HOME/work/$app_id/$executor_id/stdout

總結一下Spark中各個角色的JVM參數設置:    

(1)Driver的JVM參數:
-Xmx,-Xms,若是是yarn-client模式,則默認讀取spark-env文件中的SPARK_DRIVER_MEMORY值,-Xmx,-Xms值同樣大小;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值。
PermSize,若是是yarn-client模式,則是默認讀取spark-class文件中的JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"值;若是是yarn-cluster模式,讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值。
GC方式,若是是yarn-client模式,默認讀取的是spark-class文件中的JAVA_OPTS;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的參數值。
以上值最後都可被spark-submit工具中的--driver-java-options參數覆蓋。

(2)Executor的JVM參數:
-Xmx,-Xms,若是是yarn-client模式,則默認讀取spark-env文件中的SPARK_EXECUTOR_MEMORY值,-Xmx,-Xms值同樣大小;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
PermSize,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
GC方式,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。

(3)Executor數目及所佔CPU個數
若是是yarn-client模式,Executor數目由spark-env中的SPARK_EXECUTOR_INSTANCES指定,每一個實例的數目由SPARK_EXECUTOR_CORES指定;若是是yarn-cluster模式,Executor的數目由spark-submit工具的--num-executors參數指定,默認是2個實例,而每一個Executor使用的CPU數目由--executor-cores指定,默認爲1核。
每一個Executor運行時的信息能夠經過yarn logs命令查看到,相似於以下:

14/08/13 18:12:59 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Setting up executor with commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill %p', -Xms1024m -Xmx1024m , -XX:PermSize=256M -XX:MaxPermSize=256M -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -Xloggc:/tmp/spark_gc.log, -Djava.io.tmpdir=$PWD/tmp, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler, 1, sparktest2, 3, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)

    其中,akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler表示當前的Executor進程所在節點,後面的1表示Executor編號,sparktest2表示ApplicationMaster的host,接着的3表示當前Executor所佔用的CPU數目。

 

先在spark-env.sh 增長SPARK_HISTORY_OPTS;

而後啓動start-history-server.sh服務;

就能夠看到啓動了HistoryServer進程,且監聽端口是18080。

以後就能夠在web上使用http://hostname:18080愉快的玩耍了。



做者:俺是亮哥
連接:https://www.jianshu.com/p/65a3476757a5
來源:簡書
著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。

 

問題的癥結就在於:閉包沒有辦法序列化。在這個例子裏,閉包的範圍是:函數parser以及它所依賴的一個隱式參數: formats , 而問題就出在這個隱式參數上, 它的類型是DefaultFormats,這個類沒有提供序列化和反序列自身的說明,因此Spark沒法序列化formats,進而沒法將task推送到遠端執行。

隱式參數formats是爲extract準備的,它的參數列表以下:

org.json4s.ExtractableJsonAstNode#extract[A](implicit formats: Formats, mf: scala.reflect.Manifest[A]): A = ...
  • 1

找到問題的根源以後就好解決了。實際上咱們根本不須要序列化formats, 對咱們來講,它是無狀態的。因此,咱們只須要把它聲明爲一個全局靜態的變量就能夠繞過序列化。因此改動的方法就是簡單地把implicit val formats = DefaultFormats的聲明從方法內部遷移到App Object的字段位置上便可。

 

 

{
"color_scheme": "Packages/Color Scheme - Default/Monokai.tmTheme",
"font_size": 13,
"ignored_packages":
[
"Vintage"
],
"preview_on_click": false,
"word_wrap": "true"
}

 

ambari  部署

scala : 靜態方法,單例對象,伴生對象

 

spark job server

spark etl

spark 資源限制      

yarn   資源隊列限制  用戶限制  

算法 

hdfs 掛載  像訪問本身的目錄

 

f5

keepalived

sdg agent 採集redolog

active mq vs kafka

tomcat 備份

finixs

組的概念? hadoop組

hue權限控制

spark thirft service 

spark submit

 

ranger 只控制sdo?不能控制命令行?

sms抓取元數據與ctm 配對

 

 

 ranger ............

 scala 閉包 

java 內部類

/etc/security/limits.conf

 

同步命令:scp –r /seabox/develop/  26.6.0.141:/seabox

 

謂詞下推

 整理要了解的業務知識

 

(select z.*,row_number() over(partition by z.deal_no order by z.biz_date desc) rn
from bridge.summit_i_repo_general_info_ib z
where z.deal_no not in (select distinct deal_no from bridge.summit_i_repo_general_info_ib where deal_status in ('3', '4') and biz_date <= '{DATE_YYYYMMDD}')
)

 

 

如何匹配 :received 數字 rows  ?

START_TIME=`date "+%Y-%m-%d %H:%M:%S"`   ????????

 awk -F: '{print"用戶賬號:"$1}'

sqoop 各參數

kettle 導出爲xml文件

 http://confluence.paic.com.cn:6060/pages/viewpage.action?pageId=2132765

 http://www.docin.com/p-1354952858.html

oracle 鏈接:

JDBC

ODBC

OCI

JNDI

 http://logging.apache.org/log4j/2.x/

 

查看當前進程:ps

能夠用來查找某一應用運行在哪裏 :ps -aux | grep hive

flume 收集log4j日誌的例子:

http://blog.csdn.net/nsrainbow/article/details/36875123

 

H75244

Uy1caTod6Hgb

 

建表時沒有定義分隔符,分桶等,在表建成以後還能不能再加上?

 

 

val dfa = sc.parallelize(List(("1", "aa", "ab"), ("2", "bb", "bb"),("4", "dd", "dd"))).toDF("key", "val1", "val2")
val dfb = sc.parallelize(List(("1", "aa", "ab"), ("2", "bb", "cc"), ("3", "cc", "cc"))).toDF("key", "val1", "val2")
val dfc = sc.parallelize(List( ("key"),("val1"))).toDF("pkey")
val rv1 = dfb.join(dfa, dfa("key") === dfb("key") and dfa("val1") === dfb("val1"), "outer").show()
val tmp = dfc.select("pkey").collect().map(_(0).toString())
val mid = new Array[org.apache.spark.sql.Column](tmp.length)
for (i<- 0 until tmp.length) mid(i)=dfa(tmp(i))===dfb(tmp(i))
val rv2 = dfb.join(dfa, mid.reduce(_ and _), "outer")

 

val cols = dfb.columns
val all_col = new Array[org.apache.spark.sql.Column](cols.length)
for (i <- 0 until cols.length) all_col(i)=when(dfb("key").isNull, dfa(cols(i))).otherwise(dfb(cols(i))).as(cols(i))

val rv3 = rv2.select(all_col:_*).show()


rv2.select(when(dfb("key").isNull, dfa("key")).otherwise(dfb("key")).as("key"))

 

 

 

import scala.collection.mutable.ArrayBuffer
val cols = dfb.columns
val a=dfb.dtypes
val b = new ArrayBuffer[String]()
for (i <- a if i._2=="IntegerType") b+=i._1

val numArray = b.toArray
val num_col = new Array[org.apache.spark.sql.Column](numArray.length)
for (i <- 0 until numArray.length) num_col(i)=when(dfb("key").isNull, lit(0)).otherwise(dfb(numArray(i))).as(numArray(i))

val strArray = cols.filterNot(numArray.contains(_))
val str_col = new Array[org.apache.spark.sql.Column](strArray.length)
for (i <- 0 until strArray.length) str_col(i)=when(dfb("key").isNull, dfa(strArray(i))).otherwise(dfb(strArray(i))).as(strArray(i))

val rv3 = rv2.select((num_col ++ str_col):_*)rv3.show()

相關文章
相關標籤/搜索