車輛網項目架構設計
系統概述
本系統主要使用Spring Cloud技術,包含的模塊 Gateway(網關),Nacos(微服務註冊與發現),OpenFeign(HTTP+Restful客戶端),Hystrix(斷路器),ribbon(負載均衡),Security+OAuth2(安全認證),Kafka(消息隊列),MybatisPlus(對象關係映射),Redis(緩存數據庫),Netty等組件。html
系統架構圖
組件功能簡介
Gateway:Web服務的統一入口,進行消息的轉發和限流操做
Nacos:微服務的註冊中心和數據中心,提供服務的發現和註冊,數據的統一配置
OpenFeign:HTTP+Restfull客戶端,實現服務之間的調用
Hystrix:斷路器,實現服務的熔斷和降級
Ribbon:微服務調用的負載均衡
Security+OAuth2:提供微服務調用的認證和受權
Kafka:消息中間件,緩存終端數據,支持大併發
MybatisPlus:對象關係映射,用於訪問數據庫
Redis:數據緩存服務器,內存數據庫,併發量大
Netty:JavaNio架構,實現終端的Socket鏈接,支持更大的鏈接數
vue
項目工程結構
- obd車聯網項目
- auth 認證服務器
- cloudcore Spring cloud 核心項目
- common 工具類項目
- nettysocket 終端數據接收項目
- obd-feign-api OpenFeign接口項目
- obd-feign-client OpenFeign客戶端項目
- obd-gateway 網關
- obd-member-auth app帳戶認證中心
- obd-task 定時任務
- obd-terminal-simulator 終端模擬器
- obd-third-park 三方服務項目
- obd-zhb 真惠保項目
- 真惠保APP服務項目
- 真惠保後臺管理項目
- portal 車輛網後臺項目
- protoolanalysis 協議分析器
Authren認證服務器
- 客戶端認證配置
@EnableAuthorizationServer @Configuration public class AuthServerConfig extends AuthorizationServerConfigurerAdapter{ @Autowired AuthenticationManager authenticationManager; @Autowired RedisConnectionFactory connectionFactory; @Autowired private DataSource dataSource; @Override public void configure(ClientDetailsServiceConfigurer clients) throws Exception { clients.jdbc(dataSource); } //配置AuthorizationServer tokenServices @Override public void configure(AuthorizationServerEndpointsConfigurer endpoints)throws Exception { endpoints.tokenStore(redisTokenStore()) .accessTokenConverter(accessTokenConverter()) .authenticationManager(authenticationManager) //禁用刷新令牌 .reuseRefreshTokens(false); } //定義TokenStore 使用redis存儲token @Bean public TokenStore redisTokenStore() { RedisTokenStore redisTokenStore = new RedisTokenStore(connectionFactory); //token key生成規則 redisTokenStore.setAuthenticationKeyGenerator(oAuth2Authentication -> UUID.randomUUID().toString()); return redisTokenStore; } //token 封裝 @Bean public JwtAccessTokenConverter accessTokenConverter() { JwtAccessTokenConverter converter = new JwtAccessTokenConverter(); converter.setSigningKey("******"); return converter; } //認證請求設置 @Override public void configure(AuthorizationServerSecurityConfigurer security) throws Exception { //容許全部人請求令牌 //以驗證的客戶端才能請求check_token security.tokenKeyAccess("permitAll()") .checkTokenAccess("isAuthenticated()") .allowFormAuthenticationForClients(); } }
- 登陸帳戶認證設置
@Configuration @EnableWebSecurity public class SecurityConfig extends WebSecurityConfigurerAdapter { @Autowired private UserDetailsService userDetailsService; @Override protected void configure(AuthenticationManagerBuilder auth) throws Exception { //設置服務認證提供者 auth.authenticationProvider(authenticationProvider()); } /** * @return 封裝身份認證提供者 */ @Bean public DaoAuthenticationProvider authenticationProvider() { DaoAuthenticationProvider authenticationProvider = new DaoAuthenticationProvider(); //設置用戶加載服務類 authenticationProvider.setUserDetailsService(userDetailsService); //設置加密類 authenticationProvider.setPasswordEncoder(passwordEncoder()); return authenticationProvider; } @Bean @Override public AuthenticationManager authenticationManagerBean() throws Exception { //使用父級認證管理器 AuthenticationManager manager = super.authenticationManagerBean(); return manager; } @Override protected void configure(HttpSecurity http) throws Exception { //容許訪問/oauth受權接口 http.csrf().disable() //設置會話管理器,不是用HttpSession .sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS) .and() .requestMatchers().anyRequest() .and() .formLogin().permitAll() .and() .authorizeRequests() //調用認證接口 不須要認證 .antMatchers("/oauth/*").permitAll() .and(); } //配置密碼解碼器 @Bean public BCryptPasswordEncoder passwordEncoder() { return new MyPasswordEncoder(); } }
- 用戶加載服務類設置
@Service public class MyUserDetailsService implements UserDetailsService { @Autowired private SUserInfoMapper userInfoMapper; @Override public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException { QueryWrapper<SUserInfo> qw = new QueryWrapper<SUserInfo>(); qw.eq("userName", username); qw.ne("status", 9); SUserInfo sysUser = userInfoMapper.selectOne(qw); if(sysUser == null) { throw new UsernameNotFoundException("用戶不存在"); } User user = new User(); user.setSysUser(sysUser); //不提供受權 user.setAuthorities(new ArrayList<>()); return user; } }
後臺服務器配置
- 資源服務器設置
@Configuration @EnableResourceServer//開啓資源服務器 @EnableGlobalMethodSecurity(prePostEnabled = true)//開啓方法級別的校驗 https://www.cnblogs.com/felordcn/p/12142497.html public class ResourceServerConfig extends ResourceServerConfigurerAdapter{ @Autowired RestTemplate resourceRestTemplate; //本地受權服務 @Autowired MyLocalUserAuthoritiesService userAuthoritiesService; @Override public void configure(ResourceServerSecurityConfigurer resources) { resources .tokenStore(new JwtTokenStore(accessTokenConverter())) .stateless(true); //配置RemoteTokenServices, 用於向AuthorizationServer驗證令牌 MyRemoteTokenServices tokenServices = new MyRemoteTokenServices(userAuthoritiesService); tokenServices.setAccessTokenConverter(accessTokenConverter()); //爲restTemplate配置異常處理器,忽略400錯誤 resourceRestTemplate.setErrorHandler(new DefaultResponseErrorHandler() { @Override //忽略 400 public void handleError(ClientHttpResponse response) throws IOException { if(response.getRawStatusCode() != 400) { super.handleError(response); } } }); tokenServices.setRestTemplate(resourceRestTemplate); //設置認證服務器地址 tokenServices.setCheckTokenEndpointUrl("http://"+NacosServerHostConstant.AUTH_SERVER_NAME+NacosServerHostConstant.AUTH_SERVER_ADDRESS+"/oauth/check_token"); //客戶端id tokenServices.setClientId(OAuth2ClientEnum.OBD_PORTAL.getClientId()); //客戶端密碼 tokenServices.setClientSecret(OAuth2ClientEnum.OBD_PORTAL.getPassword()); //無狀態 resources.tokenServices(tokenServices).stateless(true); //設置資源服務id resources.resourceId(OAuth2ClientEnum.OBD_PORTAL.getClientId()); } //token封裝 @Bean public JwtAccessTokenConverter accessTokenConverter() { JwtAccessTokenConverter converter = new JwtAccessTokenConverter(); converter.setSigningKey("*****"); return converter; } @LoadBalanced @Bean public RestTemplate resourceRestTemplate() { return new RestTemplate(); } //資源請求路徑設置 @Override public void configure(HttpSecurity http) throws Exception { //容許跨域 http.cors(); //配置資源服務器攔截規則 http.sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS) .and() .requestMatchers().anyRequest() .and() .anonymous() .and() .authorizeRequests() //設置不須要認證的請求路徑 .antMatchers("/login/loginByPassword.vue","/**/*.vueNologin").permitAll() .anyRequest().authenticated() .and() .exceptionHandling().accessDeniedHandler(new OAuth2AccessDeniedHandler()); } }
- 本地受權服務
@Component public class MyLocalUserAuthoritiesServiceImpl implements MyLocalUserAuthoritiesService { @Autowired private SUserInfoMapper sysUserMapper; @Autowired private SMenuMapper sysMenuMapper; /** * redis客戶端 */ @Autowired private RedisClient redisClient; @SuppressWarnings("unchecked") @Override public List<String> loadUserAuthoritiesByUserName(String accessToken ,String userName) { //緩存中獲取權限列表 List<String> authorityList = (List<String>) redisClient.getValue(RedisKeyPreConstant.USER_LOGIN_OAUTH, accessToken); //緩存中沒有 從數據庫中獲取 if(authorityList == null) { QueryWrapper<SUserInfo> qw = new QueryWrapper<SUserInfo>(); qw.eq("userName", userName); SUserInfo sysUser = sysUserMapper.selectOne(qw); if(sysUser == null) { throw new UsernameNotFoundException("用戶不存在"); } authorityList = sysMenuMapper.getPermsListByUserId(sysUser.getUserId()); //獲取token對象 AuthAccessToken saveToken = (AuthAccessToken) redisClient.getValue(RedisKeyPreConstant.USER_LOGIN_TOKEN, accessToken); if(saveToken != null) { //有效時間和token的有效時間一直 redisClient.setValue(RedisKeyPreConstant.USER_LOGIN_OAUTH, accessToken,authorityList,saveToken.getExpires_in()-(System.currentTimeMillis()-saveToken.getCreate_time())); }else { //設置默認的有效時間 redisClient.setValue(RedisKeyPreConstant.USER_LOGIN_OAUTH, accessToken,authorityList,Constant.AUNTH_MESSAGE_IN_REDIS_TIME); } } return authorityList; } }
- 控制類設置
@RestController @RequestMapping("sys/user") public class UserController extends BaseController{ @Autowired private UserService userService; /** * 用戶分頁查詢 * @param loginUser 登陸用戶 * @param userName 用戶名 * @param pageModel 分頁參數 * @return */ @RequestMapping("getSysUserPageList.vue") //設置求情權限 @PreAuthorize("hasAuthority('user:view')") //設置事物級別爲只讀 @Transactional(readOnly = true) public ReturnModel getUserPageList(@ModelAttribute("loginUser") LoginUser loginUser,String userName,String useMan,String linkPhone,Date beginTime,Date endTime,PageModel pageModel) { ReturnModel returnModel = new ReturnModel(); returnModel.setData(userService.getPageList(getLoginUserOrganizationFullId(loginUser),userName, useMan, linkPhone, beginTime, endTime, pageModel)); return returnModel; } /** * 新增用戶 * @param loginUser 登陸用戶 * @param user 添加用戶對象 * @return */ @PostMapping("addSysUser.vue") //開啓事物 @Transactional //設置請求權限 @PreAuthorize("hasAuthority('user:add')") public ReturnModel addRole(HttpServletRequest request,SUserInfo user,/*自動注入登陸用戶對象*/@ModelAttribute("loginUser") LoginUser loginUser) { ReturnModel returnModel = new ReturnModel(); SUserInfo oldUser = userService.getUserByUserName(user.getUserName()); if(oldUser != null) { returnModel.setResultCode(ResponseCodeConstant.EXISTED); returnModel.setResultMessage("用戶名名稱不能重複"); return returnModel; } oldUser = userService.getUser(user.getUseMan(),user.getOrganizeId()); if(oldUser != null) { returnModel.setResultCode(ResponseCodeConstant.EXISTED); returnModel.setResultMessage("使用人不能重複"); return returnModel; } //正常狀態 user.setStatus(1); user.setCreaterId(loginUser.getUser().getUserId()); user.setCreaterName(loginUser.getUser().getUseMan()); user.setOperatorId(loginUser.getUser().getUserId()); user.setOperatorName(loginUser.getUser().getUseMan()); userService.addUser(user); return returnModel; } }
- 服務類設置
@Service public class UserService { @Autowired private SUserInfoMapper userInfoMapper; /** * 添加用戶信息 * @param user 用戶對象 * @return */ //添加日誌註解 查詢功能能夠不添加 @Log("新增用戶") public int addUser(SUserInfo user) { Date now = new Date(); user.setCreateTime(now); user.setModifiedTime(now); return userInfoMapper.insert(user); } }
OpenFeign工程配置
- 接口定義
@RequestMapping("feign/location") public interface IFeignLocationService { /** * 根據矩陣獲及車輛id列表取gps信息 * * @param organizeId 機構Id * @param bounds 矩陣 右上角精度,右上角維度|左下角精度,左下角維度 * @return */ @RequestMapping("getVehicleInMap") ReturnModel getVehicleInMap(@RequestBody String vehicleIds, @RequestParam(required=false) String bounds); }
- 客戶端配置
@FeignClient(/**客戶端名稱 對應nacos註冊中心的服務名稱*/name = NacosServerHostConstant.OBD_PORTAL_NAME,/**調用接口異常,快速失敗了*/fallback= FeignLocationServiceFallback.class,/**config類*/configuration = OAuth2FeignAutoConfig.class) //實現IFeignLocationService接口 public interface FeignLocationService extends IFeignLocationService{ }
- fallback類
//註冊spring bean @Component //重載父類映射路徑 避免發生路徑衝突 @RequestMapping("feign/locationback") public class FeignLocationServiceFallback implements FeignLocationService { @Override public ReturnModel getVehicleInMap(String vehicleIds,String bounds) { return ReturnModel.feignFail(); } @Override public ReturnModel getVehicleRealTimeStatus(int vehicleId) { return ReturnModel.feignFail(); } @Override public ReturnModel getVehicleLocation(String vehicleIds) { return ReturnModel.feignFail(); } }
- config類
public class OAuth2FeignAutoConfig { //獲取Auth2token類 private CloudTokenService cloudTokenService; public OAuth2FeignAutoConfig(CloudTokenService cloudTokenService) { this.cloudTokenService = cloudTokenService; } //feign請求攔截器 @Bean public RequestInterceptor OAuth2FeignRequestInterceptor() { return new OAuth2FeignRequestInterceptor(cloudTokenService); } }
- Fegin控制器實現類(服務端)
@RestController public class FeignLocationController extends ClientBaseController implements IFeignLocationService{ @Autowired private VehicleMonitorService vehicleMonitorService; /** * 獲取矩形區域內的車輛信息 * @param organizeId 機構Id * @param bounds 矩陣 右上角精度,右上角維度|左下角精度,左下角維度 * @return */ @Override public ReturnModel getVehicleInMap(String vehicleIds, String bounds) { ReturnModel model = new ReturnModel(); model.setData(vehicleMonitorService.getVehicleInMap(JSONArray.parseArray(vehicleIds), bounds)); return model; } }
網關項目
- 路由配置
server: port: 8080 #服務端口號 tomcat: max-http-form-post-size: 20971520 spring: profiles: active: - prod application: #微服務註冊名稱 name: obd-gateway cloud: gateway: #默認過濾器 處理跨域 default-filters: - DedupeResponseHeader=Access-Control-Allow-Origin, RETAIN_UNIQUE globalcors: cors-configurations: '[/**]': allowedHeaders: "*" allowedOrigins: "*" allowedMethods: "*" discovery: locator: enabled: true routes: #基於服務發現配置 portal - id: portal #lb 負載均衡 obd-portal 調用微服務的名稱 uri: lb://obd-portal predicates: - Path=/obd-portal/** filters: - StripPrefix=1 #應用路由截掉路徑的第一部分前綴 - name: Hystrix #熔斷降級 args: name: fallbackcmd fallbackUri: forward:/hystrixFallback?a=test # 限流配置 # - name: RequestRateLimiter # args: # 速率 # redis-rate-limiter.replenishRate: 10 #容量 # redis-rate-limiter.burstCapacity: 20 #基於服務發現配置 manage #基於服務發現配置 zhb - id: zhb uri: lb://obd-zhb predicates: - Path=/obd-zhb/** filters: - StripPrefix=1 #應用路由截掉路徑的第一部分前綴 - name: Hystrix #熔斷降級 args: name: fallbackcmd fallbackUri: forward:/hystrixFallback?a=test #基於服務發現配置 zhbManage - id: zhbManage uri: lb://obd-zhb-manage predicates: - Path=/obd-zhb-manage/** filters: - StripPrefix=1 #應用路由截掉路徑的第一部分前綴
接收終端數據項目
- 啓動socket項目
@Component @Sharable @Slf4j public class OBDServer { @Autowired private MessageHandler messageHandler; @Autowired private TerminalAuthHandler terminalAuthHandler; @Autowired private TerminalCommonResponse terminalCommonResponse; @Autowired private TerminalRegisterHandler terminalRegisterHandler; public void bind(int port) { log.info("OBDNettySocket啓動 port="+port); //建立NIO線程組 其實是reactor線程組 //用於接收客戶端鏈接 EventLoopGroup boosGroup = new NioEventLoopGroup(); //用於進行SocketChnnel的網絡讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //啓用nio服務對象 ServerBootstrap b = new ServerBootstrap(); //綁定nio線程 b.group(boosGroup, workerGroup) //設置channel 爲NioServerSocketChannel 對應java nio中的ServerSocketChannel .channel(NioServerSocketChannel.class) //設置NioServerSocketChannel的TCP參數 backlog .option(ChannelOption.SO_BACKLOG, 1024) //綁定nio事件處理類 做用相似於reactor模式中的handler .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new OBDNettyMessageDecoder()); ch.pipeline().addLast(new OBDNettyMessageEncoder()); ch.pipeline().addLast(terminalRegisterHandler); ch.pipeline().addLast(terminalAuthHandler); ch.pipeline().addLast(terminalCommonResponse); ch.pipeline().addLast(new HeartBeatHandler()); //必須放到最後 ch.pipeline().addLast(messageHandler); } }); //綁定端口 同步等待成功 ChannelFuture f = b.bind(port).sync(); //等待服務監聽端口關閉 f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { log.info("OBDNettySocket關閉 port="+port); //優雅退出 釋放線程資源 boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
- 終端註冊
@Component @Sharable @Slf4j public class TerminalRegisterHandler extends BaseHandler{ /** * 終端註冊 256 */ private final int TERMINAL_REGISTER = 0x0100; /** * 終端註冊 應答 33024 */ private final int TERMINAL_REGISTER_RESPONSE = 0x8100; @Autowired private CheckTerminalIdIsCanUseService checkTerminalIdIsCanUseService; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { byte[] array = (byte[]) msg; int messageId = OBDUtil.getIntFromODBIntArray(array, 0, 2); if(messageId == TERMINAL_REGISTER) { String terminalId = OBDUtil.getTerminalNumber(array); //失敗 byte status = 1; String authCode = null; if(checkTerminalIdIsCanUseService.check(terminalId)) { status = 0; //認證碼 用於終端認證 目前沒有使用 authCode = OBDConstant.TERMINAL_REGISTER_AUTH_CODE; log.info("註冊成功:terminalId="+terminalId); }else { log.info("註冊失敗:terminalId="+terminalId); } //發送註冊結果 ctx.writeAndFlush(getTerminalRegisterResponse(array,new byte[] { array[10],array[11]}, status,authCode)); } ctx.fireChannelRead(msg); }
- 終端認證
@Component @Sharable @Slf4j public class TerminalAuthHandler extends BaseHandler{ /** * 終端鑑權 258 */ private final static int TERMINAL_AUTH = 0x0102; /** * 鑑權終端是否可用服務類 */ @Autowired private CheckTerminalIdIsCanUseService checkTerminalIdIsCanUseService; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { byte[] array = (byte[]) msg; //消息id int messageId = OBDUtil.getIntFromODBIntArray(array, 0, 2); if(messageId == TERMINAL_AUTH) { //終端號 String terminalId = OBDUtil.getTerminalNumber(array); //失敗 byte status = 1; //認證成功 if(checkTerminalIdIsCanUseService.check(terminalId)) { status = 0; //沒有保存鏈路信息 if(SocketConfig.getSocketModel(terminalId) == null) { //心跳檢查定時任務 HeartBeatTask task = new HeartBeatTask(terminalId); ctx.executor().schedule(task, OBDConstant.HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS); } //保存鏈路信息 SocketConfig.putSocket(terminalId, new SocketModel(ctx)); log.info("認證成功:terminalId="+terminalId); }else { log.info("認證失敗:terminalId="+terminalId); } //返回認證結果 ctx.writeAndFlush(OBDCommonResponseUtil.getOBDCommonResponse(array,new byte[] { array[10],array[11]}, status)); }else { ctx.fireChannelRead(msg); } } }
- 消息處理
@Component @Sharable @Slf4j public class MessageHandler extends BaseHandler{ @Autowired private DealMessageService dealMessageService; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { byte[] array = (byte[]) msg; int messageId = OBDUtil.getIntFromODBIntArray(array, 0, 2); String terminalId = OBDUtil.getTerminalNumber(array); //log.info("收到終端消息:terminalId="+terminalId+" messageId="+messageId); SocketModel socketModel = SocketConfig.getSocketModel(terminalId); //有鏈路信息 說明終端設備認證 if(socketModel != null) { //處理消息 dealMessageService.dealMessage(String.valueOf(messageId),array); socketModel.resetHeartNoRespCount(); ctx.writeAndFlush(OBDCommonResponseUtil.getOBDCommonResponse(array, new byte[] { array[10],array[11]}, (byte)0)); }else { System.out.println(messageId); log.info("終端未認證 terminalId="+terminalId); //沒有發現終端 ctx.writeAndFlush(OBDCommonResponseUtil.getOBDCommonResponse(array,new byte[] { array[10],array[11]}, (byte)5)); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(ctx); ctx.close(); cause.printStackTrace(); log.error("鏈路異常關閉"); } }
軌跡解析服務
- 從kafa中獲取數據
@Slf4j @Component public class KafkaConsumer { @Autowired private GpsVehicleTraceService gpsVehicleTraceService; @Autowired private TerminalNumberVehicleService terminalNumberVehicleService; @Autowired private RedisClient redisClient; //處理軌跡上傳數據 @KafkaListener(id="gpsLocation1",topics= KafkaTopicConstant.GPS_LOCATION) public void onGPSLocation(ConsumerRecord<String, byte[]> record) { //協議解析 OBDModel obd = ProtocolAnalysis.parse(record.value()); if(obd != null) { String terminalNumber = obd.getTerminalNumber(); //獲取綁定車輛信息 Integer vehicleId = terminalNumberVehicleService.getVehicleIdByTerminalNumber(terminalNumber); if(vehicleId != null) { PositionModel positionModel = (PositionModel)obd.getBody(); if(positionModel != null) { //處理軌跡 gpsVehicleTraceService.dealGpsVehicle(vehicleId, positionModel,false); } }else { log.info("未找到綁定信息 terminalNumber ="+terminalNumber); } } } //軌跡補傳處理 @KafkaListener(id="gpsLocationBeath1",topics=KafkaTopicConstant.GPS_LOCATION_BEATCH) public void onGPSLocationBeath(ConsumerRecord<String, byte[]> record) { OBDModel obd = ProtocolAnalysis.parse(record.value()); if(obd != null) { String terminalNumber = obd.getTerminalNumber(); Integer vehicleId = terminalNumberVehicleService.getVehicleIdByTerminalNumber(terminalNumber); if(vehicleId != null) { LocationBatchModel positionBatch = (LocationBatchModel)obd.getBody(); if(positionBatch != null) { List<PositionModel> list = positionBatch.getList(); if(list != null && !list.isEmpty()) { for (PositionModel positionModel : list) { gpsVehicleTraceService.dealGpsVehicle(vehicleId, positionModel,true); } } } } } } //終端註冊 @KafkaListener(id="terminalRegister",topics=KafkaTopicConstant.TERMINAL_REGISTER) public void onTerminalRegister(ConsumerRecord<String, byte[]> record) { ProtocolAnalysis.parse(record.value()); } //終端註銷 @KafkaListener(id="terminalUnRegister",topics=KafkaTopicConstant.TERMINAL_UNREGISTER) public void onTerminalUnRegister(ConsumerRecord<String, byte[]> record) { ProtocolAnalysis.parse(record.value()); } /** * 查詢終端參數 */ @KafkaListener(id="terminalAnswer",topics=KafkaTopicConstant.TERMINAL_ANSWER) public void getTerminalAnswer(ConsumerRecord<String, byte[]> record){ OBDModel obd = ProtocolAnalysis.parse(record.value()); if(obd != null){ String terminalNumber = obd.getTerminalNumber(); TerminalAnswerData answerData = (TerminalAnswerData)obd.getBody(); List<TerminalAnswerValue> answerValue = (List<TerminalAnswerValue>)answerData.getValue(); redisClient.setValue(RedisKeyPreConstant.TERMINAL_DATA, terminalNumber, answerValue); } } }
- 協議解析類
public class ProtocolAnalysis { //協議解析類map private final static Map<Integer,DealMessageService> dealMessageServiceMap = new HashMap<>(); static { //初始化 dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_AUTH, new TerminalAuthDealMessageServiceImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.POSSITION, new PositionBaseDealMessageServiceImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_GENER_RESPONSE, new TerminalGeneralResponseDealMessageServiceImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.POSSITION_BATCH, new PossitionBatchDealMassageServiceImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_REGISTER, new TerminalRegisterDealMessageServiceImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_ANSWER, new TerminalAnswerDealMessageImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.ALARM_INFO, new AlarmDealMessageServiceImpl()); } // public static void main(String[] args) { // byte[] data = new byte[] {0, 1, 0, 5, 1, 65, 68, 120, 116, -107, 0, 33, 0, 1, -126, 7, 0, 60}; // // OBDModel obdModel =parse(data); // System.out.println(obdModel); // System.out.println(Integer.toHexString(obdModel.getMessageId())); // } public static OBDModel parse(byte[] chars) { //轉化爲整行 int[] dataArray = new int[chars.length]; for(int i=0;i<dataArray.length;i++) { if(chars[i] < 0 ) { //數據修正 傳輸數據爲無符號數 dataArray[i] = OBDUtil.obdDataCorrection(chars[i]); }else { dataArray[i] = chars[i]; } } OBDModel obdModel = new OBDModel(); //消息 ID 佔用兩個字節 高八位0 低八位 1 obdModel.setMessageId(OBDUtil.getIntFromODBIntArray(dataArray, 0, 2)); //消息體屬性 佔用兩個字節 高八位2 低八位3 obdModel.setBodyDescribe(getBodyDescribe(OBDUtil.getIntFromODBIntArray(dataArray, 2, 2))); //終端號 佔用 4~9位 obdModel.setTerminalNumber(getTerminalNumber(dataArray)); //消息流水號 佔用兩個字節 高八位10低八位11 obdModel.setSerialNumber(OBDUtil.getIntFromODBIntArray(dataArray, 10, 2)); int bodyIndex = 12; if(obdModel.getBodyDescribe().getIsSubpackage()) { //消息總包數 佔用兩個字節 高八位12低八位13 obdModel.setTotalPackage(OBDUtil.getIntFromODBIntArray(dataArray, 12, 2)); //包序號 佔用兩個字節 高八位14低八位15 obdModel.setPackageNum(OBDUtil.getIntFromODBIntArray(dataArray, 14, 2)); bodyIndex += 4; } int bodyLength = obdModel.getBodyDescribe().getBodyLength(); if(bodyLength>0) { if(OBDConstant.TOP_MIN_LENGTH + bodyLength <= dataArray.length) { int[] bodyIntArray = new int[bodyLength]; System.arraycopy(dataArray, bodyIndex, bodyIntArray, 0, bodyLength); //設置消息體 obdModel.setBody(getBody(obdModel.getMessageId(),bodyIntArray)); } } return obdModel; }; /** * 獲取body信息 */ public static Object getBody(int messageid,int[] bodyArray) { //是否支持該消息類型的解析 if(dealMessageServiceMap.containsKey(messageid)) { return dealMessageServiceMap.get(messageid).getBody(bodyArray); }else { return Arrays.toString(bodyArray); } } /** * 獲取終端號 BCD[6] 4-9 */ public static String getTerminalNumber(int[] dataArray) { String result = ""; for(int i=4;i<=9;i++) { result += OBDUtil.getBCDStr(dataArray[i]); } return result; } /** * 獲取消息體屬性 */ public static BodyDescribe getBodyDescribe(int bodyInt) { BodyDescribe bodyDescribe = new BodyDescribe(); //低9爲字節表示長度 bodyDescribe.setBodyLength(bodyInt % (1<<10)); //第10-12字節全爲0表示消息不加密 bodyDescribe.setIsEncryption((bodyInt&((1<<10)+(1<<11)+(1<<12)))!=0); //第10字節爲1表示RSA加密 bodyDescribe.setEncryptionType(OBDUtil.checkIntBitIsOne(bodyInt, 10)?1:null); //第13位爲1表示分包傳輸 bodyDescribe.setIsSubpackage(OBDUtil.checkIntBitIsOne(bodyInt, 13)); return bodyDescribe; } /** * 驗證碼校驗 */ public static boolean check(int[] dataArray) { int result = dataArray[dataArray.length-1]; int calR = dataArray[0] ^ dataArray[1]; for(int i=2;i<dataArray.length-1;i++) { calR = calR^dataArray[i]; } return calR == result; }
- 認證消息類型解析
public class TerminalAuthDealMessageServiceImpl implements DealMessageService{ @Override public Object getBody(int[] bodyArray) { return new String(bodyArray,0,bodyArray.length); } }