hadoop配置文件:http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.9.0/html
一:Hadoop簡介java
總結下起源於Nutch項目,社區貢獻最可能是Tom White,以後被雅虎關注,發展愈來愈好,在醫療健康領域和分子生物領域有不少應用node
能作的事:能夠搭建一個處理數據的基礎平臺;、linux
1.提升讀取速度web
原理:想要讀100T數據,在一個硬盤上時間確定會很長,可是若是將其分佈在100個硬盤上,再將硬盤文件共享,此時讀取數據的速度就能提高100倍。正則表達式
若是要這樣實現就須要解決兩個主要問題:1)不一樣硬盤的故障問題,hadoop提供HDFS(Hadoop Distribute FileSystem)分佈式文件系統,基本經過保存文件副本的方式,解決出現硬盤故障問題算法
2)確保每一個硬盤拿來的數據正確,提供了MapReduce(一個編程模式)抽象出這些硬盤讀寫問題並將其轉換爲map和reduce兩部分數據庫
官網主頁:http://hadoop.apache.orgexpress
資源庫:http://hadoopbook.comapache
Hadoop初始版本:Hadoop Common(基礎模塊,網絡通訊);Hadoop HDFS( 分佈式存儲);Hadoop MapReduce(分佈式計算)
Hadoop後來版本:多了一個Hadoop YARN(負責資源管理,資源調度,相似Hadoop的操做系統),基於這個層面有了不少應用層面的框架出現(HIVE,Strom,Spark,Flink,MapReduce)
二:MapReduce
經過例子:從國家天氣數據中,找到每一年的最高氣溫,文件都是以日誌二進制形式保存;
對於這種狀況,就很是適合用Hodoop的MapReduce來解決了,主要解題思路:先將每一年日誌文件經過map函數變爲特定集合,再經過reduce函數,在每一個map中元素作reduce函數處理這裏是取最大值,這樣mapreduce就找到每一年的最大氣溫了
Hadoop Streaming 是MapReduce的API
概述:
<1>將分佈式計算做業拆分紅兩個階段:Mapper和Reducer
<2>Shuffle流程:鏈接Mapper和Reducer階段
I.shuffle寫入流程
mapper任務將輸出數據寫到本地磁盤上
II.shuffle讀取流程
reducer任務從mapper磁盤上遠程讀取數據信息
<3>使用場景:離線批處理,速度慢
<4>缺點:各個task任務須要不斷申請釋放資源,過多使用磁盤
流程圖:
<1>輸入文件切片
<2>mapper進程處理切片
<3>shuffle流程
<4>reducer進程聚合數據
<5>輸出文件
2.1 具體聊聊
1)案例運行方式
a.單機運行
<1>導入window支持的兩個文件:winutils.exe和hadoop.dll放到C:\java\hadoop-2.6.0-cdh5.9.0\bin目錄下
<2>配置HADOOP_HOME環境變量(須要重啓機器)
臨時配置環境變量:System.setProperty("hadoop.home.dir","C:\java\hadoop-2.6.0-cdh5.9.0");
<3>修改NativeIO類,將access0調用處直接換成true
WardCount例子:單機運行java方式mapreduce方式
package org.mapreduce.test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; public class WordCount { //臨時配置HADOOP_HOME環境變量 static { System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0-cdh5.9.0"); } /** * 默認MapReduce是經過TextInputFormat進行切片,並交給Mapper進行處理 * TextInputFormat:key:當前行的首字母的索引,value:當前行數據 * Mapper類參數:輸入key類型:Long,輸入Value類型:String,輸出key類型:String,輸出Value類型:Long * MapReduce爲了網絡傳輸時序列化文件比較小,執行速度快,對基本類型進行包裝,實現本身的序列化 * @author Administrator * */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { LongWritable one = new LongWritable(1); /** * 將每行數據拆分,拆分完輸出每一個單詞和個數 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { String words = value.toString(); //將每行數據拆分紅各個單詞 String[] wordArr = words.split(" "); //遍歷各個單詞 for (String word : wordArr) { //輸出格式<單詞,1> context.write(new Text(word), one); } } } /** * 進行全局聚合 * Reducer參數:輸入key類型:String,輸入Value類型:Long,輸出key類型:String,輸出Value類型:Long * @author Administrator * */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { /** * 將map輸出結果進行全局聚合 * key:單詞, values:個數[1,1,1] */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { Long sum = 0L; for (LongWritable value : values) { //累加單詞個數 sum += value.get(); } //輸出最終數據結果 context.write(key, new LongWritable(sum)); } } /** * 驅動方法 * @param args * @throws IllegalArgumentException * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { //0.建立一個Job Configuration conf = new Configuration(); //鏈接hadoop環境 // conf.set("fs.defaultFS", "hdfs://hadoop-senior01.test.com:8020"); Job job = Job.getInstance(conf, "word-count"); //經過類名打成jar包 job.setJarByClass(WordCount.class); //1.輸入文件 FileInputFormat.addInputPath(job, new Path(args[0])); //2.編寫mapper處理邏輯 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //3.shuffle流程(暫時不用處理) //4.編寫reducer處理邏輯 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //5.輸出文件 FileOutputFormat.setOutputPath(job, new Path(args[1])); //6.運行Job boolean result = job.waitForCompletion(true); System.out.println(result ? 1 : 0); } }
自定義的 NativeIO:(經過對原生的作修改)
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.io.nativeio; import java.io.File; import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException; import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.PerformanceAdvisory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import sun.misc.Unsafe; import com.google.common.annotations.VisibleForTesting; /** * JNI wrappers for various native IO-related calls not available in Java. * These functions should generally be used alongside a fallback to another * more portable mechanism. */ @InterfaceAudience.Private @InterfaceStability.Unstable public class NativeIO { public static class POSIX { // Flags for open() call from bits/fcntl.h public static final int O_RDONLY = 00; public static final int O_WRONLY = 01; public static final int O_RDWR = 02; public static final int O_CREAT = 0100; public static final int O_EXCL = 0200; public static final int O_NOCTTY = 0400; public static final int O_TRUNC = 01000; public static final int O_APPEND = 02000; public static final int O_NONBLOCK = 04000; public static final int O_SYNC = 010000; public static final int O_ASYNC = 020000; public static final int O_FSYNC = O_SYNC; public static final int O_NDELAY = O_NONBLOCK; // Flags for posix_fadvise() from bits/fcntl.h /* No further special treatment. */ public static final int POSIX_FADV_NORMAL = 0; /* Expect random page references. */ public static final int POSIX_FADV_RANDOM = 1; /* Expect sequential page references. */ public static final int POSIX_FADV_SEQUENTIAL = 2; /* Will need these pages. */ public static final int POSIX_FADV_WILLNEED = 3; /* Don't need these pages. */ public static final int POSIX_FADV_DONTNEED = 4; /* Data will be accessed once. */ public static final int POSIX_FADV_NOREUSE = 5; /* Wait upon writeout of all pages in the range before performing the write. */ public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1; /* Initiate writeout of all those dirty pages in the range which are not presently under writeback. */ public static final int SYNC_FILE_RANGE_WRITE = 2; /* Wait upon writeout of all pages in the range after performing the write. */ public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4; private static final Log LOG = LogFactory.getLog(NativeIO.class); private static boolean nativeLoaded = false; private static boolean fadvisePossible = true; private static boolean syncFileRangePossible = true; static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY = "hadoop.workaround.non.threadsafe.getpwuid"; static final boolean WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT = true; private static long cacheTimeout = -1; private static CacheManipulator cacheManipulator = new CacheManipulator(); public static CacheManipulator getCacheManipulator() { return cacheManipulator; } public static void setCacheManipulator(CacheManipulator cacheManipulator) { POSIX.cacheManipulator = cacheManipulator; } /** * Used to manipulate the operating system cache. */ @VisibleForTesting public static class CacheManipulator { public void mlock(String identifier, ByteBuffer buffer, long len) throws IOException { POSIX.mlock(buffer, len); } public long getMemlockLimit() { return NativeIO.getMemlockLimit(); } public long getOperatingSystemPageSize() { return NativeIO.getOperatingSystemPageSize(); } public void posixFadviseIfPossible(String identifier, FileDescriptor fd, long offset, long len, int flags) throws NativeIOException { NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset, len, flags); } public boolean verifyCanMlock() { return NativeIO.isAvailable(); } } /** * A CacheManipulator used for testing which does not actually call mlock. * This allows many tests to be run even when the operating system does not * allow mlock, or only allows limited mlocking. */ @VisibleForTesting public static class NoMlockCacheManipulator extends CacheManipulator { public void mlock(String identifier, ByteBuffer buffer, long len) throws IOException { LOG.info("mlocking " + identifier); } public long getMemlockLimit() { return 1125899906842624L; } public long getOperatingSystemPageSize() { return 4096; } public boolean verifyCanMlock() { return true; } } static { if (NativeCodeLoader.isNativeCodeLoaded()) { try { Configuration conf = new Configuration(); workaroundNonThreadSafePasswdCalls = conf.getBoolean( WORKAROUND_NON_THREADSAFE_CALLS_KEY, WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT); initNative(); nativeLoaded = true; cacheTimeout = conf.getLong( CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_KEY, CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT) * 1000; LOG.debug("Initialized cache for IDs to User/Group mapping with a " + " cache timeout of " + cacheTimeout/1000 + " seconds."); } catch (Throwable t) { // This can happen if the user has an older version of libhadoop.so // installed - in this case we can continue without native IO // after warning PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", t); } } } /** * Return true if the JNI-based native IO extensions are available. */ public static boolean isAvailable() { return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded; } private static void assertCodeLoaded() throws IOException { if (!isAvailable()) { throw new IOException("NativeIO was not loaded"); } } /** Wrapper around open(2) */ public static native FileDescriptor open(String path, int flags, int mode) throws IOException; /** Wrapper around fstat(2) */ private static native Stat fstat(FileDescriptor fd) throws IOException; /** Native chmod implementation. On UNIX, it is a wrapper around chmod(2) */ private static native void chmodImpl(String path, int mode) throws IOException; public static void chmod(String path, int mode) throws IOException { if (!Shell.WINDOWS) { chmodImpl(path, mode); } else { try { chmodImpl(path, mode); } catch (NativeIOException nioe) { if (nioe.getErrorCode() == 3) { throw new NativeIOException("No such file or directory", Errno.ENOENT); } else { LOG.warn(String.format("NativeIO.chmod error (%d): %s", nioe.getErrorCode(), nioe.getMessage())); throw new NativeIOException("Unknown error", Errno.UNKNOWN); } } } } /** Wrapper around posix_fadvise(2) */ static native void posix_fadvise( FileDescriptor fd, long offset, long len, int flags) throws NativeIOException; /** Wrapper around sync_file_range(2) */ static native void sync_file_range( FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException; /** * Call posix_fadvise on the given file descriptor. See the manpage * for this syscall for more information. On systems where this * call is not available, does nothing. * * @throws NativeIOException if there is an error with the syscall */ static void posixFadviseIfPossible(String identifier, FileDescriptor fd, long offset, long len, int flags) throws NativeIOException { if (nativeLoaded && fadvisePossible) { try { posix_fadvise(fd, offset, len, flags); } catch (UnsupportedOperationException uoe) { fadvisePossible = false; } catch (UnsatisfiedLinkError ule) { fadvisePossible = false; } } } /** * Call sync_file_range on the given file descriptor. See the manpage * for this syscall for more information. On systems where this * call is not available, does nothing. * * @throws NativeIOException if there is an error with the syscall */ public static void syncFileRangeIfPossible( FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException { if (nativeLoaded && syncFileRangePossible) { try { sync_file_range(fd, offset, nbytes, flags); } catch (UnsupportedOperationException uoe) { syncFileRangePossible = false; } catch (UnsatisfiedLinkError ule) { syncFileRangePossible = false; } } } static native void mlock_native( ByteBuffer buffer, long len) throws NativeIOException; /** * Locks the provided direct ByteBuffer into memory, preventing it from * swapping out. After a buffer is locked, future accesses will not incur * a page fault. * * See the mlock(2) man page for more information. * * @throws NativeIOException */ static void mlock(ByteBuffer buffer, long len) throws IOException { assertCodeLoaded(); if (!buffer.isDirect()) { throw new IOException("Cannot mlock a non-direct ByteBuffer"); } mlock_native(buffer, len); } /** * Unmaps the block from memory. See munmap(2). * * There isn't any portable way to unmap a memory region in Java. * So we use the sun.nio method here. * Note that unmapping a memory region could cause crashes if code * continues to reference the unmapped code. However, if we don't * manually unmap the memory, we are dependent on the finalizer to * do it, and we have no idea when the finalizer will run. * * @param buffer The buffer to unmap. */ public static void munmap(MappedByteBuffer buffer) { if (buffer instanceof sun.nio.ch.DirectBuffer) { sun.misc.Cleaner cleaner = ((sun.nio.ch.DirectBuffer)buffer).cleaner(); cleaner.clean(); } } /** Linux only methods used for getOwner() implementation */ private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException; private static native String getUserName(long uid) throws IOException; /** * Result type of the fstat call */ public static class Stat { private int ownerId, groupId; private String owner, group; private int mode; // Mode constants public static final int S_IFMT = 0170000; /* type of file */ public static final int S_IFIFO = 0010000; /* named pipe (fifo) */ public static final int S_IFCHR = 0020000; /* character special */ public static final int S_IFDIR = 0040000; /* directory */ public static final int S_IFBLK = 0060000; /* block special */ public static final int S_IFREG = 0100000; /* regular */ public static final int S_IFLNK = 0120000; /* symbolic link */ public static final int S_IFSOCK = 0140000; /* socket */ public static final int S_IFWHT = 0160000; /* whiteout */ public static final int S_ISUID = 0004000; /* set user id on execution */ public static final int S_ISGID = 0002000; /* set group id on execution */ public static final int S_ISVTX = 0001000; /* save swapped text even after use */ public static final int S_IRUSR = 0000400; /* read permission, owner */ public static final int S_IWUSR = 0000200; /* write permission, owner */ public static final int S_IXUSR = 0000100; /* execute/search permission, owner */ Stat(int ownerId, int groupId, int mode) { this.ownerId = ownerId; this.groupId = groupId; this.mode = mode; } Stat(String owner, String group, int mode) { if (!Shell.WINDOWS) { this.owner = owner; } else { this.owner = stripDomain(owner); } if (!Shell.WINDOWS) { this.group = group; } else { this.group = stripDomain(group); } this.mode = mode; } @Override public String toString() { return "Stat(owner='" + owner + "', group='" + group + "'" + ", mode=" + mode + ")"; } public String getOwner() { return owner; } public String getGroup() { return group; } public int getMode() { return mode; } } /** * Returns the file stat for a file descriptor. * * @param fd file descriptor. * @return the file descriptor file stat. * @throws IOException thrown if there was an IO error while obtaining the file stat. */ public static Stat getFstat(FileDescriptor fd) throws IOException { Stat stat = null; if (!Shell.WINDOWS) { stat = fstat(fd); stat.owner = getName(IdCache.USER, stat.ownerId); stat.group = getName(IdCache.GROUP, stat.groupId); } else { try { stat = fstat(fd); } catch (NativeIOException nioe) { if (nioe.getErrorCode() == 6) { throw new NativeIOException("The handle is invalid.", Errno.EBADF); } else { LOG.warn(String.format("NativeIO.getFstat error (%d): %s", nioe.getErrorCode(), nioe.getMessage())); throw new NativeIOException("Unknown error", Errno.UNKNOWN); } } } return stat; } private static String getName(IdCache domain, int id) throws IOException { Map<Integer, CachedName> idNameCache = (domain == IdCache.USER) ? USER_ID_NAME_CACHE : GROUP_ID_NAME_CACHE; String name; CachedName cachedName = idNameCache.get(id); long now = System.currentTimeMillis(); if (cachedName != null && (cachedName.timestamp + cacheTimeout) > now) { name = cachedName.name; } else { name = (domain == IdCache.USER) ? getUserName(id) : getGroupName(id); if (LOG.isDebugEnabled()) { String type = (domain == IdCache.USER) ? "UserName" : "GroupName"; LOG.debug("Got " + type + " " + name + " for ID " + id + " from the native implementation"); } cachedName = new CachedName(name, now); idNameCache.put(id, cachedName); } return name; } static native String getUserName(int uid) throws IOException; static native String getGroupName(int uid) throws IOException; private static class CachedName { final long timestamp; final String name; public CachedName(String name, long timestamp) { this.name = name; this.timestamp = timestamp; } } private static final Map<Integer, CachedName> USER_ID_NAME_CACHE = new ConcurrentHashMap<Integer, CachedName>(); private static final Map<Integer, CachedName> GROUP_ID_NAME_CACHE = new ConcurrentHashMap<Integer, CachedName>(); private enum IdCache { USER, GROUP } public final static int MMAP_PROT_READ = 0x1; public final static int MMAP_PROT_WRITE = 0x2; public final static int MMAP_PROT_EXEC = 0x4; public static native long mmap(FileDescriptor fd, int prot, boolean shared, long length) throws IOException; public static native void munmap(long addr, long length) throws IOException; } private static boolean workaroundNonThreadSafePasswdCalls = false; public static class Windows { // Flags for CreateFile() call on Windows public static final long GENERIC_READ = 0x80000000L; public static final long GENERIC_WRITE = 0x40000000L; public static final long FILE_SHARE_READ = 0x00000001L; public static final long FILE_SHARE_WRITE = 0x00000002L; public static final long FILE_SHARE_DELETE = 0x00000004L; public static final long CREATE_NEW = 1; public static final long CREATE_ALWAYS = 2; public static final long OPEN_EXISTING = 3; public static final long OPEN_ALWAYS = 4; public static final long TRUNCATE_EXISTING = 5; public static final long FILE_BEGIN = 0; public static final long FILE_CURRENT = 1; public static final long FILE_END = 2; public static final long FILE_ATTRIBUTE_NORMAL = 0x00000080L; /** * Create a directory with permissions set to the specified mode. By setting * permissions at creation time, we avoid issues related to the user lacking * WRITE_DAC rights on subsequent chmod calls. One example where this can * occur is writing to an SMB share where the user does not have Full Control * rights, and therefore WRITE_DAC is denied. * * @param path directory to create * @param mode permissions of new directory * @throws IOException if there is an I/O error */ public static void createDirectoryWithMode(File path, int mode) throws IOException { createDirectoryWithMode0(path.getAbsolutePath(), mode); } /** Wrapper around CreateDirectory() on Windows */ private static native void createDirectoryWithMode0(String path, int mode) throws NativeIOException; /** Wrapper around CreateFile() on Windows */ public static native FileDescriptor createFile(String path, long desiredAccess, long shareMode, long creationDisposition) throws IOException; /** * Create a file for write with permissions set to the specified mode. By * setting permissions at creation time, we avoid issues related to the user * lacking WRITE_DAC rights on subsequent chmod calls. One example where * this can occur is writing to an SMB share where the user does not have * Full Control rights, and therefore WRITE_DAC is denied. * * This method mimics the semantics implemented by the JDK in * {@link java.io.FileOutputStream}. The file is opened for truncate or * append, the sharing mode allows other readers and writers, and paths * longer than MAX_PATH are supported. (See io_util_md.c in the JDK.) * * @param path file to create * @param append if true, then open file for append * @param mode permissions of new directory * @return FileOutputStream of opened file * @throws IOException if there is an I/O error */ public static FileOutputStream createFileOutputStreamWithMode(File path, boolean append, int mode) throws IOException { long desiredAccess = GENERIC_WRITE; long shareMode = FILE_SHARE_READ | FILE_SHARE_WRITE; long creationDisposition = append ? OPEN_ALWAYS : CREATE_ALWAYS; return new FileOutputStream(createFileWithMode0(path.getAbsolutePath(), desiredAccess, shareMode, creationDisposition, mode)); } /** Wrapper around CreateFile() with security descriptor on Windows */ private static native FileDescriptor createFileWithMode0(String path, long desiredAccess, long shareMode, long creationDisposition, int mode) throws NativeIOException; /** Wrapper around SetFilePointer() on Windows */ public static native long setFilePointer(FileDescriptor fd, long distanceToMove, long moveMethod) throws IOException; /** Windows only methods used for getOwner() implementation */ private static native String getOwner(FileDescriptor fd) throws IOException; /** Supported list of Windows access right flags */ public static enum AccessRight { ACCESS_READ (0x0001), // FILE_READ_DATA ACCESS_WRITE (0x0002), // FILE_WRITE_DATA ACCESS_EXECUTE (0x0020); // FILE_EXECUTE private final int accessRight; AccessRight(int access) { accessRight = access; } public int accessRight() { return accessRight; } }; /** Windows only method used to check if the current process has requested * access rights on the given path. */ private static native boolean access0(String path, int requestedAccess); /** * Checks whether the current process has desired access rights on * the given path. * * Longer term this native function can be substituted with JDK7 * function Files#isReadable, isWritable, isExecutable. * * @param path input path * @param desiredAccess ACCESS_READ, ACCESS_WRITE or ACCESS_EXECUTE * @return true if access is allowed * @throws IOException I/O exception on error */ public static boolean access(String path, AccessRight desiredAccess) throws IOException { return true; } /** * Extends both the minimum and maximum working set size of the current * process. This method gets the current minimum and maximum working set * size, adds the requested amount to each and then sets the minimum and * maximum working set size to the new values. Controlling the working set * size of the process also controls the amount of memory it can lock. * * @param delta amount to increment minimum and maximum working set size * @throws IOException for any error * @see POSIX#mlock(ByteBuffer, long) */ public static native void extendWorkingSetSize(long delta) throws IOException; static { if (NativeCodeLoader.isNativeCodeLoaded()) { try { initNative(); nativeLoaded = true; } catch (Throwable t) { // This can happen if the user has an older version of libhadoop.so // installed - in this case we can continue without native IO // after warning PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", t); } } } } private static final Log LOG = LogFactory.getLog(NativeIO.class); private static boolean nativeLoaded = false; static { if (NativeCodeLoader.isNativeCodeLoaded()) { try { initNative(); nativeLoaded = true; } catch (Throwable t) { // This can happen if the user has an older version of libhadoop.so // installed - in this case we can continue without native IO // after warning PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", t); } } } /** * Return true if the JNI-based native IO extensions are available. */ public static boolean isAvailable() { return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded; } /** Initialize the JNI method ID and class ID cache */ private static native void initNative(); /** * Get the maximum number of bytes that can be locked into memory at any * given point. * * @return 0 if no bytes can be locked into memory; * Long.MAX_VALUE if there is no limit; * The number of bytes that can be locked into memory otherwise. */ static long getMemlockLimit() { return isAvailable() ? getMemlockLimit0() : 0; } private static native long getMemlockLimit0(); /** * @return the operating system's page size. */ static long getOperatingSystemPageSize() { try { Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); Unsafe unsafe = (Unsafe)f.get(null); return unsafe.pageSize(); } catch (Throwable e) { LOG.warn("Unable to get operating system page size. Guessing 4096.", e); return 4096; } } private static class CachedUid { final long timestamp; final String username; public CachedUid(String username, long timestamp) { this.timestamp = timestamp; this.username = username; } } private static final Map<Long, CachedUid> uidCache = new ConcurrentHashMap<Long, CachedUid>(); private static long cacheTimeout; private static boolean initialized = false; /** * The Windows logon name has two part, NetBIOS domain name and * user account name, of the format DOMAIN\UserName. This method * will remove the domain part of the full logon name. * * @param Fthe full principal name containing the domain * @return name with domain removed */ private static String stripDomain(String name) { int i = name.indexOf('\\'); if (i != -1) name = name.substring(i + 1); return name; } public static String getOwner(FileDescriptor fd) throws IOException { ensureInitialized(); if (Shell.WINDOWS) { String owner = Windows.getOwner(fd); owner = stripDomain(owner); return owner; } else { long uid = POSIX.getUIDforFDOwnerforOwner(fd); CachedUid cUid = uidCache.get(uid); long now = System.currentTimeMillis(); if (cUid != null && (cUid.timestamp + cacheTimeout) > now) { return cUid.username; } String user = POSIX.getUserName(uid); LOG.info("Got UserName " + user + " for UID " + uid + " from the native implementation"); cUid = new CachedUid(user, now); uidCache.put(uid, cUid); return user; } } /** * Create a FileInputStream that shares delete permission on the * file opened, i.e. other process can delete the file the * FileInputStream is reading. Only Windows implementation uses * the native interface. */ public static FileInputStream getShareDeleteFileInputStream(File f) throws IOException { if (!Shell.WINDOWS) { // On Linux the default FileInputStream shares delete permission // on the file opened. // return new FileInputStream(f); } else { // Use Windows native interface to create a FileInputStream that // shares delete permission on the file opened. // FileDescriptor fd = Windows.createFile( f.getAbsolutePath(), Windows.GENERIC_READ, Windows.FILE_SHARE_READ | Windows.FILE_SHARE_WRITE | Windows.FILE_SHARE_DELETE, Windows.OPEN_EXISTING); return new FileInputStream(fd); } } /** * Create a FileInputStream that shares delete permission on the * file opened at a given offset, i.e. other process can delete * the file the FileInputStream is reading. Only Windows implementation * uses the native interface. */ public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset) throws IOException { if (!Shell.WINDOWS) { RandomAccessFile rf = new RandomAccessFile(f, "r"); if (seekOffset > 0) { rf.seek(seekOffset); } return new FileInputStream(rf.getFD()); } else { // Use Windows native interface to create a FileInputStream that // shares delete permission on the file opened, and set it to the // given offset. // FileDescriptor fd = NativeIO.Windows.createFile( f.getAbsolutePath(), NativeIO.Windows.GENERIC_READ, NativeIO.Windows.FILE_SHARE_READ | NativeIO.Windows.FILE_SHARE_WRITE | NativeIO.Windows.FILE_SHARE_DELETE, NativeIO.Windows.OPEN_EXISTING); if (seekOffset > 0) NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN); return new FileInputStream(fd); } } /** * Create the specified File for write access, ensuring that it does not exist. * @param f the file that we want to create * @param permissions we want to have on the file (if security is enabled) * * @throws AlreadyExistsException if the file already exists * @throws IOException if any other error occurred */ public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions) throws IOException { if (!Shell.WINDOWS) { // Use the native wrapper around open(2) try { FileDescriptor fd = NativeIO.POSIX.open(f.getAbsolutePath(), NativeIO.POSIX.O_WRONLY | NativeIO.POSIX.O_CREAT | NativeIO.POSIX.O_EXCL, permissions); return new FileOutputStream(fd); } catch (NativeIOException nioe) { if (nioe.getErrno() == Errno.EEXIST) { throw new AlreadyExistsException(nioe); } throw nioe; } } else { // Use the Windows native APIs to create equivalent FileOutputStream try { FileDescriptor fd = NativeIO.Windows.createFile(f.getCanonicalPath(), NativeIO.Windows.GENERIC_WRITE, NativeIO.Windows.FILE_SHARE_DELETE | NativeIO.Windows.FILE_SHARE_READ | NativeIO.Windows.FILE_SHARE_WRITE, NativeIO.Windows.CREATE_NEW); NativeIO.POSIX.chmod(f.getCanonicalPath(), permissions); return new FileOutputStream(fd); } catch (NativeIOException nioe) { if (nioe.getErrorCode() == 80) { // ERROR_FILE_EXISTS // 80 (0x50) // The file exists throw new AlreadyExistsException(nioe); } throw nioe; } } } private synchronized static void ensureInitialized() { if (!initialized) { cacheTimeout = new Configuration().getLong("hadoop.security.uid.cache.secs", 4*60*60) * 1000; LOG.info("Initialized cache for UID to User mapping with a cache" + " timeout of " + cacheTimeout/1000 + " seconds."); initialized = true; } } /** * A version of renameTo that throws a descriptive exception when it fails. * * @param src The source path * @param dst The destination path * * @throws NativeIOException On failure. */ public static void renameTo(File src, File dst) throws IOException { if (!nativeLoaded) { if (!src.renameTo(dst)) { throw new IOException("renameTo(src=" + src + ", dst=" + dst + ") failed."); } } else { renameTo0(src.getAbsolutePath(), dst.getAbsolutePath()); } } public static void link(File src, File dst) throws IOException { if (!nativeLoaded) { HardLink.createHardLink(src, dst); } else { link0(src.getAbsolutePath(), dst.getAbsolutePath()); } } /** * A version of renameTo that throws a descriptive exception when it fails. * * @param src The source path * @param dst The destination path * * @throws NativeIOException On failure. */ private static native void renameTo0(String src, String dst) throws NativeIOException; private static native void link0(String src, String dst) throws NativeIOException; /** * Unbuffered file copy from src to dst without tainting OS buffer cache * * In POSIX platform: * It uses FileChannel#transferTo() which internally attempts * unbuffered IO on OS with native sendfile64() support and falls back to * buffered IO otherwise. * * It minimizes the number of FileChannel#transferTo call by passing the the * src file size directly instead of a smaller size as the 3rd parameter. * This saves the number of sendfile64() system call when native sendfile64() * is supported. In the two fall back cases where sendfile is not supported, * FileChannle#transferTo already has its own batching of size 8 MB and 8 KB, * respectively. * * In Windows Platform: * It uses its own native wrapper of CopyFileEx with COPY_FILE_NO_BUFFERING * flag, which is supported on Windows Server 2008 and above. * * Ideally, we should use FileChannel#transferTo() across both POSIX and Windows * platform. Unfortunately, the wrapper(Java_sun_nio_ch_FileChannelImpl_transferTo0) * used by FileChannel#transferTo for unbuffered IO is not implemented on Windows. * Based on OpenJDK 6/7/8 source code, Java_sun_nio_ch_FileChannelImpl_transferTo0 * on Windows simply returns IOS_UNSUPPORTED. * * Note: This simple native wrapper does minimal parameter checking before copy and * consistency check (e.g., size) after copy. * It is recommended to use wrapper function like * the Storage#nativeCopyFileUnbuffered() function in hadoop-hdfs with pre/post copy * checks. * * @param src The source path * @param dst The destination path * @throws IOException */ public static void copyFileUnbuffered(File src, File dst) throws IOException { if (nativeLoaded && Shell.WINDOWS) { copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath()); } else { FileInputStream fis = null; FileOutputStream fos = null; FileChannel input = null; FileChannel output = null; try { fis = new FileInputStream(src); fos = new FileOutputStream(dst); input = fis.getChannel(); output = fos.getChannel(); long remaining = input.size(); long position = 0; long transferred = 0; while (remaining > 0) { transferred = input.transferTo(position, remaining, output); remaining -= transferred; position += transferred; } } finally { IOUtils.cleanup(LOG, output); IOUtils.cleanup(LOG, fos); IOUtils.cleanup(LOG, input); IOUtils.cleanup(LOG, fis); } } } private static native void copyFileUnbuffered0(String src, String dst) throws NativeIOException; }
b.遠程調用運行
含義:windows系統的代碼直接鏈接linux系統的hadoop環境進行運行,運行結果能夠存到本地或HDFS服務器上
conf.set("fs.defaultFS", "hdfs://hadoop-senior01.test.com:8020");
c.打jar包放到hadoop集羣中運行
<1>兩種方式打jar包
I.eclipse打jar包
II.maven打jar包
<2>放到linux環境中運行
bin/yarn jar hadoop-test.jar file:/opt/module/hadoop-2.6.0/LICENSE.txt file:/opt/out
d.InputFormat
<1>功能
I.對輸入文件進行切分,生成InputSplit切片
II.建立RecordReader,將InputSplit交給Mapper進程讀取
<2>子類
DBInputFormat/FileInputFormat
FileInputFormat:TextInputFormat/KeyValueTextInputFormat/SequenceFileInputFormat/NLineInputFormat/CombineFileInputFormat
<3>SequenceFileInputFormat使用
I.生成SequenceFile文件(<k,v>形式的二進制文件)
II.map/reduce/驅動方法
job.setInputFormatClass(SequenceFileInputFormat.class);
WardCount(這裏的是將入參變爲由下面一個腳本執行得到的SequenceFile文件作處理到yarn平臺)
package sequencefile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; public class WordCount { //臨時配置HADOOP_HOME環境變量 static { System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0-cdh5.9.0"); } /** * 默認MapReduce是經過TextInputFormat進行切片,並交給Mapper進行處理 * TextInputFormat:key:當前行的首字母的索引,value:當前行數據 * Mapper類參數:輸入key類型:Long,輸入Value類型:String,輸出key類型:String,輸出Value類型:Long * MapReduce爲了網絡傳輸時序列化文件比較小,執行速度快,對基本類型進行包裝,實現本身的序列化 * @author Administrator * */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { LongWritable one = new LongWritable(1); /** * 將每行數據拆分,拆分完輸出每一個單詞和個數 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { String words = value.toString(); //將每行數據拆分紅各個單詞 String[] wordArr = words.split(" "); //遍歷各個單詞 for (String word : wordArr) { //輸出格式<單詞,1> context.write(new Text(word), one); } } } /** * 進行全局聚合 * Reducer參數:輸入key類型:String,輸入Value類型:Long,輸出key類型:String,輸出Value類型:Long * @author Administrator * */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { /** * 將map輸出結果進行全局聚合 * key:單詞, values:個數[1,1,1] */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { Long sum = 0L; for (LongWritable value : values) { //累加單詞個數 sum += value.get(); } //輸出最終數據結果 context.write(key, new LongWritable(sum)); } } /** * 驅動方法 * @param args * @throws IllegalArgumentException * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { //0.建立一個Job Configuration conf = new Configuration(); //鏈接hadoop環境 // conf.set("fs.defaultFS", "hdfs://hadoop-senior01.test.com:8020"); Job job = Job.getInstance(conf, "word-count"); //經過類名打成jar包 job.setJarByClass(WordCount.class); //1.輸入文件 FileInputFormat.addInputPath(job, new Path(args[0])); //指定以SequenceFileInputFormat處理sequencefile文件 job.setInputFormatClass(SequenceFileInputFormat.class); //2.編寫mapper處理邏輯 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //3.shuffle流程(暫時不用處理) //4.編寫reducer處理邏輯 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //5.輸出文件 FileOutputFormat.setOutputPath(job, new Path(args[1])); //序列化文件的壓縮類型:None、Block、Record SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); //壓縮格式:default、gzip、lz四、snappy SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); //指定輸出格式爲序列化文件輸出 job.setOutputFormatClass(SequenceFileOutputFormat.class); //6.運行Job boolean result = job.waitForCompletion(true); System.out.println(result ? 1 : 0); } }
GenerateSequenceFile
package sequencefile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; /** * 生成sequencefile文件 * @author Administrator * */ public class GenerateSequenceFile { public static void main(String[] args) throws IOException { //1.sequenceFile文件是經過SequenceFile類生成的 //createWriter方法參數:conf:hadop配置項,name:文件名, //keyClass:key的數據類型;valClass:值得數據類型 //指定文件名稱 Writer.Option name = Writer.file(new Path("file:/e:/sf")); //指定key的類型 Writer.Option keyClass = Writer.keyClass(LongWritable.class); //指定value的類型 Writer.Option valClass = Writer.valueClass(Text.class); //hadoop配置項 Configuration conf = new Configuration(); //建立輸出流 Writer writer = SequenceFile.createWriter(conf, name, keyClass, valClass); //讀取文本文件 FileSystem fs = FileSystem.get(conf); FSDataInputStream in = fs.open(new Path("file:/e:/words.txt")); String line = null; Long num = 0L; while ((line = in.readLine())!=null) { //不斷遞增key值 num++; //輸出每行數據到sequencefile中 writer.append(new LongWritable(num), new Text(line)); } IOUtils.closeStream(writer); } }
e.輸入切片(InputSplit)
<1>何時切分
client端進行切分,切分後交給YARN服務器執行
<2>切片中存儲的內容
數據長度、數據存儲位置
<3>切片大小
minSize = max{minSplitSize, mapred.min.split.size}
maxSize = mapred.max.split.size
splitSize = max{minSize, min{maxSize, blockSize}}
<4>切片數量(mapper進程數量)
總文件大小/切片大小
f.reduce個數
g.outputFormat
<1>功能
I.判斷輸出目錄是否存在
II.將結果輸出到表或文件彙總
<2>子類
DBOutputFormat/FileOutputFormat
FileOutputFormat:TextOutputFormat/MapFileOutputFormat/SequenceFileOutputFormat/MultipleOutputs
<3>SequenceFileOutputFormat使用
I.生成SequenceFile文件(<k,v>形式的二進制文件)
II.map/reduce/驅動方法
job.setInputFormatClass(SequenceFileInputFormat.class);
h.partitioner分區器
<1>功能
I.位置:在mapper和reducer處理邏輯之間,shuffle寫入流程開始的時候
II.將map輸出結果分發到不一樣的reduce上
III.分區數和reducer數量同樣
<2>Partitioner子類
HashPartitioner、KeyFieldBasedPartitioner、BinaryPartitioner、TotalOrderedPartitioner
2.2.Shuffle流程
1)位置
在mapper和reducer處理邏輯之間,鏈接map和reduce的紐帶
2)功能
Shuffle的本義就是洗牌、混洗,把有規律有必定規則的數據儘可能轉爲一組無規律的數據,越隨機越好。MapReduce中的Shuffle更像是洗牌的逆過程,把一組無規律的數據變爲一組有規律的數據
從shuffle寫入流程(map端)到shuffle讀取流程(reduce端)整個過程能夠被廣義的稱爲Shuffle。Shuffle橫跨Map端和Reduce端,在Map端包含Splill寫過程,在Reduce端包含copy和sort讀過程
3)寫入流程
a.map輸出數據通過分區,分區完後經過collect收集到內存環形緩衝區kvbuffer
b.sort將緩衝區中的數據排序
<1>按分區排序
<2>每一個分區中數據按key進行排序
c.spill線程溢寫到本地磁盤
每次緩衝區滿就溢寫,會產生不少小文件
d.merge合併將小文件合併到大文件
4)讀取流程
a.copy
reduce端經過HTTP協議從mapper磁盤上讀取相應分區的數據
b.merge-sort
<1>若是讀取的數據在reduce內存中能放得下,就直接放到內存中。當內存空間達到必定閾值,就merge成一個磁盤文件
<2>若是讀取過來的數據內存放不下,就直接輸出到磁盤上。
每一個mapper過來的數據,就建一個文件。當文件數達到必定閾值,就merge成一個大文件。
三:HDFS 分佈式文件系統
3.1 設計原則
1)超大文件;2)流數據訪問;3)商用硬件;4)低時間延遲的數據訪問;5)大量的小文件;6)度用戶寫入,任意修改文件
3.2 概念
3.2.1 塊
通常文件系統有塊的概念512字節,是數據讀寫最小單元,可是HDFS分佈式文件系統也有塊的概念,確實默認128M大小,弄這麼大的緣由:
1)最小化尋址開銷
2)有了這個塊的概念,一個文件能夠存在在不一樣的網絡空間
3)使用抽象塊而非整個文件做爲存儲單元,大大簡化存儲子系統的設計
fsck命令能夠查看塊信息
3.2.2 namenode和datanode
namenode:管理文件命名空間,維護整個系統樹和整個系統樹內全部的文件和目錄,這些信息在磁盤上以文件形式永久保存在本地磁盤:命名空間鏡像文件+編輯日誌文件
datanode:文件系統的工做節點,他根據須要存儲檢索數據塊
客戶端POSIX:可移植操做系統界面的文件系統接口,經過他與namenode,datanode交互
聯邦HDFS:namenode在內存中保存文件系統中每一個文件和每一個數據塊的應用關係,意味着對於一個擁有大量文件的超大集羣來講,存在限制了系統橫向擴展的瓶頸,次數經過這個模式就能夠解決
想要訪問方法:客戶端須要使用客戶端掛載的數據表將文件路徑映射到namenode
3.2.3 塊緩存
datanode從磁盤中讀取塊,對於頻繁訪問的文件,其對應的塊能夠被顯示的緩存起來,用戶或者應用能夠經過增長一個cache directive 來告訴namenode須要緩存哪些文件及緩存多久
3.2.4 HDFS高可用
經過NFS或者日誌管理器QJM的機制在活動namenode失效以後,備用namenode能快速切換(熱切換),若是配用的namenode也壞了,管理員也能夠申明一個namenode冷啓動
3.2.5 文件系統
文件系統的基本操做如:%hadoop fa -copyFromLocal input/docs/qq.txt \ hdfs://localhost/user/qq.txt;//將本地文件複製到HDFS中
HDFS具有必定的權限機制
因爲Hadoop的抽象的文件系統概念,HDFS只是其中一個實現,定義了不少接口可供java調用;
這些Hadoop的api處理java直接調用還能夠經過http調用,可是不到萬不得已不用經過這個方式,會有延時(內部經過HttpFS代理,WebHDFS)
HttpFS:提供代理
WebHDFS:http接口都是經過這個協議開發的
libhdfs:C語言庫,可容許你調用hadoop的api
HFS:經過NFSv3網關將HFDS掛載爲本地客戶端的文件系統是可行的(如何配置你能夠本身百度小夥子)
FUSE:用戶空間文件系統,容許將用戶空間實現的文件系統做爲Unix文件系統集成(HFS相對來講更好推薦)
3.2.6 Java接口
1)從Hadoop的URL中讀取數據:
2)經過FileSystem API讀取數據:FSDataInputStream對象
3)寫入數據:FSDataOutputStream
4)建立目錄:create()
5)查詢文件系統:文件元數據:FileStatus:包含了文件長度、塊大小、副本、修改時間、全部者及權限信息;FileSystem的listStatus列出文件
3.2.7 文件模式
可用經過通配符方式匹配到更多文件globStatus():返回路徑模式與指定模式匹配的全部FileStatus對象組成的數組(正則表達式模式如:*,?[ab]......)
PathFilter對象:通配符模式
刪除數據:delete()
一致模式:hflush,hsync
經過distcp並行複製:%hadoop distcp file1 file2
保證HDFS集羣的均衡,對系統性能是很是棒的條件,可經過均衡器(balancer)達到這一點
3.3 深刻了解
1)三大組件
NameNode、DataNode、SecondaryNameNode
2)NameNode
a.做用
存儲元數據(文件名、建立時間、大小、權限、文件與block塊映射關係)
b.數據存儲目錄
dfs.namenode.name.dir
3)DataNode
a.做用
存儲真實的數據信息
b.數據存儲目錄
dfs.datanode.data.dir
c.block塊:默認128M,經過dfs.blocksize設置
d.副本策略
<1>默認是3個副本,經過dfs.replication配置
<2>存放形式:
I.若是客戶端在集羣中,第一個副本放到客戶端機器上;不然第一個副本隨機挑選一個不忙的機器
II.第二個副本放到和第一個副本不一樣的機架上的一個服務器上
III.第三個副本放到和第二個副本相同機架不一樣服務器上
IV.若是還有更多副本,就隨機存放
e.DataNode與NameNode通訊
<1>DN啓動後向NN進行註冊,註冊完後周期性(1小時)向NN上傳塊報告(blockreport)
blockreport:block與datanode的映射關係(第二映射關係)
做用:DN經過上傳塊報告,能更新NN內存中的映射關係
<2>DN發送心跳(3s)給NN,超過10m,就認爲DN不可用了
4)SecondaryNameNode
a.做用
減輕NameNode壓力,將edits編輯日誌文件和fsimage鏡像文件進行合併
b.執行流程
<1>週期性發送請求給NN,獲取fsimage和edits
<2>NN收到請求後,生成一個空的edits.new文件
<3>NN給SNN發送fsimage和edits
<4>SNN將fsimage文件加載到內存,合併edits文件
<5>SNN生成新的鏡像文件fsimage.ckpt
<6>SNN發送fsimage.ckpt給NN
<7>NN將fsimage.ckpt替換fsimage文件,將edits.new重命名爲edits文件
5)讀寫流程
a.寫入流程
<1>客戶端給NN通訊,建立文件
<2>NN判斷文件是否存在,是否有權限,若是有就建立文件,不然失敗報錯
<3>客戶端將數據進行切片,放到緩衝區隊列中;每一個切片都須要給NN發送請求,NN給客戶端返回DN列表
<4>客戶端鏈接DN列表寫入數據
<5>DN根據副本策略將數據發送給其餘DN
<6>DN給客戶端返回ACK包,若是成功就執行下一個切片,若是失敗就重試
b.讀取流程
<1>客戶端給NN通訊,讀取文件
<2>NN查找文件與block塊關係,block與DN關係返回給客戶端
<3>客戶端建立輸入流,根據NN返回的關係,去DN查找block數據
<4>DN查找block塊數據,返回給客戶端
<5>客戶端經過校驗和比對block是否損壞。若是損壞,就取另外DN上的block塊;若是沒有,就讀取下一個block塊
6)安全模式
a.含義
客戶端只能進行查看,不能進行寫入、刪除操做
b.做用
NN啓動後進入安全模式,檢查數據塊和DN的完整性
c.判斷條件
<1>副本數達到要求的block塊數佔全部block塊總數的比例
dfs.namenode.replication.min:副本數最小要求,默認1
dfs.namenode.safemode.threshold-pct:比例 0.999f
<2>可用DN數達到要求
dfs.namenode.safemode.min.datanodes:最小可用DN數,默認0
<3>前兩個條件知足後維護一段時間
dfs.namenode.safemode.extension:維持一段時間,默認1ms
3.4 HDFS命令行及Java API操做
1)HDFS命令行
a.bin/hdfs dfs命令:
<1> -help:查看有哪些命令
<2> -mkdir:建立目錄,-p:建立多個目錄
<3> -put:上傳本地文件到HDFS服務器上
-copyFromLocal
-moveFromLocal
<4> -ls:查看指定目錄下有哪些文件和子目錄,-R:遞歸地查看有哪些文件和目錄
<5> -du(s):查看目錄或文件的大小
-count 【-q】
<6> -mv/-cp:移動/複製目錄或文件
<7> -rm -r:刪除目錄或文件,-r:遞歸刪除
<8> -get:將服務器上的文件下載到本地
-copyToLocal
-moveToLocal
<9> -cat/-text:查看服務器上文本格式的文件
b.bin/hdfs dfsadmin命令
<1>-report:查看文件系統的基本信息和統計信息
<2>-safemode enter | leave | wait : 安全模式命令
<3>-refreshNodes : 從新讀取hosts和exclude文件,在新增節點和註銷節點時使用
<4>-finalizeUpgrade : 終結HDFS的升級操做
<5>-setQuota <quota> <dirname>:爲每一個目錄<dirname>設置配額<quota>
<6>-clrQuota <dirname>:爲每一個目錄清楚配額設定
2)HDFS Java API操做(filesystem)
參考:http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.9.0/api/index.html
建立目錄:mkdirs 上傳文件:create/put或者copyFormLocalFile 列出目錄內容:listStatus 顯示目錄或者目錄的元數據:getFlieStatus 下載文件:open/get得到copyToLocalFile 刪除文件或者目錄:delete
下面是java調用方法類方法實例:
package org.hdfs.test; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HdfsTest { public static void main(String[] args) { // // 獲取訪問入口:FileSystem // FileSystem fileSystem = getHadoopFileSystem(); // System.out.println(fileSystem); // //建立目錄 // boolean result = createPath("/test"); // System.out.println(result); // //建立文件 // boolean result = createFile("/test/test.txt", "hello world"); // System.out.println(result); //上傳文件 //輸出pathName能夠是目錄也能夠是文件 // putFile2HDFS("E://word.txt", "/test"); //輸出pathName必須是文件 // putFile2HDFS2("E://word.txt", "/test/word1.txt"); //獲取元數據信息 // list("/test"); //下載文件 //第二個參數是文件路徑 // getFileFromHDFS("/test/test.txt", "E://test"); //第二個參數是文件路徑 // getFileFromHDFS2("/test/word1.txt", "E://word1"); //刪除文件或目錄 delete("/test"); } /** * 生成文件系統FileSystem * @return */ public static FileSystem getHadoopFileSystem() { Configuration conf = new Configuration(); //執行NameNode訪問地址 conf.set("fs.defaultFS", "hdfs://hadoop-senior01.test.com:8020"); try { //經過調用FileSystem工廠模式get方法生成FileSystem FileSystem fileSystem = FileSystem.get(conf); return fileSystem; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } /** * 建立目錄 * @param pathName * @return */ public static boolean createPath(String pathName) { boolean result = false; //1.獲取文件系統 FileSystem fileSystem = getHadoopFileSystem(); //2.調用文件系統的mkdirs建立目錄 Path path = new Path(pathName); try { result = fileSystem.mkdirs(path); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { //3.將文件系統關閉 close(fileSystem); } return result; } /** * 建立文件 * @param pathName * @return */ public static boolean createFile(String pathName, String content) { boolean result = false; //1.獲取文件系統 FileSystem fileSystem = getHadoopFileSystem(); //2.調用文件系統create方法建立文件 try { //2.1建立文件 FSDataOutputStream out = fileSystem.create(new Path(pathName)); //2.2寫入數據 out.writeUTF(content); result = true; } catch (IllegalArgumentException | IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { //3.關閉文件系統 close(fileSystem); } return result; } /** * 上傳文件 * @param srcPathName * @param dstPathName */ public static void putFile2HDFS(String srcPathName, String dstPathName) { //1.獲取文件系統 FileSystem fileSystem = getHadoopFileSystem(); //2.調用文件系統中的copyFromLocalFile上傳文件 try { fileSystem.copyFromLocalFile(new Path(srcPathName), new Path(dstPathName)); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { //3.關閉文件系統 close(fileSystem); } } /** * 上傳文件(經過輸入輸出流) * @param srcPathName * @param dstPathName */ public static void putFile2HDFS2(String srcPathName, String dstPathName) { //1.獲取文件系統 FileSystem fileSystem = getHadoopFileSystem(); //2.建立輸出文件 try { //經過上傳文件,生成輸出流 FSDataOutputStream out = fileSystem.create(new Path(dstPathName)); //經過本地文件生成輸入流 FileInputStream in = new FileInputStream(srcPathName); //經過IOUtils的copyBytes方法傳遞數據流 IOUtils.copyBytes(in, out, 4096, true); } catch (IllegalArgumentException | IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { close(fileSystem); } } /** * 查看指定目錄或文件的元數據信息 * @param pathName */ public static void list(String pathName) { //1.獲取文件系統 FileSystem fileSystem = getHadoopFileSystem(); //2.調用文件系統的listStatus方法獲取元數據列表信息 try { FileStatus[] list = fileSystem.listStatus(new Path(pathName)); for (FileStatus fileStatus : list) { boolean isDir = fileStatus.isDirectory(); String path = fileStatus.getPath().toString(); short replication = fileStatus.getReplication(); System.out.println(isDir + "::" + path + "::" + replication); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { //3.關閉文件系統 close(fileSystem); } } /** * 下載文件 * @param srcPathName * @param dstPathName */ public static void getFileFromHDFS(String srcPathName, String dstPathName) { //1.獲取文件系統 FileSystem fileSystem = getHadoopFileSystem(); //2.調用文件系統的copyToLocalFile方法下載文件 try { //因爲本地是windows系統,沒有安裝hadoop環境,因此使用第四個參數指定使用本地文件系統 fileSystem.copyToLocalFile(false, new Path(srcPathName), new Path(dstPathName), true); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { //3.關閉文件系統 close(fileSystem); } } /** * 下載文件 * @param srcPathName * @param dstPathName */ public static void getFileFromHDFS2(String srcPathName, String dstPathName) { //1.獲取文件系統 FileSystem fileSystem = getHadoopFileSystem(); //2.經過輸入輸出流進行下載 try { //2.1hdfs文件經過輸入流讀取到內存 FSDataInputStream in = fileSystem.open(new Path(srcPathName)); //2.2內存中的數據經過輸出流輸出到本地文件中 // FileOutputStream out = new FileOutputStream(dstPathName); //3.3IOUtils的copyBytes方法複製數據流 // IOUtils.copyBytes(in, out, 4096, true); IOUtils.copyBytes(in, System.out, 4096, true); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } //3.關閉文件系統 close(fileSystem); } /** * 刪除文件 * @param pathName */ public static void delete(String pathName) { //1.獲取文件系統 FileSystem fileSystem = getHadoopFileSystem(); //2.調用文件系統的delete方法刪除文件 try { fileSystem.delete(new Path(pathName), true); } catch (IllegalArgumentException | IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { //3.關閉文件系統 close(fileSystem); } } /** * 關閉文件系統 * @param fileSystem */ public static void close(FileSystem fileSystem) { try { fileSystem.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; }
a.Hadoop默認支持權限控制,可將其關閉
hdfs-site.xml文件:dfs.permissions.enabled設置成false
注:需配置到服務器hdfs-site.xml中,服務器重啓
b.獲取文件元數據信息
副本策略:dfs.replication配置項在客戶端進行指定
c.下載文件時候調用copyToLocalFile的問題
因爲本地是windows系統,沒有安裝hadoop環境,因此使用第四個參數指定使用本地文件系統
fileSystem.copyToLocalFile(false, new Path(srcPathName), new Path(dstPathName), true);
四:YARN(資源調度和任何管理)
1.四大組件
ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)、Container
2.執行流程
<1>client鏈接RM提交做業,RM給client一個JobId(注:ApplicationsManager和ResourceScheduler)
<2>RM中的ApplicationsManager鏈接一個NM,讓NM建立一個AM處理客戶端做業請求
<3>AM鏈接RM中ApplicationsManager申請NodeManager
<4>AM去ResourceScheduler給client的做業申請資源(cpu、內存、磁盤、網絡)
<5>AM鏈接NM,發送client job做業程序和申請的資源cpu、內存、磁盤、網絡)
<6>NM啓動Container進程運行job的不一樣任務
<7>Container進程運行狀態實時反饋給AM
<8>AM反饋任務狀態信息給RM中的ApplicationsManager
<9>client端能夠鏈接RM或AM查詢job的執行狀況
注:NM啓動後去RM上進行註冊,會不斷髮送心跳,說明處於存活狀態
3.具體聊聊
1)資源調度
a.調度器(Resource Scheduler)
<1>FIFO Scheduler
按照做業提交順序放到先進先出的隊列中執行
<2>Capacity Scheduler(雅虎)
apache版本默認使用的
將不一樣做業放到不一樣隊列中,每一個隊列按照FIFO或DRF進行分配資源
<3>Fair Scheduler(Facebook)
CDH版本默認使用的
動態劃分或指定多個隊列,每一個隊列按照Fair(默認)或FIFO或DRF(主資源公平算法)進行分配資源
注:DRF算法(主資源公平算法)
做業1:cpu資源是主資源
做業2:內存資源是主資源
b.Capacity Scheduler配置
<1>配置capacity-scheduler.xml:
yarn.scheduler.capacity.root.queues:prod,dev
yarn.scheduler.capacity.root.dev.queues:eng,science
yarn.scheduler.capacity.root.prod.capacity:40
yarn.scheduler.capacity.root.dev.capacity:60
yarn.scheduler.capacity.root.dev.maximun-capacity:75
yarn.scheduler.capacity.root.dev.eng.capacity:50
yarn.scheduler.capacity.root.dev.science.capacity:50
<2>配置yarn-site.xml:
yarn.resourcemanager.scheduler.class:
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
c.運行Capacity Scheduler
<1>指定做業運行在哪一個隊列上mapreduce.job.queuename
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.9.0.jar wordcount -Dmapreduce.job.queuename=eng file:/opt/module/hadoop-2.6.0/NOTICE.txt file:/opt/output
<2>查看調度器
http://hadoop-senior01.test.com:8088中的scheduler
d.Fair Scheduler配置
<1>去掉yarn-site.xml中的yarn.resourcemanager.scheduler.class,保持默認
<2>直接運行做業的話,就建立一個以當前登陸用戶名爲隊列名的隊列運行;
若是運行做業時指定了隊列名,就在指定的隊列中運行
<3>fair-scheduler.xml配置
<allocations>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<queue name="prod">
<weight>40</weight>
<schedulingPolicy>fifo</schedulingPolicy>
</queue>
<queue name="prod">
<weight>60</weight>
<queue name="eng"/>
<queue name="science"/>
</queue>
<queuePlacementPolicy>
<rule name="specified" create="false"/>
<rule name="primaryGroup" create="false"/>
<rule name="default" queue="dev.eng"/>
</queuePlacementPolicy>
</allocations>
e.運行Fair Scheduler
<1>指定做業運行在哪一個隊列上mapreduce.job.queuename或不指定
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.9.0.jar wordcount -Dmapreduce.job.queuename=eng file:/opt/module/hadoop-2.6.0/NOTICE.txt file:/opt/output
<2>查看調度器
http://hadoop-senior01.test.com:8088中的scheduler
2)資源隔離(NodeManager)
a.含義
NodeManager運行多個container進程,進程須要的資源須要進行隔離,不讓彼此產生干擾
b.隔離方式
內存隔離、CPU隔離
c.Yarn Container兩種執行方式
DefaultContainerExecutor(內存隔離)和LinuxContainerExecutor(內存隔離、CPU隔離(cgroup))
注:兩種方式的內存隔離都是採用線程監控方式
五:生態系統
1)Hadoop:分佈式存儲、分佈式計算、資源調度與任務管理
hdfs、mapreduce、yarn、common
2)Lucene:索引檢索工具包;Solr:索引服務器
3)Nutch:開源的搜索引擎
4)HBase/Cassandra:基於谷歌的BigTable開源的列式存儲的非關係型數據庫
5)Hive:基於SQL的分佈式計算引擎,同時是一個數據倉庫
Pig:基於Pig Latin腳本的計算引擎
6)Thrift/Avro:RPC框架,用於網絡通訊
7)BigTop:項目測試、打包、部署
8)Oozie/Azakban:大數據的工做流框架
9)Chukwa/Scribe/Flume:數據收集框架
10)Whirr:部署爲雲服務的類庫
11)sqoop:數據遷移工具
12)Zookeeper:分佈式協調服務框架
13)HAMA:圖計算框架
14)Mahout:機器學習框架
六:安裝配置
1.三個版本:單機、僞分佈式、分佈式
2.三個分支:apache版本(Apache基金會),cdh版本(cloudera公司),hdp版本(HortOnWorks公司)
資源下載路徑:https://archive.cloudera.com/cdh5/cdh/5/;https://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.9.0.tar.gz
具體安裝詳情請看:hadoop的安裝配置
3.單機(僅僅適用於單機運行分佈式計算做業)
直接執行mapreduce-examples.jar案例
1)經過rz命令上傳本身本地的hadoop安裝包到linux服務上,我建立了一個/opt/software文件夾專門放安裝包
2)解壓:tar zxf hadoop-2.6.0-cdh5.9.0.tar.gz -C /opt/module(解壓文件都指定文件夾中去module)
3)更名字:mv hadoop-2.6.0-cdh5.9.0 hadoop-2.6.0
4)測試小demo
建立input文件夾放不少xml文件;執行測試腳本: bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.9.0.jar grep input output 'dfs[a-z.]+'
查看結果:在output文件夾中出現succuss和另外一個文件: cat part-r-00000(出現結果dfsadmin表示成功)
這單機版的hadoop沒什麼能夠作的,這裏只是他最簡單的一個小demo
4.僞分佈式
I.HDFS
<1>配置core-site.xml
fs.defaultFS
et/hadoop/core-site.xml(配置這目的:經過指定端口來訪問HDFS的主節點NameNode) <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> </configuration>
<2>配置hdfs-site.xml
dfs.replication
etc/hadoop/hdfs-site.xml(hdfs配置指定塊複製數,這裏因爲是僞分佈式,因此指定多個也沒用,塊壞了仍是跳不到其餘地方的) <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
<3>格式化NameNode
bin/hdfs namenode -format
做用:清空NameNode目錄下的全部數據,生成目錄結構,初始化一些信息到文件中
(咱們初始化的目錄默認在:/tmp/hadoop-root/dfs/name/current/;副本默認在)
<4>啓動HDFS各個進程
sbin/start-dfs.sh
或
sbin/hadoop-daemon.sh start namenode
sbin/hadoop-daemon.sh start datanode
sbin/hadoop-daemon.sh start secondarynamenode
若是報JAVA_HOME is not set and could not be found錯誤,從新指定java路徑:vim etc/hadoop/hadoop-env.sh
<5>瀏覽器訪問
http://<主機名>:50070
注:50070是http協議的端口號;9000是RPC tcp協議的端口號,下圖說明hdfs服務安裝配置好了;關服務:sbin/stop-dfs.sh
II.YARN
<1>配置mapred-site.xml(指定mapreduce的選定方案:yarn而不是默認的mapreduce了)
etc/hadoop/mapred-site.xml
<configuration>
<properity>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</properity>
</configuration>
<2>配置yarn-site.xml(配置yarn要跑什麼做業)
etc/hadoop/yarn-site.xml
<configuration> <properity> <name>yarn-nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </properity> </configuration>
<3>啓動YARN進程:ResourceManager、NodeManager
sbin/start-yarn.sh
或
sbin/yarn-daemon.sh start resourcemanager
sbin/yarn-daemon.sh start nodemanager
<4>關閉YARN進程:sbin/stop-yarn.sh
<4>瀏覽器訪問
http://<主機名>:8088
注:8032是RPC tcp協議的端口號;8088是http協議的端口號
5.分佈式環境搭建
1)先將以前僞分佈式的配置複製到另外一個文件夾
cp -r hadoop hadoop-pseudo(這裏hadoop-pseudo咱們就做爲以前分佈式的配置吧,咱們接下來的操做時修改hadoop文件中的配置項爲分佈式)
2)配置namenode訪問地址,配置secondarynamenode訪問地址,配置resourcemanager訪問地址,配置從節點的主機名
配置core-sit.xml(這裏仍是以前的不用變,指定namenode位置) <property>
<name>fs:default.name</name>
<value>hdfs://master:9000</value>
</property>
配置hdfs-sit.xml(這裏配置的是secondary namenode 爲namenode減輕壓力,這裏須要配置到第三臺機子上hadoop-senior3-test-com)
<property>
<name>dfs:namenode.secondary.http-address</name>
<value>hadoop-senior03-test-com:50090</value>
</property>
配置yarn-site.xml(配置resourcemanager訪問地址)
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce-shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop-senior02-test-com</value>
</property>
配置$HADOOP_HOME/etc/hadoop/slaves文件 hadoop-senior1.test.com hadoop-senior2.test.com hadoop-senior3.test.com
3)這一臺機器配置好了,須要將這些hadoop配置項複製到其餘機器上
scp -r * root@hadoop-senior02-test-com:/opt/module/hadoop-2.6.0/etc/hadoop
4)配置三臺電腦的免密碼登陸
5)配置聚合日誌
a.含義:
I.分佈式計算做業放到NodeManager運行,日誌信息放在NodeManager本地目錄:
yarn.nodemanager.log-dirs:${yarn.log.dir}/userlogs
II.經過配置將本地日誌放到HDFS服務器上,即聚合日誌的概念
b.配置yarn-site.xml
<!--啓用日誌聚合功能-->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!--存放多長時間-->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>3600</value>
</property>
6)歷史服務器
a.配置項
<!--RPC訪問地址-->
mapreduce.jobhistory.address
<!--HTTP訪問地址-->
mapreduce.jobhistory.webapp.address
b.啓動/訪問/中止
sbin/mr-jobhistory-daemon.sh start historyserver
yarn主界面跳轉:http://<主機名>:19888
sbin/mr-jobhistory-daemon.sh stop historyserver
三:Hadoop高級部分
3.1 MapReduce案例
1)去重排序
需求:將原始數據中出現次數不超過一行的每行數據在輸出文件中只出現一次,並按字典排序
原始數據:C:\java\eclipse\workspace\hadoop課程源碼\src\demo\去重排序
原始數據:file_1 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7 c 2012-3-3 c 原始數據:file_2 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7 c 2012-3-3 c 原始數據:file_3 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7 c 2012-3-3 c
package com.itjmd.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 去重排序 * @author Administrator * */ public class DistictAndSort { /** * mapper處理類 * @author Administrator * */ public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { /** * map處理邏輯 * 將每行數據讀取進來,轉換成輸出格式<行數據,""> */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //直接輸出行數據 context.write(value, new Text("")); } } /** * reducer處理類 * 將行數據進行去重 * @author Administrator * */ public static class MyReducer extends Reducer<Text, Text, Text, Text> { /** * reduce處理邏輯 * 將輸入數據去重,並轉換成輸出格式<行數據,""> */ @Override protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { //直接輸出 context.write(key, new Text("")); } } //驅動方法 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //0.建立一個Job Configuration conf = new Configuration(); //鏈接hadoop環境 // conf.set("fs.defaultFS", "hdfs://hadoop-senior01.test.com:8020"); Job job = Job.getInstance(conf, "DistictAndSort"); //經過類名打成jar包 job.setJarByClass(DistictAndSort.class); //1.輸入文件 for (int i = 0; i < args.length-1; i++) { FileInputFormat.addInputPath(job, new Path(args[i])); } //2.編寫mapper處理邏輯 job.setMapperClass(MyMapper.class); // job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(Text.class); //3.shuffle流程(暫時不用處理) //4.編寫reducer處理邏輯 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //5.輸出文件 FileOutputFormat.setOutputPath(job, new Path(args[args.length-1])); //6.運行Job boolean result = job.waitForCompletion(true); System.out.println(result ? 1 : 0); } }
2)最高氣溫
需求:求取每一年氣溫最高的是哪一天,氣溫是多少
原始數據:C:\java\eclipse\workspace\hadoop課程源碼\src\demo\氣溫數據
分析:
a.Map拆分原始數據每一行,將年份抽取出來。輸出格式<年份,日期:溫度>
b.Reduce拆分日期:溫度數據,將溫度逐個分析,找到每一年最大氣溫及日期
c.Reduce輸出數據。輸出格式:<日期,溫度>
1990-01-01 -5 1990-06-18 35 1990-03-20 8 1989-05-04 23 1989-11-11 -3 1989-07-05 38 1989-07-05 38 1990-07-30 37
package com.itjmd.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 每一年最高氣溫是哪天,氣溫是多少 * @author Administrator * */ public class MaxTemp { /** * mapper處理類 * @author Administrator * */ public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { /** * map處理邏輯 * 將輸入value進行拆分,拆分出年份,而後輸出<年份,日期:溫度> */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //將輸入value進行拆分 String line = value.toString(); String[] lineArr = line.split("\t"); //生成年份 String year = lineArr[0].substring(0, 4); //輸出格式:<year,day:temp> context.write(new Text(year), new Text(lineArr[0] + ":" + lineArr[1])); } } /** * reducer處理類 * @author Administrator * */ public static class MyReducer extends Reducer<Text, Text, Text, DoubleWritable> { /** * reduce處理邏輯 * 求取每一年氣溫最大值 */ @Override protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException { double maxTemp = Long.MIN_VALUE; String maxDay = null; for (Text tempVal : value) { //生成數組[日期,溫度] String tempValStr = tempVal.toString(); String[] tempValArr = tempValStr.split(":"); Double temp = Double.parseDouble(tempValArr[1]); //比較,獲取最大值 maxTemp = temp > maxTemp ? temp : maxTemp; //獲取天數 maxDay = tempValArr[0]; } //輸出格式<day,temp> context.write(new Text(maxDay), new DoubleWritable(maxTemp)); } } //驅動方法 public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { //0.建立一個Job Configuration conf = new Configuration(); //鏈接hadoop環境 // conf.set("fs.defaultFS", "hdfs://hadoop-senior01.test.com:8020"); Job job = Job.getInstance(conf, "max-temp"); //經過類名打成jar包 job.setJarByClass(MaxTemp.class); //1.輸入文件 FileInputFormat.addInputPath(job, new Path(args[0])); //2.編寫mapper處理邏輯 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //3.shuffle流程(暫時不用處理) //4.編寫reducer處理邏輯 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); //5.輸出文件 FileOutputFormat.setOutputPath(job, new Path(args[1])); //6.運行Job boolean result = job.waitForCompletion(true); System.out.println(result ? 1 : 0); } }
3)單表關聯
需求:從給出的父子關係數據表中找到祖父祖孫的關係
原始數據:C:\java\eclipse\workspace\hadoop課程源碼\src\demo\單表關聯\單表關聯.txt
分析:
a.map將原數據拆分,輸出左表數據。數據格式<parent,child>
b.Map同時輸出右表數據,輸出格式<child,parent>
c.Reduce鏈接左表的parent列和右表的child列
child parent
Tom Luck
Tom Jack
Jone Luck
Jone Jack
Lucy Marry
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesses
Philip Terry
Philip Alma
Mark Terry
Mark Alma
package com.itjmd.mapreduce; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 單表關聯 * @author Administrator * */ public class SingleJoin { //臨時配置HADOOP_HOME環境變量 static { System.setProperty("hadoop.home.dir", "C:\\java\\hadoop-2.6.0-cdh5.9.0"); } /** * Mapper處理類 * @author Administrator * */ public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { /** * map處理邏輯 * 將輸入數據轉成兩個表記錄:1.<子,父> 2.<父,子> */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //拆分每行數據 String line = value.toString(); String[] lineArr = line.split("\\s+"); //過濾文件頭 if (!"child".equals(lineArr[0])) { //1:向上,父母輩; 2:向下,孩子輩 //輸出對應的<子,父> context.write(new Text(lineArr[0]), new Text("1:" + lineArr[1])); //輸出對應的<父,子> context.write(new Text(lineArr[1]), new Text("2:" + lineArr[0])); } } } /** * Reducer處理類 * @author Administrator * */ public static class MyReducer extends Reducer<Text, Text, Text, Text> { /** * Reduce處理邏輯 * 將輸入集合數據拆分紅孫子輩列表和祖父輩列表,而後將兩個列表進行合併,生成<孫子輩,祖父輩>關係 */ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { //孫子輩列表 List<String> grandChildList = new ArrayList<String>(); //祖父輩列表 List<String> grandParentList = new ArrayList<String>(); for (Text tempVal : values) { String tempValStr = tempVal.toString(); String[] tempValArr = tempValStr.split(":"); if ("2".equals(tempValArr[0])) { //1.找出孫子輩列表 grandChildList.add(tempValArr[1]); } else if ("1".equals(tempValArr[0])) { //2.找出祖父輩列表 grandParentList.add(tempValArr[1]); } } //3.將兩個列表進行關聯,獲取<孫子輩,祖父輩>關係 for (String grandChild : grandChildList) { for (String grandParent : grandParentList) { //輸出<孫子輩,祖父輩> context.write(new Text(grandChild), new Text(grandParent)); } } } } //驅動方法 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //0.建立一個Job Configuration conf = new Configuration(); //鏈接hadoop環境 // conf.set("fs.defaultFS", "hdfs://hadoop-senior01.test.com:8020"); Job job = Job.getInstance(conf, "single-join"); //經過類名打成jar包 job.setJarByClass(SingleJoin.class); //1.輸入文件 FileInputFormat.addInputPath(job, new Path(args[0])); //2.編寫mapper處理邏輯 job.setMapperClass(MyMapper.class); //3.shuffle流程(暫時不用處理) //4.編寫reducer處理邏輯 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //5.輸出文件 FileOutputFormat.setOutputPath(job, new Path(args[1])); //6.運行Job boolean result = job.waitForCompletion(true); System.out.println(result ? 1 : 0); } }
4)多表關聯
需求:將訂單明細表和商品表中的數據關聯
原始數據:C:\java\eclipse\workspace\hadoop課程源碼\src\demo\多表關聯
detail order_id item_id amount 12 sp001 2 12 sp002 4 12 sp003 3 12 sp001 2 13 sp001 2 13 sp002 4 iteminfo item_id item_type sp001 type001 sp002 type002 sp003 type002
package com.itjmd.mapreduce; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ReduceJoin { //臨時配置HADOOP_HOME環境變量 static { System.setProperty("hadoop.home.dir", "C:\\java\\hadoop-2.6.0-cdh5.9.0"); } /** * mapper處理類 * @author Administrator * */ public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { /** * map處理邏輯 * 1.判斷是哪一個表 * 2.針對不一樣的表輸出不一樣的數據 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //1.判斷是哪一個表文件 String fileName = ((FileSplit)context.getInputSplit()).getPath().getName(); //2.切分每行數據 String line = value.toString(); if (line.contains("item_id")) return; String[] lineArr = line.split("\t"); //輸出格式的1:訂單明細表;2:商品表 if ("detail.txt".equals(fileName)) { //訂單明細表,輸出格式<item_id,"1:order_id:amount"> context.write(new Text(lineArr[1]), new Text("1\t" + lineArr[0] + "\t" + lineArr[2])); } else if ("iteminfo.txt".equals(fileName)) { //商品表,輸出格式<item_id,"2:item_type"> context.write(new Text(lineArr[0]), new Text("2\t" + lineArr[1])); } } } /** * reducer處理類 * @author Administrator * */ public static class MyReducer extends Reducer<Text, Text, Text, Text> { /** * reduce處理邏輯 * 1.將相同商品id的訂單明細信息和商品信息進行拆分,拆分後存到響應的訂單明細列表和商品列表中 * 2.將訂單明細列表和商品列表進行嵌套遍歷 */ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { //0.定義訂單明細列表和商品信息列表 List<String> orderDetailList = new ArrayList<String>(); List<String> itemInfoList = new ArrayList<String>(); //1.將相同商品id的訂單明細信息和商品信息進行拆分,拆分後存到響應的訂單明細列表和商品列表中 for (Text tempVal : values) { String tempValStr = tempVal.toString(); String[] tempValArr = tempValStr.split("\t"); if ("1".equals(tempValArr[0])) { //訂單明細表 orderDetailList.add(tempValStr.substring(2)); } else { //商品表 itemInfoList.add(tempValArr[1]); } } //2.將訂單明細列表和商品列表進行嵌套遍歷 for (String itemInfo : itemInfoList) { for (String orderDetail : orderDetailList) { context.write(key, new Text(itemInfo + "\t" + orderDetail)); } } } } //驅動方法 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //0.建立一個Job Configuration conf = new Configuration(); //鏈接hadoop環境 // conf.set("fs.defaultFS", "hdfs://hadoop-senior01.test.com:8020"); Job job = Job.getInstance(conf, "reduce-join"); //經過類名打成jar包 job.setJarByClass(ReduceJoin.class); //1.輸入文件 for (int i = 0; i < args.length-1; i++) { FileInputFormat.addInputPath(job, new Path(args[i])); } //2.編寫mapper處理邏輯 job.setMapperClass(MyMapper.class); //3.shuffle流程(暫時不用處理) //4.編寫reducer處理邏輯 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //5.輸出文件 FileOutputFormat.setOutputPath(job, new Path(args[args.length-1])); //6.運行Job boolean result = job.waitForCompletion(true); System.out.println(result ? 1 : 0); } }
注意:reduceJoin會產生數據傾斜的問題
商品類(數據分佈均勻)與訂單明細表(熱門商品id會有不少條)
方案:訂單明細表中map輸出的key添加10000內隨機數後綴,將生成的新的key分發到不一樣的reduce task上商品表中的map輸出須要擴容10000條,輸出到各個reduce task上
reduce 將兩個表的map數據進行合併,將後綴刪除
5)mapJoin原理與例子
原理:
3.2 HDFS HA架構部署
3.3 HDFS Federation 架構部署
3.4 YARN HA 架構部署
3.5 Hadoop性能調優