域嵌套太深
In our adventures trying to build a data lake, we are using dynamically generated spark cluster to ingest some data from MongoDB, our production database, to BigQuery. In order to do that, we use PySpark data frames and since mongo doesn’t have schemas, we try to infer the schema from the data.
在尝试建立数据湖的冒险中,我们使用动态生成的火花集群将一些数据从生产数据库MongoDB提取到BigQuery。 为此,我们使用PySpark数据帧,并且由于mongo没有架构,因此我们尝试从数据中推断出架构。
collection_schema = spark.read.format(“mongo”) \
.option(“database”, db) \
.option(“collection”, coll) \
.option(‘sampleSize’, 50000) \
.load() \
.schema ingest_df = spark.read.format(“mongo”) \
.option(“database”, db) \
.option(“collection”, coll) \ .load(schema=fix_spark_schema(collection_schema))
Our fix_spark_schema method just converts NullType columns to String.
我们的fix_spark_schema方法仅将NullType列转换为String。
In the users collection, we have the groups field, which is an array, because users can join multiple groups.
在users集合中,我们拥有groups字段,它是一个数组,因为用户可以加入多个group。
root
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- programs: struct (nullable = true)
| | | |-- **{ program id }**: struct (nullable = true)
| | | | |-- Date: timestamp (nullable = true)
| | | | |-- Name: string (nullable = true)
| | | | |-- Some_Flags: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = true)
| | | | | |-- def: boolean (nullable = true)
| | | | | |-- ghi: boolean (nullable = true)
| | | | | |-- xyz: boolean (nullable = true)
Also, each different group has some different programs the users can join. So under the programs, we store a JSON with keys the program ids the user has joined and values some extra data about the date they joined etc. The data looks like this
此外,每个不同的组都有一些用户可以加入的不同程序。 因此,在这些程序下,我们存储了一个JSON,其中包含用户已加入的程序ID以及其加入日期等额外数据的键值。数据看起来像这样
“groups” : [
{… some other fields …
“programs” : {
“123c12b123456c1d76a4f265f10f20a0” : {
“name” : “test_program_1”,
“some_flags” : {
“abc” : true,
“def” : true,
“ghi” : false,
“xyz” : true
},
“date” : ISODate(“2019–11–16T03:29:00.000+0000”)
}
}
]
As a result of the above, BigQuery creates a new column for each program_id and we end up with hundreds of columns, most of them empty for most of the users. So, how can we fix that? We can convert programs from a struct to string and store the whole json in there. That would create some extra friction if someone wants to access those fields, but it would make our columns much cleaner.
由于上述原因,BigQuery为每个program_id创建了一个新列,最后我们得到了数百个列,其中大多数对于大多数用户而言都是空的。 那么,我们该如何解决呢? 我们可以将程序从结构转换为字符串,然后将整个json存储在其中。 如果有人要访问这些字段,那会产生一些额外的摩擦,但这会使我们的色谱柱更加整洁。
Attempt 1:
尝试1:
So, if the field wasn’t nested we could easily just cast it to string.
因此,如果未嵌套该字段,则可以轻松地将其转换为字符串。
ingest_df
but since it’s nested this doesn’t work. The following command works only for root-level fields, so it could work if we wanted to convert the whole groups field, or move programs at the root level
但由于它是嵌套的,因此不起作用。 以下命令仅适用于根级别的字段,因此如果我们要转换整个组字段或在根级别移动程序 ,则该命令可以使用
ingest_df
Attempt 2:
尝试2:
After a lot of research and many different tries. I realized that if we want to change the type, edit, rename, add or remove a nested field we need to modify the schema. The steps we have to follow are these:
经过大量研究和许多尝试。 我意识到,如果要更改类型,编辑,重命名,添加或删除嵌套字段,则需要修改架构。 我们必须遵循的步骤是:
- Iterate through the schema of the nested Struct and make the changes we want 遍历嵌套的Struct的架构并进行所需的更改
Create a JSON version of the root level field, in our case groups, and name it for example groups_json and drop groups
在我们的案例组中,创建根级别字段的JSON版本,并将其命名为groups_json和drop groups
Then convert the groups_json field to groups again using the modified schema we created in step 1.
然后使用在步骤1中创建的修改后的架构再次将groups_json字段转换为组 。
If we know the schema and we’re sure that it’s not going to change, we could hardcode it but … we can do better. We can write (search on StackOverflow and modify) a dynamic function that would iterate through the whole schema and change the type of the field we want. The following method would convert the fields_to_change into Strings, but you can modify it to whatever you want
如果我们知道该模式并且确定它不会改变,则可以对其进行硬编码,但是…我们可以做得更好。 我们可以编写(搜索StackOverflow并进行修改)动态函数,该函数将遍历整个架构并更改所需字段的类型。 以下方法会将fields_to_change转换为字符串,但是您可以将其修改为所需的任何值
def change_nested_field_type(schema, fields_to_change, parent=""):
new_schema = []
if isinstance(schema, StringType):
return schema
for field in schema:
full_field_name = field.name
if parent:
full_field_name = parent + "." + full_field_name
if full_field_name not in fields_to_change:
if isinstance(field.dataType, StructType):
inner_schema = change_nested_field_type(field.dataType, fields_to_change, full_field_name)
new_schema.append(StructField(field.name, inner_schema))
elif isinstance(field.dataType, ArrayType):
inner_schema = change_nested_field_type(field.dataType.elementType, fields_to_change, full_field_name)
new_schema.append(StructField(field.name, ArrayType(inner_schema)))
else:
new_schema.append(StructField(field.name, field.dataType))
else:
# Here we change the field type to Stringnew_schema.append(StructField(field.name, StringType()))
return StructType(new_schema)
and now we can do the conversion like this:
现在我们可以像这样进行转换:
new_schema = ArrayType(change_nested_field_type(df.schema["groups"].dataType.elementType, ["programs"]))
df = df.withColumn("
df = df.withColumn("groups", from_json("
and voila! groups.programs is converted to a string.
和瞧! groups.programs将转换为字符串。
翻译自: https://medium.com/swlh/pyspark-how-to-modify-a-nested-struct-field-8105ebe83d09
域嵌套太深
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/389609.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!