本文主要研究一下puma的ClientPositionServicejava
puma/puma/src/main/java/com/dianping/puma/biz/service/ClientPositionService.javagit
public interface ClientPositionService { List<ClientPositionEntity> findAll(); ClientPositionEntity find(String clientName); void update(ClientPositionEntity clientPositionEntity, boolean flush); void flush(); void cleanUpTestClients(); }
puma/puma/src/main/java/com/dianping/puma/biz/service/impl/ClientPositionServiceImpl.javagithub
@Service public class ClientPositionServiceImpl implements ClientPositionService { private final static Logger logger = LoggerFactory.getLogger(ClientPositionServiceImpl.class); @Autowired private ClientPositionDao clientPositionDao; private Map<String, ClientPositionEntity> positionEntityMap = new ConcurrentHashMap<String, ClientPositionEntity>(); @Override public List<ClientPositionEntity> findAll() { return clientPositionDao.findAll(); } @Override public ClientPositionEntity find(String clientName) { return clientPositionDao.findByClientName(clientName); } @Override public void update(ClientPositionEntity clientPositionEntity, boolean flush) { if (flush) { positionEntityMap.remove(clientPositionEntity.getClientName()); insertOrUpdate(clientPositionEntity); } else { positionEntityMap.put(clientPositionEntity.getClientName(), clientPositionEntity); } } @Scheduled(fixedDelay = 5000) public void flush() { Set<String> keys = positionEntityMap.keySet(); for (String key : keys) { ClientPositionEntity entity = positionEntityMap.remove(key); if (entity == null) { continue; } insertOrUpdate(entity); } } private void insertOrUpdate(ClientPositionEntity entity) { try { entity.setUpdateTime(new Date()); int updateRow = clientPositionDao.update(entity); if (updateRow == 0) { clientPositionDao.insert(entity); } } catch (Exception e) { logger.error(e.getMessage(), e); } } public void cleanUpTestClients() { List<ClientPositionEntity> clients = clientPositionDao.findOldTestClient(); for (ClientPositionEntity entity : clients) { clientPositionDao.delete(entity.getId()); } } }
puma/biz/src/main/java/com/dianping/puma/biz/dao/ClientPositionDao.javasql
public interface ClientPositionDao { List<ClientPositionEntity> findAll(); ClientPositionEntity findByClientName(String clientName); int update(ClientPositionEntity entity); int insert(ClientPositionEntity entity); int delete(int id); List<ClientPositionEntity> findOldTestClient(); }
puma/biz/src/main/java/com/dianping/puma/biz/entity/ClientPositionEntity.javamybatis
public class ClientPositionEntity extends BaseEntity { private String clientName; private String binlogFile; private long binlogPosition; private long serverId; private int eventIndex; private long timestamp; //...... }
puma/biz/src/main/resources/sqlmap/ClientPositionMapper.xmlapp
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.dianping.puma.biz.dao.ClientPositionDao"> <select id="findAll" resultType="ClientPositionEntity"> SELECT * FROM ClientPosition </select> <select id="findByClientName" resultType="ClientPositionEntity"> SELECT * FROM ClientPosition where ClientName = #{clientName} </select> <update id="update" parameterType="ClientPositionEntity"> update ClientPosition set BinlogFile = #{binlogFile}, BinlogPosition = #{binlogPosition}, ServerId = #{serverId}, EventIndex = #{eventIndex}, Timestamp = #{timestamp}, UpdateTime = #{updateTime} where ClientName = #{clientName} </update> <insert id="insert" parameterType="ClientPositionEntity" useGeneratedKeys="true" keyProperty="id"> insert into ClientPosition ( ClientName, BinlogFile, BinlogPosition, ServerId, EventIndex, Timestamp, UpdateTime ) values ( #{clientName}, #{binlogFile}, #{binlogPosition}, #{serverId}, #{eventIndex}, #{timestamp}, #{updateTime} ) </insert> <select id="findOldTestClient" resultType="ClientPositionEntity"> select * from ClientPosition where UpdateTime < NOW() - INTERVAL 10 DAY and ClientName like '%test' </select> <delete id="delete" parameterType="int"> delete from ClientPosition where id = #{id} </delete> </mapper>
ClientPositionService定義了findAll、find、update、flush、cleanUpTestClients方法;ClientPositionServiceImpl實現了ClientPositionService接口,其findAll方法執行clientPositionDao.findAll();其find方法執行clientPositionDao.findByClientName(clientName);其update方法在flush爲true時執行positionEntityMap.remove及insertOrUpdate,在flush爲false時執行positionEntityMap.put;其flush方法遍歷positionEntityMap,挨個移除,而後執行insertOrUpdate(entity)ide