package com.demo.netty;node
import org.junit.Before;
import org.junit.Test;算法
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;數據庫
public class Client {
private int port;
@Before
public void init() {
this.port=8088;
}
@Test
public void run() throws Exception {
NioEventLoopGroup boosAndWorkerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(boosAndWorkerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {bootstrap
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//設置讀取的字節大小爲1kb也就是1024字節,同時禁止緩存類加載器
ch.pipeline().addLast(new ObjectDecoder(1024,ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture sync = bootstrap.connect("127.0.0.1", port).sync();
sync.channel().closeFuture().sync();
}緩存
}服務器
package com.demo.netty;數據結構
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;socket
public class ClientHandler extends ChannelHandlerAdapter{分佈式
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.toString());
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//鏈接服務器後自動發出
Person person = new Person();
person.setName("客戶端");
person.setAge(1);
ctx.writeAndFlush(person);
}
}ide
package com.demo.netty;
import org.junit.Before;
import org.junit.Test;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class Server {
private int port;
@Before
public void init() {
this.port = 8088;
}
@Test
public void run() throws Exception {
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_SNDBUF, 1024).option(ChannelOption.SO_RCVBUF, 1024).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//發送pojo必定要調用解碼處理器再調用編碼處理器,這裏編碼處理器的參數的意思是,接收1m大小的字節,同時對類加載器進行緩存,當虛擬機緩存不足時,會自動釋放緩存,爲了防止異常碼流和解碼錯位而致使的內存溢出
ch.pipeline().addLast(new ObjectDecoder(1024*1024,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture sync = serverBootstrap.bind(port).sync();
//對Server端進行阻塞,等待服務端監聽端口關閉後,主線程才退出
sync.channel().closeFuture().sync();
}
}
package com.demo.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Person per = (Person)msg;
System.out.println(per.toString());
Person person = new Person();
person.setName("服務端");
person.setAge(1);
ctx.writeAndFlush(person);
}
}
Zookeeper是一個開源的集羣管理服務,雖然它本質上只提供兩個功能,存儲客戶端發來的數據並將其以樹形結構進行存儲,還有對保存數據節點的監聽,可是咱們能夠靠代碼設計來經過這兩個
功能來解決一系列的問題;
Zookeeper典型應用場景:
配置管理
集羣管理
發佈與訂閱
數據庫切換
分佈式日誌的收集
分佈式鎖,隊列管理等等
Zookeeper數據類型與結構:
Zookeeper的所存儲的數據結構咱們成爲Znode,它是經過路勁進行引用也就是/開頭,頂層節點就是/,而且它兼具文件和目錄兩種特色,既能夠保存數據,源信息,ACL,時間戳等數據結構,
又像目錄同樣能夠做爲路勁標識的一部分,每個節點都是Znode,每一個持久節點均可以擁有子節點(臨時節點不能擁有),Znode由三個部分組成:
stat:此爲狀態信息描述該Znode的版本,權限等信息;
data:與該Znode關聯的數據;
children:該節點下的子節點;
Zookeeper本質上數據類型只有兩種,持久節點和臨時節點,但由於它們都各自有另外一種生成方式,也就是帶有序列號的持久節點和臨時節點,因此變成了4種(序列號的生成由
Zookeeper根據保存數據時事務成功時的順序進行遞增),同時每一個節點又由於保存着版本號而擁有着不一樣的數據;
Zookeeper集羣選舉原理 Zookeeper集羣分爲三種角色,主(Leader),從(follower),obServer(Learner,不多用到,就是複製客戶端的數據,只負責讀,不須要leader主動分發任務,而且也不參與選舉) 首先,要組成Zookeeper必需要三臺或以上的服務器,第一臺開機後,首先給本身投票,以後判斷是否有其它機器,沒有就進入Looking狀態,若是有就發給其它服務器反饋信息讓它們給本身投票;第二臺同理,只有到第三臺的時候,纔會使用選舉算法根據服務器ID,越大投票份量越重(也就是咱們在建立集羣是,在zookeeper保存數據的地方新建一個myid的文件並存儲值(這個值根據你搭建集羣時給每一個服務定的值,具體百度Zookeeper集羣搭建)),邏輯時鐘(指的時每臺服務器投票的次數),每臺服務器都會把本身的ID和邏輯時鐘的保存的次數發給其它服務器,其實還有關於服務器中保存數據的版本信息之類,不過這個佔用的投票權利不大;以後一旦選舉出leader,就算後面新增的服務器,它的ID再怎麼大,都只能是follower; 選舉完成後由Leader來分發而且也只能由Leader任務給其它Follower,就算客戶端是把保存數據的請求發送給了Follower,Follower也會轉發給Leader,以後由Leader先保存這個數據,而後分派任務,讓其它Follower也所有保存這個數據,這樣就保證了數據的一致性,當有大於通常數量的集羣服務器保存了這個數據後,Leader就會返回消息給客戶端,告知其保存成功;Zookeeper增刪改都是原子性的,也就是說,只要你修改了某個節點中保存的數據,那麼相應的,這條數據的版本信息,源信息都會被作相應處理;同時Zookeeper是有事物支撐的;