spark源碼閱讀之network(1)

spark將在1.6中替換掉akka,而採用netty實現整個集羣的rpc的框架,netty的內存管理和NIO支持將有效的提升spark集羣的網絡傳輸能力,爲了看懂這塊代碼,在網上找了兩本書看《netty in action》和《netty權威指南》,結合了spark的源碼既學習了netty也看完了spark netty的部分源碼。該部分源碼摻雜了太多netty的東西,看起來仍是有點累的。html

 

緩存模塊

network工程裏面抽閒了一個ManagerBuffer的接口,該接口用來表示二進制數據中視圖(表示數據的一部分),具體的實現依賴數據的來源,目前支持file,nio bytebuffer,netty bytebuf這3中數據來源。注意該接口具體的實現可能脫離了JVM GC的管理,好比NettyManagerBuffer是引用計數的,此時當該buffer傳遞給其餘線程是須要調用retain/release來添加或減小引用。
ManagerBuffer以ByteBuffer, InputStream和Netty對象三種方式對外顯示這些數據,ByteBuffer因爲消耗過大,不建議使用,添加了引用計數管理和數據大小查詢。
  1. publicabstractclassManagedBuffer{
  2. /** Number of bytes of the data. */
  3. publicabstractlong size();
  4. /**
  5. * Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the
  6. * returned ByteBuffer should not affect the content of this buffer.
  7. */
  8. // TODO: Deprecate this, usage may require expensive memory mapping or allocation.
  9. publicabstractByteBuffer nioByteBuffer()throwsIOException;
  10. /**
  11. * Exposes this buffer's data as an InputStream. The underlying implementation does not
  12. * necessarily check for the length of bytes read, so the caller is responsible for making sure
  13. * it does not go over the limit.
  14. */
  15. publicabstractInputStream createInputStream()throwsIOException;
  16. /**
  17. * Increment the reference count by one if applicable.
  18. */
  19. publicabstractManagedBuffer retain();
  20. /**
  21. * If applicable, decrement the reference count by one and deallocates the buffer if the
  22. * reference count reaches zero.
  23. */
  24. publicabstractManagedBuffer release();
  25. /**
  26. * Convert the buffer into an Netty object, used to write the data out.
  27. */
  28. publicabstractObject convertToNetty()throwsIOException;
  29. }
ManageredBuffer每一種數據來源有一個實現類。先看下數據來源爲file的。
  1. publicfinalclassFileSegmentManagedBufferextendsManagedBuffer{
  2. privatefinalTransportConf conf;
  3. privatefinalFile file;
  4. privatefinallong offset;
  5. privatefinallong length;
  6. publicFileSegmentManagedBuffer(TransportConf conf,File file,long offset,long length){
  7. this.conf = conf;
  8. this.file = file;
  9. this.offset = offset;
  10. this.length = length;
  11. }
  12. @Override
  13. publiclong size(){
  14. return length;
  15. }
  16. @Override
  17. publicByteBuffer nioByteBuffer()throwsIOException{
  18. FileChannel channel =null;
  19. try{
  20. channel =newRandomAccessFile(file,"r").getChannel();
  21. // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
  22. if(length < conf.memoryMapBytes()){
  23. ByteBuffer buf =ByteBuffer.allocate((int) length);
  24. channel.position(offset);
  25. while(buf.remaining()!=0){
  26. if(channel.read(buf)==-1){
  27. thrownewIOException(String.format("Reached EOF before filling buffer\n"+
  28. "offset=%s\nfile=%s\nbuf.remaining=%s",
  29. offset, file.getAbsoluteFile(), buf.remaining()));
  30. }
  31. }
  32. buf.flip();
  33. return buf;
  34. }else{
  35. return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
  36. }
  37. }catch(IOException e){
  38. try{
  39. if(channel !=null){
  40. long size = channel.size();
  41. thrownewIOException("Error in reading "+this+" (actual file length "+ size +")",
  42. e);
  43. }
  44. }catch(IOException ignored){
  45. // ignore
  46. }
  47. thrownewIOException("Error in opening "+this, e);
  48. }finally{
  49. JavaUtils.closeQuietly(channel);
  50. }
  51. }
  52. @Override
  53. publicInputStream createInputStream()throwsIOException{
  54. FileInputStream is =null;
  55. try{
  56. is =newFileInputStream(file);
  57. ByteStreams.skipFully(is, offset);
  58. returnnewLimitedInputStream(is, length);
  59. }catch(IOException e){
  60. try{
  61. if(is !=null){
  62. long size = file.length();
  63. thrownewIOException("Error in reading "+this+" (actual file length "+ size +")",
  64. e);
  65. }
  66. }catch(IOException ignored){
  67. // ignore
  68. }finally{
  69. JavaUtils.closeQuietly(is);
  70. }
  71. thrownewIOException("Error in opening "+this, e);
  72. }catch(RuntimeException e){
  73. JavaUtils.closeQuietly(is);
  74. throw e;
  75. }
  76. }
  77. @Override
  78. publicManagedBuffer retain(){
  79. returnthis;
  80. }
  81. @Override
  82. publicManagedBuffer release(){
  83. returnthis;
  84. }
  85. @Override
  86. publicObject convertToNetty()throwsIOException{
  87. if(conf.lazyFileDescriptor()){
  88. returnnewLazyFileRegion(file, offset, length);
  89. }else{
  90. FileChannel fileChannel =newFileInputStream(file).getChannel();
  91. returnnewDefaultFileRegion(fileChannel, offset, length);
  92. }
  93. }
  94. publicFile getFile(){return file;}
  95. publiclong getOffset(){return offset;}
  96. publiclong getLength(){return length;}
  97. @Override
  98. publicString toString(){
  99. returnObjects.toStringHelper(this)
  100. .add("file", file)
  101. .add("offset", offset)
  102. .add("length", length)
  103. .toString();
  104. }
  105. }
nioByteBuffer,若是數據大小小於spark.storage.memoryMapThreshold。那麼使用ByteBufer讀取通道的數據,若是大於等於該值,那麼使用文件內存映射方式讀取數據。
createInputStream中返回一個控制讀取長度的LimitedInputStream,這裏使用guava的ByteStreams
convertToNetty返回一個 FileRegion。若是spark.shuffle.io.lazyFD設置爲true那麼使用LazyFileRegion,若是爲false使用DefaultFileRegion。LazyFileRegion會在傳輸的時候生成FileChannel,註解說若是netty使用了epoll協議那麼不能夠使用 LazyFileRegion。
 
數據源爲ByteBuf的實現類,該類用Bytebuf來存儲數據。
  1. publicfinalclassNettyManagedBufferextendsManagedBuffer{
  2. privatefinalByteBuf buf;
  3. publicNettyManagedBuffer(ByteBuf buf){
  4. this.buf = buf;
  5. }
  6. @Override
  7. publiclong size(){
  8. return buf.readableBytes();
  9. }
  10. @Override
  11. publicByteBuffer nioByteBuffer()throwsIOException{
  12. return buf.nioBuffer();
  13. }
  14. @Override
  15. publicInputStream createInputStream()throwsIOException{
  16. returnnewByteBufInputStream(buf);
  17. }
  18. @Override
  19. publicManagedBuffer retain(){
  20. buf.retain();
  21. returnthis;
  22. }
  23. @Override
  24. publicManagedBuffer release(){
  25. buf.release();
  26. returnthis;
  27. }
  28. @Override
  29. publicObject convertToNetty()throwsIOException{
  30. return buf.duplicate();
  31. }
  32. @Override
  33. publicString toString(){
  34. returnObjects.toStringHelper(this)
  35. .add("buf", buf)
  36. .toString();
  37. }
  38. }
把一個bytebuf對象轉成InputStream對象使用ByteBufInputStream對象來完成。還有bytebuf的duplicate()返回一個bytebuf映射同一份數據,任何一個修改結果都會影響另外一個,注意引用計數。參見 http://www.maljob.com/pages/newsDetail.html?id=394
 
還一個數據源爲bytebuffer的實現
  1. publicfinalclassNioManagedBufferextendsManagedBuffer{
  2. privatefinalByteBuffer buf;
  3. publicNioManagedBuffer(ByteBuffer buf){
  4. this.buf = buf;
  5. }
  6. @Override
  7. publiclong size(){
  8. return buf.remaining();
  9. }
  10. @Override
  11. publicByteBuffer nioByteBuffer()throwsIOException{
  12. return buf.duplicate();
  13. }
  14. @Override
  15. publicInputStream createInputStream()throwsIOException{
  16. returnnewByteBufInputStream(Unpooled.wrappedBuffer(buf));
  17. }
  18. @Override
  19. publicManagedBuffer retain(){
  20. returnthis;
  21. }
  22. @Override
  23. publicManagedBuffer release(){
  24. returnthis;
  25. }
  26. @Override
  27. publicObject convertToNetty()throwsIOException{
  28. returnUnpooled.wrappedBuffer(buf);
  29. }
  30. @Override
  31. publicString toString(){
  32. returnObjects.toStringHelper(this)
  33. .add("buf", buf)
  34. .toString();
  35. }
  36. }
 這裏面一個有意思的顯示就是把bytebuffer轉成bytebuf使用netty中Unpooled.wrappedBuffer()實現
 
 
 
 



相關文章
相關標籤/搜索