聊聊storm的PartialKeyGrouping

本文主要研究一下storm的PartialKeyGrouping

實例

    @Testpublic void testPartialKeyGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {String spoutId = "wordGenerator";String counterId = "counter";String aggId = "aggregator";String intermediateRankerId = "intermediateRanker";String totalRankerId = "finalRanker";int TOP_N = 5;TopologyBuilder builder = new TopologyBuilder();builder.setSpout(spoutId, new TestWordSpout(), 5);//NOTE 通過partialKeyGrouping替代fieldsGrouping,實現較為均衡的負載到countBoltbuilder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word"));builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);submitRemote(builder);}
復制代碼
  • 值得注意的是在wordCount的bolt使用PartialKeyGrouping,同一個單詞不再固定發給相同的task,因此這里還需要RollingCountAggBolt按fieldsGrouping進行合并。

PartialKeyGrouping(1.2.2版)

storm-core-1.2.2-sources.jar!/org/apache/storm/grouping/PartialKeyGrouping.java

public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {private static final long serialVersionUID = -447379837314000353L;private List<Integer> targetTasks;private long[] targetTaskStats;private HashFunction h1 = Hashing.murmur3_128(13);private HashFunction h2 = Hashing.murmur3_128(17);private Fields fields = null;private Fields outFields = null;public PartialKeyGrouping() {//Empty}public PartialKeyGrouping(Fields fields) {this.fields = fields;}@Overridepublic void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {this.targetTasks = targetTasks;targetTaskStats = new long[this.targetTasks.size()];if (this.fields != null) {this.outFields = context.getComponentOutputFields(stream);}}@Overridepublic List<Integer> chooseTasks(int taskId, List<Object> values) {List<Integer> boltIds = new ArrayList<>(1);if (values.size() > 0) {byte[] raw;if (fields != null) {List<Object> selectedFields = outFields.select(fields, values);ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);for (Object o: selectedFields) {if (o instanceof List) {out.putInt(Arrays.deepHashCode(((List)o).toArray()));} else if (o instanceof Object[]) {out.putInt(Arrays.deepHashCode((Object[])o));} else if (o instanceof byte[]) {out.putInt(Arrays.hashCode((byte[]) o));} else if (o instanceof short[]) {out.putInt(Arrays.hashCode((short[]) o));} else if (o instanceof int[]) {out.putInt(Arrays.hashCode((int[]) o));} else if (o instanceof long[]) {out.putInt(Arrays.hashCode((long[]) o));} else if (o instanceof char[]) {out.putInt(Arrays.hashCode((char[]) o));} else if (o instanceof float[]) {out.putInt(Arrays.hashCode((float[]) o));} else if (o instanceof double[]) {out.putInt(Arrays.hashCode((double[]) o));} else if (o instanceof boolean[]) {out.putInt(Arrays.hashCode((boolean[]) o));} else if (o != null) {out.putInt(o.hashCode());} else {out.putInt(0);}}raw = out.array();} else {raw = values.get(0).toString().getBytes(); // assume key is the first field}int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % this.targetTasks.size());int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % this.targetTasks.size());int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice;boltIds.add(targetTasks.get(selected));targetTaskStats[selected]++;}return boltIds;}
}
復制代碼
  • 可以看到PartialKeyGrouping是一種CustomStreamGrouping,在prepare的時候,初始化了long[] targetTaskStats用于統計每個task
  • partialKeyGrouping如果沒有指定fields,則默認按outputFields的第一個field來計算
  • 這里使用guava類庫提供的Hashing.murmur3_128函數,構造了兩個HashFunction,然后計算哈希值的絕對值與targetTasks.size()取余數得到兩個可選的taskId下標
  • 然后根據targetTaskStats的統計值,取用過的次數小的那個taskId,選中之后更新targetTaskStats

PartialKeyGrouping(2.0.0版)

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

/*** A variation on FieldGrouping. This grouping operates on a partitioning of the incoming tuples (like a FieldGrouping), but it can send* Tuples from a given partition to multiple downstream tasks.** Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of a subset of those tasks. Each* key is assigned a subset of tasks. Each tuple is then sent to one task from that subset.** Notes: - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible - the default* AssignmentCreator hashes the key and produces an assignment of two tasks*/
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {private static final long serialVersionUID = -1672360572274911808L;private List<Integer> targetTasks;private Fields fields = null;private Fields outFields = null;private AssignmentCreator assignmentCreator;private TargetSelector targetSelector;public PartialKeyGrouping() {this(null);}public PartialKeyGrouping(Fields fields) {this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());}public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {this(fields, assignmentCreator, new BalancedTargetSelector());}public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {this.fields = fields;this.assignmentCreator = assignmentCreator;this.targetSelector = targetSelector;}@Overridepublic void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {this.targetTasks = targetTasks;if (this.fields != null) {this.outFields = context.getComponentOutputFields(stream);}}@Overridepublic List<Integer> chooseTasks(int taskId, List<Object> values) {List<Integer> boltIds = new ArrayList<>(1);if (values.size() > 0) {final byte[] rawKeyBytes = getKeyBytes(values);final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);boltIds.add(selectedTask);}return boltIds;}/*** Extract the key from the input Tuple.*/private byte[] getKeyBytes(List<Object> values) {byte[] raw;if (fields != null) {List<Object> selectedFields = outFields.select(fields, values);ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);for (Object o : selectedFields) {if (o instanceof List) {out.putInt(Arrays.deepHashCode(((List) o).toArray()));} else if (o instanceof Object[]) {out.putInt(Arrays.deepHashCode((Object[]) o));} else if (o instanceof byte[]) {out.putInt(Arrays.hashCode((byte[]) o));} else if (o instanceof short[]) {out.putInt(Arrays.hashCode((short[]) o));} else if (o instanceof int[]) {out.putInt(Arrays.hashCode((int[]) o));} else if (o instanceof long[]) {out.putInt(Arrays.hashCode((long[]) o));} else if (o instanceof char[]) {out.putInt(Arrays.hashCode((char[]) o));} else if (o instanceof float[]) {out.putInt(Arrays.hashCode((float[]) o));} else if (o instanceof double[]) {out.putInt(Arrays.hashCode((double[]) o));} else if (o instanceof boolean[]) {out.putInt(Arrays.hashCode((boolean[]) o));} else if (o != null) {out.putInt(o.hashCode());} else {out.putInt(0);}}raw = out.array();} else {raw = values.get(0).toString().getBytes(); // assume key is the first field}return raw;}//......
}
復制代碼
  • 2.0.0版本將邏輯封裝到了RandomTwoTaskAssignmentCreator以及BalancedTargetSelector中

RandomTwoTaskAssignmentCreator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

    /*** This interface is responsible for choosing a subset of the target tasks to use for a given key.** NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on multiple Storm Workers, thus* each of them needs to come up with the same assignment for a given key.*/public interface AssignmentCreator extends Serializable {int[] createAssignment(List<Integer> targetTasks, byte[] key);}/*========== Implementations ==========*//*** This implementation of AssignmentCreator chooses two arbitrary tasks.*/public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator {/*** Creates a two task assignment by selecting random tasks.*/public int[] createAssignment(List<Integer> tasks, byte[] key) {// It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the keyfinal long seedForRandom = Arrays.hashCode(key);final Random random = new Random(seedForRandom);final int choice1 = random.nextInt(tasks.size());int choice2 = random.nextInt(tasks.size());// ensure that choice1 and choice2 are not the same taskchoice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2;return new int[]{ tasks.get(choice1), tasks.get(choice2) };}}
復制代碼
  • 2.0.0版本不再使用guava類庫提供的Hashing.murmur3_128哈希函數,轉而使用key的哈希值作為seed,采用Random函數來計算兩個taskId的下標,這里返回兩個值供bolt做負載均衡選擇

BalancedTargetSelector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

    /*** This interface chooses one element from a task assignment to send a specific Tuple to.*/public interface TargetSelector extends Serializable {Integer chooseTask(int[] assignedTasks);}/*** A basic implementation of target selection. This strategy chooses the task within the assignment that has received the fewest Tuples* overall from this instance of the grouping.*/public static class BalancedTargetSelector implements TargetSelector {private Map<Integer, Long> targetTaskStats = Maps.newHashMap();/*** Chooses one of the incoming tasks and selects the one that has been selected the fewest times so far.*/public Integer chooseTask(int[] assignedTasks) {Integer taskIdWithMinLoad = null;Long minTaskLoad = Long.MAX_VALUE;for (Integer currentTaskId : assignedTasks) {final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L);if (currentTaskLoad < minTaskLoad) {minTaskLoad = currentTaskLoad;taskIdWithMinLoad = currentTaskId;}}targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);return taskIdWithMinLoad;}}
復制代碼
  • BalancedTargetSelector根據選中的taskId,然后根據targetTaskStats計算taskIdWithMinLoad返回

FieldsGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class FieldsGrouper implements CustomStreamGrouping {private Fields outFields;private List<List<Integer>> targetTasks;private Fields groupFields;private int numTasks;public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {this.outFields = outFields;this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));}@Overridepublic void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {this.targetTasks = new ArrayList<List<Integer>>();for (Integer targetTask : targetTasks) {this.targetTasks.add(Collections.singletonList(targetTask));}this.numTasks = targetTasks.size();}@Overridepublic List<Integer> chooseTasks(int taskId, List<Object> values) {int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);return targetTasks.get(targetTaskIndex);}}
復制代碼
  • 這里可以看到FieldsGrouper的chooseTasks方法使用TupleUtils.chooseTaskIndex來選擇taskId下標

TupleUtils.chooseTaskIndex

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java

    public static <T> int chooseTaskIndex(List<T> keys, int numTasks) {return Math.floorMod(listHashCode(keys), numTasks);}private static <T> int listHashCode(List<T> alist) {if (alist == null) {return 1;} else {return Arrays.deepHashCode(alist.toArray());}}
復制代碼
  • 這里先對keys進行listHashCode,然后與numTasks進行Math.floorMod運算,即向下取模
  • listHashCode調用了Arrays.deepHashCode(alist.toArray())進行哈希值計算

小結

  • storm的PartialKeyGrouping是解決fieldsGrouping造成的bolt節點skewed load的問題
  • fieldsGrouping采取的是對所選字段進行哈希然后與taskId數量向下取模來選擇taskId的下標
  • PartialKeyGrouping在1.2.2版本的實現是使用guava提供的Hashing.murmur3_128哈希函數計算哈希值,然后取絕對值與taskId數量取余數得到兩個可選的taskId下標;在2.0.0版本則使用key的哈希值作為seed,采用Random函數來計算兩個taskId的下標。注意這里返回兩個值供bolt做負載均衡選擇,這是與fieldsGrouping的差別。在得到兩個候選taskId之后,PartialKeyGrouping額外維護了taskId的使用數,每次選擇使用少的,與此同時也更新每次選擇的計數。
  • 值得注意的是在wordCount的bolt使用PartialKeyGrouping,同一個單詞不再固定發給相同的task,因此這里還需要RollingCountAggBolt按fieldsGrouping進行合并。

doc

  • Common Topology Patterns
  • The Power of Both Choices: Practical Load Balancing for Distributed Stream Processing Engines
  • Storm-源碼分析-Streaming Grouping (backtype.storm.daemon.executor)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/277075.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/277075.shtml
英文地址,請注明出處:http://en.pswp.cn/news/277075.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ArcGIS Server安裝的幾個問題

今天安裝 了ArcGIS Server &#xff0c;本來一直不愿意裝這么“重”的東西&#xff0c;不過新事物還是要看看。安裝沒有出很大的問題&#xff0c;基本上一次成功&#xff0c;比很久前&#xff08;大概要到03年了吧&#xff09;第一次裝ArcIMS要好多了&#xff0c;那次可是重裝…

大數據之MySql筆記-0916

2019獨角獸企業重金招聘Python工程師標準>>> 復習: 1.MySQL部署 拓展題: rm -rf $MYSQL_HOME/arch/* binlog日志 恢復 主從同步 rm -rf $MYSQL_HOME/data/* 數據 $MYSQL_HOME/scripts/mysql_install_db \ --usermysqladmin \ --basedir/usr/local/mysql \ --dat…

delphi調用c#寫的webservice中文出現亂碼的問題

解決方法&#xff1a;HTTPRIO1的屬性---HttpWebNode--UseUtf8InHeader設置為true 代碼片斷&#xff1a; function TLoginManager.Get_LoginManagerSoap: ILoginManagerSoap; const defSvcLoginManager; defPrtLoginManagerSoap; var RIO: THTTPRIO; begin Result: nil…

浮浮沉沉的上海

來到上海已經快有一年&#xff0c;抱著學習的心態來的&#xff0c;卻發現忘掉了更多&#xff0c;最近一直在回顧&#xff0c;總想把冷卻的技能重新拾回來&#xff0c;卻也發現不簡單。剛剛從餐北斗辭職&#xff0c;也正如朋友所說&#xff0c;趁早走&#xff0c;再遲一點&#…

GridView的操作大全

一、GridView和DataGrid的異同 GridView 是 DataGrid的后繼控件&#xff0c;在.net framework 2 中&#xff0c;雖然還存在DataGrid&#xff0c;但是GridView已經走上了歷史的前臺&#xff0c;取代DataGrid的趨勢已是勢不可擋。GridView和DataGrid功能相似&#xff0c;都是在we…

Sql Server設置用戶只能查看并訪問特定數據庫

1.新建登錄用戶 以管理員身份登陸數據庫&#xff08;權限最高的身份如sa&#xff09;&#xff0c;點擊安全性->登錄名&#xff0c;右鍵新建登錄名&#xff0c;輸入登錄名和密碼&#xff0c;取消強制實施密碼策略。 2.將服務器角色設置為public 注意&#xff1a;很重要的一…

arcgis jsapi接口入門系列(6):樣式

2019獨角獸企業重金招聘Python工程師標準>>> symbol: function () {//線樣式//樣式詳情請看官方文檔let style {//線顏色&#xff0c;支持多種格式&#xff1a;//CSS color string&#xff1a;例如"dodgerblue";//HEX&#xff1a;例如"#33cc33"…

ORM(四)應用.腳本管理工具

ORM(四)應用.腳本管理工具數據腳本的維護,不知道各位有什么好的工具推薦沒有,由于以前一直是用手工來進行腳本的維護操作,很麻煩,而且容易出錯.大多數時候,都在原來的基礎上進行直接修改.今天有點時間就完成了一個簡陋的工具,也是對ORM組件的應用.下面是程序的運行界面http://f…

OpenCV2:應用篇 三維重建

一.簡介 VTK(Visualization Toolkit):開源三維圖形庫 ITK(Insight Segmentation and Registration Toolkit):開源醫學圖像處理庫,包含醫學算法和支持醫學圖片格式DICOM QT:用戶圖形界面 轉載于:https://www.cnblogs.com/k5bg/p/11232131.html

手把手教你搭建Mac環境微信小程序的本地測試服務器

問題的提出 Mac環境方便快捷地搭建小程序的測試服務器 小程序對于網絡請求的URL的特殊要求 不能出現端口號不能用localhost必須用https主要步驟 用json-server搭建簡單的服務器&#xff0c;搭建出來的服務器地址為localhonst:3000安裝nginx進行反向代理&#xff0c;以便隱藏端口…

自定義vue-cli生成項目模板配置(1)

最近在讀《變量》&#xff0c;目前得到的認知之一&#xff1a;慢變量才是決定事物長期發展的因素。 打算自定義vue-cli的腳手架或者根據自己的需要設置項目模板的相關參數&#xff0c;很大程度與慢變量這個概念相關。 當然&#xff0c;我還有一個想法或者認知&#xff1a;我的技…

spring cloud 微服務調用--ribbon和feign調用

這里介紹ribbon和feign調用兩種通信服務調用方式&#xff0c;同時介紹如何引入第三方服務調用。案例包括了ribbon負載均衡和hystrix熔斷--服務降級的處理&#xff0c;以及feign聲明式服務調用。例子包括spring boot項目&#xff0c;用來作為第三方服務供ribbon消費和feign消費客…

35歲前把下面十件事做好

35歲是青春的后期&#xff0c;35歲以后是收獲的季節&#xff0c;如果你沒有資格說這句話&#xff0c;你將會憎恨自己。所以在35歲以前&#xff0c;在爛漫蓬勃的青春年華里&#xff0c;你最好把下面十件事做好&#xff1a; 第一&#xff0c;學會本行業所需要的一切知識并有所發展…

Java筆記:包裝類、toString()方法、單例類、比較(==和equals方法)

1.包裝類 1&#xff09;包裝類為基本數據類型提供了相應的引用數據類型。(基本數據類型-包裝類)btye-Byte&#xff0c;char-Character,short-Short,float-Floatint-Integer,double-Double,long-Long,boolean-Boolean 2&#xff09;通過包裝類的構造器來實現吧基本數據類型包裝成…

log4j.xml引用Javaweb項目中配置文件的參數

2019獨角獸企業重金招聘Python工程師標準>>> 由于最近用阿里云日志服務整合log4j&#xff0c;在配置com.aliyun.openservices.log.log4j.LoghubAppender需要設置一些參數&#xff0c;因為項目中有統一的配置文件&#xff0c;所以想要可以直接在log4j.xml中通過${}來…

……

快瘋了快瘋了…… 我無非是想找人說話。已經幾天沒有講話了。 轉載于:https://www.cnblogs.com/belial/archive/2007/04/13/711429.html

時間換算單位

時鐘周期是一個時間的量&#xff0c;人們規定10納秒&#xff08;ns&#xff09;為一個時鐘周期。時鐘周期表示了SDRAM所能運行的最高頻率。更小的時鐘周期就意味著更高的工作頻率。對于PC100規格的內存來說&#xff0c;它的運行時鐘周期應該不高于10納秒。納秒與工作頻率之間的…

砂 即懶且忙 只有隨筆

B同學說&#xff1a;砂&#xff0c;你已經好久沒更新你的博了。是啊&#xff0c;我即懶且忙。上周六爬了青云山&#xff0c;公司組織的。一直懶得處理照片。拍了將近300張的照片&#xff0c;可到現在也就才弄了那么幾張。我自戀。照片是自拍的。娘說&#xff1a;天吶&#xff0…

.Net Core創建Docker鏡像

1、.Net Core項目【Lails.Server.Demo】發布到目錄下Lails.Server.Demo\bin\Release\netcoreapp2.1\publish2、上面目錄下新建文件Dockerfile&#xff1a;# 父鏡像FROM microsoft/dotnet:2.1-aspnetcore-runtime AS base# 設置工作目錄WORKDIR /app# 復制發布文件到/app下COPY …

[原]變參函數原理詳解

/*變參函數原理說明:va_start就是求得第一個可變參的地址.下面幾個宏的作用:保是來確定可變參數的列表中每個參數的地址.實現變參函數的要點,就是想辦法取得每個參數的地址.*/#include <stdio.h> #include <stdarg.h>#if 0#define va_list void*#define va_arg(arg…