Rename key in a nested Spark DataFrame Schema (Scala)
I have a use case that needs to read a nested JSON schema and write it back as a Parquet
(My schema changes based on the day I am reading the data so I don't know the exact schema in advance) since in some of my nest keys I have some character like space when I want to save it as parquet I am getting an exception complaining about special character ,;{}()\n\t=
This is a sample Schema it's not real schema keys are dynamic and chages day by day
val nestedSchema = StructType(Seq(
StructField("event_time", StringType),
StructField("event_id", StringType),
StructField("app", StructType(Seq(
StructField("environment", StringType),
StructField("name", StringType),
StructField("type", StructType(Seq(
StructField("word tier", StringType), ### This cause problem when you save it as Parquet
StructField("level", StringType)
))
))))))
val nestedDF = spark.createDataFrame(sc.emptyRDD[Row], nestedSchema)
myDF.printSchema
Output
root
|-- event_time: string (nullable = true)
|-- event_id: string (nullable = true)
|-- app: struct (nullable = true)
| |-- environment: string (nullable = true)
| |-- name: string (nullable = true)
| |-- type: struct (nullable = true)
| | |-- word tier: string (nullable = true)
| | |-- level: string (nullable = true)
Trying to save as parquet
myDF.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")
I Found some solution like this
myDF.toDF(myDF
.schema
.fieldNames
.map(name => "[ ,;{}()\n\t=]+".r.replaceAllIn(name, "_")): _*)
.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")
But it only works on a parent keys, not on a nested one. Is there any recursive solution for this?
My question is not a duplicate of this question Since my schema is dynamic and I don't know about what are my keys. It changes based on the data I am reading, so my solution should be generic, I need somehow recursively created the same schema structure but with a key a correct name.
scala apache-spark schema parquet
add a comment |
I have a use case that needs to read a nested JSON schema and write it back as a Parquet
(My schema changes based on the day I am reading the data so I don't know the exact schema in advance) since in some of my nest keys I have some character like space when I want to save it as parquet I am getting an exception complaining about special character ,;{}()\n\t=
This is a sample Schema it's not real schema keys are dynamic and chages day by day
val nestedSchema = StructType(Seq(
StructField("event_time", StringType),
StructField("event_id", StringType),
StructField("app", StructType(Seq(
StructField("environment", StringType),
StructField("name", StringType),
StructField("type", StructType(Seq(
StructField("word tier", StringType), ### This cause problem when you save it as Parquet
StructField("level", StringType)
))
))))))
val nestedDF = spark.createDataFrame(sc.emptyRDD[Row], nestedSchema)
myDF.printSchema
Output
root
|-- event_time: string (nullable = true)
|-- event_id: string (nullable = true)
|-- app: struct (nullable = true)
| |-- environment: string (nullable = true)
| |-- name: string (nullable = true)
| |-- type: struct (nullable = true)
| | |-- word tier: string (nullable = true)
| | |-- level: string (nullable = true)
Trying to save as parquet
myDF.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")
I Found some solution like this
myDF.toDF(myDF
.schema
.fieldNames
.map(name => "[ ,;{}()\n\t=]+".r.replaceAllIn(name, "_")): _*)
.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")
But it only works on a parent keys, not on a nested one. Is there any recursive solution for this?
My question is not a duplicate of this question Since my schema is dynamic and I don't know about what are my keys. It changes based on the data I am reading, so my solution should be generic, I need somehow recursively created the same schema structure but with a key a correct name.
scala apache-spark schema parquet
add a comment |
I have a use case that needs to read a nested JSON schema and write it back as a Parquet
(My schema changes based on the day I am reading the data so I don't know the exact schema in advance) since in some of my nest keys I have some character like space when I want to save it as parquet I am getting an exception complaining about special character ,;{}()\n\t=
This is a sample Schema it's not real schema keys are dynamic and chages day by day
val nestedSchema = StructType(Seq(
StructField("event_time", StringType),
StructField("event_id", StringType),
StructField("app", StructType(Seq(
StructField("environment", StringType),
StructField("name", StringType),
StructField("type", StructType(Seq(
StructField("word tier", StringType), ### This cause problem when you save it as Parquet
StructField("level", StringType)
))
))))))
val nestedDF = spark.createDataFrame(sc.emptyRDD[Row], nestedSchema)
myDF.printSchema
Output
root
|-- event_time: string (nullable = true)
|-- event_id: string (nullable = true)
|-- app: struct (nullable = true)
| |-- environment: string (nullable = true)
| |-- name: string (nullable = true)
| |-- type: struct (nullable = true)
| | |-- word tier: string (nullable = true)
| | |-- level: string (nullable = true)
Trying to save as parquet
myDF.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")
I Found some solution like this
myDF.toDF(myDF
.schema
.fieldNames
.map(name => "[ ,;{}()\n\t=]+".r.replaceAllIn(name, "_")): _*)
.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")
But it only works on a parent keys, not on a nested one. Is there any recursive solution for this?
My question is not a duplicate of this question Since my schema is dynamic and I don't know about what are my keys. It changes based on the data I am reading, so my solution should be generic, I need somehow recursively created the same schema structure but with a key a correct name.
scala apache-spark schema parquet
I have a use case that needs to read a nested JSON schema and write it back as a Parquet
(My schema changes based on the day I am reading the data so I don't know the exact schema in advance) since in some of my nest keys I have some character like space when I want to save it as parquet I am getting an exception complaining about special character ,;{}()\n\t=
This is a sample Schema it's not real schema keys are dynamic and chages day by day
val nestedSchema = StructType(Seq(
StructField("event_time", StringType),
StructField("event_id", StringType),
StructField("app", StructType(Seq(
StructField("environment", StringType),
StructField("name", StringType),
StructField("type", StructType(Seq(
StructField("word tier", StringType), ### This cause problem when you save it as Parquet
StructField("level", StringType)
))
))))))
val nestedDF = spark.createDataFrame(sc.emptyRDD[Row], nestedSchema)
myDF.printSchema
Output
root
|-- event_time: string (nullable = true)
|-- event_id: string (nullable = true)
|-- app: struct (nullable = true)
| |-- environment: string (nullable = true)
| |-- name: string (nullable = true)
| |-- type: struct (nullable = true)
| | |-- word tier: string (nullable = true)
| | |-- level: string (nullable = true)
Trying to save as parquet
myDF.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")
I Found some solution like this
myDF.toDF(myDF
.schema
.fieldNames
.map(name => "[ ,;{}()\n\t=]+".r.replaceAllIn(name, "_")): _*)
.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")
But it only works on a parent keys, not on a nested one. Is there any recursive solution for this?
My question is not a duplicate of this question Since my schema is dynamic and I don't know about what are my keys. It changes based on the data I am reading, so my solution should be generic, I need somehow recursively created the same schema structure but with a key a correct name.
scala apache-spark schema parquet
scala apache-spark schema parquet
edited Jan 18 at 21:28
Am1rr3zA
asked Jan 18 at 18:16
Am1rr3zAAm1rr3zA
2,230114986
2,230114986
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
Basically, you have to construct a Column
expression which would cast your input to a type with sanitized field names. To do this, you can use the org.apache.spark.sql.functions.struct
function, which allows you to combine other Column
s to construct a column of structural type. Something like this should work:
import org.apache.spark.sql.{functions => f}
def sanitizeName(s: String): String = s.replace(" ", "_")
def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
st.fields.map { sf =>
val sanitizedName = sanitizeName(sf.name)
val sanitizedField = sf.dataType match {
case struct: StructType =>
val subcontext = context(sf.name)
sanitizeFieldNames(struct, subcontext(_))
case _ => context(sf.name)
}
sanitizedField.as(sanitizedName)
}: _*
)
You use it like this:
val df: DataFrame = ...
val appFieldType = df.schema("app").asInstanceOf[StructType] // or otherwise obtain the field type
df.withColumn(
"app",
sanitizeFieldNames(appFieldType, df("app")(_))
)
For your type, this recursive function would return a column like
f.struct(
df("app")("environment").as("environment"),
df("app")("name").as("name"),
f.struct(
df("app")("type")("word tier").as("word_tier"),
df("app")("type")("level").as("level")
).as("type")
)
which then gets assigned to the "app" field, replacing what is present there.
There is a limitation to this solution, though. It does not support nested arrays or maps: if you have a schema with structs inside arrays or maps, this method won't convert any structs inside arrays and maps. That being said, in Spark 2.4 they have added functions which performs operations on collections, so it is possible that in Spark 2.4 this function could be generalized to support nested arrays and maps as well.
Finally, it is possible to do what you want with mapPartitions
. First, you write a recursive method which sanitizes only the StructType
of your field:
def sanitizeType(dt: DataType): DataType = dt match {
case st: StructType => ... // rename fields and invoke recursively
case at: ArrayType => ... // invoke recursively
case mt: MapType => ... // invoke recursively
case _ => dt // simple types do not have anything to sanitize
}
Second, you apply a sanitized schema to your dataframe. There are basically two methods to do it: a safe one mapPartitions
and one which relies on internal Spark API.
With mapPartitions
, it is simple:
df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))
Here, we apply a mapPartitions
operation and explicitly specify the output encoder. Remember that schemas in Spark are not intrinsic to the data: they are always associated with a particular dataframe. All data inside the dataframe is represented as rows with no labels on individual fields, just positions. As long as your schema has exactly the same types on same positions (but with potentially different names), it should work as you expect.
mapPartitions
does result in several additional nodes in the logical plan. To avoid it, it is possible to construct a Dataset[Row]
instance directly with a specific encoder:
new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))
This would avoid unnecessary mapPartitions
(which, in general, results in a deserialize-map-serialize steps in the query execution plan), but it might be unsafe; I personally did not check it now, but it could work for you.
Thanks for your answer, but I am a bit lost, in the first step when we applysanitizeName
like thisval appFieldType = df.schema("app").asInstanceOf[StructType]
what's gonna happen to the rest of fields I need to keep the whole schema. and what'sf
in thesanitizeName
definition.
– Am1rr3zA
Jan 19 at 15:46
and does it preserve the old schema? what's gonna happen to the app added to replaced schema?
– Am1rr3zA
Jan 19 at 15:47
f
is thefunctions
object, my bad - forgot to add an import. In the approach withf.struct
, I use theDataset.withColumn
method, which adds or replaces the specified column with the provided definition (in this case, it replaces theapp
column). Other columns are not affected.
– Vladimir Matveev
Jan 20 at 3:19
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54259478%2frename-key-in-a-nested-spark-dataframe-schema-scala%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Basically, you have to construct a Column
expression which would cast your input to a type with sanitized field names. To do this, you can use the org.apache.spark.sql.functions.struct
function, which allows you to combine other Column
s to construct a column of structural type. Something like this should work:
import org.apache.spark.sql.{functions => f}
def sanitizeName(s: String): String = s.replace(" ", "_")
def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
st.fields.map { sf =>
val sanitizedName = sanitizeName(sf.name)
val sanitizedField = sf.dataType match {
case struct: StructType =>
val subcontext = context(sf.name)
sanitizeFieldNames(struct, subcontext(_))
case _ => context(sf.name)
}
sanitizedField.as(sanitizedName)
}: _*
)
You use it like this:
val df: DataFrame = ...
val appFieldType = df.schema("app").asInstanceOf[StructType] // or otherwise obtain the field type
df.withColumn(
"app",
sanitizeFieldNames(appFieldType, df("app")(_))
)
For your type, this recursive function would return a column like
f.struct(
df("app")("environment").as("environment"),
df("app")("name").as("name"),
f.struct(
df("app")("type")("word tier").as("word_tier"),
df("app")("type")("level").as("level")
).as("type")
)
which then gets assigned to the "app" field, replacing what is present there.
There is a limitation to this solution, though. It does not support nested arrays or maps: if you have a schema with structs inside arrays or maps, this method won't convert any structs inside arrays and maps. That being said, in Spark 2.4 they have added functions which performs operations on collections, so it is possible that in Spark 2.4 this function could be generalized to support nested arrays and maps as well.
Finally, it is possible to do what you want with mapPartitions
. First, you write a recursive method which sanitizes only the StructType
of your field:
def sanitizeType(dt: DataType): DataType = dt match {
case st: StructType => ... // rename fields and invoke recursively
case at: ArrayType => ... // invoke recursively
case mt: MapType => ... // invoke recursively
case _ => dt // simple types do not have anything to sanitize
}
Second, you apply a sanitized schema to your dataframe. There are basically two methods to do it: a safe one mapPartitions
and one which relies on internal Spark API.
With mapPartitions
, it is simple:
df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))
Here, we apply a mapPartitions
operation and explicitly specify the output encoder. Remember that schemas in Spark are not intrinsic to the data: they are always associated with a particular dataframe. All data inside the dataframe is represented as rows with no labels on individual fields, just positions. As long as your schema has exactly the same types on same positions (but with potentially different names), it should work as you expect.
mapPartitions
does result in several additional nodes in the logical plan. To avoid it, it is possible to construct a Dataset[Row]
instance directly with a specific encoder:
new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))
This would avoid unnecessary mapPartitions
(which, in general, results in a deserialize-map-serialize steps in the query execution plan), but it might be unsafe; I personally did not check it now, but it could work for you.
Thanks for your answer, but I am a bit lost, in the first step when we applysanitizeName
like thisval appFieldType = df.schema("app").asInstanceOf[StructType]
what's gonna happen to the rest of fields I need to keep the whole schema. and what'sf
in thesanitizeName
definition.
– Am1rr3zA
Jan 19 at 15:46
and does it preserve the old schema? what's gonna happen to the app added to replaced schema?
– Am1rr3zA
Jan 19 at 15:47
f
is thefunctions
object, my bad - forgot to add an import. In the approach withf.struct
, I use theDataset.withColumn
method, which adds or replaces the specified column with the provided definition (in this case, it replaces theapp
column). Other columns are not affected.
– Vladimir Matveev
Jan 20 at 3:19
add a comment |
Basically, you have to construct a Column
expression which would cast your input to a type with sanitized field names. To do this, you can use the org.apache.spark.sql.functions.struct
function, which allows you to combine other Column
s to construct a column of structural type. Something like this should work:
import org.apache.spark.sql.{functions => f}
def sanitizeName(s: String): String = s.replace(" ", "_")
def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
st.fields.map { sf =>
val sanitizedName = sanitizeName(sf.name)
val sanitizedField = sf.dataType match {
case struct: StructType =>
val subcontext = context(sf.name)
sanitizeFieldNames(struct, subcontext(_))
case _ => context(sf.name)
}
sanitizedField.as(sanitizedName)
}: _*
)
You use it like this:
val df: DataFrame = ...
val appFieldType = df.schema("app").asInstanceOf[StructType] // or otherwise obtain the field type
df.withColumn(
"app",
sanitizeFieldNames(appFieldType, df("app")(_))
)
For your type, this recursive function would return a column like
f.struct(
df("app")("environment").as("environment"),
df("app")("name").as("name"),
f.struct(
df("app")("type")("word tier").as("word_tier"),
df("app")("type")("level").as("level")
).as("type")
)
which then gets assigned to the "app" field, replacing what is present there.
There is a limitation to this solution, though. It does not support nested arrays or maps: if you have a schema with structs inside arrays or maps, this method won't convert any structs inside arrays and maps. That being said, in Spark 2.4 they have added functions which performs operations on collections, so it is possible that in Spark 2.4 this function could be generalized to support nested arrays and maps as well.
Finally, it is possible to do what you want with mapPartitions
. First, you write a recursive method which sanitizes only the StructType
of your field:
def sanitizeType(dt: DataType): DataType = dt match {
case st: StructType => ... // rename fields and invoke recursively
case at: ArrayType => ... // invoke recursively
case mt: MapType => ... // invoke recursively
case _ => dt // simple types do not have anything to sanitize
}
Second, you apply a sanitized schema to your dataframe. There are basically two methods to do it: a safe one mapPartitions
and one which relies on internal Spark API.
With mapPartitions
, it is simple:
df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))
Here, we apply a mapPartitions
operation and explicitly specify the output encoder. Remember that schemas in Spark are not intrinsic to the data: they are always associated with a particular dataframe. All data inside the dataframe is represented as rows with no labels on individual fields, just positions. As long as your schema has exactly the same types on same positions (but with potentially different names), it should work as you expect.
mapPartitions
does result in several additional nodes in the logical plan. To avoid it, it is possible to construct a Dataset[Row]
instance directly with a specific encoder:
new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))
This would avoid unnecessary mapPartitions
(which, in general, results in a deserialize-map-serialize steps in the query execution plan), but it might be unsafe; I personally did not check it now, but it could work for you.
Thanks for your answer, but I am a bit lost, in the first step when we applysanitizeName
like thisval appFieldType = df.schema("app").asInstanceOf[StructType]
what's gonna happen to the rest of fields I need to keep the whole schema. and what'sf
in thesanitizeName
definition.
– Am1rr3zA
Jan 19 at 15:46
and does it preserve the old schema? what's gonna happen to the app added to replaced schema?
– Am1rr3zA
Jan 19 at 15:47
f
is thefunctions
object, my bad - forgot to add an import. In the approach withf.struct
, I use theDataset.withColumn
method, which adds or replaces the specified column with the provided definition (in this case, it replaces theapp
column). Other columns are not affected.
– Vladimir Matveev
Jan 20 at 3:19
add a comment |
Basically, you have to construct a Column
expression which would cast your input to a type with sanitized field names. To do this, you can use the org.apache.spark.sql.functions.struct
function, which allows you to combine other Column
s to construct a column of structural type. Something like this should work:
import org.apache.spark.sql.{functions => f}
def sanitizeName(s: String): String = s.replace(" ", "_")
def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
st.fields.map { sf =>
val sanitizedName = sanitizeName(sf.name)
val sanitizedField = sf.dataType match {
case struct: StructType =>
val subcontext = context(sf.name)
sanitizeFieldNames(struct, subcontext(_))
case _ => context(sf.name)
}
sanitizedField.as(sanitizedName)
}: _*
)
You use it like this:
val df: DataFrame = ...
val appFieldType = df.schema("app").asInstanceOf[StructType] // or otherwise obtain the field type
df.withColumn(
"app",
sanitizeFieldNames(appFieldType, df("app")(_))
)
For your type, this recursive function would return a column like
f.struct(
df("app")("environment").as("environment"),
df("app")("name").as("name"),
f.struct(
df("app")("type")("word tier").as("word_tier"),
df("app")("type")("level").as("level")
).as("type")
)
which then gets assigned to the "app" field, replacing what is present there.
There is a limitation to this solution, though. It does not support nested arrays or maps: if you have a schema with structs inside arrays or maps, this method won't convert any structs inside arrays and maps. That being said, in Spark 2.4 they have added functions which performs operations on collections, so it is possible that in Spark 2.4 this function could be generalized to support nested arrays and maps as well.
Finally, it is possible to do what you want with mapPartitions
. First, you write a recursive method which sanitizes only the StructType
of your field:
def sanitizeType(dt: DataType): DataType = dt match {
case st: StructType => ... // rename fields and invoke recursively
case at: ArrayType => ... // invoke recursively
case mt: MapType => ... // invoke recursively
case _ => dt // simple types do not have anything to sanitize
}
Second, you apply a sanitized schema to your dataframe. There are basically two methods to do it: a safe one mapPartitions
and one which relies on internal Spark API.
With mapPartitions
, it is simple:
df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))
Here, we apply a mapPartitions
operation and explicitly specify the output encoder. Remember that schemas in Spark are not intrinsic to the data: they are always associated with a particular dataframe. All data inside the dataframe is represented as rows with no labels on individual fields, just positions. As long as your schema has exactly the same types on same positions (but with potentially different names), it should work as you expect.
mapPartitions
does result in several additional nodes in the logical plan. To avoid it, it is possible to construct a Dataset[Row]
instance directly with a specific encoder:
new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))
This would avoid unnecessary mapPartitions
(which, in general, results in a deserialize-map-serialize steps in the query execution plan), but it might be unsafe; I personally did not check it now, but it could work for you.
Basically, you have to construct a Column
expression which would cast your input to a type with sanitized field names. To do this, you can use the org.apache.spark.sql.functions.struct
function, which allows you to combine other Column
s to construct a column of structural type. Something like this should work:
import org.apache.spark.sql.{functions => f}
def sanitizeName(s: String): String = s.replace(" ", "_")
def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
st.fields.map { sf =>
val sanitizedName = sanitizeName(sf.name)
val sanitizedField = sf.dataType match {
case struct: StructType =>
val subcontext = context(sf.name)
sanitizeFieldNames(struct, subcontext(_))
case _ => context(sf.name)
}
sanitizedField.as(sanitizedName)
}: _*
)
You use it like this:
val df: DataFrame = ...
val appFieldType = df.schema("app").asInstanceOf[StructType] // or otherwise obtain the field type
df.withColumn(
"app",
sanitizeFieldNames(appFieldType, df("app")(_))
)
For your type, this recursive function would return a column like
f.struct(
df("app")("environment").as("environment"),
df("app")("name").as("name"),
f.struct(
df("app")("type")("word tier").as("word_tier"),
df("app")("type")("level").as("level")
).as("type")
)
which then gets assigned to the "app" field, replacing what is present there.
There is a limitation to this solution, though. It does not support nested arrays or maps: if you have a schema with structs inside arrays or maps, this method won't convert any structs inside arrays and maps. That being said, in Spark 2.4 they have added functions which performs operations on collections, so it is possible that in Spark 2.4 this function could be generalized to support nested arrays and maps as well.
Finally, it is possible to do what you want with mapPartitions
. First, you write a recursive method which sanitizes only the StructType
of your field:
def sanitizeType(dt: DataType): DataType = dt match {
case st: StructType => ... // rename fields and invoke recursively
case at: ArrayType => ... // invoke recursively
case mt: MapType => ... // invoke recursively
case _ => dt // simple types do not have anything to sanitize
}
Second, you apply a sanitized schema to your dataframe. There are basically two methods to do it: a safe one mapPartitions
and one which relies on internal Spark API.
With mapPartitions
, it is simple:
df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))
Here, we apply a mapPartitions
operation and explicitly specify the output encoder. Remember that schemas in Spark are not intrinsic to the data: they are always associated with a particular dataframe. All data inside the dataframe is represented as rows with no labels on individual fields, just positions. As long as your schema has exactly the same types on same positions (but with potentially different names), it should work as you expect.
mapPartitions
does result in several additional nodes in the logical plan. To avoid it, it is possible to construct a Dataset[Row]
instance directly with a specific encoder:
new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))
This would avoid unnecessary mapPartitions
(which, in general, results in a deserialize-map-serialize steps in the query execution plan), but it might be unsafe; I personally did not check it now, but it could work for you.
edited Jan 20 at 3:16
answered Jan 19 at 1:11
Vladimir MatveevVladimir Matveev
69.9k15170214
69.9k15170214
Thanks for your answer, but I am a bit lost, in the first step when we applysanitizeName
like thisval appFieldType = df.schema("app").asInstanceOf[StructType]
what's gonna happen to the rest of fields I need to keep the whole schema. and what'sf
in thesanitizeName
definition.
– Am1rr3zA
Jan 19 at 15:46
and does it preserve the old schema? what's gonna happen to the app added to replaced schema?
– Am1rr3zA
Jan 19 at 15:47
f
is thefunctions
object, my bad - forgot to add an import. In the approach withf.struct
, I use theDataset.withColumn
method, which adds or replaces the specified column with the provided definition (in this case, it replaces theapp
column). Other columns are not affected.
– Vladimir Matveev
Jan 20 at 3:19
add a comment |
Thanks for your answer, but I am a bit lost, in the first step when we applysanitizeName
like thisval appFieldType = df.schema("app").asInstanceOf[StructType]
what's gonna happen to the rest of fields I need to keep the whole schema. and what'sf
in thesanitizeName
definition.
– Am1rr3zA
Jan 19 at 15:46
and does it preserve the old schema? what's gonna happen to the app added to replaced schema?
– Am1rr3zA
Jan 19 at 15:47
f
is thefunctions
object, my bad - forgot to add an import. In the approach withf.struct
, I use theDataset.withColumn
method, which adds or replaces the specified column with the provided definition (in this case, it replaces theapp
column). Other columns are not affected.
– Vladimir Matveev
Jan 20 at 3:19
Thanks for your answer, but I am a bit lost, in the first step when we apply
sanitizeName
like this val appFieldType = df.schema("app").asInstanceOf[StructType]
what's gonna happen to the rest of fields I need to keep the whole schema. and what's f
in the sanitizeName
definition.– Am1rr3zA
Jan 19 at 15:46
Thanks for your answer, but I am a bit lost, in the first step when we apply
sanitizeName
like this val appFieldType = df.schema("app").asInstanceOf[StructType]
what's gonna happen to the rest of fields I need to keep the whole schema. and what's f
in the sanitizeName
definition.– Am1rr3zA
Jan 19 at 15:46
and does it preserve the old schema? what's gonna happen to the app added to replaced schema?
– Am1rr3zA
Jan 19 at 15:47
and does it preserve the old schema? what's gonna happen to the app added to replaced schema?
– Am1rr3zA
Jan 19 at 15:47
f
is the functions
object, my bad - forgot to add an import. In the approach with f.struct
, I use the Dataset.withColumn
method, which adds or replaces the specified column with the provided definition (in this case, it replaces the app
column). Other columns are not affected.– Vladimir Matveev
Jan 20 at 3:19
f
is the functions
object, my bad - forgot to add an import. In the approach with f.struct
, I use the Dataset.withColumn
method, which adds or replaces the specified column with the provided definition (in this case, it replaces the app
column). Other columns are not affected.– Vladimir Matveev
Jan 20 at 3:19
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54259478%2frename-key-in-a-nested-spark-dataframe-schema-scala%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown