本文主要研究一下puma的ClientPositionServicejava
puma/puma/src/main/java/com/dianping/puma/biz/service/ClientPositionService.javagit
public interface ClientPositionService {
List<ClientPositionEntity> findAll();
ClientPositionEntity find(String clientName);
void update(ClientPositionEntity clientPositionEntity, boolean flush);
void flush();
void cleanUpTestClients();
}
複製代碼
puma/puma/src/main/java/com/dianping/puma/biz/service/impl/ClientPositionServiceImpl.javagithub
@Service
public class ClientPositionServiceImpl implements ClientPositionService {
private final static Logger logger = LoggerFactory.getLogger(ClientPositionServiceImpl.class);
@Autowired
private ClientPositionDao clientPositionDao;
private Map<String, ClientPositionEntity> positionEntityMap = new ConcurrentHashMap<String, ClientPositionEntity>();
@Override
public List<ClientPositionEntity> findAll() {
return clientPositionDao.findAll();
}
@Override
public ClientPositionEntity find(String clientName) {
return clientPositionDao.findByClientName(clientName);
}
@Override
public void update(ClientPositionEntity clientPositionEntity, boolean flush) {
if (flush) {
positionEntityMap.remove(clientPositionEntity.getClientName());
insertOrUpdate(clientPositionEntity);
} else {
positionEntityMap.put(clientPositionEntity.getClientName(), clientPositionEntity);
}
}
@Scheduled(fixedDelay = 5000)
public void flush() {
Set<String> keys = positionEntityMap.keySet();
for (String key : keys) {
ClientPositionEntity entity = positionEntityMap.remove(key);
if (entity == null) {
continue;
}
insertOrUpdate(entity);
}
}
private void insertOrUpdate(ClientPositionEntity entity) {
try {
entity.setUpdateTime(new Date());
int updateRow = clientPositionDao.update(entity);
if (updateRow == 0) {
clientPositionDao.insert(entity);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public void cleanUpTestClients() {
List<ClientPositionEntity> clients = clientPositionDao.findOldTestClient();
for (ClientPositionEntity entity : clients) {
clientPositionDao.delete(entity.getId());
}
}
}
複製代碼
puma/biz/src/main/java/com/dianping/puma/biz/dao/ClientPositionDao.javasql
public interface ClientPositionDao {
List<ClientPositionEntity> findAll();
ClientPositionEntity findByClientName(String clientName);
int update(ClientPositionEntity entity);
int insert(ClientPositionEntity entity);
int delete(int id);
List<ClientPositionEntity> findOldTestClient();
}
複製代碼
puma/biz/src/main/java/com/dianping/puma/biz/entity/ClientPositionEntity.javabash
public class ClientPositionEntity extends BaseEntity {
private String clientName;
private String binlogFile;
private long binlogPosition;
private long serverId;
private int eventIndex;
private long timestamp;
//......
}
複製代碼
puma/biz/src/main/resources/sqlmap/ClientPositionMapper.xmlmybatis
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dianping.puma.biz.dao.ClientPositionDao">
<select id="findAll" resultType="ClientPositionEntity">
SELECT * FROM ClientPosition
</select>
<select id="findByClientName" resultType="ClientPositionEntity">
SELECT * FROM ClientPosition where ClientName = #{clientName}
</select>
<update id="update" parameterType="ClientPositionEntity">
update ClientPosition
set
BinlogFile = #{binlogFile},
BinlogPosition = #{binlogPosition},
ServerId = #{serverId},
EventIndex = #{eventIndex},
Timestamp = #{timestamp},
UpdateTime = #{updateTime}
where
ClientName = #{clientName}
</update>
<insert id="insert" parameterType="ClientPositionEntity" useGeneratedKeys="true" keyProperty="id">
insert into ClientPosition
(
ClientName,
BinlogFile,
BinlogPosition,
ServerId,
EventIndex,
Timestamp,
UpdateTime
)
values
(
#{clientName},
#{binlogFile},
#{binlogPosition},
#{serverId},
#{eventIndex},
#{timestamp},
#{updateTime}
)
</insert>
<select id="findOldTestClient" resultType="ClientPositionEntity">
select * from ClientPosition
where UpdateTime < NOW() - INTERVAL 10 DAY
and ClientName like '%test'
</select>
<delete id="delete" parameterType="int">
delete from ClientPosition where id = #{id}
</delete>
</mapper>
複製代碼
ClientPositionService定義了findAll、find、update、flush、cleanUpTestClients方法;ClientPositionServiceImpl實現了ClientPositionService接口,其findAll方法執行clientPositionDao.findAll();其find方法執行clientPositionDao.findByClientName(clientName);其update方法在flush爲true時執行positionEntityMap.remove及insertOrUpdate,在flush爲false時執行positionEntityMap.put;其flush方法遍歷positionEntityMap,挨個移除,而後執行insertOrUpdate(entity)app