看到不少在提到hadoop的同時,都會提到avro和thrift兩個數據持久化(序列化)的項目。特別是avro,Doug Cutting 又是creator. apache官方的序列化的示例有提供java版本和python版本的,都是很是簡單的示例,結合maven很是容易上手,可是關於RPC的示例則顯得比較簡單,摸索了半天,這裏做個備忘。java
首先是pom.xmlpython
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>avro-sample</groupId> <artifactId>avro-sample</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-ipc</artifactId> <version>1.7.5</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.7.5</version> <executions> <execution> <id>schemas</id> <phase>generate-sources</phase> <goals> <goal>schema</goal> <goal>protocol</goal> <goal>idl-protocol</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> <testSourceDirectory>${project.basedir}/src/test/resources/</testSourceDirectory> <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project>
注意其中設置了插件,用於代碼自動生成的,將user.avsc和mail.avpr放到main/resources/目錄下, 而後能夠運行mvn generate-sources, 相關的protocol等類就自動生成了apache
user.avscapp
{"namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
mail.avprmaven
{"namespace": "example.proto", "protocol": "Mail", "types": [ {"name": "Message", "type": "record", "fields": [ {"name": "to", "type": "string"}, {"name": "from", "type": "string"}, {"name": "body", "type": "string"} ] } ], "messages": { "send": { "request": [{"name": "message", "type": "Message"}], "response": "string" } } }
RPCMain.javaoop
package example.avro; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.avro.ipc.SocketServer; import org.apache.avro.ipc.SocketTransceiver; import org.apache.avro.ipc.specific.SpecificRequestor; import org.apache.avro.ipc.specific.SpecificResponder; import org.apache.avro.util.Utf8; import example.proto.Mail; import example.proto.Message; import org.apache.avro.ipc.NettyServer; import org.apache.avro.ipc.NettyTransceiver; import org.apache.avro.ipc.Server; import org.apache.avro.ipc.specific.SpecificRequestor; import org.apache.avro.ipc.specific.SpecificResponder; import org.apache.avro.util.Utf8; import java.io.IOException; import java.net.InetSocketAddress; public class RPCMain { public static class MailImpl implements Mail { // in this simple example just return details of the message public Utf8 send(Message message) { System.out.println("Sending message"); return new Utf8("Sending message to " + message.getTo().toString() + " from " + message.getFrom().toString() + " with body " + message.getBody().toString()); } } private static Server server; private static void startServer() throws IOException { server = new NettyServer(new SpecificResponder(Mail.class, new MailImpl()), new InetSocketAddress(65111)); // the server implements the Mail protocol (MailImpl) } public static void main(String[] args) throws IOException { if (args.length != 3) { System.out.println("Usage: <to> <from> <body>"); System.exit(1); } System.out.println("Starting server"); // usually this would be another app, but for simplicity startServer(); System.out.println("Server started"); NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(65111)); // client code - attach to the server and send a message Mail proxy = (Mail) SpecificRequestor.getClient(Mail.class, client); System.out.println("Client built, got proxy"); // fill in the Message record and send it Message message = new Message(); message.setTo(new Utf8(args[0])); message.setFrom(new Utf8(args[1])); message.setBody(new Utf8(args[2])); System.out.println("Calling proxy.send with message: " + message.toString()); System.out.println("Result: " + proxy.send(message)); // cleanup client.close(); server.close(); } }
SpecificMain.java能夠參考官方示例ui
package example.avro; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificDatumWriter; import java.io.File; import java.io.IOException; public class SpecificMain { public static void main(String args[]) throws IOException { User user1 = new User(); user1.setName("Alyssa"); user1.setFavoriteNumber(256); // Leave favorite color null // Alternate constructor User user2 = new User("Ben", 7, "red"); // Construct via builder User user3 = User.newBuilder() .setName("Charlie") .setFavoriteColor("blue") .setFavoriteNumber(null) .build(); // Serialize user1 and user2 to disk DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); File file = new File("users.avro"); System.out.println(file.getAbsolutePath()); dataFileWriter.create(user1.getSchema(), file); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); dataFileWriter.close(); // Deserialize users from disk DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(user1.getSchema()); DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader); GenericRecord user = null; while (dataFileReader.hasNext()) { // Reuse user object by passing it to next(). This saves us from // allocating and garbage collecting many objects for files with // many items. user = dataFileReader.next(user); System.out.println(user); } } }
此外我下載了avro的python包,而後用python讀了一下用java生成的user.avro文件this
SpecificMain.pyspa
import avro.schema from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter reader = DataFileReader(open("users.avro", "r"), DatumReader()) for user in reader: print user reader.close()