域嵌套太深
In our adventures trying to build a data lake, we are using dynamically generated spark cluster to ingest some data from MongoDB, our production database, to BigQuery. In order to do that, we use PySpark data frames and since mongo doesn’t have schemas, we try to infer the schema from the data.
在嘗試建立數據湖的冒險中,我們使用動態生成的火花集群將一些數據從生產數據庫MongoDB提取到BigQuery。 為此,我們使用PySpark數據幀,并且由于mongo沒有架構,因此我們嘗試從數據中推斷出架構。
collection_schema = spark.read.format(“mongo”) \
.option(“database”, db) \
.option(“collection”, coll) \
.option(‘sampleSize’, 50000) \
.load() \
.schema ingest_df = spark.read.format(“mongo”) \
.option(“database”, db) \
.option(“collection”, coll) \ .load(schema=fix_spark_schema(collection_schema))
Our fix_spark_schema method just converts NullType columns to String.
我們的fix_spark_schema方法僅將NullType列轉換為String。
In the users collection, we have the groups field, which is an array, because users can join multiple groups.
在users集合中,我們擁有groups字段,它是一個數組,因為用戶可以加入多個group。
root
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- programs: struct (nullable = true)
| | | |-- **{ program id }**: struct (nullable = true)
| | | | |-- Date: timestamp (nullable = true)
| | | | |-- Name: string (nullable = true)
| | | | |-- Some_Flags: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = true)
| | | | | |-- def: boolean (nullable = true)
| | | | | |-- ghi: boolean (nullable = true)
| | | | | |-- xyz: boolean (nullable = true)
Also, each different group has some different programs the users can join. So under the programs, we store a JSON with keys the program ids the user has joined and values some extra data about the date they joined etc. The data looks like this
此外,每個不同的組都有一些用戶可以加入的不同程序。 因此,在這些程序下,我們存儲了一個JSON,其中包含用戶已加入的程序ID以及其加入日期等額外數據的鍵值。數據看起來像這樣
“groups” : [
{… some other fields …
“programs” : {
“123c12b123456c1d76a4f265f10f20a0” : {
“name” : “test_program_1”,
“some_flags” : {
“abc” : true,
“def” : true,
“ghi” : false,
“xyz” : true
},
“date” : ISODate(“2019–11–16T03:29:00.000+0000”)
}
}
]
As a result of the above, BigQuery creates a new column for each program_id and we end up with hundreds of columns, most of them empty for most of the users. So, how can we fix that? We can convert programs from a struct to string and store the whole json in there. That would create some extra friction if someone wants to access those fields, but it would make our columns much cleaner.
由于上述原因,BigQuery為每個program_id創建了一個新列,最后我們得到了數百個列,其中大多數對于大多數用戶而言都是空的。 那么,我們該如何解決呢? 我們可以將程序從結構轉換為字符串,然后將整個json存儲在其中。 如果有人要訪問這些字段,那會產生一些額外的摩擦,但這會使我們的色譜柱更加整潔。
Attempt 1:
嘗試1:
So, if the field wasn’t nested we could easily just cast it to string.
因此,如果未嵌套該字段,則可以輕松地將其轉換為字符串。
ingest_df
but since it’s nested this doesn’t work. The following command works only for root-level fields, so it could work if we wanted to convert the whole groups field, or move programs at the root level
但由于它是嵌套的,因此不起作用。 以下命令僅適用于根級別的字段,因此如果我們要轉換整個組字段或在根級別移動程序 ,則該命令可以使用
ingest_df
Attempt 2:
嘗試2:
After a lot of research and many different tries. I realized that if we want to change the type, edit, rename, add or remove a nested field we need to modify the schema. The steps we have to follow are these:
經過大量研究和許多嘗試。 我意識到,如果要更改類型,編輯,重命名,添加或刪除嵌套字段,則需要修改架構。 我們必須遵循的步驟是:
- Iterate through the schema of the nested Struct and make the changes we want 遍歷嵌套的Struct的架構并進行所需的更改
Create a JSON version of the root level field, in our case groups, and name it for example groups_json and drop groups
在我們的案例組中,創建根級別字段的JSON版本,并將其命名為groups_json和drop groups
Then convert the groups_json field to groups again using the modified schema we created in step 1.
然后使用在步驟1中創建的修改后的架構再次將groups_json字段轉換為組 。
If we know the schema and we’re sure that it’s not going to change, we could hardcode it but … we can do better. We can write (search on StackOverflow and modify) a dynamic function that would iterate through the whole schema and change the type of the field we want. The following method would convert the fields_to_change into Strings, but you can modify it to whatever you want
如果我們知道該模式并且確定它不會改變,則可以對其進行硬編碼,但是…我們可以做得更好。 我們可以編寫(搜索StackOverflow并進行修改)動態函數,該函數將遍歷整個架構并更改所需字段的類型。 以下方法會將fields_to_change轉換為字符串,但是您可以將其修改為所需的任何值
def change_nested_field_type(schema, fields_to_change, parent=""):
new_schema = []
if isinstance(schema, StringType):
return schema
for field in schema:
full_field_name = field.name
if parent:
full_field_name = parent + "." + full_field_name
if full_field_name not in fields_to_change:
if isinstance(field.dataType, StructType):
inner_schema = change_nested_field_type(field.dataType, fields_to_change, full_field_name)
new_schema.append(StructField(field.name, inner_schema))
elif isinstance(field.dataType, ArrayType):
inner_schema = change_nested_field_type(field.dataType.elementType, fields_to_change, full_field_name)
new_schema.append(StructField(field.name, ArrayType(inner_schema)))
else:
new_schema.append(StructField(field.name, field.dataType))
else:
# Here we change the field type to Stringnew_schema.append(StructField(field.name, StringType()))
return StructType(new_schema)
and now we can do the conversion like this:
現在我們可以像這樣進行轉換:
new_schema = ArrayType(change_nested_field_type(df.schema["groups"].dataType.elementType, ["programs"]))
df = df.withColumn("
df = df.withColumn("groups", from_json("
and voila! groups.programs is converted to a string.
和瞧! groups.programs將轉換為字符串。
翻譯自: https://medium.com/swlh/pyspark-how-to-modify-a-nested-struct-field-8105ebe83d09
域嵌套太深
本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。 如若轉載,請注明出處:http://www.pswp.cn/news/389609.shtml 繁體地址,請注明出處:http://hk.pswp.cn/news/389609.shtml 英文地址,請注明出處:http://en.pswp.cn/news/389609.shtml
如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!