Kafka在大型網站中應用普遍,主要用來日誌收集和消息系統。
Kafka是一種發佈-訂閱的消息系統,生產者稱發佈者,消費者稱訂閱者。json
下面先介紹Kafka實例類屬性及配置:服務器
@ConfigurationProperties(prefix = "kafka.producer")
@Data(這是Lambok插件的簡單應用)
public class KafkaProducerProperties {
/**
* servers
*/
private String servers;
/**
* retries
*/
private int retries;
/**
* batchSize
*/
private int batchSize;
/**
* bufferMemory
*/
private int bufferMemory;
/**
* autoCommitInterval
*/
private String autoCommitInterval;
/**
* sessionTimeout
*/
private String sessionTimeout;
/**
* autoOffsetReset
*/
private String autoOffsetReset;
/**
* groupId
*/
private String groupId;
/**
* concurrency
*/
private int concurrency;
/**
* pollTimeout
*/
private int pollTimeout;
}
Kafka生產者的實例:session
@Configuration
@EnableConfigurationProperties(KafkaProducerProperties.class)
@EnableKafka
public class KafkaProducerConfig {
/**
* properties
*/
@Autowired
KafkaProducerProperties properties;
@Autowired
private KafkaSendResultHandler kafkaSendResultHandler;
/**
*
* 〈一句話功能簡述〉 〈功能詳細描述〉
*
* @return props
*/
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getServers());
props.put(ProducerConfig.RETRIES_CONFIG, properties.getRetries());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, properties.getBatchSize());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, properties.getBufferMemory());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/**
*
* 〈一句話功能簡述〉 〈功能詳細描述〉
*
* @return new DefaultKafkaProducerFactory
*/
private ProducerFactory<Object, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
*
* 〈一句話功能簡述〉 〈功能詳細描述〉
*
* @return new KafkaTemplate<String, String>
*/
@Bean(name = "kafkaTemplate")
public KafkaTemplate<Object, Object> kafkaTemplate() {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(producerFactory());
kafkaTemplate.setProducerListener(kafkaSendResultHandler);
return kafkaTemplate;
}
}
@Component
public class KafkaSendResultHandler implements ProducerListener<Object, Object> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSendResultHandler.class);
@Autowired
private CmbsKafkaSendErrMapper cmbsKafkaSendErrMapper;
@Override
public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
// log.info("kafka消息發送成功,回調時間 = {}, topic = {}, partition = {}, key =
// {}, value = {}, recordMetadata = {}",
// LocalDateTime.now(), topic, partition, key, value,
// recordMetadata.toString());
if (CommonUtil.isNotNull(key)) {
// 更新狀態爲成功,次數+1,再次發送成功不更新原有錯誤,留記錄
CmbsKafkaSendErr kafkaErr = new CmbsKafkaSendErr();
kafkaErr.setId(Long.parseLong(String.valueOf(key)));
kafkaErr.setSendStatus(KafkaSendErrStatus.SUCCESS.getStatus());
cmbsKafkaSendErrMapper.updateStatusAndSendTimesById(kafkaErr);
}
}
@Override
public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
String json = "";
if (CommonUtil.isNotNull(value)) {
json = String.valueOf(value);
}
// log.error("kafka消息發送失敗,回調時間 = {} topic = {}, partition = {}, key =
// {}, value = {}, exception = {}",
// LocalDateTime.now(), topic, partition, key, json,
// exception.getMessage());
if (CommonUtil.isNull(key)) {
try {
CmbsKafkaSendErr cmbsKafkaSendErr = new CmbsKafkaSendErr();
cmbsKafkaSendErr.setTopic(topic);
cmbsKafkaSendErr.setMsgContent(json);
cmbsKafkaSendErr.setFailDesc(StringUtils.substring(exception.getMessage(), 0, 65535));
cmbsKafkaSendErr.setSendStatus(KafkaSendErrStatus.FAIL.getStatus());
cmbsKafkaSendErr.setSendTimes(MktCampaignConst.Number.ZERO);
cmbsKafkaSendErr.setGmtCreate(new Date());
cmbsKafkaSendErr.setGmtModified(new Date());
cmbsKafkaSendErr.setState(MktCampaignConst.DataStates.NORMAL_STATE);
cmbsKafkaSendErr.setType(KafkaErrTypeStatus.SEND_ERR.getStatus());
cmbsKafkaSendErrMapper.insert(cmbsKafkaSendErr);
} catch (Exception e) {
LOGGER.error("KafkaSendResultHandler.onError error:", e);
}
} else {
// 更新狀態爲失敗,次數+1
CmbsKafkaSendErr kafkaErr = new CmbsKafkaSendErr();
kafkaErr.setId(Long.parseLong(String.valueOf(key)));
kafkaErr.setSendStatus(KafkaSendErrStatus.FAIL.getStatus());
kafkaErr.setFailDesc(StringUtils.substring(exception.getMessage(), 0, 256));
cmbsKafkaSendErrMapper.updateStatusAndSendTimesById(kafkaErr);
}
}
@Override
public boolean isInterestedInSuccess() {
return true;
}
}
下面是消費者實例:app
@ConfigurationProperties(prefix = "kafka.consumer")
@Data
public class KafkaConsumerProperties {
/**
* servers
*/
private String servers;
/**
* retries
*/
private int retries;
/**
* batchSize
*/
private int batchSize;
/**
* bufferMemory
*/
private int bufferMemory;
/**
* autoCommitInterval
*/
private String autoCommitInterval;
/**
* sessionTimeout
*/
private String sessionTimeout;
/**
* autoOffsetReset
*/
private String autoOffsetReset;
/**
* groupId
*/
private String groupId;
/**
* concurrency
*/
private int concurrency;
/**
* pollTimeout
*/
private int pollTimeout;
}
@Configuration
@EnableConfigurationProperties(KafkaConsumerProperties.class)
@EnableKafka
public class KafkaConsumerConfig {
/**
* properties
*/
@Autowired
KafkaConsumerProperties properties;
/**
* 〈一句話功能簡述〉
* 〈功能詳細描述〉
*
* @return new DefaultKafkaConsumerFactory
*/
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configs = new HashMap<String, Object>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getServers());
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getAutoCommitInterval());
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, properties.getSessionTimeout());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getAutoOffsetReset());
return new DefaultKafkaConsumerFactory<>(configs);
}
/**
* 〈一句話功能簡述〉
* 〈功能詳細描述〉
*
* @return factory
*/
@Bean(name = "kafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(properties.getConcurrency());
factory.getContainerProperties().setPollTimeout(properties.getPollTimeout());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
factory.setBatchListener(true);
return factory;
}
}
@Bean(name = "kafkaTemplate")
public KafkaTemplate<Object, Object> kafkaTemplate() {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(producerFactory());
kafkaTemplate.setProducerListener(kafkaSendResultHandler);
return kafkaTemplate;
}
依賴注入,使用實例。負載均衡
拓展:Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具備如下特性:快速持久化,能夠在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;支持Hadoop數據並行加載,對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。分佈式