spark將在1.6中替換掉akka,而採用netty實現整個集羣的rpc的框架,netty的內存管理和NIO支持將有效的提升spark集羣的網絡傳輸能力,爲了看懂這塊代碼,在網上找了兩本書看《netty in action》和《netty權威指南》,結合了spark的源碼既學習了netty也看完了spark netty的部分源碼。該部分源碼摻雜了太多netty的東西,看起來仍是有點累的。html
publicabstractclassManagedBuffer{
/** Number of bytes of the data. */
publicabstractlong size();
/**
* Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the
* returned ByteBuffer should not affect the content of this buffer.
*/
// TODO: Deprecate this, usage may require expensive memory mapping or allocation.
publicabstractByteBuffer nioByteBuffer()throwsIOException;
/**
* Exposes this buffer's data as an InputStream. The underlying implementation does not
* necessarily check for the length of bytes read, so the caller is responsible for making sure
* it does not go over the limit.
*/
publicabstractInputStream createInputStream()throwsIOException;
/**
* Increment the reference count by one if applicable.
*/
publicabstractManagedBuffer retain();
/**
* If applicable, decrement the reference count by one and deallocates the buffer if the
* reference count reaches zero.
*/
publicabstractManagedBuffer release();
/**
* Convert the buffer into an Netty object, used to write the data out.
*/
publicabstractObject convertToNetty()throwsIOException;
}
publicfinalclassFileSegmentManagedBufferextendsManagedBuffer{
privatefinalTransportConf conf;
privatefinalFile file;
privatefinallong offset;
privatefinallong length;
publicFileSegmentManagedBuffer(TransportConf conf,File file,long offset,long length){
this.conf = conf;
this.file = file;
this.offset = offset;
this.length = length;
}
@Override
publiclong size(){
return length;
}
@Override
publicByteBuffer nioByteBuffer()throwsIOException{
FileChannel channel =null;
try{
channel =newRandomAccessFile(file,"r").getChannel();
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if(length < conf.memoryMapBytes()){
ByteBuffer buf =ByteBuffer.allocate((int) length);
channel.position(offset);
while(buf.remaining()!=0){
if(channel.read(buf)==-1){
thrownewIOException(String.format("Reached EOF before filling buffer\n"+
"offset=%s\nfile=%s\nbuf.remaining=%s",
offset, file.getAbsoluteFile(), buf.remaining()));
}
}
buf.flip();
return buf;
}else{
return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
}
}catch(IOException e){
try{
if(channel !=null){
long size = channel.size();
thrownewIOException("Error in reading "+this+" (actual file length "+ size +")",
e);
}
}catch(IOException ignored){
// ignore
}
thrownewIOException("Error in opening "+this, e);
}finally{
JavaUtils.closeQuietly(channel);
}
}
@Override
publicInputStream createInputStream()throwsIOException{
FileInputStream is =null;
try{
is =newFileInputStream(file);
ByteStreams.skipFully(is, offset);
returnnewLimitedInputStream(is, length);
}catch(IOException e){
try{
if(is !=null){
long size = file.length();
thrownewIOException("Error in reading "+this+" (actual file length "+ size +")",
e);
}
}catch(IOException ignored){
// ignore
}finally{
JavaUtils.closeQuietly(is);
}
thrownewIOException("Error in opening "+this, e);
}catch(RuntimeException e){
JavaUtils.closeQuietly(is);
throw e;
}
}
@Override
publicManagedBuffer retain(){
returnthis;
}
@Override
publicManagedBuffer release(){
returnthis;
}
@Override
publicObject convertToNetty()throwsIOException{
if(conf.lazyFileDescriptor()){
returnnewLazyFileRegion(file, offset, length);
}else{
FileChannel fileChannel =newFileInputStream(file).getChannel();
returnnewDefaultFileRegion(fileChannel, offset, length);
}
}
publicFile getFile(){return file;}
publiclong getOffset(){return offset;}
publiclong getLength(){return length;}
@Override
publicString toString(){
returnObjects.toStringHelper(this)
.add("file", file)
.add("offset", offset)
.add("length", length)
.toString();
}
}
publicfinalclassNettyManagedBufferextendsManagedBuffer{
privatefinalByteBuf buf;
publicNettyManagedBuffer(ByteBuf buf){
this.buf = buf;
}
@Override
publiclong size(){
return buf.readableBytes();
}
@Override
publicByteBuffer nioByteBuffer()throwsIOException{
return buf.nioBuffer();
}
@Override
publicInputStream createInputStream()throwsIOException{
returnnewByteBufInputStream(buf);
}
@Override
publicManagedBuffer retain(){
buf.retain();
returnthis;
}
@Override
publicManagedBuffer release(){
buf.release();
returnthis;
}
@Override
publicObject convertToNetty()throwsIOException{
return buf.duplicate();
}
@Override
publicString toString(){
returnObjects.toStringHelper(this)
.add("buf", buf)
.toString();
}
}
publicfinalclassNioManagedBufferextendsManagedBuffer{
privatefinalByteBuffer buf;
publicNioManagedBuffer(ByteBuffer buf){
this.buf = buf;
}
@Override
publiclong size(){
return buf.remaining();
}
@Override
publicByteBuffer nioByteBuffer()throwsIOException{
return buf.duplicate();
}
@Override
publicInputStream createInputStream()throwsIOException{
returnnewByteBufInputStream(Unpooled.wrappedBuffer(buf));
}
@Override
publicManagedBuffer retain(){
returnthis;
}
@Override
publicManagedBuffer release(){
returnthis;
}
@Override
publicObject convertToNetty()throwsIOException{
returnUnpooled.wrappedBuffer(buf);
}
@Override
publicString toString(){
returnObjects.toStringHelper(this)
.add("buf", buf)
.toString();
}
}