這篇文章介紹瞭如何利用Apache Flink的內置指標系統以及如何使用Prometheus來高效地監控流式應用程序。apache
隨着深刻地瞭解Prometheus,你會發現一些很是好的功能:api
Flink官方已經提供了對接Prometheus的jar包,很方便就能夠集成。因爲本系列文章重點在Flink on Kubernetes, 所以咱們全部的操做都是基於這點展開。app
對k8s不熟悉的同窗,能夠查閱k8s相關文檔。因爲部署不是本博客的重點,因此咱們直接貼出yaml文件:運維
--- apiVersion: v1 kind: ServiceAccount metadata: name: monitor namespace: kube-system labels: kubernetes.io/cluster-service: "true" addonmanager.kubernetes.io/mode: Reconcile --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: monitor labels: kubernetes.io/cluster-service: "true" addonmanager.kubernetes.io/mode: Reconcile rules: - apiGroups: - "" resources: - pods verbs: - get - list - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: monitor labels: kubernetes.io/cluster-service: "true" addonmanager.kubernetes.io/mode: Reconcile roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: monitor subjects: - kind: ServiceAccount name: monitor namespace: kube-system --- apiVersion: v1 kind: ConfigMap metadata: labels: app: monitor name: monitor namespace: kube-system data: prometheus.yml: |- global: scrape_interval: 10s evaluation_interval: 10s scrape_configs: - job_name: kubernetes-pods kubernetes_sd_configs: - role: pod relabel_configs: - action: keep regex: true source_labels: - __meta_kubernetes_pod_annotation_prometheus_io_scrape - action: replace regex: (.+) source_labels: - __meta_kubernetes_pod_annotation_prometheus_io_path target_label: __metrics_path__ - action: replace regex: ([^:]+)(?::\d+)?;(\d+) replacement: $1:$2 source_labels: - __address__ - __meta_kubernetes_pod_annotation_prometheus_io_port target_label: __address__ - action: labelmap regex: __meta_kubernetes_pod_label_(.+) - action: replace source_labels: - __meta_kubernetes_namespace target_label: kubernetes_namespace - action: replace source_labels: - __meta_kubernetes_pod_name target_label: kubernetes_pod_name --- apiVersion: apps/v1 kind: StatefulSet metadata: labels: app: monitor name: monitor namespace: kube-system spec: serviceName: monitor selector: matchLabels: app: monitor replicas: 1 template: metadata: labels: app: monitor spec: containers: - args: - --config.file=/etc/prometheus/prometheus.yml - --storage.tsdb.path=/data/prometheus - --storage.tsdb.retention.time=10d image: prom/prometheus:v2.19.0 imagePullPolicy: IfNotPresent name: prometheus ports: - containerPort: 9090 protocol: TCP readinessProbe: httpGet: path: /-/ready port: 9090 initialDelaySeconds: 30 timeoutSeconds: 30 livenessProbe: httpGet: path: /-/healthy port: 9090 initialDelaySeconds: 30 timeoutSeconds: 30 resources: limits: cpu: 1000m memory: 2018Mi requests: cpu: 1000m memory: 2018Mi volumeMounts: - mountPath: /etc/prometheus name: config-volume - mountPath: /data name: monitor-persistent-storage restartPolicy: Always priorityClassName: system-cluster-critical serviceAccountName: monitor initContainers: - name: "init-chown-data" image: "busybox:latest" imagePullPolicy: "IfNotPresent" command: ["chown", "-R", "65534:65534", "/data"] volumeMounts: - name: monitor-persistent-storage mountPath: /data subPath: "" volumes: - configMap: defaultMode: 420 name: monitor name: config-volume volumeClaimTemplates: - metadata: name: monitor-persistent-storage namespace: kube-system spec: accessModes: - ReadWriteOnce resources: requests: storage: 20Gi storageClassName: gp2 --- apiVersion: v1 kind: Service metadata: annotations: service.beta.kubernetes.io/aws-load-balancer-type: nlb labels: app: monitor name: monitor namespace: kube-system spec: ports: - name: http port: 9090 protocol: TCP targetPort: 9090 selector: app: monitor type: LoadBalancer
這裏咱們簡單說下,因爲咱們想利用Prometheus的Kubernetes的服務發現的方式,因此須要RBAC受權,受權prometheus 實例對集羣中的pod有一些讀取權限。工具
爲何咱們要使用自動發現的方式那?lua
相比配置文件的方式,自動發現更加靈活。尤爲是當你使用的是flink on native kubernetes,整個job manager 和task manager 是根據做業的提交自動建立的,這種動態性,顯然是配置文件沒法知足的。spa
因爲咱們的集羣在eks上,因此你們在使用其餘雲的時候,須要略作調整。插件
這裏咱們基本上使用上一篇文章介紹的demo上,增長監控相關,因此Dockerfile以下:3d
FROM flink COPY /plugins/metrics-prometheus/flink-metrics-prometheus-1.11.0.jar /opt/flink/lib RUN mkdir -p $FLINK_HOME/usrlib COPY ./examples/streaming/WordCount.jar $FLINK_HOME/usrlib/my-flink-job.jar
Flink 的 Classpath 位於/opt/flink/lib,因此插件的jar包須要放到該目錄下
因爲咱們的Pod必須增長必定的標識,從而讓Prometheus實例能夠發現。因此提交命令稍做更改,以下:rest
./bin/flink run-application -p 8 -t kubernetes-application \ -Dkubernetes.cluster-id=my-first-cluster \ -Dtaskmanager.memory.process.size=2048m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dkubernetes.container.image=iyacontrol/flink-world-count:v0.0.2 \ -Dkubernetes.container.image.pull-policy=Always \ -Dkubernetes.namespace=stream \ -Dkubernetes.jobmanager.service-account=flink \ -Dkubernetes.rest-service.exposed.type=LoadBalancer \ -Dkubernetes.rest-service.annotations=service.beta.kubernetes.io/aws-load-balancer-type:nlb,service.beta.kubernetes.io/aws-load-balancer-internal:true \ -Dkubernetes.jobmanager.annotations=prometheus.io/scrape:true,prometheus.io/port:9249 \ -Dkubernetes.taskmanager.annotations=prometheus.io/scrape:true,prometheus.io/port:9249 \ -Dmetrics.reporters=prom \ -Dmetrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter \ local:///opt/flink/usrlib/my-flink-job.jar
關於prometheus reporter:
參數:
port
- 可選, Prometheus導出器監聽的端口,默認爲9249。爲了可以在一臺主機上運行報告程序的多個實例(例如,當一個TaskManager與JobManager並置時),建議使用這樣的端口範圍 9250-9260。filterLabelValueCharacters
- 可選, 指定是否過濾標籤值字符。若是啓用,則將刪除全部不匹配[a-zA-Z0-9:_]的字符,不然將不刪除任何字符。禁用此選項以前,請確保您的標籤值符合Prometheus要求。提交任務後,咱們看下實際效果。
首先查看Prometheus 是否發現了咱們的Pod。
而後查看具體的metrics,是否被準確抓取。
指標已經收集,後續你們就能夠選擇grafana繪圖了。或是增長相應的報警規則。例如:
固然除了Prometheus主動發現Pod,而後按期抓取metrcis的方式,flink 也支持向PushGateway 主動push metrcis。
Flink 經過 Reporter
來向外部系統提供metrcis。經過在conf/flink-conf.yaml
中配置一個或多個Reporter ,能夠將metrcis公開給外部系統。這些Reporter在啓動時將在每一個做業和任務管理器上實例化。
全部Reporter都必須至少具備class或factory.class屬性。能夠/應該使用哪一個屬性取決於Reporter的實現。有關更多信息,請參見各個Reporter 配置部分。一些Reporter容許指定報告間隔。
指定多個Reporter 的示例配置:
metrics.reporters: my_jmx_reporter,my_other_reporter metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory metrics.reporter.my_jmx_reporter.port: 9020-9040 metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter metrics.reporter.my_other_reporter.host: 192.168.1.1 metrics.reporter.my_other_reporter.port: 10000
啓動Flink時,必須能夠訪問包含reporter的jar。支持factory.class屬性的reporter能夠做爲插件加載。不然,必須將jar放在/lib文件夾中。你能夠經過實現org.apache.flink.metrics.reporter.MetricReporter接口來編寫本身的Reporter。若是 reporter按期發送報告,則還必須實現Scheduled接口。經過額外實現MetricReporterFactory,你的reporter也能夠做爲插件加載。