91精品新拍在线观看,91精品一区二区,91精品一区二区三区久久久久,91精品在线播放,91精品中综合久久久久婷婷

<pre id="fhfjj"><ruby id="fhfjj"><var id="fhfjj"></var></ruby></pre>
    <p id="fhfjj"><del id="fhfjj"></del></p>

    <p id="fhfjj"><mark id="fhfjj"><progress id="fhfjj"></progress></mark></p>

      當前位置: 首頁 / 技術分享 / 正文
      數據采集工具之Flume的選擇器

      2023-03-16

      channel 事件 寫入   

      數據采集工具之Flume的選擇器

        說明

        Flume中的Channel選擇器作用于source階段 ,是決定Source接受的特定事件寫入到哪個Channel的組件,他們告訴Channel處理器,然后由其將事件寫入到Channel。

        Agent中各個組件的交互

        由于Flume不是兩階段提交,事件被寫入到一個Channel,然后事件在寫入下一個Channel之前提交,如果寫入一個Channel出現異常,那么之前已經寫入到其他Channel的相同事件不能被回滾。當這樣的異常發生時,Channel處理器拋出ChannelException異常,事務失敗,如果Source試圖再次寫入相同的事件(大多數情況下,會再次寫入,只有Syslog,Exec等Source不能重試,因為沒有辦法生成相同的數據),重復的事件將寫入到Channel中,而先前的提交是成功的,這樣在Flume中就發生了重復。

        Channel選擇器的配置是通過Channel處理器完成的,Channel選擇器可以指定一組Channel是必須的,另一組的可選的。

        Flume分類兩種選擇器,如果Source配置中沒有指定選擇器,那么會自動使用復制Channel選擇器.

        ●replicating:該選擇器復制每個事件到通過Source的Channels參數指定的所有Channel中。

        ●multiplexing:是一種專門用于動態路由事件的Channel選擇器,通過選擇事件應該寫入到哪個Channel,基于一個特定的事件頭的值進行路由

        案例演示:replicating selector

        配置方案

        [root@qianfeng01 flumeconf]# vi rep.conf

        a1.sources = r1

        a1.channels = c1 c2

        a1.sinks = s1 s2

        a1.sources.r1.type=syslogtcp

        a1.sources.r1.host = qianfeng01

        a1.sources.r1.port = 6666

        a1.sources.r1.selector.type=replicating

        a1.channels.c1.type=memory

        a1.channels.c1.capacity=1000

        a1.channels.c1.transactionCapacity=100

        a1.channels.c1.keep-alive=3

        a1.channels.c1.byteCapacityBufferPercentage=20

        a1.channels.c1.byteCapacity=800000

        a1.channels.c2.type=memory

        a1.channels.c2.capacity=1000

        a1.channels.c2.transactionCapacity=100

        a1.sinks.s1.type=hdfs

        a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/rep

        a1.sinks.s1.hdfs.filePrefix=s1sink

        a1.sinks.s1.hdfs.fileSuffix=.log

        a1.sinks.s1.hdfs.inUseSuffix=.tmp

        a1.sinks.s1.hdfs.rollInterval=60

        a1.sinks.s1.hdfs.rollSize=1024

        a1.sinks.s1.hdfs.rollCount=10

        a1.sinks.s1.hdfs.idleTimeout=0

        a1.sinks.s1.hdfs.batchSize=100

        a1.sinks.s1.hdfs.fileType=DataStream

        a1.sinks.s1.hdfs.writeFormat=Text

        a1.sinks.s1.hdfs.round=true

        a1.sinks.s1.hdfs.roundValue=1

        a1.sinks.s1.hdfs.roundUnit=second

        a1.sinks.s1.hdfs.useLocalTimeStamp=true

        a1.sinks.s2.type=hdfs

        a1.sinks.s2.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/rep

        a1.sinks.s2.hdfs.filePrefix=s2sink

        a1.sinks.s2.hdfs.fileSuffix=.log

        a1.sinks.s2.hdfs.inUseSuffix=.tmp

        a1.sinks.s2.hdfs.rollInterval=60

        a1.sinks.s2.hdfs.rollSize=1024

        a1.sinks.s2.hdfs.rollCount=10

        a1.sinks.s2.hdfs.idleTimeout=0

        a1.sinks.s2.hdfs.batchSize=100

        a1.sinks.s2.hdfs.fileType=DataStream

        a1.sinks.s2.hdfs.writeFormat=Text

        a1.sinks.s2.hdfs.round=true

        a1.sinks.s2.hdfs.roundValue=1

        a1.sinks.s2.hdfs.roundUnit=second

        a1.sinks.s2.hdfs.useLocalTimeStamp=true

        a1.sources.r1.channels=c1 c2

        a1.sinks.s1.channel=c1

        a1.sinks.s2.channel=c2

        啟動agent的服務

        [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./rep.conf -n a1 -Dflume.root.logger=INFO,console

        測試

        [root@qianfeng01 ~]# echo "hello world hello qianfeng" | nc qianfeng01 6666

        案例演示:Multiplexing selector

        配置方案

        [root@qianfeng01 flumeconf]# vi mul.conf

        a1.sources = r1

        a1.channels = c1 c2

        a1.sinks = s1 s2

        a1.sources.r1.type=http

        a1.sources.r1.bind = qianfeng01

        a1.sources.r1.port = 6666

        a1.sources.r1.selector.type=multiplexing

        a1.sources.r1.selector.header = state

        a1.sources.r1.selector.mapping.USER = c1

        a1.sources.r1.selector.mapping.ORDER = c2

        a1.sources.r1.selector.default = c1

        a1.channels.c1.type=memory

        a1.channels.c1.capacity=1000

        a1.channels.c1.transactionCapacity=100

        a1.channels.c1.keep-alive=3

        a1.channels.c1.byteCapacityBufferPercentage=20

        a1.channels.c1.byteCapacity=800000

        a1.channels.c2.type=memory

        a1.channels.c2.capacity=1000

        a1.channels.c2.transactionCapacity=100

        a1.sinks.s1.type=hdfs

        a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/mul

        a1.sinks.s1.hdfs.filePrefix=s1sink

        a1.sinks.s1.hdfs.fileSuffix=.log

        a1.sinks.s1.hdfs.inUseSuffix=.tmp

        a1.sinks.s1.hdfs.rollInterval=60

        a1.sinks.s1.hdfs.rollSize=1024

        a1.sinks.s1.hdfs.rollCount=10

        a1.sinks.s1.hdfs.idleTimeout=0

        a1.sinks.s1.hdfs.batchSize=100

        a1.sinks.s1.hdfs.fileType=DataStream

        a1.sinks.s1.hdfs.writeFormat=Text

        a1.sinks.s1.hdfs.round=true

        a1.sinks.s1.hdfs.roundValue=1

        a1.sinks.s1.hdfs.roundUnit=second

        a1.sinks.s1.hdfs.useLocalTimeStamp=true

        a1.sinks.s2.type=hdfs

        a1.sinks.s2.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/mul

        a1.sinks.s2.hdfs.filePrefix=s2sink

        a1.sinks.s2.hdfs.fileSuffix=.log

        a1.sinks.s2.hdfs.inUseSuffix=.tmp

        a1.sinks.s2.hdfs.rollInterval=60

        a1.sinks.s2.hdfs.rollSize=1024

        a1.sinks.s2.hdfs.rollCount=10

        a1.sinks.s2.hdfs.idleTimeout=0

        a1.sinks.s2.hdfs.batchSize=100

        a1.sinks.s2.hdfs.fileType=DataStream

        a1.sinks.s2.hdfs.writeFormat=Text

        a1.sinks.s2.hdfs.round=true

        a1.sinks.s2.hdfs.roundValue=1

        a1.sinks.s2.hdfs.roundUnit=second

        a1.sinks.s2.hdfs.useLocalTimeStamp=true

        a1.sources.r1.channels=c1 c2

        a1.sinks.s1.channel=c1

        a1.sinks.s2.channel=c2

        啟動Agent的服務

        [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./mul.conf -n a1 -Dflume.root.logger=INFO,console

        測試

        [root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://qianfeng01:6666

        [root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my content"}]' http://qianfeng01:6666


      好程序員公眾號

      • · 剖析行業發展趨勢
      • · 匯聚企業項目源碼

      好程序員開班動態

      More+
      • HTML5大前端 <高端班>

        開班時間:2021-04-12(深圳)

        開班盛況

        開班時間:2021-05-17(北京)

        開班盛況
      • 大數據+人工智能 <高端班>

        開班時間:2021-03-22(杭州)

        開班盛況

        開班時間:2021-04-26(北京)

        開班盛況
      • JavaEE分布式開發 <高端班>

        開班時間:2021-05-10(北京)

        開班盛況

        開班時間:2021-02-22(北京)

        開班盛況
      • Python人工智能+數據分析 <高端班>

        開班時間:2021-07-12(北京)

        預約報名

        開班時間:2020-09-21(上海)

        開班盛況
      • 云計算開發 <高端班>

        開班時間:2021-07-12(北京)

        預約報名

        開班時間:2019-07-22(北京)

        開班盛況
      IT培訓IT培訓
      在線咨詢
      IT培訓IT培訓
      試聽
      IT培訓IT培訓
      入學教程
      IT培訓IT培訓
      立即報名
      IT培訓

      Copyright 2011-2023 北京千鋒互聯科技有限公司 .All Right 京ICP備12003911號-5 京公網安備 11010802035720號

      91精品新拍在线观看,91精品一区二区,91精品一区二区三区久久久久,91精品在线播放,91精品中综合久久久久婷婷