python操做hadoop HDFS api使用

doc:http://pyhdfs.readthedocs.io/en/latest/html

pip install hdfs  python

https://hdfscli.readthedocs.io/en/latest/quickstart.htmlgit

此外還有一個庫pyhdfsgithub

https://github.com/jingw/pyhdfs/blob/master/README.rstshell

通常也能夠直接hadoop HDFS 執行hdfscli command操做編程

hdfs庫文檔入門

命令行界面

默認狀況下,HdfsCLI帶有單個入口點hdfscli,該入口點提供了方便的界面來執行常見操做。它的全部命令都接受一個自 --alias變量(如上所述),該自變量定義了針對哪一個集羣進行操做。json

下載和上傳文件

HdfsCLI支持從HDFS透明地下載和上傳文件和文件夾(咱們也能夠使用該--threads 選項指定並行度)。api

$ # Write a single file to HDFS.
$ hdfscli upload --alias=dev weights.json models/
$ # Read all files inside a folder from HDFS and store them locally.
$ hdfscli download export/results/ "results-$(date +%F)"

若是讀取(或寫入)單個文件,則還能夠經過將其內容-用做路徑參數,將其內容流式傳輸到標準輸出(從標準輸入返回)。session

$ # Read a file from HDFS and append its contents to a local log file.
$ hdfscli download logs/1987-03-23.txt - >>logs

默認狀況下,若是嘗試寫入現有路徑(在本地或在HDFS上),HdfsCLI將引起錯誤。咱們能夠使用該--force選項強制覆蓋路徑 。app

互動殼

interactive命令(在未指定任何命令時也使用)將建立一個HDFS客戶端,並將其公開在python shell中(若是可用,請使用IPython)。這使得在HDFS上執行文件系統操做並與其數據進行交互變得很方便。有關可用方法的概述,請參見下面的Python綁定

$ hdfscli --alias=dev

Welcome to the interactive HDFS python shell.
The HDFS client is available as `CLIENT`.

In [1]: CLIENT.list('data/')
Out[1]: ['1.json', '2.json']

In [2]: CLIENT.status('data/2.json')
Out[2]: {
  'accessTime': 1439743128690,
  'blockSize': 134217728,
  'childrenNum': 0,
  'fileId': 16389,
  'group': 'supergroup',
  'length': 2,
  'modificationTime': 1439743129392,
  'owner': 'drwho',
  'pathSuffix': '',
  'permission': '755',
  'replication': 1,
  'storagePolicy': 0,
  'type': 'FILE'
}

In [3]: CLIENT.delete('data/2.json')
Out[3]: True

利用python的所有功能,咱們能夠輕鬆地執行更復雜的操做,例如重命名與某些模式匹配的文件夾,刪除一段時間未訪問的文件,查找某個用戶擁有的全部路徑等。

更多

cf. 有關命令和選項的完整列表。hdfscli --help

Python綁定

實例化客戶端

獲取hdfs.client.Client實例的最簡單方法是使用上述的Interactive Shell,在該Shell中客戶端將自動可用。要以編程方式實例化客戶端,有兩種選擇:

第一種是導入客戶端類並直接調用其構造函數。這是最直接,最靈活的方法,可是不容許咱們重複使用已配置的別名:

from hdfs import InsecureClient
client = InsecureClient('http://host:port', user='ann')

第二種方法利用hdfs.config.Config該類加載現有的配置文件(默認與CLI相同)並從現有別名建立客戶端:

from hdfs import Config
client = Config().get_client('dev')

讀寫文件

read()方法提供了相似文件的界面,用於從HDFS讀取文件。它必須在一個with塊中使用(確保始終正確關閉鏈接):

# Loading a file in memory.
with client.read('features') as reader:
  features = reader.read()

# Directly deserializing a JSON object.
with client.read('model.json', encoding='utf-8') as reader:
  from json import load
  model = load(reader)

若是chunk_size傳遞了參數,則該方法將返回一個生成器,有時使流文件內容更簡單。

# Stream a file.
with client.read('features', chunk_size=8096) as reader:
  for chunk in reader:
    pass

一樣,若是delimiter傳遞了一個參數,則該方法將返回定界塊的生成器。

with client.read('samples.csv', encoding='utf-8', delimiter='\n') as reader:
  for line in reader:
    pass

使用如下write() 方法將文件寫入HDFS:該方法返回相似文件的可寫對象:

# Writing part of a file.
with open('samples') as reader, client.write('samples') as writer:
  for line in reader:
    if line.startswith('-'):
      writer.write(line)

# Writing a serialized JSON object.
with client.write('model.json', encoding='utf-8') as writer:
  from json import dump
  dump(model, writer)

爲了方便起見,還能夠將可迭代的data參數直接傳遞給該方法。

# This is equivalent to the JSON example above.
from json import dumps
client.write('model.json', dumps(model))

探索文件系統

全部Client子類都公開了各類與HDFS交互的方法。大多數都是在WebHDFS操做以後直接建模的,其中一些在下面的代碼段中顯示:

# Retrieving a file or folder content summary.
content = client.content('dat')

# Listing all files inside a directory.
fnames = client.list('dat')

# Retrieving a file or folder status.
status = client.status('dat/features')

# Renaming ("moving") a file.
client.rename('dat/features', 'features')

# Deleting a file or folder.
client.delete('dat', recursive=True)

基於這些方法的其餘方法可提供更多高級功能:

# Download a file or folder locally.
client.download('dat', 'dat', n_threads=5)

# Get all files under a given folder (arbitrary depth).
import posixpath as psp
fpaths = [
  psp.join(dpath, fname)
  for dpath, _, fnames in client.walk('predictions')
  for fname in fnames
]

有關可用方法的完整列表,請參見API參考

檢查路徑是否存在

上述大多數方法都會HdfsError 在缺乏的路徑上引起if調用。推薦的檢查路徑是否存在的方法是使用帶有參數的content()或 status()方法strict=False(在這種狀況下,它們將None在缺乏的路徑上返回)。

更多

請參閱高級用法部分以瞭解更多信息。

=========================

2:Client——建立集羣鏈接

> from hdfs import *  

> client = Client("http://s100:50070")  

 

其餘參數說明:

      classhdfs.client.Client(url, root=None, proxy=None, timeout=None, session=None)

                       url:ip:端口

                       root:制定的hdfs根目錄

                       proxy:制定登錄的用戶身份

                       timeout:設置的超時時間

session:鏈接標識

 

 client = Client("http://127.0.0.1:50070",root="/",timeout=100,session=False)  

>>> client.list("/")  

[u'home',u'input', u'output', u'tmp']

 

 

3:dir——查看支持的方法

>dir(client)

 

4:status——獲取路徑的具體信息

其餘參數:status(hdfs_path, strict=True)

              hdfs_path:就是hdfs路徑

              strict:設置爲True時,若是hdfs_path路徑不存在就會拋出異常,若是設置爲False,若是路徑爲不存在,則返回None

 

5:list——獲取指定路徑的子目錄信息

>client.list("/")

[u'home',u'input', u'output', u'tmp']

 

其餘參數:list(hdfs_path, status=False)

             status:爲True時,也返回子目錄的狀態信息,默認爲Flase

 

6:makedirs——建立目錄

>client.makedirs("/123")

其餘參數:makedirs(hdfs_path, permission=None)

               permission:設置權限

>client.makedirs("/test",permission=777)

 

 

7: rename—重命名

>client.rename("/123","/test")

 

8:delete—刪除

>client.delete("/test")

其餘參數:delete(hdfs_path, recursive=False)

              recursive:刪除文件和其子目錄,設置爲False若是不存在,則會拋出異常,默認爲False

 

9:upload——上傳數據

>client.upload("/test","F:\[PPT]Google Protocol Buffers.pdf");

其餘參數:upload(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None, 

                              chunk_size=65536,progress=None, cleanup=True, **kwargs)

              overwrite:是不是覆蓋性上傳文件

              n_threads:啓動的線程數目

              temp_dir:當overwrite=true時,遠程文件一旦存在,則會在上傳完以後進行交換

              chunk_size:文件上傳的大小區間

              progress:回調函數來跟蹤進度,爲每一chunk_size字節。它將傳遞兩個參數,文件上傳的路徑和傳輸的字節數。一旦完成,-1將做爲第二個參數

              cleanup:若是在上傳任何文件時發生錯誤,則刪除該文件

 

 

10:download——下載

>client.download("/test/NOTICE.txt","/home")

 

11:read——讀取文件

withclient.read("/test/[PPT]Google Protocol Buffers.pdf") as reader:
    print reader.read()

其餘參數:read(*args, **kwds)

             hdfs_path:hdfs路徑

             offset:設置開始的字節位置

             length:讀取的長度(字節爲單位)

             buffer_size:用於傳輸數據的字節的緩衝區的大小。默認值設置在HDFS配置。

             encoding:制定編碼

             chunk_size:若是設置爲正數,上下文管理器將返回一個發生器產生的每一chunk_size字節而不是一個相似文件的對象

             delimiter:若是設置,上下文管理器將返回一個發生器產生每次遇到分隔符。此參數要求指定的編碼。

             progress:回調函數來跟蹤進度,爲每一chunk_size字節(不可用,若是塊大小不是指定)。它將傳遞兩個參數,文件上傳的路徑和傳輸的字節數。稱爲一次與- 1做爲第二個參數。

相關文章
相關標籤/搜索