最近 有網友看個人「整合Kafka到Spark Streaming——代碼示例和挑戰」文章,
講 kafka對象 放到 pool 並經過broadcast廣播出去:
而後 在開發測試階段 報錯以下:
html
而後就找我,說「代碼都跟你的差很少呀,爲何就報這個錯呢?」
其實 對於廣播操做,spark 確定要序列號的,還有儘可能不要把大對象廣播出去,
後來 把代碼要過來看了下,發現 createKafkaProducerPool這個方法 ,單首創建了一個類,同時這個類 extends Serializable ,我當時的感受就是,若是createKafkaProducerPool方法 ,寫在main方法 or Driver端 應該就確定不會有這個問題,我也建議這樣搞的,還有 我懷疑 集羣是啓用了Kryo序列號方式,而createKafkaProducerPool方法所在類居然 extends Serializable ,不解java
The closures (anon function going inside RDD.map(…)) are serialized by Spark before distributing them. Hadoop does not have this problem because it binary-serializes the whole .jar and copies it over the network. Spark uses JavaSerialization by default, but it is very slow compared to, say, Kryo. So we use Kryo to do that by using a wrapper (Spark doesn’t support kryo-serde for closures, not yet).git
And uptill now the org.dbpedia.extraction.spark.serializeKryoSerializationWrapper class has been working perfectly. Some freak extractors seem to fail though. github
版權聲明:本文爲博主原創文章,未經博主容許不得轉載。apache