Rename key in a nested Spark DataFrame Schema (Scala)












0















I have a use case that needs to read a nested JSON schema and write it back as a Parquet (My schema changes based on the day I am reading the data so I don't know the exact schema in advance) since in some of my nest keys I have some character like space when I want to save it as parquet I am getting an exception complaining about special character ,;{}()\n\t=



This is a sample Schema it's not real schema keys are dynamic and chages day by day



  val nestedSchema = StructType(Seq(
StructField("event_time", StringType),
StructField("event_id", StringType),
StructField("app", StructType(Seq(
StructField("environment", StringType),
StructField("name", StringType),
StructField("type", StructType(Seq(
StructField("word tier", StringType), ### This cause problem when you save it as Parquet
StructField("level", StringType)
))
))))))

val nestedDF = spark.createDataFrame(sc.emptyRDD[Row], nestedSchema)

myDF.printSchema


Output



root
|-- event_time: string (nullable = true)
|-- event_id: string (nullable = true)
|-- app: struct (nullable = true)
| |-- environment: string (nullable = true)
| |-- name: string (nullable = true)
| |-- type: struct (nullable = true)
| | |-- word tier: string (nullable = true)
| | |-- level: string (nullable = true)


Trying to save as parquet



myDF.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")


I Found some solution like this



myDF.toDF(myDF
.schema
.fieldNames
.map(name => "[ ,;{}()\n\t=]+".r.replaceAllIn(name, "_")): _*)
.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")


But it only works on a parent keys, not on a nested one. Is there any recursive solution for this?



My question is not a duplicate of this question Since my schema is dynamic and I don't know about what are my keys. It changes based on the data I am reading, so my solution should be generic, I need somehow recursively created the same schema structure but with a key a correct name.










share|improve this question





























    0















    I have a use case that needs to read a nested JSON schema and write it back as a Parquet (My schema changes based on the day I am reading the data so I don't know the exact schema in advance) since in some of my nest keys I have some character like space when I want to save it as parquet I am getting an exception complaining about special character ,;{}()\n\t=



    This is a sample Schema it's not real schema keys are dynamic and chages day by day



      val nestedSchema = StructType(Seq(
    StructField("event_time", StringType),
    StructField("event_id", StringType),
    StructField("app", StructType(Seq(
    StructField("environment", StringType),
    StructField("name", StringType),
    StructField("type", StructType(Seq(
    StructField("word tier", StringType), ### This cause problem when you save it as Parquet
    StructField("level", StringType)
    ))
    ))))))

    val nestedDF = spark.createDataFrame(sc.emptyRDD[Row], nestedSchema)

    myDF.printSchema


    Output



    root
    |-- event_time: string (nullable = true)
    |-- event_id: string (nullable = true)
    |-- app: struct (nullable = true)
    | |-- environment: string (nullable = true)
    | |-- name: string (nullable = true)
    | |-- type: struct (nullable = true)
    | | |-- word tier: string (nullable = true)
    | | |-- level: string (nullable = true)


    Trying to save as parquet



    myDF.write
    .mode("overwrite")
    .option("compression", "snappy")
    .parquet("PATH/TO/DESTINATION")


    I Found some solution like this



    myDF.toDF(myDF
    .schema
    .fieldNames
    .map(name => "[ ,;{}()\n\t=]+".r.replaceAllIn(name, "_")): _*)
    .write
    .mode("overwrite")
    .option("compression", "snappy")
    .parquet("PATH/TO/DESTINATION")


    But it only works on a parent keys, not on a nested one. Is there any recursive solution for this?



    My question is not a duplicate of this question Since my schema is dynamic and I don't know about what are my keys. It changes based on the data I am reading, so my solution should be generic, I need somehow recursively created the same schema structure but with a key a correct name.










    share|improve this question



























      0












      0








      0








      I have a use case that needs to read a nested JSON schema and write it back as a Parquet (My schema changes based on the day I am reading the data so I don't know the exact schema in advance) since in some of my nest keys I have some character like space when I want to save it as parquet I am getting an exception complaining about special character ,;{}()\n\t=



      This is a sample Schema it's not real schema keys are dynamic and chages day by day



        val nestedSchema = StructType(Seq(
      StructField("event_time", StringType),
      StructField("event_id", StringType),
      StructField("app", StructType(Seq(
      StructField("environment", StringType),
      StructField("name", StringType),
      StructField("type", StructType(Seq(
      StructField("word tier", StringType), ### This cause problem when you save it as Parquet
      StructField("level", StringType)
      ))
      ))))))

      val nestedDF = spark.createDataFrame(sc.emptyRDD[Row], nestedSchema)

      myDF.printSchema


      Output



      root
      |-- event_time: string (nullable = true)
      |-- event_id: string (nullable = true)
      |-- app: struct (nullable = true)
      | |-- environment: string (nullable = true)
      | |-- name: string (nullable = true)
      | |-- type: struct (nullable = true)
      | | |-- word tier: string (nullable = true)
      | | |-- level: string (nullable = true)


      Trying to save as parquet



      myDF.write
      .mode("overwrite")
      .option("compression", "snappy")
      .parquet("PATH/TO/DESTINATION")


      I Found some solution like this



      myDF.toDF(myDF
      .schema
      .fieldNames
      .map(name => "[ ,;{}()\n\t=]+".r.replaceAllIn(name, "_")): _*)
      .write
      .mode("overwrite")
      .option("compression", "snappy")
      .parquet("PATH/TO/DESTINATION")


      But it only works on a parent keys, not on a nested one. Is there any recursive solution for this?



      My question is not a duplicate of this question Since my schema is dynamic and I don't know about what are my keys. It changes based on the data I am reading, so my solution should be generic, I need somehow recursively created the same schema structure but with a key a correct name.










      share|improve this question
















      I have a use case that needs to read a nested JSON schema and write it back as a Parquet (My schema changes based on the day I am reading the data so I don't know the exact schema in advance) since in some of my nest keys I have some character like space when I want to save it as parquet I am getting an exception complaining about special character ,;{}()\n\t=



      This is a sample Schema it's not real schema keys are dynamic and chages day by day



        val nestedSchema = StructType(Seq(
      StructField("event_time", StringType),
      StructField("event_id", StringType),
      StructField("app", StructType(Seq(
      StructField("environment", StringType),
      StructField("name", StringType),
      StructField("type", StructType(Seq(
      StructField("word tier", StringType), ### This cause problem when you save it as Parquet
      StructField("level", StringType)
      ))
      ))))))

      val nestedDF = spark.createDataFrame(sc.emptyRDD[Row], nestedSchema)

      myDF.printSchema


      Output



      root
      |-- event_time: string (nullable = true)
      |-- event_id: string (nullable = true)
      |-- app: struct (nullable = true)
      | |-- environment: string (nullable = true)
      | |-- name: string (nullable = true)
      | |-- type: struct (nullable = true)
      | | |-- word tier: string (nullable = true)
      | | |-- level: string (nullable = true)


      Trying to save as parquet



      myDF.write
      .mode("overwrite")
      .option("compression", "snappy")
      .parquet("PATH/TO/DESTINATION")


      I Found some solution like this



      myDF.toDF(myDF
      .schema
      .fieldNames
      .map(name => "[ ,;{}()\n\t=]+".r.replaceAllIn(name, "_")): _*)
      .write
      .mode("overwrite")
      .option("compression", "snappy")
      .parquet("PATH/TO/DESTINATION")


      But it only works on a parent keys, not on a nested one. Is there any recursive solution for this?



      My question is not a duplicate of this question Since my schema is dynamic and I don't know about what are my keys. It changes based on the data I am reading, so my solution should be generic, I need somehow recursively created the same schema structure but with a key a correct name.







      scala apache-spark schema parquet






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 18 at 21:28







      Am1rr3zA

















      asked Jan 18 at 18:16









      Am1rr3zAAm1rr3zA

      2,230114986




      2,230114986
























          1 Answer
          1






          active

          oldest

          votes


















          0














          Basically, you have to construct a Column expression which would cast your input to a type with sanitized field names. To do this, you can use the org.apache.spark.sql.functions.struct function, which allows you to combine other Columns to construct a column of structural type. Something like this should work:



            import org.apache.spark.sql.{functions => f}

          def sanitizeName(s: String): String = s.replace(" ", "_")

          def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
          st.fields.map { sf =>
          val sanitizedName = sanitizeName(sf.name)
          val sanitizedField = sf.dataType match {
          case struct: StructType =>
          val subcontext = context(sf.name)
          sanitizeFieldNames(struct, subcontext(_))
          case _ => context(sf.name)
          }
          sanitizedField.as(sanitizedName)
          }: _*
          )


          You use it like this:



          val df: DataFrame = ...

          val appFieldType = df.schema("app").asInstanceOf[StructType] // or otherwise obtain the field type
          df.withColumn(
          "app",
          sanitizeFieldNames(appFieldType, df("app")(_))
          )


          For your type, this recursive function would return a column like



          f.struct(
          df("app")("environment").as("environment"),
          df("app")("name").as("name"),
          f.struct(
          df("app")("type")("word tier").as("word_tier"),
          df("app")("type")("level").as("level")
          ).as("type")
          )


          which then gets assigned to the "app" field, replacing what is present there.



          There is a limitation to this solution, though. It does not support nested arrays or maps: if you have a schema with structs inside arrays or maps, this method won't convert any structs inside arrays and maps. That being said, in Spark 2.4 they have added functions which performs operations on collections, so it is possible that in Spark 2.4 this function could be generalized to support nested arrays and maps as well.



          Finally, it is possible to do what you want with mapPartitions. First, you write a recursive method which sanitizes only the StructType of your field:



          def sanitizeType(dt: DataType): DataType = dt match {
          case st: StructType => ... // rename fields and invoke recursively
          case at: ArrayType => ... // invoke recursively
          case mt: MapType => ... // invoke recursively
          case _ => dt // simple types do not have anything to sanitize
          }


          Second, you apply a sanitized schema to your dataframe. There are basically two methods to do it: a safe one mapPartitions and one which relies on internal Spark API.



          With mapPartitions, it is simple:



          df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))


          Here, we apply a mapPartitions operation and explicitly specify the output encoder. Remember that schemas in Spark are not intrinsic to the data: they are always associated with a particular dataframe. All data inside the dataframe is represented as rows with no labels on individual fields, just positions. As long as your schema has exactly the same types on same positions (but with potentially different names), it should work as you expect.



          mapPartitions does result in several additional nodes in the logical plan. To avoid it, it is possible to construct a Dataset[Row] instance directly with a specific encoder:



          new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))


          This would avoid unnecessary mapPartitions (which, in general, results in a deserialize-map-serialize steps in the query execution plan), but it might be unsafe; I personally did not check it now, but it could work for you.






          share|improve this answer


























          • Thanks for your answer, but I am a bit lost, in the first step when we apply sanitizeName like this val appFieldType = df.schema("app").asInstanceOf[StructType] what's gonna happen to the rest of fields I need to keep the whole schema. and what's f in the sanitizeName definition.

            – Am1rr3zA
            Jan 19 at 15:46











          • and does it preserve the old schema? what's gonna happen to the app added to replaced schema?

            – Am1rr3zA
            Jan 19 at 15:47











          • f is the functions object, my bad - forgot to add an import. In the approach with f.struct, I use the Dataset.withColumn method, which adds or replaces the specified column with the provided definition (in this case, it replaces the app column). Other columns are not affected.

            – Vladimir Matveev
            Jan 20 at 3:19











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54259478%2frename-key-in-a-nested-spark-dataframe-schema-scala%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          Basically, you have to construct a Column expression which would cast your input to a type with sanitized field names. To do this, you can use the org.apache.spark.sql.functions.struct function, which allows you to combine other Columns to construct a column of structural type. Something like this should work:



            import org.apache.spark.sql.{functions => f}

          def sanitizeName(s: String): String = s.replace(" ", "_")

          def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
          st.fields.map { sf =>
          val sanitizedName = sanitizeName(sf.name)
          val sanitizedField = sf.dataType match {
          case struct: StructType =>
          val subcontext = context(sf.name)
          sanitizeFieldNames(struct, subcontext(_))
          case _ => context(sf.name)
          }
          sanitizedField.as(sanitizedName)
          }: _*
          )


          You use it like this:



          val df: DataFrame = ...

          val appFieldType = df.schema("app").asInstanceOf[StructType] // or otherwise obtain the field type
          df.withColumn(
          "app",
          sanitizeFieldNames(appFieldType, df("app")(_))
          )


          For your type, this recursive function would return a column like



          f.struct(
          df("app")("environment").as("environment"),
          df("app")("name").as("name"),
          f.struct(
          df("app")("type")("word tier").as("word_tier"),
          df("app")("type")("level").as("level")
          ).as("type")
          )


          which then gets assigned to the "app" field, replacing what is present there.



          There is a limitation to this solution, though. It does not support nested arrays or maps: if you have a schema with structs inside arrays or maps, this method won't convert any structs inside arrays and maps. That being said, in Spark 2.4 they have added functions which performs operations on collections, so it is possible that in Spark 2.4 this function could be generalized to support nested arrays and maps as well.



          Finally, it is possible to do what you want with mapPartitions. First, you write a recursive method which sanitizes only the StructType of your field:



          def sanitizeType(dt: DataType): DataType = dt match {
          case st: StructType => ... // rename fields and invoke recursively
          case at: ArrayType => ... // invoke recursively
          case mt: MapType => ... // invoke recursively
          case _ => dt // simple types do not have anything to sanitize
          }


          Second, you apply a sanitized schema to your dataframe. There are basically two methods to do it: a safe one mapPartitions and one which relies on internal Spark API.



          With mapPartitions, it is simple:



          df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))


          Here, we apply a mapPartitions operation and explicitly specify the output encoder. Remember that schemas in Spark are not intrinsic to the data: they are always associated with a particular dataframe. All data inside the dataframe is represented as rows with no labels on individual fields, just positions. As long as your schema has exactly the same types on same positions (but with potentially different names), it should work as you expect.



          mapPartitions does result in several additional nodes in the logical plan. To avoid it, it is possible to construct a Dataset[Row] instance directly with a specific encoder:



          new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))


          This would avoid unnecessary mapPartitions (which, in general, results in a deserialize-map-serialize steps in the query execution plan), but it might be unsafe; I personally did not check it now, but it could work for you.






          share|improve this answer


























          • Thanks for your answer, but I am a bit lost, in the first step when we apply sanitizeName like this val appFieldType = df.schema("app").asInstanceOf[StructType] what's gonna happen to the rest of fields I need to keep the whole schema. and what's f in the sanitizeName definition.

            – Am1rr3zA
            Jan 19 at 15:46











          • and does it preserve the old schema? what's gonna happen to the app added to replaced schema?

            – Am1rr3zA
            Jan 19 at 15:47











          • f is the functions object, my bad - forgot to add an import. In the approach with f.struct, I use the Dataset.withColumn method, which adds or replaces the specified column with the provided definition (in this case, it replaces the app column). Other columns are not affected.

            – Vladimir Matveev
            Jan 20 at 3:19
















          0














          Basically, you have to construct a Column expression which would cast your input to a type with sanitized field names. To do this, you can use the org.apache.spark.sql.functions.struct function, which allows you to combine other Columns to construct a column of structural type. Something like this should work:



            import org.apache.spark.sql.{functions => f}

          def sanitizeName(s: String): String = s.replace(" ", "_")

          def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
          st.fields.map { sf =>
          val sanitizedName = sanitizeName(sf.name)
          val sanitizedField = sf.dataType match {
          case struct: StructType =>
          val subcontext = context(sf.name)
          sanitizeFieldNames(struct, subcontext(_))
          case _ => context(sf.name)
          }
          sanitizedField.as(sanitizedName)
          }: _*
          )


          You use it like this:



          val df: DataFrame = ...

          val appFieldType = df.schema("app").asInstanceOf[StructType] // or otherwise obtain the field type
          df.withColumn(
          "app",
          sanitizeFieldNames(appFieldType, df("app")(_))
          )


          For your type, this recursive function would return a column like



          f.struct(
          df("app")("environment").as("environment"),
          df("app")("name").as("name"),
          f.struct(
          df("app")("type")("word tier").as("word_tier"),
          df("app")("type")("level").as("level")
          ).as("type")
          )


          which then gets assigned to the "app" field, replacing what is present there.



          There is a limitation to this solution, though. It does not support nested arrays or maps: if you have a schema with structs inside arrays or maps, this method won't convert any structs inside arrays and maps. That being said, in Spark 2.4 they have added functions which performs operations on collections, so it is possible that in Spark 2.4 this function could be generalized to support nested arrays and maps as well.



          Finally, it is possible to do what you want with mapPartitions. First, you write a recursive method which sanitizes only the StructType of your field:



          def sanitizeType(dt: DataType): DataType = dt match {
          case st: StructType => ... // rename fields and invoke recursively
          case at: ArrayType => ... // invoke recursively
          case mt: MapType => ... // invoke recursively
          case _ => dt // simple types do not have anything to sanitize
          }


          Second, you apply a sanitized schema to your dataframe. There are basically two methods to do it: a safe one mapPartitions and one which relies on internal Spark API.



          With mapPartitions, it is simple:



          df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))


          Here, we apply a mapPartitions operation and explicitly specify the output encoder. Remember that schemas in Spark are not intrinsic to the data: they are always associated with a particular dataframe. All data inside the dataframe is represented as rows with no labels on individual fields, just positions. As long as your schema has exactly the same types on same positions (but with potentially different names), it should work as you expect.



          mapPartitions does result in several additional nodes in the logical plan. To avoid it, it is possible to construct a Dataset[Row] instance directly with a specific encoder:



          new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))


          This would avoid unnecessary mapPartitions (which, in general, results in a deserialize-map-serialize steps in the query execution plan), but it might be unsafe; I personally did not check it now, but it could work for you.






          share|improve this answer


























          • Thanks for your answer, but I am a bit lost, in the first step when we apply sanitizeName like this val appFieldType = df.schema("app").asInstanceOf[StructType] what's gonna happen to the rest of fields I need to keep the whole schema. and what's f in the sanitizeName definition.

            – Am1rr3zA
            Jan 19 at 15:46











          • and does it preserve the old schema? what's gonna happen to the app added to replaced schema?

            – Am1rr3zA
            Jan 19 at 15:47











          • f is the functions object, my bad - forgot to add an import. In the approach with f.struct, I use the Dataset.withColumn method, which adds or replaces the specified column with the provided definition (in this case, it replaces the app column). Other columns are not affected.

            – Vladimir Matveev
            Jan 20 at 3:19














          0












          0








          0







          Basically, you have to construct a Column expression which would cast your input to a type with sanitized field names. To do this, you can use the org.apache.spark.sql.functions.struct function, which allows you to combine other Columns to construct a column of structural type. Something like this should work:



            import org.apache.spark.sql.{functions => f}

          def sanitizeName(s: String): String = s.replace(" ", "_")

          def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
          st.fields.map { sf =>
          val sanitizedName = sanitizeName(sf.name)
          val sanitizedField = sf.dataType match {
          case struct: StructType =>
          val subcontext = context(sf.name)
          sanitizeFieldNames(struct, subcontext(_))
          case _ => context(sf.name)
          }
          sanitizedField.as(sanitizedName)
          }: _*
          )


          You use it like this:



          val df: DataFrame = ...

          val appFieldType = df.schema("app").asInstanceOf[StructType] // or otherwise obtain the field type
          df.withColumn(
          "app",
          sanitizeFieldNames(appFieldType, df("app")(_))
          )


          For your type, this recursive function would return a column like



          f.struct(
          df("app")("environment").as("environment"),
          df("app")("name").as("name"),
          f.struct(
          df("app")("type")("word tier").as("word_tier"),
          df("app")("type")("level").as("level")
          ).as("type")
          )


          which then gets assigned to the "app" field, replacing what is present there.



          There is a limitation to this solution, though. It does not support nested arrays or maps: if you have a schema with structs inside arrays or maps, this method won't convert any structs inside arrays and maps. That being said, in Spark 2.4 they have added functions which performs operations on collections, so it is possible that in Spark 2.4 this function could be generalized to support nested arrays and maps as well.



          Finally, it is possible to do what you want with mapPartitions. First, you write a recursive method which sanitizes only the StructType of your field:



          def sanitizeType(dt: DataType): DataType = dt match {
          case st: StructType => ... // rename fields and invoke recursively
          case at: ArrayType => ... // invoke recursively
          case mt: MapType => ... // invoke recursively
          case _ => dt // simple types do not have anything to sanitize
          }


          Second, you apply a sanitized schema to your dataframe. There are basically two methods to do it: a safe one mapPartitions and one which relies on internal Spark API.



          With mapPartitions, it is simple:



          df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))


          Here, we apply a mapPartitions operation and explicitly specify the output encoder. Remember that schemas in Spark are not intrinsic to the data: they are always associated with a particular dataframe. All data inside the dataframe is represented as rows with no labels on individual fields, just positions. As long as your schema has exactly the same types on same positions (but with potentially different names), it should work as you expect.



          mapPartitions does result in several additional nodes in the logical plan. To avoid it, it is possible to construct a Dataset[Row] instance directly with a specific encoder:



          new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))


          This would avoid unnecessary mapPartitions (which, in general, results in a deserialize-map-serialize steps in the query execution plan), but it might be unsafe; I personally did not check it now, but it could work for you.






          share|improve this answer















          Basically, you have to construct a Column expression which would cast your input to a type with sanitized field names. To do this, you can use the org.apache.spark.sql.functions.struct function, which allows you to combine other Columns to construct a column of structural type. Something like this should work:



            import org.apache.spark.sql.{functions => f}

          def sanitizeName(s: String): String = s.replace(" ", "_")

          def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
          st.fields.map { sf =>
          val sanitizedName = sanitizeName(sf.name)
          val sanitizedField = sf.dataType match {
          case struct: StructType =>
          val subcontext = context(sf.name)
          sanitizeFieldNames(struct, subcontext(_))
          case _ => context(sf.name)
          }
          sanitizedField.as(sanitizedName)
          }: _*
          )


          You use it like this:



          val df: DataFrame = ...

          val appFieldType = df.schema("app").asInstanceOf[StructType] // or otherwise obtain the field type
          df.withColumn(
          "app",
          sanitizeFieldNames(appFieldType, df("app")(_))
          )


          For your type, this recursive function would return a column like



          f.struct(
          df("app")("environment").as("environment"),
          df("app")("name").as("name"),
          f.struct(
          df("app")("type")("word tier").as("word_tier"),
          df("app")("type")("level").as("level")
          ).as("type")
          )


          which then gets assigned to the "app" field, replacing what is present there.



          There is a limitation to this solution, though. It does not support nested arrays or maps: if you have a schema with structs inside arrays or maps, this method won't convert any structs inside arrays and maps. That being said, in Spark 2.4 they have added functions which performs operations on collections, so it is possible that in Spark 2.4 this function could be generalized to support nested arrays and maps as well.



          Finally, it is possible to do what you want with mapPartitions. First, you write a recursive method which sanitizes only the StructType of your field:



          def sanitizeType(dt: DataType): DataType = dt match {
          case st: StructType => ... // rename fields and invoke recursively
          case at: ArrayType => ... // invoke recursively
          case mt: MapType => ... // invoke recursively
          case _ => dt // simple types do not have anything to sanitize
          }


          Second, you apply a sanitized schema to your dataframe. There are basically two methods to do it: a safe one mapPartitions and one which relies on internal Spark API.



          With mapPartitions, it is simple:



          df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))


          Here, we apply a mapPartitions operation and explicitly specify the output encoder. Remember that schemas in Spark are not intrinsic to the data: they are always associated with a particular dataframe. All data inside the dataframe is represented as rows with no labels on individual fields, just positions. As long as your schema has exactly the same types on same positions (but with potentially different names), it should work as you expect.



          mapPartitions does result in several additional nodes in the logical plan. To avoid it, it is possible to construct a Dataset[Row] instance directly with a specific encoder:



          new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))


          This would avoid unnecessary mapPartitions (which, in general, results in a deserialize-map-serialize steps in the query execution plan), but it might be unsafe; I personally did not check it now, but it could work for you.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Jan 20 at 3:16

























          answered Jan 19 at 1:11









          Vladimir MatveevVladimir Matveev

          69.9k15170214




          69.9k15170214













          • Thanks for your answer, but I am a bit lost, in the first step when we apply sanitizeName like this val appFieldType = df.schema("app").asInstanceOf[StructType] what's gonna happen to the rest of fields I need to keep the whole schema. and what's f in the sanitizeName definition.

            – Am1rr3zA
            Jan 19 at 15:46











          • and does it preserve the old schema? what's gonna happen to the app added to replaced schema?

            – Am1rr3zA
            Jan 19 at 15:47











          • f is the functions object, my bad - forgot to add an import. In the approach with f.struct, I use the Dataset.withColumn method, which adds or replaces the specified column with the provided definition (in this case, it replaces the app column). Other columns are not affected.

            – Vladimir Matveev
            Jan 20 at 3:19



















          • Thanks for your answer, but I am a bit lost, in the first step when we apply sanitizeName like this val appFieldType = df.schema("app").asInstanceOf[StructType] what's gonna happen to the rest of fields I need to keep the whole schema. and what's f in the sanitizeName definition.

            – Am1rr3zA
            Jan 19 at 15:46











          • and does it preserve the old schema? what's gonna happen to the app added to replaced schema?

            – Am1rr3zA
            Jan 19 at 15:47











          • f is the functions object, my bad - forgot to add an import. In the approach with f.struct, I use the Dataset.withColumn method, which adds or replaces the specified column with the provided definition (in this case, it replaces the app column). Other columns are not affected.

            – Vladimir Matveev
            Jan 20 at 3:19

















          Thanks for your answer, but I am a bit lost, in the first step when we apply sanitizeName like this val appFieldType = df.schema("app").asInstanceOf[StructType] what's gonna happen to the rest of fields I need to keep the whole schema. and what's f in the sanitizeName definition.

          – Am1rr3zA
          Jan 19 at 15:46





          Thanks for your answer, but I am a bit lost, in the first step when we apply sanitizeName like this val appFieldType = df.schema("app").asInstanceOf[StructType] what's gonna happen to the rest of fields I need to keep the whole schema. and what's f in the sanitizeName definition.

          – Am1rr3zA
          Jan 19 at 15:46













          and does it preserve the old schema? what's gonna happen to the app added to replaced schema?

          – Am1rr3zA
          Jan 19 at 15:47





          and does it preserve the old schema? what's gonna happen to the app added to replaced schema?

          – Am1rr3zA
          Jan 19 at 15:47













          f is the functions object, my bad - forgot to add an import. In the approach with f.struct, I use the Dataset.withColumn method, which adds or replaces the specified column with the provided definition (in this case, it replaces the app column). Other columns are not affected.

          – Vladimir Matveev
          Jan 20 at 3:19





          f is the functions object, my bad - forgot to add an import. In the approach with f.struct, I use the Dataset.withColumn method, which adds or replaces the specified column with the provided definition (in this case, it replaces the app column). Other columns are not affected.

          – Vladimir Matveev
          Jan 20 at 3:19


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54259478%2frename-key-in-a-nested-spark-dataframe-schema-scala%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Liquibase includeAll doesn't find base path

          How to use setInterval in EJS file?

          Petrus Granier-Deferre