熬了四個大夜才搞明白,最晚一天熬到早上十點/(ㄒoㄒ)/~~,最后發現只要加個參數就解決了。。。抱頭痛哭
問題描述:
Hadoop集群部署在docker容器中,宿主機執行pyspark程序讀取hive表
問題一:當master('local[*]')時,docker容器返回給driver端datanode節點的內網ip地址,修改hosts只能將域名轉發到ip地址,不能將ip地址轉發給ip地址。
問題二:當master('spark://localhost:7077'),因為容器做了端口映射,這里使用的時localhost。driver端為宿主機,spark會把driver端的hostname傳到spark集群worker節點上,spark work容器無法識別宿主機hostname
解決方法:
在宿主機配置好hosts,格式為:127.0.0.1 容器hostname(eg:datanode)
問題一:SparkSession加參數config("dfs.client.use.datanode.hostname", "true")//客戶端(如 Spark Driver)通過主機名訪問 DataNode。
問題二:SparkSession加參數config("spark.driver.host", "192.168.1.5") //宿主機ip地址
就是這么簡單。。。哭死(;′??Д??`)?
當時試了好多種辦法,nginx反向代理、DNSmasq自定義DNS、NPS內網穿透、Macvlan網絡模式,SNAT、最后甚至還裝了k8s集群外加k8s監控界面,真的哭死。看看現在時間吧,已經4:03了。。。😭😭😭
最后附上完整代碼:
import string
import random
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':# =============================== spark local模式 ===============================# 想要讀寫數據必須要配置windows下的Hadoop環境(不一定,未驗證)# 想要使用Hive元數據必須添加enableHiveSupport()這行代碼# 無論spark集群配置文件中有沒有配置spark元數據信息,都要在代碼工程中配置元數據信息,因為本地讀取不到集群中的環境變量,創建hive-site.xml文件或代碼中定義config都行# 如果不指定spark.sql.warehouse.dir信息則默認為: file:/C:/Users/yelor/spark-warehouse# 如果不知道hive.metastore.uris的值則找不到hive元數據,但不會報錯,只是無法使用hive元數據# spark.sql.warehouse.dir和hive.metastore.uris的值可以在代碼工程中配置hive-site.xml文件來指定# 容器外需要注意ip地址互通問題,需要配置hosts# 如果Hadoop集群部署在docker容器中,dfs.client.use.datanode.hostname=true在本地local模式下必須要加,不然spark會使用datanode的內網ip來通信import os# 這里可以選擇本地win系統的PySpark環境執行pySpark代碼,也可以使用虛擬機中PySpark環境,通過os可以配置。# os.environ['SPARK_HOME'] = r'D:\software2\spark-3.1.2-bin-hadoop3.2' # 暫時不寫也行PYSPARK_PYTHON = r"D:\AnacondaCache\envs\mspro\python" #python.exe或者簡寫為python都行# 當存在多個python版本環境時(如使用conda),不指定python環境會報錯os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON# 配置指定提交作業至HDFS的用戶 不然上傳文件到 HDFS 的時候會報權限錯誤 參考文章:https://zhuanlan.zhihu.com/p/538265736# os.environ["HADOOP_USER_NAME"] = "yelor" # 暫時不寫也行# 在local模式下,如果想使用hive元數據,以下參數是必須要配置的:spark.sql.warehouse.dir、hive.metastore.urisspark = SparkSession.builder.\appName('udf_define').\master('local[*]').\config('spark.sql.shuffle.partitions', 2).\config('spark.sql.warehouse.dir', 'hdfs://localhost:9000/user/hive/warehouse').\config('hive.metastore.uris', 'thrift://localhost:9083').\config("spark.executor.memory", "1g").\config("spark.driver.memory", "1g").\config("dfs.client.use.datanode.hostname", "true").\config("spark.driver.host", "192.168.1.5").\enableHiveSupport(). \getOrCreate() # =============================== spark master集群模式 ===============================# 如果要連接spark集群,需要保證pyspark包的版本與集群的spark版本一致# 查看spark版本:spark-submit --version# 就算是使用集群執行作業,也必須要配置hive-site.xml文件中的信息,因為還是讀取driver端的環境變量# 必須添加enableHiveSupport()這行代碼# spark.driver.host保證代碼傳到spark容器中時以指定的ip地址為driver地址,不然會使用本機的hostname# import os# os.environ['PYSPARK_PYTHON']=r"D:\\AnacondaCache\\envs\\mspro\\python.exe"# spark = SparkSession.builder.\# appName('udf_define').\# master('spark://localhost:7077').\# config('spark.sql.warehouse.dir', 'hdfs://localhost:9000/user/hive/warehouse').\# config('hive.metastore.uris', 'thrift://localhost:9083').\# config("spark.executor.memory", "512m").\# config("spark.driver.memory", "512m").\# config("spark.driver.host", "192.168.1.5").\# enableHiveSupport(). \# getOrCreate() # 如果spark配置文件中沒有配置spark元數據信息,就不能使用enableHiveSupport().\ 直接在代碼中配置元數據信息也能臉上hive元數據sc = spark.sparkContext# 設置日志級別為 DEBUG# sc.setLogLevel("DEBUG")# 查看表的存儲位置warehouse_dir = spark.conf.get("spark.sql.warehouse.dir")print(f"Spark SQL warehouse directory: {warehouse_dir}")# 指定要使用的數據庫database_name = "tb"spark.sql(f"USE {database_name}")# 執行 SQL 查詢# query = "show databases"# query = "SELECT * FROM orders"# query = "SELECT * FROM students"# df = spark.sql(query)# # 顯示查詢結果# df.show()# 生成 10000 條模擬數據data = []for _ in range(1000):# 生成隨機的姓名和年齡name = ''.join(random.choices(string.ascii_letters, k=5))age = random.randint(18, 60)data.append((name, age))# 定義 DataFrame 的列名columns = ["name", "age"]# 創建 DataFramedf = spark.createDataFrame(data, columns)# 創建臨時視圖# df.createOrReplaceTempView("test_table")try:# 創建持久化表(可選)df.write.saveAsTable("testaa_table", mode="overwrite")# 驗證數據插入spark.sql("SELECT * FROM testaa_table LIMIT 5").show()# 加入循環,保持 SparkSession 一直運行,方便看 Spark UIwhile True:try:import timetime.sleep(1)except KeyboardInterrupt:breakexcept Exception as e:print(f"An error occurred: {e}")finally:spark.stop()# 停止 SparkSession# spark.stop()