kafka 搭建與使用

消息量非超級多不建議使用,能夠使用redis或Beanstalkd 使用簡單 php

Beanstalkd 客戶端建議用:
composer require pda/pheanstalk

若是無JAVA JDK 先下載JAVA JDK 並安裝html

添加.profile變量:JAVA_HOMEjava

export JAVA_HOME=/usr/java/jdk1.8.0_31

下載:
http://gradle.org/downloads
gradle-2.3-all.zip (binaries, sources and documentation)
解壓到:
/usr/local/gradle-2.3/
添加.profile變量:GRADLE_HOME(在PATH=**下)nginx

export GRADLE_HOME=/usr/local/gradle-2.3
PATH=$PATH:$GRADLE_HOME/bin

下載:
http://kafka.apache.org/downloads.html
Source download:
Source download: kafka-0.8.2.0-src.tgz (asc, md5)
解壓到:
/usr/local/kafka/
執行:git

gradle
./gradlew

#若是無獨立 zookeeper 執行 下載 zookeeper,若是想使用zookeeper集羣,請獨立安裝zookeepergithub

 ./gradlew jar

#啓動 zookeeperredis

bin/zookeeper-server-start.sh config/zookeeper.properties &

配置:config/server.properties
啓動 kafka算法

bin/kafka-server-start.sh config/server.properties &

#測試apache

建立topic :test
json

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

--replication-factor 表示副本的數量(kafka服務的數量)
--partitions 分區的數量
備註:一個topic 能夠有多個分區.產生消息能夠隨機或用指定算法寫入到指定分區.

查看topic列表:

bin/kafka-topics.sh --list --zookeeper localhost:2181

查看topic狀態:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

往topic添加消息(Leader kafka):

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
this is message
^C

啓動topic觀察者:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
#這裏會輸出
^C

#集羣.配置等其餘資料查閱:http://kafka.apache.org/documentation.html#quickstart
#備註: zookeeper爲外部服務提供統一接口,內部自動選擇Leader等. kafka 也依賴zookeeper 來實現部分功能

 

安裝管理軟件:https://github.com/yahoo/kafka-manager

下載,解壓,編譯

修改 config/application.conf

./sbt clean dist

編譯完成進入 ./

解壓 並移動到/usr/local/下

啓動:

./bin/kafka-manager -Dconfig.file=./config/application.conf -Dhttp.port=9009

配置NGINX代理到此服務(提供用戶驗證)

在NGINX配置目錄/conf 下執行

htpasswd ./pwd username

        proxy_buffering    off;
        proxy_set_header   X-Real-IP $remote_addr;
        proxy_set_header   X-Forwarded-Proto $scheme;
        proxy_set_header   X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header   Host $http_host;
        proxy_http_version 1.1;
        upstream my-backend {
            server 127.0.0.1:9009;
        }
        server {
             listen       192.168.4.200:8080;
             charset utf-8;
             auth_basic "kafka panel!!";
             auth_basic_user_file pwd;
             location / {
                proxy_pass http://my-backend;
             }
        }

 

PHP 接口:
https://github.com/nmred/kafka-php
#Produce.php

for ($i=0;$i<10;$i++) {
            $produce = \Kafka\Produce::getInstance('localhost:2181',300);
    $partitions = $produce->getAvailablePartitions('testp'); 

    if(count($partitions)==1)
    $partition=array_pop($partitions);
    else
    $partition=rand(0,count($partitions)-1);
    $produce->setRequireAck(-1);

	//參數:
	// topic 主題
	// partition 分組
	// message 消息數組
           $produce->setMessages('testp',$partition, array('test'.$i));
          $result = $produce->send();
          var_dump($result);
}

  

#Consumer.php

$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$group = 'testgroup';//表明消費的組,一組的成員數據消費一次
$consumer->setGroup($group);
//$consumer->setPartition('testp', 0);//設置具體topic分組
$consumer->setTopic('testp');//遍歷所有分組
$result = $consumer->fetch();
foreach ($result as $topicName => $partition) {
    foreach ($partition as $partId => $messageSet) {
		var_dump($partition->getHighOffset());//最大值
		var_dump($partition->getMessageOffset());//當前分組的最大偏移
		foreach ($messageSet as $k=>$message) {
			var_dump($message);
			flush();    
		}
    }
}

#其餘example 參考便可 ,爲 Produce Consumer 的分拆測試部分

 

關於其餘hadoop 的參考:http://hadoop.apache.org/ 能夠先看: HDFS MapReduce Hive 實現

 

PHP獲得通知解決辦法:

用C實現一個後臺程序監聽kafka,當有消息時候,啓動PHP執行消息處理

C接口:

https://github.com/edenhill/librdkafka

簡單實現:

#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <sys/time.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/wait.h>


/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is builtin from within the librdkafka source tree and thus differs. */
#include "rdkafka.h"  /* for Kafka driver */
#include "rdkafka_int.h"

#include <zookeeper.h>
#include <zookeeper.jute.h>
#include <jansson.h>

#define BROKER_PATH "/brokers/ids"

static int run = 1;
static rd_kafka_t *rk;
static int exit_eof = 0;
static int quiet = 0;

//signal
static void sig_stop (int sig) {
    run = 0;
    fclose(stdin); /* abort fgets() */
}

static void sig_usr1 (int sig) {
    rd_kafka_dump(stdout, rk);
}

static void sig_child_stop(int sig){
   pid_t t;
   while((t=waitpid(-1,NULL,WNOHANG)>0)){
       if(quiet)
           printf("stop child:%d \n",t);
   }
}

/**
 * Kafka logger callback (optional)
 */
static void logger (const rd_kafka_t *rk, int level,
            const char *fac, const char *buf) {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
        (int)tv.tv_sec, (int)(tv.tv_usec / 1000),
        level, fac, rd_kafka_name(rk), buf);
}


static void notify_php(
        const char *site_dir,const char *php,const char *bootstarp,
        const char * topic,int partition,rd_kafka_message_t *rkmessage
        ){
    if (rkmessage->err) {
        if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
            fprintf(stderr,
                "%% Consumer reached end of %s [%"PRId32"] "
                   "message queue at offset %"PRId64"\n",
                   rd_kafka_topic_name(rkmessage->rkt),
                   rkmessage->partition, rkmessage->offset);

            if (exit_eof)
                run = 0;

            return;
        }

        fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] "
               "offset %"PRId64": %s\n",
               rd_kafka_topic_name(rkmessage->rkt),
               rkmessage->partition,
               rkmessage->offset,
               rd_kafka_message_errstr(rkmessage));
        return;
    }

    int pid=fork();
    if(pid==0){
        chdir(site_dir);
        char _topic[120];
        char _partition[20];
        sprintf(_topic,"--topic=%s",topic);
        sprintf(_partition,"--partition=%d",partition);
        execl(php,"php",bootstarp,"--task=tips",_topic,_partition,NULL);
        exit(errno);
    }
}


static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers)
{
    if (zzh)
    {
        struct String_vector brokerlist;
        if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK)
        {
            fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);
            return;
        }

        int i;
        char *brokerptr = brokers;
        for (i = 0; i < brokerlist.count; i++)
        {
            char path[255], cfg[1024];
            sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);
            int len = sizeof(cfg);
            zoo_get(zzh, path, 0, cfg, &len, NULL);

            if (len > 0)
            {
                cfg[len] = '\0';
                json_error_t jerror;
                json_t *jobj = json_loads(cfg, 0, &jerror);
                if (jobj)
                {
                    json_t *jhost = json_object_get(jobj, "host");
                    json_t *jport = json_object_get(jobj, "port");

                    if (jhost && jport)
                    {
                        const char *host = json_string_value(jhost);
                        const int   port = json_integer_value(jport);
                        sprintf(brokerptr, "%s:%d", host, port);

                        brokerptr += strlen(brokerptr);
                        if (i < brokerlist.count - 1)
                        {
                            *brokerptr++ = ',';
                        }
                    }
                    json_decref(jobj);
                }
            }
        }
        deallocate_String_vector(&brokerlist);
        printf("Found brokers %s\n", brokers);
    }
}


static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
{
    char brokers[1024];
    if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)
    {
        brokers[0] = '\0';
        set_brokerlist_from_zookeeper(zh, brokers);
        if (brokers[0] != '\0' && rk != NULL)
        {
            rd_kafka_brokers_add(rk, brokers);
            rd_kafka_poll(rk, 10);
        }
    }
}


static zhandle_t* initialize_zookeeper(const char * zookeeper, const int debug)
{
    zhandle_t *zh;
    if (debug)
    {
        zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
    }
    zh = zookeeper_init(zookeeper, watcher, 10000, 0, 0, 0);
    if (zh == NULL)
    {
        fprintf(stderr, "Zookeeper connection not established.");
        exit(1);
    }
    return zh;
}


int main (int argc, char **argv) {
    rd_kafka_topic_t *rkt;

    char *site_dir="/usr/local/nginx/html";
    char *php="/usr/local/php/bin/php";
    char *bootstarp="index.php";

    char *zookeeper = "localhost:2181";
    zhandle_t *zh = NULL;
    char brokers[1024];
    char *topic = NULL;
    int partition = RD_KAFKA_PARTITION_UA;
    int opt;
    rd_kafka_conf_t *conf;
    rd_kafka_topic_conf_t *topic_conf;
    char errstr[512];
    const char *debug = NULL;
    int64_t start_offset = RD_KAFKA_OFFSET_STORED;
    int do_conf_dump = 0;

    memset(brokers, 0, sizeof(brokers));
    quiet = !isatty(STDIN_FILENO);

    /* Kafka configuration */
    conf = rd_kafka_conf_new();

    /* Topic configuration */
    topic_conf = rd_kafka_topic_conf_new();

    while ((opt = getopt(argc, argv, "t:p:k:z:qd:o:eX:s:r:b")) != -1) {
        switch (opt) {
        case 's':
            site_dir = optarg;
            break;
        case 'r':
            php = optarg;
            break;
        case 'b':
            bootstarp = optarg;
            break;
        case 't':
            topic = optarg;
            break;
        case 'p':
            partition = atoi(optarg);
            break;
        case 'k':
            zookeeper = optarg;
            break;
        case 'z':
            if (rd_kafka_conf_set(conf, "compression.codec",
                          optarg,
                          errstr, sizeof(errstr)) !=
                RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%% %s\n", errstr);
                exit(1);
            }
            break;
        case 'o':
            if (!strcmp(optarg, "end"))
                start_offset = RD_KAFKA_OFFSET_END;
            else if (!strcmp(optarg, "beginning"))
                start_offset = RD_KAFKA_OFFSET_BEGINNING;
            else if (!strcmp(optarg, "stored"))
                start_offset = RD_KAFKA_OFFSET_STORED;
            else
                start_offset = strtoll(optarg, NULL, 10);
            break;
        case 'e':
            exit_eof = 1;
            break;
        case 'd':
            debug = optarg;
            break;
        case 'q':
            quiet = 1;
            break;
        case 'X':
        {
            char *name, *val;
            rd_kafka_conf_res_t res;

            if (!strcmp(optarg, "list") ||
                !strcmp(optarg, "help")) {
                rd_kafka_conf_properties_show(stdout);
                exit(0);
            }

            if (!strcmp(optarg, "dump")) {
                do_conf_dump = 1;
                continue;
            }

            name = optarg;
            if (!(val = strchr(name, '='))) {
                fprintf(stderr, "%% Expected "
                    "-X property=value, not %s\n", name);
                exit(1);
            }

            *val = '\0';
            val++;

            res = RD_KAFKA_CONF_UNKNOWN;
            /* Try "topic." prefixed properties on topic
             * conf first, and then fall through to global if
             * it didnt match a topic configuration property. */
            if (!strncmp(name, "topic.", strlen("topic.")))
                res = rd_kafka_topic_conf_set(topic_conf,
                                  name+
                                  strlen("topic."),
                                  val,
                                  errstr,
                                  sizeof(errstr));

            if (res == RD_KAFKA_CONF_UNKNOWN)
                res = rd_kafka_conf_set(conf, name, val,
                            errstr, sizeof(errstr));

            if (res != RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%% %s\n", errstr);
                exit(1);
            }
        }
        break;

        default:
            goto usage;
        }
    }


    if (do_conf_dump) {
        const char **arr;
        size_t cnt;
        int pass;

        for (pass = 0 ; pass < 2 ; pass++) {
            int i;

            if (pass == 0) {
                arr = rd_kafka_conf_dump(conf, &cnt);
                printf("# Global config\n");
            } else {
                printf("# Topic config\n");
                arr = rd_kafka_topic_conf_dump(topic_conf,
                                   &cnt);
            }

            for (i = 0 ; i < cnt ; i += 2)
                printf("%s = %s\n",
                       arr[i], arr[i+1]);

            printf("\n");

            rd_kafka_conf_dump_free(arr, cnt);
        }

        exit(0);
    }


    if (optind != argc || !topic) {
    usage:
        fprintf(stderr,
            "Usage: %s -C|-P|-L -t <topic> "
            "[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
            "\n"
            "librdkafka version %s (0x%08x)\n"
            "\n"
            " Options:\n"
            "  -t <topic>      Topic to fetch / produce\n"
            "  -p <num>        Partition (random partitioner)\n"
            "  -k <zookeepers> Zookeeper address (localhost:2181)\n"
            "  -z <codec>      Enable compression:\n"
            "                  none|gzip|snappy\n"
            "  -o <offset>     Start offset (consumer)\n"
            "  -e              Exit consumer when last message\n"
            "                  in partition has been received.\n"
            "  -d [facs..]     Enable debugging contexts:\n"
            "  -q              Be quiet\n"
            "  -X <prop=name> Set arbitrary librdkafka "
            "configuration property\n"
            "               Properties prefixed with \"topic.\" "
            "will be set on topic object.\n"
            "               Use '-X list' to see the full list\n"
            "               of supported properties.\n"
            "\n"
            " In Consumer mode:\n"
            "  writes fetched messages to stdout\n"
            " In Producer mode:\n"
            "  reads messages from stdin and sends to broker\n"
                        " In List mode:\n"
                        "  queries broker for metadata information, "
                        "topic is optional.\n"
            "\n"
            "\n"
            "\n",
            argv[0],
            rd_kafka_version_str(), rd_kafka_version(),
            RD_KAFKA_DEBUG_CONTEXTS);
        exit(1);
    }


    signal(SIGINT, sig_stop);
    signal(SIGUSR1, sig_usr1);
    signal(SIGCHLD,sig_child_stop);

    if (debug &&
        rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) !=
        RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% Debug configuration failed: %s: %s\n",
            errstr, debug);
        exit(1);
    }

    /** Initialize zookeeper */
    zh = initialize_zookeeper(zookeeper, debug != NULL);

    /* Add brokers */
    set_brokerlist_from_zookeeper(zh, brokers);
        if (rd_kafka_conf_set(conf, "metadata.broker.list",
                              brokers, errstr, sizeof(errstr) !=
                              RD_KAFKA_CONF_OK)) {
                fprintf(stderr, "%% Failed to set brokers: %s\n", errstr);
                exit(1);
        }

    if (debug) {
        printf("Broker list from zookeeper cluster %s: %s\n", zookeeper, brokers);
    }

    /*
     * Consumer
     */

    /* Create Kafka handle */
    if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
                errstr, sizeof(errstr)))) {
        fprintf(stderr,
            "%% Failed to create new consumer: %s\n",
            errstr);
        exit(1);
    }

    /* Set logger */
    rd_kafka_set_logger(rk, logger);
    rd_kafka_set_log_level(rk, LOG_DEBUG);

    /* Create topic */
    rkt = rd_kafka_topic_new(rk, topic, topic_conf);

    /* Start consuming */
    if (rd_kafka_consume_start(rkt, partition, start_offset) == -1){
        fprintf(stderr, "%% Failed to start consuming: %s\n",
            rd_kafka_err2str(rd_kafka_errno2err(errno)));
        exit(1);
    }

    while (run) {
        rd_kafka_message_t *rkmessage;

        /* Consume single message.
         * See rdkafka_performance.c for high speed
         * consuming of messages. */
        rkmessage = rd_kafka_consume(rkt, partition, RD_POLL_INFINITE);
        if (!rkmessage) /* timeout */
            continue;

        notify_php(site_dir,php,bootstarp,topic,partition,rkmessage);

        /* Return message to rdkafka */
        rd_kafka_message_destroy(rkmessage);
    }

    /* Stop consuming */
    rd_kafka_consume_stop(rkt, partition);

    rd_kafka_topic_destroy(rkt);

    rd_kafka_destroy(rk);

    /* Let background threads clean up and terminate cleanly. */
    rd_kafka_wait_destroyed(2000);

    /** Free the zookeeper data. */
    zookeeper_close(zh);

    return 0;
}
View Code
相關文章
相關標籤/搜索