storm 中的Python bolt的注意事項

Storm可支持多種語言,其中就有python .python

首先須要建立一個類,git

    public static class BasieCalculateBolt extends ShellBolt implements
            IRichBolt {

        public BasieCalculateBolt() {
            super("python", "bolt_base_calculate.py");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
引用的bolt_base_calculate.py放置的目錄必須爲本項目的resources目錄,本項目的py文件放置於mutilang/resources目錄下,則要在maven的pom.xml文件中將其設置爲resource目錄。
    <build>
        <sourceDirectory>src/jvm</sourceDirectory>
        <testSourceDirectory>test/jvm</testSourceDirectory>
        <resources>
            <resource>
                <directory>${basedir}/multilang</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

一個最簡單的Python bolt以下所示:github

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

在resources目錄下還需放置在官網上下載的最新storm.py文件,https://github.com/apache/storm/blob/master/bin/storm.py。apache

python的bolt中不可有print語句,由於storm中Python bolt和其餘bolt之間數據的傳遞的即是經過監控console輸出的數據。可是咱們在Python中須要打印一些log來查看程序的運行,此時可以使用log,即建立一個log.pyjvm

    import logging
    import logging.config
    import os
    
    logging.config.fileConfig('logging.conf')
    # create logger 下面是你工程的名稱
    logger = logging.getLogger('calculateEngine') 

logging.conf的配置可設置爲maven

    [loggers]
    keys=root,calculateEngine
    
    [handlers]
    keys=fileHandler,consoleHandler
    
    [formatters]
    keys=simpleFormatter
    
    [logger_root]
    level=DEBUG
    handlers=consoleHandler
    
    [logger_calculateEngine]
    level=INFO
    handlers=fileHandler
    qualname=calculateEngine
    propagate=0
    
    [handler_consoleHandler]
    class=StreamHandler
    level=WARN
    formatter=simpleFormatter
    args=(sys.stdout,)
    
    [handler_fileHandler]
    class=FileHandler
    level=DEBUG
    maxBytes=10485760
    backupCount=20
    encoding=utf8
    formatter=simpleFormatter
    args=(os.path.join(os.path.dirname(__file__),'asien_calculate.log'),'a')
    
    [formatter_simpleFormatter]
    format=%(asctime)s - %(name)s - %(levelname)s - %(message)s

在python中的使用,只需from log import logger ,log.info("")便可ide

相關文章
相關標籤/搜索