Spark筆記之Catalog

 

1、什麼是Catalog

Spark SQL提供了執行sql語句的支持,sql語句是以表的方式組織使用數據的,而表自己是如何組織存儲的呢,確定是存在一些元數據之類的東西了,Catalog就是Spark 2.0以後提供的訪問元數據的類:html

image

Catalog提供一些API用來對數據庫、表、視圖、緩存、列、函數(UDF/UDAF)進行操做,下文將一一介紹。java

 

2、如何使用Catalog

獲得Catalog:sql

val spark = SparkSession.builder().master("local[*]").appName("catalog-study").getOrCreate()
val catalog = spark.catalog 

Catalog相關的代碼存放在org.apache.spark.sql.catalog下:數據庫

image

上面的Catalog只是一個接口定義規範,具體實現還有一個org.apache.spark.sql.internal.CatalogImpl,若是隻是使用Spark完成工做的話只閱讀接口定義基本夠用了。apache

 

3、相關API

數據庫相關

看數據庫相關的操做以前先看一下Catalog對數據庫的表示:api

/**
 * A database in Spark, as returned by the `listDatabases` method defined in [[Catalog]].
 *
 * @param name name of the database.
 * @param description description of the database.
 * @param locationUri path (in the form of a uri) to data files.
 * @since 2.0.0
 */
@InterfaceStability.Stable
class Database(
    val name: String,
    @Nullable val description: String,
    val locationUri: String)
  extends DefinedByConstructorParams {

  override def toString: String = {
    "Database[" +
      s"name='$name', " +
      Option(description).map { d => s"description='$d', " }.getOrElse("") +
      s"path='$locationUri']"
  }

}

Catalog使用三個字段表示一個數據庫:緩存

name:數據庫名字app

descripttion:數據庫描述,能夠認爲是註釋ide

locationUri:數據庫的數據保存位置函數

 

currentDatabase: String

返回當前使用的數據庫,至關於select database();

setCurrentDatabase(dbName: String): Unit
設置當前使用的數據庫,至關於use database_name;
listDatabases(): Dataset[Database]

查看全部數據庫,至關於show databases;

getDatabase(dbName: String): Database

獲取某數據庫的元數據,返回值是Database類型的,若是指定的數據庫不存在則會@throws[AnalysisException]("database does not exist")

databaseExists(dbName: String): Boolean

判斷某個數據庫是否已經存在,返回boolean值。

爲了不拋異常對單個數據庫進行getDatabase獲取元數據以前仍是先使用databaseExists肯定數據庫已經存在。

 

表/視圖相關

一樣的,對錶或視圖Catalog也用一個class來表示:

/**
 * A table in Spark, as returned by the `listTables` method in [[Catalog]].
 *
 * @param name name of the table.
 * @param database name of the database the table belongs to.
 * @param description description of the table.
 * @param tableType type of the table (e.g. view, table).
 * @param isTemporary whether the table is a temporary table.
 * @since 2.0.0
 */
@InterfaceStability.Stable
class Table(
    val name: String,
    @Nullable val database: String,
    @Nullable val description: String,
    val tableType: String,
    val isTemporary: Boolean)
  extends DefinedByConstructorParams {

  override def toString: String = {
    "Table[" +
      s"name='$name', " +
      Option(database).map { d => s"database='$d', " }.getOrElse("") +
      Option(description).map { d => s"description='$d', " }.getOrElse("") +
      s"tableType='$tableType', " +
      s"isTemporary='$isTemporary']"
  }

}

name:表的名字

database:表所屬的數據庫的名字

description:表的描述信息

tableType:用於區分是表仍是視圖,兩個取值:table或view。

isTemporary:是不是臨時表或臨時視圖,解釋一下啥是臨時表,臨時表就是使用Dataset或DataFrame的createOrReplaceTempView等相似的API註冊的視圖或表,當這次Spark任務結束後這些表就沒了,再次使用的話還要再進行註冊,而非臨時表就是在Hive中真實存在的,開啓Hive支持就可以直接使用的,本次Spark任務結束後表仍然能存在,下次啓動不須要從新作任何處理就可以使用,表是持久的,這種不是臨時表。

 

listTables(): Dataset[Table]

查看全部表或視圖,至關於show tables;

listTables(dbName: String): Dataset[Table]

返回指定數據庫下的表或視圖,若是指定的數據庫不存在則會拋出@throws[AnalysisException]("database does not exist")表示數據庫不存在。

getTable(tableName: String): Table
getTable(dbName: String, tableName: String): Table

獲取表的元信息,不存在則會拋出異常。

tableExists(tableName: String): Boolean
tableExists(dbName: String, tableName: String): Boolean

判斷表或視圖是否存在,返回boolean值。

dropTempView(viewName: String): Boolean
dropGlobalTempView(viewName: String): Boolean

使用createOrReplaceTempView相似API註冊的臨時視圖可使用此方法刪除,若是這個視圖已經被緩存過的話會自動清除緩存。

recoverPartitions(tableName: String): Unit

 

isCached(tableName: String): Boolean

用於判斷一個表否已經緩存過了。

cacheTable(tableName: String): Unit
cacheTable(tableName: String, storageLevel: StorageLevel): Unit
用於緩存表
uncacheTable(tableName: String): Unit

對錶取消緩存

clearCache(): Unit

清空全部緩存

refreshTable(tableName: String): Unit

Spark爲了性能考慮,對錶的元數據作了緩存,因此當被緩存的表已經改變時也必須刷新元數據從新緩存。

refreshByPath(path: String): Unit

 

createTable(tableName: String, path: String): DataFrame
createTable(tableName: String, path: String, source: String): DataFrame
createTable(tableName: String, source: String, options: java.util.Map[String, String]): DataFrame
createTable(tableName: String, source: String, options: Map[String, String]): DataFrame
createTable(tableName: String, source: String, schema: StructType, options: java.util.Map[String, String]): DataFrame
createTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame  
 

函數相關

Catalog對函數的表示:

/**
 * A user-defined function in Spark, as returned by `listFunctions` method in [[Catalog]].
 *
 * @param name name of the function.
 * @param database name of the database the function belongs to.
 * @param description description of the function; description can be null.
 * @param className the fully qualified class name of the function.
 * @param isTemporary whether the function is a temporary function or not.
 * @since 2.0.0
 */
@InterfaceStability.Stable
class Function(
    val name: String,
    @Nullable val database: String,
    @Nullable val description: String,
    val className: String,
    val isTemporary: Boolean)
  extends DefinedByConstructorParams {

  override def toString: String = {
    "Function[" +
      s"name='$name', " +
      Option(database).map { d => s"database='$d', " }.getOrElse("") +
      Option(description).map { d => s"description='$d', " }.getOrElse("") +
      s"className='$className', " +
      s"isTemporary='$isTemporary']"
  }

}

name:函數的名字

database:函數註冊在哪一個數據庫下,函數是跟數據庫綁定的

description:對函數的描述信息,能夠理解成註釋

className:函數其實就是一個class,調用函數就是調用類的方法,className表示函數對應的class的全路徑類名

isTemporary:是不是臨時函數。

 

listFunctions(): Dataset[Function]

列出當前數據庫下的全部函數,包括註冊的臨時函數。

listFunctions(dbName: String): Dataset[Function]

列出指定數據庫下注冊的全部函數,包括臨時函數,若是指定的數據庫不存在的話則會拋出@throws[AnalysisException]("database does not exist")表示數據庫不存在。

getFunction(functionName: String): Function
getFunction(dbName: String, functionName: String): Function

獲取函數的元信息,函數不存在則會拋出異常。

functionExists(functionName: String): Boolean
functionExists(dbName: String, functionName: String): Boolean
判斷函數是否存在,返回boolean值。

 

對錶或視圖的列相關的操做

Catalog對列的表示:

/**
 * A column in Spark, as returned by `listColumns` method in [[Catalog]].
 *
 * @param name name of the column.
 * @param description description of the column.
 * @param dataType data type of the column.
 * @param nullable whether the column is nullable.
 * @param isPartition whether the column is a partition column.
 * @param isBucket whether the column is a bucket column.
 * @since 2.0.0
 */
@InterfaceStability.Stable
class Column(
    val name: String,
    @Nullable val description: String,
    val dataType: String,
    val nullable: Boolean,
    val isPartition: Boolean,
    val isBucket: Boolean)
  extends DefinedByConstructorParams {

  override def toString: String = {
    "Column[" +
      s"name='$name', " +
      Option(description).map { d => s"description='$d', " }.getOrElse("") +
      s"dataType='$dataType', " +
      s"nullable='$nullable', " +
      s"isPartition='$isPartition', " +
      s"isBucket='$isBucket']"
  }

}

name:列的名字

description:列的描述信息,與註釋差很少

dataType:列的數據類型

nullable:列是否容許爲null

isPartition:是不是分區列

isBucket:是不是桶列

 

listColumns(tableName: String): Dataset[Column]
listColumns(dbName: String, tableName: String): Dataset[Column]

列出指定的表或視圖有哪些列,表不存在則拋異常。

 

相關資料:

1.  Spark 2.0介紹:Catalog API介紹和使用 

2. Java Doc: Class Catalog

 

.

相關文章
相關標籤/搜索