首先kafka監聽不獲得數據,檢查以下mysql
@Component 犯了最不該該出差錯的問題
若是出現監聽不到數據的問題,那麼就試試更改方法一二,若是不能夠在去試試方法三,以前出現這個問題也是查過 通常查到都會說 「低版本的服務器接收不到高版本的生產者發送的消息」,可是淨由測試使用 用1.0.5RELEASE 和 2.6.3反覆測試,並無任何的問題。redis
若是按照版本一致,那麼根本就不現實,由於可能不一樣的項目,springboot版本不一致的話,可能有的springboot版本低,那麼你還得要求本身維護項目版本升級?若是出現第四種狀況就無話可說了。spring
重複數據的發送問題以下sql
目前我是使用的Redis進行的排重法,用的是Redis中的set,保證裏面不存在重複,保證Redis裏面不會存入太多的髒數據。並按期清理apache
粘貼一下個人排重(Redis排重法)bootstrap
//kafka prefix String cache = "kafka_cache"; //kafka suffix Calendar c = Calendar.getInstance(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //0點,目前是爲了設置爲這一天的固定時間。這個徹底能夠去寫個工具類本身弄,爲了看的更清楚,麻煩了一點的寫入 SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd 00:00:00"); String gtimeStart = sdf2.format(c.getTime()); long time = sdf.parse(gtimeStart).getTime(); //此位置爲了設置是不是新的一天,新的一天須要設置定時時間,保證redis中不會存儲太多無用數據 Boolean flag = false; //數據接收 Set<String> range = new HashSet<>(); //判斷是否存在 if (redisTemplate.hasKey(cache + time)) { //存在則取出這個set range = redisTemplate.opsForSet().members(cache + time); }else { //不存在,則爲下面過時時間的設置鋪墊 flag = true; } //判斷監聽到的數據是不是重複 if (range.contains("測試須要")) { //重複則排出,根據邏輯本身修改 continue; } else { //添加進去 redisTemplate.opsForSet().add(cache + time, i+""); if (flag){ //設置爲24小時,保證新一天使用,以前使用的存儲會消失掉 redisTemplate.expire(cache + time,24,TimeUnit.HOURS); //不會在進入這個裏面,若是屢次的存入過時時間,那麼這個key的過時時間就永遠是24小時,一直就不會過時 flag = false; } }
緣由是由於在不一樣group-id之下,kafka接收到之後,會給監聽他的每個組發送一個他所收到的消息,可是兩個消費端監聽同一個group-id,那麼就只有一個消費端能夠消費到。springboot
# 指定kafka 代理地址,能夠多個,用逗號間隔
spring.kafka.bootstrap-servers= localhost:9092
# 指定默認消費者group id
spring.kafka.consumer.group-id= test
# 是否自動提交
spring.kafka.consumer.enable-auto-commit= true
# 提交間隔的毫秒
spring.kafka.consumer.auto-commit-interval.ms=60000
# 最大輪詢的次數
spring.kafka.consumer.max-poll-records=1
# 將偏移量重置爲最新偏移量
spring.kafka.consumer.auto-offset-reset=earliest
# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
若有什麼地方錯誤或者不明白請下方評論指出,謝謝。討論解決使咱們共同進步服務器