日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

storm mysql druid_Druid 集成

發布時間:2023/12/2 数据库 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 storm mysql druid_Druid 集成 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

title: Storm Druid 集成 layout: documentation documentation: true

Storm Druid Bolt 和 TridentState

該模塊提供了將數據寫入Druid 數據存儲的核心Strom和Trident bolt(螺栓)的實現。 該實現使用Druid's的Tranquility庫向druid發送消息。

一些實施細節從現有的借用 Tranquility Storm Bolt. 這個新的Bolt(螺栓)增加了支持最新的storm釋放,并保持在storm回購的bolt(螺栓)。

Core Bolt

下面的例子描述了使用 org.apache.storm.druid.bolt.DruidBeamBolt的核心bolt(螺栓)默認情況下,該bolt(螺栓)希望收到元組,其中"事件"字段提供您的事件類型。可以通過實現ITupleDruidEventMapper接口來更改此邏輯。

DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap());

DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();

ITupleDruidEventMapper> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);

DruidBeamBolt> druidBolt = new DruidBeamBolt>(druidBeamFactory, eventMapper, druidConfig);

topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");

topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());

Trident State

DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap());

ITupleDruidEventMapper> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);

final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));

stream.peek(new Consumer() {

@Override

public void accept(TridentTuple input) {

LOG.info("########### Received tuple: [{}]", input);

}

}).partitionPersist(new DruidBeamStateFactory>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());

樣品工廠實現

Druid bolt 必須配置一個 BeamFactory. 您可以使用它們其中一個來實現 [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method. See the Configuration documentation for details. For more details refer Tranquility library docs.

public class SampleDruidBeamFactoryImpl implements DruidBeamFactory> {

@Override

public Beam> makeBeam(Map, ?> conf, IMetricsContext metrics) {

final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.

final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path

final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.

final List dimensions = ImmutableList.of("publisher", "advertiser");

List aggregators = ImmutableList.of(

new CountAggregatorFactory(

"click"

)

);

// Tranquility needs to be able to extract timestamps from your object type (in this case, Map).

final Timestamper> timestamper = new Timestamper>()

{

@Override

public DateTime timestamp(Map theMap)

{

return new DateTime(theMap.get("timestamp"));

}

};

// Tranquility uses ZooKeeper (through Curator) for coordination.

final CuratorFramework curator = CuratorFrameworkFactory

.builder()

.connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf

.retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))

.build();

curator.start();

// The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,

// Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.

final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);

// Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is

// done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.

// In this case, we won't provide one, so we're just using Jackson.

final Beam> beam = DruidBeams

.builder(timestamper)

.curator(curator)

.discoveryPath(discoveryPath)

.location(DruidLocation.create(indexService, dataSource))

.timestampSpec(timestampSpec)

.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))

.tuning(

ClusteredBeamTuning

.builder()

.segmentGranularity(Granularity.HOUR)

.windowPeriod(new Period("PT10M"))

.partitions(1)

.replicants(1)

.build()

)

.druidBeamConfig(

DruidBeamConfig

.builder()

.indexRetryPeriod(new Period("PT10M"))

.build())

.buildBeam();

return beam;

}

}

Example code is available here.

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的storm mysql druid_Druid 集成的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。