hadoop streaming編程小demo(python版)

大數據團隊搞數據質量評測。自動化質檢和監控平臺是用django,MR也是通過python實現的。(后來發現有orc壓縮問題,python不知道怎么解決,正在改成java版本)

這里展示一個python編寫MR的例子吧。

抄一句話:Hadoop Streaming是Hadoop提供的一個編程工具,它允許用戶使用任何可執行文件或者腳本文件作為Mapper和Reducer。

?

1、首先,先介紹一下背景,我們的數據是存放在hive里的。hive建表語句如下:

我們將會解析元數據,和HDFS上的數據進行merge,方便處理。這里的partition_key用的是year/month/day。

hive (gulfstream_ods)> desc g_order;
OK
col_name        data_type       comment
order_id                bigint                  訂單id                
driver_id               bigint                  司機id,司機搶單前該值為0      
driver_phone            string                  司機電話                
passenger_id            bigint                  乘客id                
passenger_phone         string                  乘客電話                
car_id                  int                     接駕車輛id              
area                    int                     城市id                
district                string                  城市區號                
type                    int                     訂單時效,0 實時  1預約      
current_lng             decimal(19,6)           乘客發單時的經度            
current_lat             decimal(19,6)           乘客發單時的緯度            
starting_name           string                  起點名稱                
starting_lng            decimal(19,6)           起點經度                
starting_lat            decimal(19,6)           起點緯度                
dest_name               string                  終點名稱                
dest_lng                decimal(19,6)           終點經度                
dest_lat                decimal(19,6)           終點緯度                
driver_start_distance   int                     司機與出發地的路面距離,單位:米    
start_dest_distance     int                     出發地與終點的路面距離,單位:米    
departure_time          string                  出發時間(預約單的預約時間,實時單為發單時間)
strive_time             string                  搶單成功時間              
consult_time            string                  協商時間                
arrive_time             string                  司機點擊‘我已到達’的時間       
setoncar_time           string                  上車時間(暫時不用)          
begin_charge_time       string                  司機點機‘開始計費’的時間       
finish_time             string                  完成時間                
year                    string                                      
month                   string                                      
day                     string                                      # Partition Information          
# col_name              data_type               comment             year                    string                                      
month                   string                                      
day                     string              

?

2、我們解析元數據

這里是解析元數據的過程。之后我們把元數據序列化后存入文件desc.gulfstream_ods.g_order,我們將會將此配置文件連同MR腳本一起上傳到hadoop集群。

import subprocess
from subprocess import Popendef desc_table(db, table):process = Popen('hive -e "desc %s.%s"' % (db, table),shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)stdout, stderr = process.communicate()is_column = Truestructure_list = list()column_list = list()for line in stdout.split('\n'):value_list = list()if not line or len(line.split()) < 2:breakif is_column:column_list = line.split()is_column = Falsecontinueelse:value_list = line.split()structure_dict = dict(zip(column_list, value_list))structure_list.append(structure_dict)return structure_list

?

3、下面是hadoop streaming執行腳本。

#!/bin/bash
source /etc/profile
source ~/.bash_profile

#hadoop目錄
echo "HADOOP_HOME: "$HADOOP_HOME
HADOOP="$HADOOP_HOME/bin/hadoop"

DB=$1
TABLE=$2
YEAR=$3
MONTH=$4
DAY=$5
echo $DB--$TABLE--$YEAR--$MONTH--$DAY

if [ "$DB" = "gulfstream_ods" ]
then
DB_NAME="gulfstream"
else
DB_NAME=$DB
fi
TABLE_NAME=$TABLE

#輸入路徑
input_path="/user/xiaoju/data/bi/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY/*"
#標記文件后綴名
input_mark="_SUCCESS"
echo $input_path
#輸出路徑
output_path="/user/bigdata-t/QA/yangfan/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY"
output_mark="_SUCCESS"
echo $output_path
#性能約束參數
capacity_mapper=500
capacity_reducer=200
map_num=10
reducer_num=10
queue_name="root.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop"
#啟動job name
job_name="DW_Monitor_${DB_NAME}_${TABLE_NAME}_${YEAR}${MONTH}${DAY}"
mapper="python mapper.py $DB $TABLE_NAME"
reducer="python reducer.py"

$HADOOP fs -rmr $output_path
$HADOOP jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
-jobconf mapred.job.name="$job_name" \
-jobconf mapred.job.queue.name=$queue_name \
-jobconf mapred.map.tasks=$map_num \
-jobconf mapred.reduce.tasks=$reducer_num \
-jobconf mapred.map.capacity=$capacity_mapper \
-jobconf mapred.reduce.capacity=$capacity_reducer \
-input $input_path \
-output $output_path \
-file ./mapper.py \
-file ./reducer.py \
-file ./utils.py \
-file ./"desc.${DB}.${TABLE_NAME}" \
-mapper "$mapper" \
-reducer "$reducer"
if [ $? -ne 0 ]; then
echo "$DB_NAME $TABLE_NAME $YEAR $MONTH $DAY run faild"
fi
$HADOOP fs -touchz "${output_path}/$output_mark"
rm -rf ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}
$HADOOP fs -get $output_path/part-00000 ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}

?

?4、這里是Wordcount的進階版本,第一個功能是分區域統計訂單量,第二個功能是在一天中分時段統計訂單量。

mapper腳本

# -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding('utf-8')# 將字段和元數據匹配, 返回迭代器
def read_from_input(file, separator, columns):for line in file:if line is None or line == '':continuedata_list = mapper_input(line, separator)if not data_list:continueitem = None# 最后3列, 年月日作為partitionkey, 無用if len(data_list) == len(columns) - 3:item = dict(zip(columns, data_list))elif len(data_list) == len(columns):item = dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open('desc.%s.%s' % (db, table), 'r') as fr:structure_list = deserialize(fr.read())return [column.get('col_name') for column in structure_list]# map入口
def main(separator, columns):items = read_from_input(sys.stdin, separator, columns)mapper_result = {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)

def mapper_plugin_1(item, mapper_result):# key在現實中可以是不同appkey, 是用來分發到不同的reducer上的, 相同的route用來分發到相同的reducerkey = 'route1'area = item.get('area')district = item.get('district')order_id = item.get('order_id')if not area or not district or not order_id:returnmapper_output(key, {'area': area, 'district': district, 'order_id': order_id, 'count': 1})def mapper_plugin_2(item, mapper_result):key = 'route2'strive_time = item.get('strive_time')order_id = item.get('order_id')if not strive_time or not order_id:returntry:day_hour = strive_time.split(':')[0]mapper_output(key, {'order_id': order_id, 'strive_time': strive_time, 'count': 1, 'day_hour': day_hour})except Exception, ex:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator='\t'):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s%s%s' % (key, separator, data)# print >> sys.stderr, '%s%s%s' % (key, separator, data)if __name__ == '__main__':db = sys.argv[1]table = sys.argv[2]columns = index_columns(db, table)main('||', columns)

reducer腳本

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import json
import pickle
from itertools import groupby
from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator='\t'):reducer_result = {}line_list = read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key == 'route1':reducer_plugin_1(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])if route_key == 'route2':reducer_plugin_2(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])

def reducer_plugin_1(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif not data.get('area') or not data.get('district') or not data.get('count'):continuekey = '_'.join([data.get('area'), data.get('district')])reducer_result[route_key].setdefault(key, 0)reducer_result[route_key][key] += int(data.get('count'))# print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key])def reducer_plugin_2(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif not data.get('order_id') or not data.get('strive_time') or not data.get('count') or not data.get('day_hour'):continuekey = data.get('day_hour')reducer_result[route_key].setdefault(key, {})reducer_result[route_key][key].setdefault('count', 0)reducer_result[route_key][key].setdefault('order_list', [])reducer_result[route_key][key]['count'] += int(data.get('count'))if len(reducer_result[route_key][key]['order_list']) < 100:reducer_result[route_key][key]['order_list'].append(data.get('order_id'))# print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key])
def serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator='\t'):data_list = data.strip().split(separator, 2)key = data_list[0]data = deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s\t%s' % (key, data)# print >> sys.stderr, '%s\t%s' % (key, data)if __name__ == '__main__':main()

?

5、上一個版本,遭遇了reduce慢的情況,原因有兩個:一是因為route的設置,所有相同的route都將分發到同一個reducer,造成單個reducer處理壓力大,性能下降。二是因為集群是搭建在虛擬機上的,性能本身就差。可以對這個問題進行改進。改進版本如下,方案是在mapper階段先對數據進行初步的統計,緩解reducer的計算壓力。

mapper腳本

# -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding('utf-8')# 將字段和元數據匹配, 返回迭代器
def read_from_input(file, separator, columns):for line in file:if line is None or line == '':continuedata_list = mapper_input(line, separator)if not data_list:continueitem = None# 最后3列, 年月日作為partitionkey, 無用if len(data_list) == len(columns) - 3:item = dict(zip(columns, data_list))elif len(data_list) == len(columns):item = dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open('desc.%s.%s' % (db, table), 'r') as fr:structure_list = deserialize(fr.read())return [column.get('col_name') for column in structure_list]# map入口
def main(separator, columns):items = read_from_input(sys.stdin, separator, columns)mapper_result = {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)for route_key, route_value in mapper_result.iteritems():for key, value in route_value.iteritems():ret_dict = dict()ret_dict['route_key'] = route_keyret_dict['key'] = keyret_dict.update(value)mapper_output('route_total', ret_dict)def mapper_plugin_1(item, mapper_result):# key在現實中可以是不同appkey, 是用來分發到不同的reducer上的, 相同的route用來分發到相同的reducerkey = 'route1'area = item.get('area')district = item.get('district')order_id = item.get('order_id')if not area or not district or not order_id:returntry:# total統計
        mapper_result.setdefault(key, {})mapper_result[key].setdefault('_'.join([area, district]), {})mapper_result[key]['_'.join([area, district])].setdefault('count', 0)mapper_result[key]['_'.join([area, district])].setdefault('order_id', [])mapper_result[key]['_'.join([area, district])]['count'] += 1if len(mapper_result[key]['_'.join([area, district])]['order_id']) < 10:mapper_result[key]['_'.join([area, district])]['order_id'].append(order_id)except Exception, ex:passdef mapper_plugin_2(item, mapper_result):key = 'route2'strive_time = item.get('strive_time')order_id = item.get('order_id')if not strive_time or not order_id:returntry:day_hour = strive_time.split(':')[0]# total統計
        mapper_result.setdefault(key, {})mapper_result[key].setdefault(day_hour, {})mapper_result[key][day_hour].setdefault('count', 0)mapper_result[key][day_hour].setdefault('order_id', [])mapper_result[key][day_hour]['count'] += 1if len(mapper_result[key][day_hour]['order_id']) < 10:mapper_result[key][day_hour]['order_id'].append(order_id)except Exception, ex:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator='\t'):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s%s%s' % (key, separator, data)# print >> sys.stderr, '%s%s%s' % (key, separator, data)if __name__ == '__main__':db = sys.argv[1]table = sys.argv[2]columns = index_columns(db, table)main('||', columns)

reducer腳本

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import json
import pickle
from itertools import groupby
from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator='\t'):reducer_result = {}line_list = read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key == 'route_total':reducer_total(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])def reducer_total(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif data.get('route_key') == 'route1':reducer_result[route_key].setdefault(data.get('route_key'), {})reducer_result[route_key][data.get('key')].setdefault('count', 0)reducer_result[route_key][data.get('key')].setdefault('order_id', [])reducer_result[route_key][data.get('key')]['count'] += data.get('count')for order_id in data.get('order_id'):if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:reducer_result[route_key][data.get('key')]['order_id'].append(order_id)elif data.get('route_key') == 'route2':reducer_result[route_key].setdefault(data.get('route_key'), {})reducer_result[route_key][data.get('key')].setdefault('count', 0)reducer_result[route_key][data.get('key')].setdefault('order_id', [])reducer_result[route_key][data.get('key')]['count'] += data.get('count')for order_id in data.get('order_id'):if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:reducer_result[route_key][data.get('key')]['order_id'].append(order_id)else:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator='\t'):data_list = data.strip().split(separator, 2)key = data_list[0]data = deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s\t%s' % (key, data)# print >> sys.stderr, '%s\t%s' % (key, data)if __name__ == '__main__':main()

?

遇到的問題:

1、The DiskSpace /user/bigdata/qa quota of ?is exceeded

在reducer結束后,遭遇如上問題,是因為HDFS ?路徑下的disk容量已經被沾滿,釋放容量即可;

?

轉載于:https://www.cnblogs.com/kangoroo/p/6151104.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/255780.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/255780.shtml
英文地址,請注明出處:http://en.pswp.cn/news/255780.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Asp.net在IE10、IE11下事件丟失經驗總結

asp.net4.0出生得比IE10早&#xff0c;所以asp.net4.0以前版本不認識IE10 的 User-Agent 標頭&#xff0c;導致的后果就是ASP.NET 特定功能失效&#xff0c;例如&#xff1a;頁面報錯__doPostBack找不到&#xff0c;不支援 Cookies 功能等等。這屬于.net的Bug&#xff0c;微軟也…

第6章 循環結構

循環語句: 可以讓一部分代碼,反復執行 1.1 循環語句while while循環: 編寫格式:while(條件){ 循環體 } 條件: 當條件是true,就執行循環體,執行完循環體后 程序再次執行while中的條件,如果條件還是true,繼續執行循環體 直到條件是false的時候,循環就結束 public class WhileDem…

【深度學習】——pytorch搭建模型及相關模型

目錄 1、搭建模型的流程 1&#xff09;步驟 2&#xff09;完整代碼——手寫minist數據集為例&#xff08;這里使用的數據集是自帶的&#xff09; 2、搭建模型的四種方法 1&#xff09;方法一——利用nn.Sequential&#xff08;&#xff09; 2&#xff09;方法二——利用co…

ABB robot 與 Fronius 設備 IO

ABB robot 與 Fronius 設備 IO

初次使用cocoapods注意事項

在僅僅用cocoapods時可能會遇到各種各樣的錯誤和問題 這里中總結下: 1.首先使用cocoapods有非常多優點,在github上非常多優秀的開源項目都用到了它;假設你不會使用它,那么非常多優秀的開源項目你下載下來了也發現跑不起來,假設發現有Profile,Profile.lock,Pods等cocoapods相關…

MongoDB復制集技術

為什么使用MongogDB復制集技術? mysql中:一主一從&#xff0c;一主多從結構存在的問題 1、 fileover&#xff08;故障轉移&#xff09;a) 選主投票b) 切換 2、 是否對就用透明化 3、 數據補償的問題a) 兩階段數據補償 4、 解決方法 mysql中使用MHAVIP b…

Linux文件系統的實現 (圖文并茂,比較好)

作者&#xff1a;Vamei 出處&#xff1a;http://www.cnblogs.com/vamei 歡迎轉載&#xff0c;也請保留這段聲明。謝謝&#xff01; Linux文件管理從用戶的層面介紹了Linux管理文件的方式。Linux有一個樹狀結構來組織文件。樹的頂端為根目錄(/)&#xff0c;節點為目錄&#xff0…

【深度學習】——如何處理輸入圖像大小不一樣的情況

這里一般有常見的幾種方法&#xff1a; 1&#xff09;將圖像縮放成大小一致后再輸入&#xff0c;如RCNN算法 2&#xff09;roi pooling&#xff1a;這里允許輸入圖像的大小不一樣&#xff0c;后續根據指定的固定大小來求解池化的核大小&#xff0c;以此來得到相同大小的特征圖&…

ROS探索總結(一)——ROS簡介

隨著機器人領域的快速發展和復雜化&#xff0c;代碼的復用性和模塊化的需求原來越強烈&#xff0c;而已有的開源機器人系統又不能很好的適應需求。2010年Willow Garage公司發布了開源機器人操作系統ROS&#xff08;robot operating system&#xff09;&#xff0c;很快在機器人…

微信瀏覽器取消緩存的方法

摘要:做微信公家號以及調試手機頁面的時辰&#xff0c;防止不了頁面要跳轉到微信閱讀器打開&#xff0c;調試階段&#xff0c;android版微信閱讀器一直都默許緩存html靜態資本&#xff0c;每一次靜態資本變革乃至新內容發布的時辰在微信閱讀器上都極有可能不克不及更新&#xf…

【機器視覺】——裂紋檢測筆記

目錄 傳統算法處理裂縫的基本思路&#xff1a; 第一種思路 第二種思路&#xff1a; 第三種思路 CPP代碼 halcon代碼 python代碼 Matlab代碼 深度學習缺陷檢測 裂縫檢測文獻 傳統算法處理裂縫的基本思路&#xff1a; 第一種思路 1.先轉換彩色圖為灰度圖 2.進行自適應…

利用union判斷系統的大小端

int checkCPUendian()//返回1&#xff0c;為小端&#xff1b;反之&#xff0c;為大端&#xff1b; { union{ unsigned int a; unsigned char b; }c; c.a 1; return 1 c.b; }大端模式(Big-endian)&#xff0c;是指數據的高字節保存在內存的低地址中&#xff0c;而數據…

Filter(過濾器)?和?interceptor(攔截器)的區別

Filter&#xff08;過濾器&#xff09; 和 interceptor&#xff08;攔截器&#xff09;的區別 1.攔截器是基于java反射機制的&#xff0c;而過濾器是基于函數回調的。 2.過濾器依賴于Servlet容器&#xff0c;而攔截器不依賴于Servlet容器。 3.攔截器只對Action請求起作用&#…

ROS探索總結(二)——ROS總體框架

一、 總體結構 根據ROS系統代碼的維護者和分布來標示&#xff0c;主要有兩大部分&#xff1a;&#xff08;1&#xff09;main&#xff1a;核心部分&#xff0c;主要由Willow Garage公司和一些開發者設計、提供以及維護。它提供了一些分布式計算的基本工具&#xff0c;以及整個…

python 阿貍的進階之路(4)

裝飾器 #1、開放封閉原則&#xff1a;對擴展開放&#xff0c;對修改是封閉#2、裝飾器&#xff1a;裝飾它人的&#xff0c;器指的是任意可調用對象&#xff0c;現在的場景裝飾器-》函數&#xff0c;被裝飾的對象也是-》函數#原則&#xff1a;1、不修改被裝飾對象的源代碼 2、不修…

【深度學習】——利用pytorch搭建一個完整的深度學習項目(構建模型、加載數據集、參數配置、訓練、模型保存、預測)

目錄 一、深度學習項目的基本構成 二、實戰&#xff08;貓狗分類&#xff09; 1、數據集下載 2、dataset.py文件 3、model.py 4、config.py 5、predict.py 一、深度學習項目的基本構成 一個深度學習模型一般包含以下幾個文件&#xff1a; datasets文件夾&#xff1a;存放…

二叉樹的序遍歷

時間限制: 1 s空間限制: 32000 KB題目等級 : 白銀 Silver題目描述 Description求一棵二叉樹的前序遍歷&#xff0c;中序遍歷和后序遍歷 輸入描述 Input Description第一行一個整數n&#xff0c;表示這棵樹的節點個數。 接下來n行每行2個整數L和R。第i行的兩個整數Li和Ri代表編號…

GUI登錄界面

在這次的作業中&#xff0c;我先使用單選按鈕&#xff0c;輸入框&#xff0c;復選框設計了一個簡單地登錄界面。接著我使用了MouseListener將登陸按鈕與下一個“查詢界面”連接起來。最后我使用了我們本周所學的JFrame框架與事件處理機制設計了一個簡單地界面。我所設計的登錄界…

淺談ROS操作系統及其應用趨勢

ROS操作系統是最先由斯坦福開發的開源機器人操作系統&#xff0c;目前由willowgarage公司開發和維護&#xff0c;相關的開發社區也很成熟&#xff08; http://www.ros.org &#xff0c; http://answers.ros.org, http://www.willowgarage.com), 經過幾年的發展API也逐漸穩定&a…

Raft學習傳送門

Raft官網 官方可視化動畫1 官方可視化動畫2 論文中文翻譯 論文英文地址 Paxos Made Simple論文翻譯 Raft理解 技術分享 《分布式一致性raft算法實現原理》 狀態機 MIT&#xff1a; raft實現 分布式系統學習2-Raft算法分析與實現 分布式系統MIT 6.824學習資源 知乎大神的&#…