Introducing DataFrames in Apache Spark for Large Scale Data Science(中英雙語)

文章標題

Introducing DataFrames in Apache Spark for Large Scale Data Sciencehtml

一個用於大規模數據科學的API——DataFramegit

做者介紹

 Reynold XinMichael Armbrust and Davies Liu github

文章正文

Today, we are excited to announce a new DataFrame API designed to make big data processing even easier for a wider audience.sql

今天,咱們正式宣佈Spark新的API——DataFrame 。做爲2014–2015年Spark最大的API改動,DataFrame可以使得大數據更爲簡單,從而擁有更普遍的受衆羣體。數據庫

When we first open sourced Apache Spark, we aimed to provide a simple API for distributed data processing in general-purpose programming languages (Java, Python, Scala). Spark enabled distributed data processing through functional transformations on distributed collections of data (RDDs). This was an incredibly powerful API: tasks that used to take thousands of lines of code to express could be reduced to dozens.express

咱們最先在設計Spark的時候,其中一個很重要的目標就是給大數據生態圈提供基於通用編程語言的(Java、Scala、Python)簡單易用的API。Spark本來的RDD API經過函數式編程的模式把分佈式數據處理轉換成分佈式數據集(distributed collections)。本來須要上千行用Hadoop MapReduce實現的代碼,在Spark這個API上減小到了數十行。apache

  • dozens ['dʌznz] 幾十,許多;(一)打,十二個( dozen的名詞複數 )

As Spark continues to grow, we want to enable wider audiences beyond 「Big Data」 engineers to leverage the power of distributed processing. The new DataFrames API was created with this goal in mind.  This API is inspired by data frames in R and Python (Pandas), but designed from the ground-up to support modern big data and data science applications. As an extension to the existing RDD API, DataFrames feature:編程

而後隨着Spark的不斷壯大,咱們但願擁有更普遍的受衆羣體利用其進行分佈式處理,不侷限於「大數據」工程師。這個新的DataFrame API在R和Python data frame的設計靈感之上,專門爲了數據科學應用設計,具備如下功能特性:json

  • leverage  [ˈli:vərɪdʒ] 槓桿做用;影響力;優點,力量;舉債經營
  • ground-up 碾碎的;磨成粉的 從新
  • Ability to scale from kilobytes of data on a single laptop to petabytes on a large cluster
  • Support for a wide array of data formats and storage systems
  • State-of-the-art optimization and code generation through the Spark SQL Catalystoptimizer
  • Seamless integration with all big data tooling and infrastructure via Spark
  • APIs for Python, Java, Scala, and R (in development via SparkR)
  • 從KB到PB級的數據量支持;
  • 多種數據格式和多種存儲系統支持;
  • 經過Spark SQL的Catalyst優化器進行先進的優化,生成代碼;
  • 經過Spark無縫集成全部大數據工具與基礎設施;
  • 爲Python、Java、Scala和R語言(SparkR)API。

For new users familiar with data frames in other programming languages, this API should make them feel at home. For existing Spark users, this extended API will make Spark easier to program, and at the same time improve performance through intelligent optimizations and code-generation.api

對於以前熟悉其餘語言中data frames的新用戶來講,這個新的API可讓Spark的初體驗變得更加友好。而對於那些已經在使用的用戶來講,這個API會讓基於Spark的編程更加容易,同時其智能優化和代碼生成也能幫助用戶得到更好的性能。

一、What Are DataFrames?

In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

在Spark中,DataFrame是一個以命名列方式組織的分佈式數據集,等同於關係型數據庫中的一個表,也至關於R/Python中的data frames(可是進行了更多的優化)。DataFrames能夠由結構化數據文件轉換而來,也能夠從Hive中的表得來,以及能夠轉換自外部數據庫或現有的RDD。

  • conceptually [kən'septʃʊrlɪ] 概念地 從概念上講
  • equivalent  [ɪˈkwɪvələnt] 相等的,至關的,等效的;等價的,等積的;[化學]當量的

The following example shows how to construct DataFrames in Python. A similar API is available in Scala and Java.

下面代碼演示瞭如何使用Python構造DataFrames,而在Scala和Java中也有相似的API能夠調用。

// Constructs a DataFrame from the users table in Hive.
users = context.table(「users」)

// from JSON files in S3
logs = context.load(「s3n://path/to/data.json」, 「json」)

二、How Can One Use DataFrames?

Once built, DataFrames provide a domain-specific language for distributed data manipulation.  Here is an example of using DataFrames to manipulate the demographic data of a large population of users:

一經構建,DataFrames就會爲分佈式數據處理提供一個指定的DSL(domain-specific language )。

  • demographic  [ˌdemə'ɡræfɪk] 人口統計學的;人口統計的
// Create a new DataFrame that contains 「young users」 only
young = users.filter(users.age < 21)

// Alternatively, using Pandas-like syntax
young = users[users.age < 21]

// Increment everybody’s age by 1
young.select(young.name, young.age + 1)

// Count the number of young users by gender
young.groupBy(「gender」).count()

// Join young users with another DataFrame called logs
young.join(logs, logs.userId == users.userId, 「left_outer」)

You can also incorporate SQL while working with DataFrames, using Spark SQL. This example counts the number of users in the young DataFrame.

經過Spark SQL,你還能夠用SQL的方式操做DataFrames。下面這個例子統計了「young」 DataFrame中的用戶數量。

young.registerTempTable(「young」)

context.sql(「SELECT count(*) FROM young」)

In Python, you can also convert freely between Pandas DataFrame and Spark DataFrame:

在Python中,Pandas DataFrame和Spark DataFrame還能夠自由轉換。

// Convert Spark DataFrame to Pandas
pandas_df = young.toPandas()

// Create a Spark DataFrame from Pandas
spark_df = context.createDataFrame(pandas_df)

Similar to RDDs, DataFrames are evaluated lazily. That is to say, computation only happens when an action (e.g. display result, save output) is required. This allows their executions to be optimized, by applying techniques such as predicate push-downs and bytecode generation, as explained later in the section 「Under the Hood: Intelligent Optimization and Code Generation」. All DataFrame operations are also automatically parallelized and distributed on clusters.

相似於RDD,DataFrame一樣使用了lazy的方式。也就是說,只有動做真正發生時(如顯示結果,保存輸出),計算纔會進行。從而,經過一些技術,好比predicate push-downs和bytecode generation,執行過程能夠進行適當的優化(詳情見下文)。同時,全部的DataFrames也會自動的在集羣上並行和分佈執行。

三、Supported Data Formats and Sources

Modern applications often need to collect and analyze data from a variety of sources. Out of the box, DataFrame supports reading data from the most popular formats, including JSON files, Parquet files, Hive tables. It can read from local file systems, distributed file systems (HDFS), cloud storage (S3), and external relational database systems via JDBC. In addition, through Spark SQL’s external data sources API, DataFrames can be extended to support any third-party data formats or sources. Existing third-party extensions already include Avro, CSV, ElasticSearch, and Cassandra.

現代的應用程序一般須要收集和分析來自各類不一樣數據源的數據,而DataFrame與生俱來就支持讀取最流行的格式,包括JSON文件、Parquet文件和Hive表格。DataFrame還支持從多種類型的文件系統中讀取,好比本地文件系統、分佈式文件系統(HDFS)以及雲存儲(S3)。同時,配合JDBC,它還能夠讀取外部關係型數據庫系統。此外,經過Spark SQL的外部數據源(external data sources) API,DataFrames能夠更普遍地支持任何第三方數據格式和數據源。值得一提的是,當下的第三方擴展已經包含Avro、CSV、ElasticSearch和Cassandra。

  • out of the box [aʊt ʌv ði bɑks]  (澳/新西蘭,非正式)很是好

DataFrames’ support for data sources enables applications to easily combine data from disparate sources (known as federated query processing in database systems). For example, the following code snippet joins a site’s textual traffic log stored in S3 with a PostgreSQL database to count the number of times each user has visited the site.

DataFrames對數據源的支持能力容許應用程序能夠輕鬆地組合來自不一樣數據源的數據。下面的代碼片斷則展現了存儲在S3上網站的一個文本流量日誌(textual traffic log)與一個PostgreSQL數據庫的join操做,目的是計算網站用戶訪問該網站的次數。

  • disparate ['dɪspərət] 徹底不一樣的;從根本上種類有區分或不一樣的
users = context.jdbc(「jdbc:postgresql:production」, 「users」)

logs = context.load(「/path/to/traffic.log」)

logs.join(users, logs.userId == users.userId, 「left_outer」) \

.groupBy(「userId」).agg({「*」: 「count」})

四、Application: Advanced Analytics and Machine Learning

Data scientists are employing increasingly sophisticated techniques that go beyond joins and aggregations. To support this, DataFrames can be used directly in MLlib’s machine learning pipeline API. In addition, programs can run arbitrarily complex user functions on DataFrames.

當下,數據科學家們使用的技術已日益複雜,超越了joins和aggregations。爲了更好地支持他們的使用,DateFrames能夠直接在MLlib的machine learning pipeline API中使用。此外,在DataFrames中,程序還能夠運行任意複雜的用戶函數。

  • sophisticated [səˈfɪstɪˌketɪd]  複雜的;精緻的;富有經驗的;深奧微妙的
  • arbitrarily  [ˌɑrbəˈtrɛrəlɪ] 任意地;武斷地;反覆無常地;肆意地

Most common advanced analytics tasks can be specified using the new pipeline API in MLlib. For example, the following code creates a simple text classification pipeline consisting of a tokenizer, a hashing term frequency feature extractor, and logistic regression.

經過Spark,用戶可使用MLlib中新的pipelineAPI來指定高級分析任務。例如,下面的代碼建立了一個簡單的文本分類(text classification)管道。該管道由一個tokenizer,一個hashing term frequency feature extractor和logistic regression組成。

tokenizer = Tokenizer(inputCol=」text」, outputCol=」words」)

hashingTF = HashingTF(inputCol=」words」, outputCol=」features」)

lr = LogisticRegression(maxIter=10, regParam=0.01)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

Once the pipeline is setup, we can use it to train on a DataFrame directly:

一旦管道設置好,咱們能夠直接使用它在DataFrame上進行訓練。

df = context.load(「/path/to/data」)

model = pipeline.fit(df)

For more complicated tasks beyond what the machine learning pipeline API provides, applications can also apply arbitrarily complex functions on a DataFrame, which can also be manipulated using Spark’s existing RDD API. The following snippet performs a word count, the 「hello world」 of big data, on the 「bio」 column of a DataFrame.

對於那些複雜程度超出了machine learning pipeline API能力的任務,應用程序也能夠經過DataFrames提供任意複雜的函數,固然這也能夠經過Spark已有的RDD API來實現。下面代碼段實現的是一個DataFrame「bio」列上的word count(大數據時代的Hello World)。

  • manipulate  [məˈnɪpjəˌlet] 操縱;操做,處理;巧妙地控制;[醫] 推拿,調整
  • snippet  [ˈsnɪpɪt] 小片,片斷;不知天高地厚的年輕人
df = context.load(「/path/to/people.json」)
// RDD-style methods such as map, flatMap are available on DataFrames
// Split the bio text into multiple words.
words = df.select(「bio」).flatMap(lambda row: row.bio.split(」 「))
// Create a new DataFrame to count the number of words words_df = words.map(lambda w: Row(word=w, cnt=1)).toDF() word_counts = words_df.groupBy(「word」).sum()

 

五、Under the Hood: Intelligent Optimization and Code Generation

Unlike the eagerly evaluated data frames in R and Python, DataFrames in Spark have their execution automatically optimized by a query optimizer. Before any computation on a DataFrame starts, the Catalyst optimizer compiles the operations that were used to build the DataFrame into a physical plan for execution. Because the optimizer understands the semantics of operations and structure of the data, it can make intelligent decisions to speed up computation.

與R/Python中data frame使用的eager方式不一樣,Spark中的DataFrames執行會被查詢優化器自動優化。在DataFrame上的計算開始以前,Catalyst優化器會編譯操做,這將把DataFrame構建成物理計劃來執行。由於優化器清楚操做的語義和數據的結構,因此它能夠爲計算加速制定智能的決策。

  • eagerly [ˈiɡɚlɪ] 渴望地,熱切地

At a high level, there are two kinds of optimizations. First, Catalyst applies logical optimizations such as predicate pushdown. The optimizer can push filter predicates down into the data source, enabling the physical execution to skip irrelevant data. In the case of Parquet files, entire blocks can be skipped and comparisons on strings can be turned into cheaper integer comparisons via dictionary encoding. In the case of relational databases, predicates are pushed down into the external databases to reduce the amount of data traffic.

在高等級上,這裏存在兩種類型的優化。首先,Catalyst提供了邏輯優化,好比謂詞下推(predicate pushdown)。優化器能夠將謂詞過濾下推到數據源,從而使物理執行跳過無關數據。在使用Parquet的狀況下,更可能存在文件被整塊跳過的狀況,同時系統還經過字典編碼把字符串對比轉換爲開銷更小的整數對比。在關係型數據庫中,謂詞則被下推到外部數據庫用以減小數據傳輸。

  • irrelevant  [ɪˈrɛləvənt] 不相干的;不恰當;缺少時代性的,落後於潮流的;牛頭不對馬嘴
  • traffic [ˈtræfɪk] 交通,運輸量;(非法的)交易;通訊量;交際 傳輸

Second, Catalyst compiles operations into physical plans for execution and generates JVM bytecode for those plans that is often more optimized than hand-written code. For example, it can choose intelligently between broadcast joins and shuffle joins to reduce network traffic. It can also perform lower level optimizations such as eliminating expensive object allocations and reducing virtual function calls. As a result, we expect performance improvements for existing Spark programs when they migrate to DataFrames.

第二,爲了更好地執行,Catalyst將操做編譯爲物理計劃,並生成JVM bytecode,這些一般會比人工編碼更加優化。例如,它能夠智能地選擇broadcast joins和shuffle joins來減小網絡傳輸。其次,一樣存在一些較爲低級的優化,如消除代價昂貴的對象分配及減小虛擬函數調用。所以,咱們認爲現有的Spark項目遷移到DataFrames後,性能會有所改觀。

  • hand-written code 人工編碼 手寫代碼
  • network traffic 網絡傳輸
  • perform [pərˈfɔ:rm] 執行;履行;表演;扮演

Since the optimizer generates JVM bytecode for execution, Python users will experience the same high performance as Scala and Java users.

同時,鑑於優化器爲執行生成了JVM bytecode,Python用戶將擁有與Scala和Java用戶同樣的高性能體驗。

The above chart compares the runtime performance of running group-by-aggregation on 10 million integer pairs on a single machine (source code). Since both Scala and Python DataFrame operations are compiled into JVM bytecode for execution, there is little difference between the two languages, and both outperform the vanilla Python RDD variant by a factor of 5 and Scala RDD variant by a factor of 2.

上圖是在單個機器上對1000萬個整數進行分組聚合(group-by-aggregation)的運行時性能對比。在綠色部分,爲了更好地執行,Scala和Python的DataFrame操做都被編譯成了JVM bytecode,致使這兩種語言在性能上基本有着一樣的表現。同時,二者性能均優於普通Python RDD實現的4倍,也達到了Scala RDD實現的兩倍。

DataFrames were inspired by previous distributed data frame efforts, including Adatao’s DDF and Ayasdi’s BigDF. However, the main difference from these projects is that DataFrames go through the Catalyst optimizer, enabling optimized execution similar to that of Spark SQL queries. As we improve the Catalyst optimizer, the engine also becomes smarter, making applications faster with each new release of Spark.

無論選擇了哪一種語言,Catalyst優化器都實現了DataFrame程序的優化執行。同時,隨着Catalyst優化器的不斷改善,引擎也會變得更智能,從而對比已有版本,Spark的每個新版本都會有性能上的提高。

Our data science team at Databricks has been using this new DataFrame API on our internal data pipelines. It has brought performance improvements to our Spark programs while making them more concise and easier to understand. We are very excited about it and believe it will make big data processing more accessible to a wider array of users.

在Databricks,數據科學家團隊已經將DataFrame API搭載在內部的數據管道上。Spark程序性能的改進已經在咱們內部獲得證明,而程序也更加的簡潔易懂。毫無疑問,這將大幅度地下降大數據使用門檻,讓大數據技術爲更多人使用。

  • concise  [kənˈsaɪs]  簡約;簡明的,簡潔的;精煉

This API will be released as part of Spark 1.3 in early March. If you can’t wait, check out Spark from GitHub to try it out. If you are in the Bay Area at the Strata conference, please join us on Feb 17 in San Jose for a meetup on this topic.

這個API將在3月初做爲Spark1.3版本的一部分發布。若是你火燒眉毛,能夠關注Spark在GitHub上的進展。若是你在加州灣區參加Strata conference,2月17日聖何塞有一個關於這個主題的Meetup。

  • bay [be] 灣,海灣;犬吠聲;月桂樹;吊窗,凸窗
  • San Jose [sɑnhoˈze] 聖何塞(美國城市)

This effort would not have been possible without the prior data frame implementations, and thus we would like to thank the developers of R, Pandas, DDF and BigDF for their work.

To try out DataFrames, get a free trial of Databricks or use the Community Edition. 

參考文獻

  • https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
  • https://blog.csdn.net/mt0803/article/details/50464124
  • https://blog.csdn.net/yhao2014/article/details/44979041
相關文章
相關標籤/搜索