91精品新拍在线观看,91精品一区二区,91精品一区二区三区久久久久,91精品在线播放,91精品中综合久久久久婷婷

<pre id="fhfjj"><ruby id="fhfjj"><var id="fhfjj"></var></ruby></pre>
    <p id="fhfjj"><del id="fhfjj"></del></p>

    <p id="fhfjj"><mark id="fhfjj"><progress id="fhfjj"></progress></mark></p>

      當前位置: 首頁 / 技術分享 / 正文
      數據采集工具之Flume 的攔截器

      2023-03-15

      攔截器    interceptor 事件

        在Flume運行過程中 ,Flume有能力在運行階段修改/刪除Event,這是通過攔截器(Interceptors)來實現的。攔截器有下面幾個特點:

        攔截器需要實現org.apache.flume.interceptor.Interceptor接口。

        攔截器可以修改或刪除事件基于開發者在選擇器中選擇的任何條件。

        攔截器采用了責任鏈模式,多個攔截器可以按指定順序攔截。

        一個攔截器返回的事件列表被傳遞給鏈中的下一個攔截器。

        如果一個攔截器需要刪除事件,它只需要在返回的事件集中不包含要刪除的事件即可。

        一、系統內置攔截器

        Timestamp Interceptor :時間戳攔截器,將當前時間戳(毫秒)加入到events header中,key名字為:timestamp,值為當前時間戳。用的不是很多

        Host Interceptor:主機名攔截器。將運行Flume agent的主機名或者IP地址加入到events header中,key名字為:host(也可自定義)

        Static Interceptor:靜態攔截器,用于在events header中加入一組靜態的key和value。

        二、內置攔截器的使用

        2.1. Timestamp+HTTP+File+HDFS

        通過時間攔截器,數據源為HTTP,傳送的通道模式是FileChannel,最后輸出的目的地為HDFS

        采集方案

        a1.sources = r1

        a1.channels = c1

        a1.sinks = s1

        a1.sources.r1.type=http

        a1.sources.r1.bind = qianfeng01

        a1.sources.r1.port = 6666

        a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler

        a1.sources.r1.handler.nickname = JSON props

        a1.sources.r1.interceptors=i1 i2 i3

        a1.sources.r1.interceptors.i1.type=timestamp

        #如果攔截器中已經有了時間戳,直接替換成現在的

        a1.sources.r1.interceptors.i1.preserveExisting=false

        a1.sources.r1.interceptors.i2.type=host

        a1.sources.r1.interceptors.i2.preserveExisting=false

        a1.sources.r1.interceptors.i2.useIP=true

        a1.sources.r1.interceptors.i2.hostHeader=hostname

        a1.sources.r1.interceptors.i3.type=static

        a1.sources.r1.interceptors.i3.preserveExisting=false

        a1.sources.r1.interceptors.i3.key=hn

        a1.sources.r1.interceptors.i3.value=qianfeng01

        a1.channels.c1.type=memory

        a1.channels.c1.capacity=1000

        a1.channels.c1.transactionCapacity=100

        a1.channels.c1.keep-alive=3

        a1.channels.c1.byteCapacityBufferPercentage=20

        a1.channels.c1.byteCapacity=800000

        a1.sinks.s1.type=hdfs

        a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/%H%M

        a1.sinks.s1.hdfs.filePrefix=%{hostname}

        a1.sinks.s1.hdfs.fileSuffix=.log

        a1.sinks.s1.hdfs.inUseSuffix=.tmp

        a1.sinks.s1.hdfs.rollInterval=60

        a1.sinks.s1.hdfs.rollSize=1024

        a1.sinks.s1.hdfs.rollCount=10

        a1.sinks.s1.hdfs.idleTimeout=0

        a1.sinks.s1.hdfs.batchSize=100

        a1.sinks.s1.hdfs.fileType=DataStream

        a1.sinks.s1.hdfs.writeFormat=Text

        a1.sinks.s1.hdfs.round=true

        a1.sinks.s1.hdfs.roundValue=1

        a1.sinks.s1.hdfs.roundUnit=second

        a1.sinks.s1.hdfs.useLocalTimeStamp=true

        a1.sources.r1.channels=c1

        a1.sinks.s1.channel=c1

        啟動 Agent

        [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./ts.conf -n a1 -Dflume.root.logger=INFO,console

        測試數據

        [root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"hn":"qianfeng01","pwd":"123456"},"body":"this is my content qianfeng01"}]' http://qianfeng01:6666

        三、自定義攔截器

        為了提高Flume的擴展性,用戶可以自己定義一個攔截器, 對每一組的item_type和active_time都過濾出相應的HOST和USERID

        處理數據樣例:

        log='{

        "host":"www.baidu.com",

        "user_id":"13755569427",

        "items":[

        {

        "item_type":"eat",

        "active_time":156234

        },

        {

        "item_type":"car",

        "active_time":156233

        }

        ]

        }'

        結果樣例:

        {"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"}

        3.1. pom.xml

      <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
          <groupId>org.apache.flume</groupId>
          <artifactId>flume-ng-core</artifactId>
          <version>1.9.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.48</version>
        </dependency>
      </dependencies>

        3.2. 代碼實現

        /**

        * @Author 千鋒大數據教學團隊

        * @Company 千鋒好程序員大數據

        * @Description 自定義攔截器:對每一組的item_type和active_time都過濾出相應的HOST和USERID

        */

        public class MyInterceptor implements Interceptor {

        @Override

        public void initialize() {

        //初始化方法,寫攔截器初始化時的業務

        }

        @Override

        public void close() {

        //關閉方法,寫攔截器關閉時的代碼

        }

        /**

        * 解析單條event

        * @param event

        * @return

        */

        @Override

        public Event intercept(Event event) {

        //輸入

        String inputeBody=null;

        //輸出

        byte[] outputBoday=null;

        //解析---這里定義對單條Event處理規則

        try {

        inputeBody=new String(event.getBody(), Charsets.UTF_8);

        ArrayListtemp = new ArrayList<>();

        JSONObject bodyObj = JSON.parseObject(inputeBody);

        //1)公共字段

        String host = bodyObj.getString("host");

        String user_id = bodyObj.getString("user_id");

        JSONArray data = bodyObj.getJSONArray("items");

        //2)Json數組=>every json obj

        for (Object item : data) {

        JSONObject itemObj = JSON.parseObject(item.toString());

        HashMap<string, object=""> fields = new HashMap<>();

        fields.put("host",host);

        fields.put("user_id",user_id);

        fields.put("item_type",itemObj.getString("item_type"));

        fields.put("active_time",itemObj.getLongValue("active_time"));

        temp.add(new JSONObject(fields).toJSONString());

        }

        //3)Json obj 拼接

        outputBoday=String.join("\n",temp).getBytes();

        }catch (Exception e){

        System.out.println("輸入數據:"+inputeBody);

        e.printStackTrace();

        }

        event.setBody(outputBoday);

        return event;

        }

        /**

        * 解析一批event

        * @param events

        * @return

        */

        @Override

        public Listintercept(Listevents) {

        //輸出---一批Event

        ArrayListresult = new ArrayList<>();

        //輸入---一批Event

        try{

        for (Event event : events) {

        //一條條解析

        Event interceptedEvent = intercept(event);

        byte[] interceptedEventBody = interceptedEvent.getBody();

        if(interceptedEventBody.length!=0){

        String multiEvent = new String(interceptedEventBody, Charsets.UTF_8);

        String[] multiEventArr = multiEvent.split("\n");

        for (String needEvent : multiEventArr) {

        SimpleEvent simpleEvent = new SimpleEvent();

        simpleEvent.setBody(needEvent.getBytes());

        result.add(simpleEvent);

        }

        }

        }

        }catch (Exception e){

        e.printStackTrace();

        }

        return result;

        }

        /**

        * 實現內部類接口

        */

        public static class Builder implements Interceptor.Builder{

        @Override

        public Interceptor build() {

        return new MyInterceptor();

        }

        @Override

        public void configure(Context context) {

        }

        }

        }

        3.3. 打包上傳

        使用maven將攔截器打包,然后把此包和依賴的fastjson一起上傳到Flume lib目錄下

        3.4. 采集方案制定

        a1.sources = s1

        a1.channels = c1

        a1.sinks = r1

        a1.sources.s1.type = TAILDIR

        #文件以JSON格式記錄inode、絕對路徑和每個跟蹤文件的最后位置

        a1.sources.s1.positionFile = /root/flume/taildir_position.json

        #以空格分隔的文件組列表。每個文件組表示要跟蹤的一組文件

        a1.sources.s1.filegroups = f1

        #文件組的絕對路徑

        a1.sources.s1.filegroups.f1=/root/flume/data/.*log

        #是否添加存儲絕對路徑文件名的標題

        a1.sources.s1.fileHeader = true

        #使用自定義攔截器

        a1.sources.s1.interceptors = i1

        a1.sources.s1.interceptors.i1.type = flume.MyInterceptor$Builder

        a1.channels.c1.type = file

        a1.channels.c1.dataDirs = /root/flume/filechannle/dataDirs

        a1.channels.c1.checkpointDir = /root/flume/filechannle/checkpointDir

        a1.channels.c1.capacity = 1000

        a1.channels.c1.transactionCapacity = 100

        a1.sinks.r1.type = hdfs

        a1.sinks.r1.hdfs.path = hdfs://qianfeng01:8020/flume/spooldir

        a1.sinks.r1.hdfs.filePrefix =

        a1.sinks.r1.hdfs.round = true

        a1.sinks.r1.hdfs.roundValue = 10

        a1.sinks.r1.hdfs.roundUnit = minute

        a1.sinks.r1.hdfs.fileSuffix= .log

        a1.sinks.r1.hdfs.rollInterval=60

        a1.sinks.r1.hdfs.fileType=DataStream

        a1.sinks.r1.hdfs.writeFormat=Text

        a1.sources.s1.channels = c1

        a1.sinks.r1.channel = c1

        3.5. 啟動 Agent

        [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf/ -f ./mytest.conf -n a1 -Dflume.root.logger=INFO,console

        3.6. 測試數據

        [root@qianfeng01 ~]# vi my.sh

        #!/bin/bash

        log='{

        "host":"www.baidu.com",

        "user_id":"13755569427",

        "items":[

        {

        "item_type":"eat",

        "active_time":156234

        },

        {

        "item_type":"car",

        "active_time":156233

        }

        ]

        }'

        echo $log>> /root/flume/data/test.log

        [root@qianfeng01 ~]# bash my.sh

        執行后我們希望得到是數據格式:

        {"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"}

        {"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}


      好程序員公眾號

      • · 剖析行業發展趨勢
      • · 匯聚企業項目源碼

      好程序員開班動態

      More+
      • HTML5大前端 <高端班>

        開班時間:2021-04-12(深圳)

        開班盛況

        開班時間:2021-05-17(北京)

        開班盛況
      • 大數據+人工智能 <高端班>

        開班時間:2021-03-22(杭州)

        開班盛況

        開班時間:2021-04-26(北京)

        開班盛況
      • JavaEE分布式開發 <高端班>

        開班時間:2021-05-10(北京)

        開班盛況

        開班時間:2021-02-22(北京)

        開班盛況
      • Python人工智能+數據分析 <高端班>

        開班時間:2021-07-12(北京)

        預約報名

        開班時間:2020-09-21(上海)

        開班盛況
      • 云計算開發 <高端班>

        開班時間:2021-07-12(北京)

        預約報名

        開班時間:2019-07-22(北京)

        開班盛況
      IT培訓IT培訓
      在線咨詢
      IT培訓IT培訓
      試聽
      IT培訓IT培訓
      入學教程
      IT培訓IT培訓
      立即報名
      IT培訓

      Copyright 2011-2023 北京千鋒互聯科技有限公司 .All Right 京ICP備12003911號-5 京公網安備 11010802035720號

      91精品新拍在线观看,91精品一区二区,91精品一区二区三区久久久久,91精品在线播放,91精品中综合久久久久婷婷