在語義的世界裏,能夠近似地說:萬事萬物都是特徵提取。 你只要找到特徵,事情就好辦。…………你指望畢其功於一役嗎?天然語言處理的真實應用裏是很難有什麼場景找到一個通吃特徵的。都是一層一層特徵疊加的。一層特徵去掉一部分垃圾數據。如此反覆,終成正果。注意方法論。
統計粗且糙,乃大錘。規則細而精,乃小錘。先大場後細棋。
KafkaSink.java
|
import kafka.javaapi.producer.Producer;
……
public class KafkaSink extends AbstractSink implements Configurable {
……
private Producer<String, byte[]> producer;
……
@Override
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction tx = channel.getTransaction();
try {
tx.begin();
Event e = channel.take();
if (e == null) {
tx.rollback();
return Status.BACKOFF;
}
producer.send(new KeyedMessage<String, byte[]>(topic, e.getBody()));
tx.commit();
return Status.READY;
} catch (Exception e) {
|
KafkaSpout.java
|
public abstract class KafkaSpout implements IRichSpout {
……
@Override
public void activate() {
……
for (final KafkaStream<byte[], byte[]> stream : streamList) {
executor.submit(new Runnable() {
@Override
public void run() {
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while (iterator.hasNext()) {
if (spoutPending.get() <= 0) {
sleep(1000);
continue;
}
MessageAndMetadata<byte[], byte[]> next = iterator.next();
byte[] message = next.message();
List<Object> tuple = null;
try {
tuple = generateTuple(message);
} catch (Exception e) {
e.printStackTrace();
}
if (tuple == null || tuple.size() != outputFieldsLength) {
continue;
}
collector.emit(tuple);
spoutPending.decrementAndGet();
}
}
|
EvaluateBolt.java
|
public class EvaluateBolt extends BaseBasicBolt {
……
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
……
if (LogWebsiteSpout.PAGE_EVENT_BROWSE.equals(event)) {
if (LogWebsiteSpout.PAGE_TYPE_GOODS.equals(pageType)) {
incrBaseStatistics(baseKeyMap, BROWSE_ALL, 1);
} else if (LogWebsiteSpout.PAGE_TYPE_PAY1.equals(pageType)) {
incrBaseStatistics(baseKeyMap, ORDER_ALL, 1);
}
String recDisplay = input.getStringByField(LogWebsiteSpout.FIELD_REC_DISPLAY);
recDisplayStatistics(recDisplay, time, pageType, baseKeyMap);
} else if (LogWebsiteSpout.PAGE_EVENT_CLICK.equals(event)) {
String recType = input.getStringByField(LogWebsiteSpout.FIELD_REC_TYPE);
|