1、前言java
在完成了前面的理論學習後,如今能夠從源碼角度來解析Zookeeper的細節,首先筆者想從序列化入手,由於在網絡通訊、數據存儲中都用到了序列化,下面開始分析。express
2、序列化apache
序列化主要在zookeeper.jute包中,其中涉及的主要接口以下數組
· InputArchive網絡
· OutputArchiveapp
· Indexless
· Recordide
2.1 InputArchive函數
其是全部反序列化器都須要實現的接口,其方法以下 學習
public interface InputArchive { // 讀取byte類型 public byte readByte(String tag) throws IOException; // 讀取boolean類型 public boolean readBool(String tag) throws IOException; // 讀取int類型 public int readInt(String tag) throws IOException; // 讀取long類型 public long readLong(String tag) throws IOException; // 讀取float類型 public float readFloat(String tag) throws IOException; // 讀取double類型 public double readDouble(String tag) throws IOException; // 讀取String類型 public String readString(String tag) throws IOException; // 經過緩衝方式讀取 public byte[] readBuffer(String tag) throws IOException; // 開始讀取記錄 public void readRecord(Record r, String tag) throws IOException; // 開始讀取記錄 public void startRecord(String tag) throws IOException; // 結束讀取記錄 public void endRecord(String tag) throws IOException; // 開始讀取向量 public Index startVector(String tag) throws IOException; // 結束讀取向量 public void endVector(String tag) throws IOException; // 開始讀取Map public Index startMap(String tag) throws IOException; // 結束讀取Map public void endMap(String tag) throws IOException; }
InputArchive的類結構以下
1. BinaryInputArchive
public class BinaryInputArchive implements InputArchive { // DataInput接口,用於從二進制流中讀取字節 private DataInput in; // 靜態方法,用於獲取Archive static public BinaryInputArchive getArchive(InputStream strm) { return new BinaryInputArchive(new DataInputStream(strm)); } // 內部類,對應BinaryInputArchive索引 static private class BinaryIndex implements Index { private int nelems; BinaryIndex(int nelems) { this.nelems = nelems; } public boolean done() { return (nelems <= 0); } public void incr() { nelems--; } } /** Creates a new instance of BinaryInputArchive */ // 構造函數 public BinaryInputArchive(DataInput in) { this.in = in; } // 讀取字節 public byte readByte(String tag) throws IOException { return in.readByte(); } // 讀取boolean類型 public boolean readBool(String tag) throws IOException { return in.readBoolean(); } // 讀取int類型 public int readInt(String tag) throws IOException { return in.readInt(); } // 讀取long類型 public long readLong(String tag) throws IOException { return in.readLong(); } // 讀取float類型 public float readFloat(String tag) throws IOException { return in.readFloat(); } // 讀取double類型 public double readDouble(String tag) throws IOException { return in.readDouble(); } // 讀取String類型 public String readString(String tag) throws IOException { // 肯定長度 int len = in.readInt(); if (len == -1) return null; byte b[] = new byte[len]; // 從輸入流中讀取一些字節,並將它們存儲在緩衝區數組b中 in.readFully(b); return new String(b, "UTF8"); } // 最大緩衝值 static public final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff); // 讀取緩衝 public byte[] readBuffer(String tag) throws IOException { // 肯定長度 int len = readInt(tag); if (len == -1) return null; // Since this is a rough sanity check, add some padding to maxBuffer to // make up for extra fields, etc. (otherwise e.g. clients may be able to // write buffers larger than we can read from disk!) if (len < 0 || len > maxBuffer + 1024) { // 檢查長度是否合理 throw new IOException("Unreasonable length = " + len); } byte[] arr = new byte[len]; // 從輸入流中讀取一些字節,並將它們存儲在緩衝區數組arr中 in.readFully(arr); return arr; } // 讀取記錄 public void readRecord(Record r, String tag) throws IOException { // 反序列化,動態調用 r.deserialize(this, tag); } // 開始讀取記錄,實現爲空 public void startRecord(String tag) throws IOException {} // 結束讀取記錄,實現爲空 public void endRecord(String tag) throws IOException {} // 開始讀取向量 public Index startVector(String tag) throws IOException { // 肯定長度 int len = readInt(tag); if (len == -1) { return null; } // 返回索引 return new BinaryIndex(len); } // 結束讀取向量 public void endVector(String tag) throws IOException {} // 開始讀取Map public Index startMap(String tag) throws IOException { // 返回索引 return new BinaryIndex(readInt(tag)); } // 結束讀取Map,實現爲空 public void endMap(String tag) throws IOException {} }
2. CsvInputArchive
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.jute; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PushbackReader; import java.io.UnsupportedEncodingException; /** * */ class CsvInputArchive implements InputArchive { // 推回字節流 private PushbackReader stream; // 內部類,對應CsvInputArchive索引 private class CsvIndex implements Index { public boolean done() { char c = '\0'; try { c = (char) stream.read(); stream.unread(c); } catch (IOException ex) { } return (c == '}') ? true : false; } public void incr() {} } // 私有方法,拋出異常 private void throwExceptionOnError(String tag) throws IOException { throw new IOException("Error deserializing "+tag); } // 私有方法,讀取字段 private String readField(String tag) throws IOException { try { StringBuilder buf = new StringBuilder(); while (true) { // 讀取並轉化爲字符 char c = (char) stream.read(); switch (c) { // 判斷字符 case ',': // 讀取字段完成,可直接返回 return buf.toString(); case '}': case '\n': case '\r': // 推回緩衝區 stream.unread(c); return buf.toString(); default: // 默認添加至buf中 buf.append(c); } } } catch (IOException ex) { throw new IOException("Error reading "+tag); } } // 獲取CsvInputArchive static CsvInputArchive getArchive(InputStream strm) throws UnsupportedEncodingException { return new CsvInputArchive(strm); } /** Creates a new instance of CsvInputArchive */ // 構造函數 public CsvInputArchive(InputStream in) throws UnsupportedEncodingException { // 初始化stream屬性 stream = new PushbackReader(new InputStreamReader(in, "UTF-8")); } // 讀取byte類型 public byte readByte(String tag) throws IOException { return (byte) readLong(tag); } // 讀取boolean類型 public boolean readBool(String tag) throws IOException { String sval = readField(tag); return "T".equals(sval) ? true : false; } // 讀取int類型 public int readInt(String tag) throws IOException { return (int) readLong(tag); } // 讀取long類型 public long readLong(String tag) throws IOException { // 讀取字段 String sval = readField(tag); try { // 轉化 long lval = Long.parseLong(sval); return lval; } catch (NumberFormatException ex) { throw new IOException("Error deserializing "+tag); } } // 讀取float類型 public float readFloat(String tag) throws IOException { return (float) readDouble(tag); } // 讀取double類型 public double readDouble(String tag) throws IOException { // 讀取字段 String sval = readField(tag); try { // 轉化 double dval = Double.parseDouble(sval); return dval; } catch (NumberFormatException ex) { throw new IOException("Error deserializing "+tag); } } // 讀取String類型 public String readString(String tag) throws IOException { // 讀取字段 String sval = readField(tag); // 轉化 return Utils.fromCSVString(sval); } // 讀取緩衝類型 public byte[] readBuffer(String tag) throws IOException { // 讀取字段 String sval = readField(tag); // 轉化 return Utils.fromCSVBuffer(sval); } // 讀取記錄 public void readRecord(Record r, String tag) throws IOException { // 反序列化 r.deserialize(this, tag); } // 開始讀取記錄 public void startRecord(String tag) throws IOException { if (tag != null && !"".equals(tag)) { // 讀取並轉化爲字符 char c1 = (char) stream.read(); // 讀取並轉化爲字符 char c2 = (char) stream.read(); if (c1 != 's' || c2 != '{') { // 進行判斷 throw new IOException("Error deserializing "+tag); } } } // 結束讀取記錄 public void endRecord(String tag) throws IOException { // 讀取並轉化爲字符 char c = (char) stream.read(); if (tag == null || "".equals(tag)) { if (c != '\n' && c != '\r') { // 進行判斷 throw new IOException("Error deserializing record."); } else { return; } } if (c != '}') { // 進行判斷 throw new IOException("Error deserializing "+tag); } // 讀取並轉化爲字符 c = (char) stream.read(); if (c != ',') { // 推回緩衝區 stream.unread(c); } return; } // 開始讀取vector public Index startVector(String tag) throws IOException { char c1 = (char) stream.read(); char c2 = (char) stream.read(); if (c1 != 'v' || c2 != '{') { throw new IOException("Error deserializing "+tag); } return new CsvIndex(); } // 結束讀取vector public void endVector(String tag) throws IOException { char c = (char) stream.read(); if (c != '}') { throw new IOException("Error deserializing "+tag); } c = (char) stream.read(); if (c != ',') { stream.unread(c); } return; } // 開始讀取Map public Index startMap(String tag) throws IOException { char c1 = (char) stream.read(); char c2 = (char) stream.read(); if (c1 != 'm' || c2 != '{') { throw new IOException("Error deserializing "+tag); } return new CsvIndex(); } // 結束讀取Map public void endMap(String tag) throws IOException { char c = (char) stream.read(); if (c != '}') { throw new IOException("Error deserializing "+tag); } c = (char) stream.read(); if (c != ',') { stream.unread(c); } return; } }
3. XmlInputArchive
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.jute; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParserFactory; import org.xml.sax.Attributes; import org.xml.sax.SAXException; import org.xml.sax.helpers.DefaultHandler; /** * */ class XmlInputArchive implements InputArchive { // 內部類,值(包含類型和值) static private class Value { private String type; private StringBuffer sb; public Value(String t) { type = t; sb = new StringBuffer(); } // 添加chars public void addChars(char[] buf, int offset, int len) { sb.append(buf, offset, len); } // 返回value public String getValue() { return sb.toString(); } // 返回type public String getType() { return type; } } // 內部類,XML解析器 private static class XMLParser extends DefaultHandler { private boolean charsValid = false; private ArrayList<Value> valList; private XMLParser(ArrayList<Value> vlist) { valList = vlist; } // 文檔開始,空的實現 public void startDocument() throws SAXException {} // 文檔結束,空的實現 public void endDocument() throws SAXException {} // 開始解析元素 public void startElement(String ns, String sname, String qname, Attributes attrs) throws SAXException { // charsValid = false; if ("boolean".equals(qname) || // boolean類型 "i4".equals(qname) || // 四個字節 "int".equals(qname) || // int類型 "string".equals(qname) || // String類型 "double".equals(qname) || // double類型 "ex:i1".equals(qname) || // 一個字節 "ex:i8".equals(qname) || // 八個字節 "ex:float".equals(qname)) { // 基本類型 // charsValid = true; // 添加至列表 valList.add(new Value(qname)); } else if ("struct".equals(qname) || "array".equals(qname)) { // 結構體或數組類型 // 添加至列表 valList.add(new Value(qname)); } } // 結束解析元素 public void endElement(String ns, String sname, String qname) throws SAXException { charsValid = false; if ("struct".equals(qname) || "array".equals(qname)) { // 結構體或數組類型 // 添加至列表 valList.add(new Value("/"+qname)); } } public void characters(char buf[], int offset, int len) throws SAXException { if (charsValid) { // 是否合法 // 從列表獲取value Value v = valList.get(valList.size()-1); // 將buf添加至value v.addChars(buf, offset,len); } } } // 內部類,對應XmlInputArchive private class XmlIndex implements Index { // 是否已經完成 public boolean done() { // 根據索引獲取value Value v = valList.get(vIdx); if ("/array".equals(v.getType())) { // 類型爲/array // 設置開索引值爲null valList.set(vIdx, null); // 增長索引值 vIdx++; return true; } else { return false; } } // 增長索引值,空的實現 public void incr() {} } // 值列表 private ArrayList<Value> valList; // 值長度 private int vLen; // 索引 private int vIdx; // 下一項 private Value next() throws IOException { if (vIdx < vLen) { // 當前索引值小於長度 // 獲取值 Value v = valList.get(vIdx); // 設置索引值爲null valList.set(vIdx, null); // 增長索引值 vIdx++; return v; } else { throw new IOException("Error in deserialization."); } } // 獲取XmlInputArchive static XmlInputArchive getArchive(InputStream strm) throws ParserConfigurationException, SAXException, IOException { return new XmlInputArchive(strm); } /** Creates a new instance of BinaryInputArchive */ // 構造函數 public XmlInputArchive(InputStream in) throws ParserConfigurationException, SAXException, IOException { // 初始化XmlInputArchive的相應字段 valList = new ArrayList<Value>(); DefaultHandler handler = new XMLParser(valList); SAXParserFactory factory = SAXParserFactory.newInstance(); SAXParser parser = factory.newSAXParser(); parser.parse(in, handler); vLen = valList.size(); vIdx = 0; } // 讀取byte類型 public byte readByte(String tag) throws IOException { Value v = next(); if (!"ex:i1".equals(v.getType())) { throw new IOException("Error deserializing "+tag+"."); } return Byte.parseByte(v.getValue()); } // 讀取Boolean類型 public boolean readBool(String tag) throws IOException { Value v = next(); if (!"boolean".equals(v.getType())) { throw new IOException("Error deserializing "+tag+"."); } return "1".equals(v.getValue()); } // 讀取int類型 public int readInt(String tag) throws IOException { Value v = next(); if (!"i4".equals(v.getType()) && !"int".equals(v.getType())) { throw new IOException("Error deserializing "+tag+"."); } return Integer.parseInt(v.getValue()); } // 讀取long類型 public long readLong(String tag) throws IOException { Value v = next(); if (!"ex:i8".equals(v.getType())) { throw new IOException("Error deserializing "+tag+"."); } return Long.parseLong(v.getValue()); } // 讀取float類型 public float readFloat(String tag) throws IOException { Value v = next(); if (!"ex:float".equals(v.getType())) { throw new IOException("Error deserializing "+tag+"."); } return Float.parseFloat(v.getValue()); } // 讀取double類型 public double readDouble(String tag) throws IOException { Value v = next(); if (!"double".equals(v.getType())) { throw new IOException("Error deserializing "+tag+"."); } return Double.parseDouble(v.getValue()); } // 讀取String類型 public String readString(String tag) throws IOException { Value v = next(); if (!"string".equals(v.getType())) { throw new IOException("Error deserializing "+tag+"."); } return Utils.fromXMLString(v.getValue()); } // 讀取Buffer類型 public byte[] readBuffer(String tag) throws IOException { Value v = next(); if (!"string".equals(v.getType())) { throw new IOException("Error deserializing "+tag+"."); } return Utils.fromXMLBuffer(v.getValue()); } // 讀取Record類型 public void readRecord(Record r, String tag) throws IOException { r.deserialize(this, tag); } // 開始讀取Record public void startRecord(String tag) throws IOException { Value v = next(); if (!"struct".equals(v.getType())) { throw new IOException("Error deserializing "+tag+"."); } } // 結束讀取Record public void endRecord(String tag) throws IOException { Value v = next(); if (!"/struct".equals(v.getType())) { throw new IOException("Error deserializing "+tag+"."); } } // 開始讀取vector public Index startVector(String tag) throws IOException { Value v = next(); if (!"array".equals(v.getType())) { throw new IOException("Error deserializing "+tag+"."); } return new XmlIndex(); } // 結束讀取vector public void endVector(String tag) throws IOException {} // 開始讀取Map public Index startMap(String tag) throws IOException { return startVector(tag); } // 中止讀取Map public void endMap(String tag) throws IOException { endVector(tag); } }
2.2 OutputArchive
其是全部序列化器都須要實現此接口,其方法以下。
public interface OutputArchive { // 寫Byte類型 public void writeByte(byte b, String tag) throws IOException; // 寫boolean類型 public void writeBool(boolean b, String tag) throws IOException; // 寫int類型 public void writeInt(int i, String tag) throws IOException; // 寫long類型 public void writeLong(long l, String tag) throws IOException; // 寫float類型 public void writeFloat(float f, String tag) throws IOException; // 寫double類型 public void writeDouble(double d, String tag) throws IOException; // 寫String類型 public void writeString(String s, String tag) throws IOException; // 寫Buffer類型 public void writeBuffer(byte buf[], String tag) throws IOException; // 寫Record類型 public void writeRecord(Record r, String tag) throws IOException; // 開始寫Record public void startRecord(Record r, String tag) throws IOException; // 結束寫Record public void endRecord(Record r, String tag) throws IOException; // 開始寫Vector public void startVector(List v, String tag) throws IOException; // 結束寫Vector public void endVector(List v, String tag) throws IOException; // 開始寫Map public void startMap(TreeMap v, String tag) throws IOException; // 結束寫Map public void endMap(TreeMap v, String tag) throws IOException; }
OutputArchive的類結構以下
1. BinaryOutputArchive
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.jute; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.List; import java.util.TreeMap; /** * */ public class BinaryOutputArchive implements OutputArchive { // 字節緩衝 private ByteBuffer bb = ByteBuffer.allocate(1024); // DataInput接口,用於從二進制流中讀取字節 private DataOutput out; // 靜態方法,用於獲取Archive public static BinaryOutputArchive getArchive(OutputStream strm) { return new BinaryOutputArchive(new DataOutputStream(strm)); } /** Creates a new instance of BinaryOutputArchive */ // 構造函數 public BinaryOutputArchive(DataOutput out) { this.out = out; } // 寫Byte類型 public void writeByte(byte b, String tag) throws IOException { out.writeByte(b); } // 寫boolean類型 public void writeBool(boolean b, String tag) throws IOException { out.writeBoolean(b); } // 寫int類型 public void writeInt(int i, String tag) throws IOException { out.writeInt(i); } // 寫long類型 public void writeLong(long l, String tag) throws IOException { out.writeLong(l); } // 寫float類型 public void writeFloat(float f, String tag) throws IOException { out.writeFloat(f); } // 寫double類型 public void writeDouble(double d, String tag) throws IOException { out.writeDouble(d); } /** * create our own char encoder to utf8. This is faster * then string.getbytes(UTF8). * @param s the string to encode into utf8 * @return utf8 byte sequence. */ // 將String類型轉化爲ByteBuffer類型 final private ByteBuffer stringToByteBuffer(CharSequence s) { // 清空ByteBuffer bb.clear(); // s的長度 final int len = s.length(); for (int i = 0; i < len; i++) { // 遍歷s if (bb.remaining() < 3) { // ByteBuffer剩餘大小小於3 // 再進行一次分配(擴大一倍) ByteBuffer n = ByteBuffer.allocate(bb.capacity() << 1); // 切換方式 bb.flip(); // 寫入bb n.put(bb); bb = n; } char c = s.charAt(i); if (c < 0x80) { // 小於128,直接寫入 bb.put((byte) c); } else if (c < 0x800) { // 小於2048,則進行相應處理 bb.put((byte) (0xc0 | (c >> 6))); bb.put((byte) (0x80 | (c & 0x3f))); } else { // 大於2048,則進行相應處理 bb.put((byte) (0xe0 | (c >> 12))); bb.put((byte) (0x80 | ((c >> 6) & 0x3f))); bb.put((byte) (0x80 | (c & 0x3f))); } } // 切換方式 bb.flip(); return bb; } // 寫String類型 public void writeString(String s, String tag) throws IOException { if (s == null) { writeInt(-1, "len"); return; } ByteBuffer bb = stringToByteBuffer(s); writeInt(bb.remaining(), "len"); out.write(bb.array(), bb.position(), bb.limit()); } // 寫Buffer類型 public void writeBuffer(byte barr[], String tag) throws IOException { if (barr == null) { out.writeInt(-1); return; } out.writeInt(barr.length); out.write(barr); } // 寫Record類型 public void writeRecord(Record r, String tag) throws IOException { r.serialize(this, tag); } // 開始寫Record public void startRecord(Record r, String tag) throws IOException {} // 結束寫Record public void endRecord(Record r, String tag) throws IOException {} // 開始寫Vector public void startVector(List v, String tag) throws IOException { if (v == null) { writeInt(-1, tag); return; } writeInt(v.size(), tag); } // 結束寫Vector public void endVector(List v, String tag) throws IOException {} // 開始寫Map public void startMap(TreeMap v, String tag) throws IOException { writeInt(v.size(), tag); } // 結束寫Map public void endMap(TreeMap v, String tag) throws IOException {} }
2. CsvOutputArchive
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.jute; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.TreeMap; /** * */ public class CsvOutputArchive implements OutputArchive { // PrintStream爲其餘輸出流添加了功能,使它們可以方便地打印各類數據值表示形式 private PrintStream stream; // 默認爲第一次 private boolean isFirst = true; // 獲取Archive static CsvOutputArchive getArchive(OutputStream strm) throws UnsupportedEncodingException { return new CsvOutputArchive(strm); } // 私有函數,拋出異常 private void throwExceptionOnError(String tag) throws IOException { if (stream.checkError()) { throw new IOException("Error serializing "+tag); } } // 私有函數,除第一次外,均打印"," private void printCommaUnlessFirst() { if (!isFirst) { stream.print(","); } isFirst = false; } /** Creates a new instance of CsvOutputArchive */ // 構造函數 public CsvOutputArchive(OutputStream out) throws UnsupportedEncodingException { stream = new PrintStream(out, true, "UTF-8"); } // 寫Byte類型 public void writeByte(byte b, String tag) throws IOException { writeLong((long)b, tag); } // 寫boolean類型 public void writeBool(boolean b, String tag) throws IOException { // 打印"," printCommaUnlessFirst(); String val = b ? "T" : "F"; // 打印值 stream.print(val); // 拋出異常 throwExceptionOnError(tag); } // 寫int類型 public void writeInt(int i, String tag) throws IOException { writeLong((long)i, tag); } // 寫long類型 public void writeLong(long l, String tag) throws IOException { printCommaUnlessFirst(); stream.print(l); throwExceptionOnError(tag); } // 寫float類型 public void writeFloat(float f, String tag) throws IOException { writeDouble((double)f, tag); } // 寫double類型 public void writeDouble(double d, String tag) throws IOException { printCommaUnlessFirst(); stream.print(d); throwExceptionOnError(tag); } // 寫String類型 public void writeString(String s, String tag) throws IOException { printCommaUnlessFirst(); stream.print(Utils.toCSVString(s)); throwExceptionOnError(tag); } // 寫Buffer類型 public void writeBuffer(byte buf[], String tag) throws IOException { printCommaUnlessFirst(); stream.print(Utils.toCSVBuffer(buf)); throwExceptionOnError(tag); } // 寫Record類型 public void writeRecord(Record r, String tag) throws IOException { if (r == null) { return; } r.serialize(this, tag); } // 開始寫Record public void startRecord(Record r, String tag) throws IOException { if (tag != null && !"".equals(tag)) { printCommaUnlessFirst(); stream.print("s{"); isFirst = true; } } // 結束寫Record public void endRecord(Record r, String tag) throws IOException { if (tag == null || "".equals(tag)) { stream.print("\n"); isFirst = true; } else { stream.print("}"); isFirst = false; } } // 開始寫Vector public void startVector(List v, String tag) throws IOException { printCommaUnlessFirst(); stream.print("v{"); isFirst = true; } // 結束寫Vector public void endVector(List v, String tag) throws IOException { stream.print("}"); isFirst = false; } // 開始寫Map public void startMap(TreeMap v, String tag) throws IOException { printCommaUnlessFirst(); stream.print("m{"); isFirst = true; } // 結束寫Map public void endMap(TreeMap v, String tag) throws IOException { stream.print("}"); isFirst = false; } }
3. XmlOutputArchive
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.jute; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; import java.util.List; import java.util.Stack; import java.util.TreeMap; /** * */ class XmlOutputArchive implements OutputArchive { // PrintStream爲其餘輸出流添加了功能,使它們可以方便地打印各類數據值表示形式 private PrintStream stream; // 縮進個數 private int indent = 0; // 棧結構 private Stack<String> compoundStack; // 存放縮進 private void putIndent() { StringBuilder sb = new StringBuilder(""); for (int idx = 0; idx < indent; idx++) { sb.append(" "); } stream.print(sb.toString()); } // 添加縮進 private void addIndent() { indent++; } // 減小縮進 private void closeIndent() { indent--; } // 打印文件頭格式 private void printBeginEnvelope(String tag) { if (!compoundStack.empty()) { String s = compoundStack.peek(); if ("struct".equals(s)) { putIndent(); stream.print("<member>\n"); addIndent(); putIndent(); stream.print("<name>"+tag+"</name>\n"); putIndent(); stream.print("<value>"); } else if ("vector".equals(s)) { stream.print("<value>"); } else if ("map".equals(s)) { stream.print("<value>"); } } else { stream.print("<value>"); } } // 打印文件尾格式 private void printEndEnvelope(String tag) { if (!compoundStack.empty()) { String s = compoundStack.peek(); if ("struct".equals(s)) { stream.print("</value>\n"); closeIndent(); putIndent(); stream.print("</member>\n"); } else if ("vector".equals(s)) { stream.print("</value>\n"); } else if ("map".equals(s)) { stream.print("</value>\n"); } } else { stream.print("</value>\n"); } } // private void insideVector(String tag) { printBeginEnvelope(tag); compoundStack.push("vector"); } private void outsideVector(String tag) throws IOException { String s = compoundStack.pop(); if (!"vector".equals(s)) { throw new IOException("Error serializing vector."); } printEndEnvelope(tag); } private void insideMap(String tag) { printBeginEnvelope(tag); compoundStack.push("map"); } private void outsideMap(String tag) throws IOException { String s = compoundStack.pop(); if (!"map".equals(s)) { throw new IOException("Error serializing map."); } printEndEnvelope(tag); } private void insideRecord(String tag) { printBeginEnvelope(tag); compoundStack.push("struct"); } private void outsideRecord(String tag) throws IOException { String s = compoundStack.pop(); if (!"struct".equals(s)) { throw new IOException("Error serializing record."); } printEndEnvelope(tag); } // 獲取Archive static XmlOutputArchive getArchive(OutputStream strm) { return new XmlOutputArchive(strm); } /** Creates a new instance of XmlOutputArchive */ // 構造函數 public XmlOutputArchive(OutputStream out) { stream = new PrintStream(out); compoundStack = new Stack<String>(); } // 寫Byte類型 public void writeByte(byte b, String tag) throws IOException { printBeginEnvelope(tag); stream.print("<ex:i1>"); stream.print(Byte.toString(b)); stream.print("</ex:i1>"); printEndEnvelope(tag); } // 寫boolean類型 public void writeBool(boolean b, String tag) throws IOException { printBeginEnvelope(tag); stream.print("<boolean>"); stream.print(b ? "1" : "0"); stream.print("</boolean>"); printEndEnvelope(tag); } // 寫int類型 public void writeInt(int i, String tag) throws IOException { printBeginEnvelope(tag); stream.print("<i4>"); stream.print(Integer.toString(i)); stream.print("</i4>"); printEndEnvelope(tag); } // 寫long類型 public void writeLong(long l, String tag) throws IOException { printBeginEnvelope(tag); stream.print("<ex:i8>"); stream.print(Long.toString(l)); stream.print("</ex:i8>"); printEndEnvelope(tag); } // 寫float類型 public void writeFloat(float f, String tag) throws IOException { printBeginEnvelope(tag); stream.print("<ex:float>"); stream.print(Float.toString(f)); stream.print("</ex:float>"); printEndEnvelope(tag); } // 寫double類型 public void writeDouble(double d, String tag) throws IOException { printBeginEnvelope(tag); stream.print("<double>"); stream.print(Double.toString(d)); stream.print("</double>"); printEndEnvelope(tag); } // 寫String類型 public void writeString(String s, String tag) throws IOException { printBeginEnvelope(tag); stream.print("<string>"); stream.print(Utils.toXMLString(s)); stream.print("</string>"); printEndEnvelope(tag); } // 寫Buffer類型 public void writeBuffer(byte buf[], String tag) throws IOException { printBeginEnvelope(tag); stream.print("<string>"); stream.print(Utils.toXMLBuffer(buf)); stream.print("</string>"); printEndEnvelope(tag); } // 寫Record類型 public void writeRecord(Record r, String tag) throws IOException { r.serialize(this, tag); } // 開始寫Record類型 public void startRecord(Record r, String tag) throws IOException { insideRecord(tag); stream.print("<struct>\n"); addIndent(); } // 結束寫Record類型 public void endRecord(Record r, String tag) throws IOException { closeIndent(); putIndent(); stream.print("</struct>"); outsideRecord(tag); } // 開始寫Vector類型 public void startVector(List v, String tag) throws IOException { insideVector(tag); stream.print("<array>\n"); addIndent(); } // 結束寫Vector類型 public void endVector(List v, String tag) throws IOException { closeIndent(); putIndent(); stream.print("</array>"); outsideVector(tag); } // 開始寫Map類型 public void startMap(TreeMap v, String tag) throws IOException { insideMap(tag); stream.print("<array>\n"); addIndent(); } // 結束寫Map類型 public void endMap(TreeMap v, String tag) throws IOException { closeIndent(); putIndent(); stream.print("</array>"); outsideMap(tag); } }
2.3 Index
其用於迭代反序列化器的迭代器。
public interface Index { // 是否已經完成 public boolean done(); // 下一項 public void incr(); }
Index的類結構以下
1. BinaryIndex
static private class BinaryIndex implements Index { // 元素個數 private int nelems; // 構造函數 BinaryIndex(int nelems) { this.nelems = nelems; } // 是否已經完成 public boolean done() { return (nelems <= 0); } // 移動一項 public void incr() { nelems--; } }
2. CsxIndex
private class CsvIndex implements Index { // 是否已經完成 public boolean done() { char c = '\0'; try { // 讀取字符 c = (char) stream.read(); // 推回緩衝區 stream.unread(c); } catch (IOException ex) { } return (c == '}') ? true : false; } // 什麼都不作 public void incr() {} }
3. XmlIndex
private class XmlIndex implements Index { // 是否已經完成 public boolean done() { // 根據索引獲取值 Value v = valList.get(vIdx); if ("/array".equals(v.getType())) { // 判斷是否值的類型是否爲/array // 設置索引的值 valList.set(vIdx, null); // 索引加1 vIdx++; return true; } else { return false; } } // 什麼都不作 public void incr() {} }
2.4 Record
全部用於網絡傳輸或者本地存儲的類型都實現該接口,其方法以下
public interface Record { // 序列化 public void serialize(OutputArchive archive, String tag) throws IOException; // 反序列化 public void deserialize(InputArchive archive, String tag) throws IOException; }
全部的實現類都須要實現seriallize和deserialize方法。
3、示例
下面經過一個示例來理解OutputArchive和InputArchive的搭配使用。
package com.leesf.zookeeper_samples; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Set; import java.util.TreeMap; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Index; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; public class ArchiveTest { public static void main( String[] args ) throws IOException { String path = "F:\\test.txt"; // write operation OutputStream outputStream = new FileOutputStream(new File(path)); BinaryOutputArchive binaryOutputArchive = BinaryOutputArchive.getArchive(outputStream); binaryOutputArchive.writeBool(true, "boolean"); byte[] bytes = "leesf".getBytes(); binaryOutputArchive.writeBuffer(bytes, "buffer"); binaryOutputArchive.writeDouble(13.14, "double"); binaryOutputArchive.writeFloat(5.20f, "float"); binaryOutputArchive.writeInt(520, "int"); Person person = new Person(25, "leesf"); binaryOutputArchive.writeRecord(person, "leesf"); TreeMap<String, Integer> map = new TreeMap<String, Integer>(); map.put("leesf", 25); map.put("dyd", 25); Set<String> keys = map.keySet(); binaryOutputArchive.startMap(map, "map"); int i = 0; for (String key: keys) { String tag = i + ""; binaryOutputArchive.writeString(key, tag); binaryOutputArchive.writeInt(map.get(key), tag); i++; } binaryOutputArchive.endMap(map, "map"); // read operation InputStream inputStream = new FileInputStream(new File(path)); BinaryInputArchive binaryInputArchive = BinaryInputArchive.getArchive(inputStream); System.out.println(binaryInputArchive.readBool("boolean")); System.out.println(new String(binaryInputArchive.readBuffer("buffer"))); System.out.println(binaryInputArchive.readDouble("double")); System.out.println(binaryInputArchive.readFloat("float")); System.out.println(binaryInputArchive.readInt("int")); Person person2 = new Person(); binaryInputArchive.readRecord(person2, "leesf"); System.out.println(person2); Index index = binaryInputArchive.startMap("map"); int j = 0; while (!index.done()) { String tag = j + ""; System.out.println("key = " + binaryInputArchive.readString(tag) + ", value = " + binaryInputArchive.readInt(tag)); index.incr(); j++; } } static class Person implements Record { private int age; private String name; public Person() { } public Person(int age, String name) { this.age = age; this.name = name; } public void serialize(OutputArchive archive, String tag) throws IOException { archive.startRecord(this, tag); archive.writeInt(age, "age"); archive.writeString(name, "name"); archive.endRecord(this, tag); } public void deserialize(InputArchive archive, String tag) throws IOException { archive.startRecord(tag); age = archive.readInt("age"); name = archive.readString("name"); archive.endRecord(tag); } public String toString() { return "age = " + age + ", name = " + name; } } }
運行結果:
true leesf 13.14 5.2 520 age = 25, name = leesf key = dyd, value = 25 key = leesf, value = 25
4、總結
本篇博文分析了序列化中涉及到的類,主要是org.zookeeper.jute包下的類,相對來講仍是相對簡單,也謝謝各位園友的觀看~