How to check the number of partitions of a Spark DataFrame without incurring the cost of .rdd












3















There are a number of questions about how to obtain the number of partitions of a n RDD and or a DataFrame : the answers invariably are:



 rdd.getNumPartitions


or



 df.rdd.getNumPartitions


Unfortunately that is an expensive operation on a DataFrame because the



 df.rdd


requires conversion from the DataFrame to an rdd. This is on the order of the time it takes to run



 df.count


I am writing logic that optionally repartition's or coalesce's a DataFrame - based on whether the current number of partitions were within a range of acceptable values or instead below or above them.



  def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}


But we can not afford to incur the cost of the rdd.getNumPartitions for every DataFrame in this manner.



Is there not any way to obtain this information - e.g. from querying the online/temporary catalog for the registered table maybe?



Update The Spark GUI showed the DataFrame.rdd operation as taking as long as the longest sql in the job. I will re-run the job and attach the screenshot in a bit here.



The following is just a testcase : it is using a small fraction of the data size of that in production. The longest sql is only five minutes - and this one is on its way to spending that amount of time as well (note that the sql is not helped out here: it also has to execute subsequently thus effectively doubling the cumulative execution time).



enter image description here



We can see that the .rdd operation at DataFrameUtils line 30 (shown in the snippet above) takes 5.1mins - and yet the save operation still took 5.2 mins later -i.e. we did not save any time by doing the .rdd in terms of the execution time of the subsequent save.










share|improve this question





























    3















    There are a number of questions about how to obtain the number of partitions of a n RDD and or a DataFrame : the answers invariably are:



     rdd.getNumPartitions


    or



     df.rdd.getNumPartitions


    Unfortunately that is an expensive operation on a DataFrame because the



     df.rdd


    requires conversion from the DataFrame to an rdd. This is on the order of the time it takes to run



     df.count


    I am writing logic that optionally repartition's or coalesce's a DataFrame - based on whether the current number of partitions were within a range of acceptable values or instead below or above them.



      def repartition(inDf: DataFrame, minPartitions: Option[Int],
    maxPartitions: Option[Int]): DataFrame = {
    val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
    val outDf = minPartitions.flatMap{ minp =>
    if (inputPartitions < minp) {
    info(s"Repartition the input from $inputPartitions to $minp partitions..")
    Option(inDf.repartition(minp))
    } else {
    None
    }
    }.getOrElse( maxPartitions.map{ maxp =>
    if (inputPartitions > maxp) {
    info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
    inDf.coalesce(maxp)
    } else inDf
    }.getOrElse(inDf))
    outDf
    }


    But we can not afford to incur the cost of the rdd.getNumPartitions for every DataFrame in this manner.



    Is there not any way to obtain this information - e.g. from querying the online/temporary catalog for the registered table maybe?



    Update The Spark GUI showed the DataFrame.rdd operation as taking as long as the longest sql in the job. I will re-run the job and attach the screenshot in a bit here.



    The following is just a testcase : it is using a small fraction of the data size of that in production. The longest sql is only five minutes - and this one is on its way to spending that amount of time as well (note that the sql is not helped out here: it also has to execute subsequently thus effectively doubling the cumulative execution time).



    enter image description here



    We can see that the .rdd operation at DataFrameUtils line 30 (shown in the snippet above) takes 5.1mins - and yet the save operation still took 5.2 mins later -i.e. we did not save any time by doing the .rdd in terms of the execution time of the subsequent save.










    share|improve this question



























      3












      3








      3


      1






      There are a number of questions about how to obtain the number of partitions of a n RDD and or a DataFrame : the answers invariably are:



       rdd.getNumPartitions


      or



       df.rdd.getNumPartitions


      Unfortunately that is an expensive operation on a DataFrame because the



       df.rdd


      requires conversion from the DataFrame to an rdd. This is on the order of the time it takes to run



       df.count


      I am writing logic that optionally repartition's or coalesce's a DataFrame - based on whether the current number of partitions were within a range of acceptable values or instead below or above them.



        def repartition(inDf: DataFrame, minPartitions: Option[Int],
      maxPartitions: Option[Int]): DataFrame = {
      val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
      val outDf = minPartitions.flatMap{ minp =>
      if (inputPartitions < minp) {
      info(s"Repartition the input from $inputPartitions to $minp partitions..")
      Option(inDf.repartition(minp))
      } else {
      None
      }
      }.getOrElse( maxPartitions.map{ maxp =>
      if (inputPartitions > maxp) {
      info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
      inDf.coalesce(maxp)
      } else inDf
      }.getOrElse(inDf))
      outDf
      }


      But we can not afford to incur the cost of the rdd.getNumPartitions for every DataFrame in this manner.



      Is there not any way to obtain this information - e.g. from querying the online/temporary catalog for the registered table maybe?



      Update The Spark GUI showed the DataFrame.rdd operation as taking as long as the longest sql in the job. I will re-run the job and attach the screenshot in a bit here.



      The following is just a testcase : it is using a small fraction of the data size of that in production. The longest sql is only five minutes - and this one is on its way to spending that amount of time as well (note that the sql is not helped out here: it also has to execute subsequently thus effectively doubling the cumulative execution time).



      enter image description here



      We can see that the .rdd operation at DataFrameUtils line 30 (shown in the snippet above) takes 5.1mins - and yet the save operation still took 5.2 mins later -i.e. we did not save any time by doing the .rdd in terms of the execution time of the subsequent save.










      share|improve this question
















      There are a number of questions about how to obtain the number of partitions of a n RDD and or a DataFrame : the answers invariably are:



       rdd.getNumPartitions


      or



       df.rdd.getNumPartitions


      Unfortunately that is an expensive operation on a DataFrame because the



       df.rdd


      requires conversion from the DataFrame to an rdd. This is on the order of the time it takes to run



       df.count


      I am writing logic that optionally repartition's or coalesce's a DataFrame - based on whether the current number of partitions were within a range of acceptable values or instead below or above them.



        def repartition(inDf: DataFrame, minPartitions: Option[Int],
      maxPartitions: Option[Int]): DataFrame = {
      val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
      val outDf = minPartitions.flatMap{ minp =>
      if (inputPartitions < minp) {
      info(s"Repartition the input from $inputPartitions to $minp partitions..")
      Option(inDf.repartition(minp))
      } else {
      None
      }
      }.getOrElse( maxPartitions.map{ maxp =>
      if (inputPartitions > maxp) {
      info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
      inDf.coalesce(maxp)
      } else inDf
      }.getOrElse(inDf))
      outDf
      }


      But we can not afford to incur the cost of the rdd.getNumPartitions for every DataFrame in this manner.



      Is there not any way to obtain this information - e.g. from querying the online/temporary catalog for the registered table maybe?



      Update The Spark GUI showed the DataFrame.rdd operation as taking as long as the longest sql in the job. I will re-run the job and attach the screenshot in a bit here.



      The following is just a testcase : it is using a small fraction of the data size of that in production. The longest sql is only five minutes - and this one is on its way to spending that amount of time as well (note that the sql is not helped out here: it also has to execute subsequently thus effectively doubling the cumulative execution time).



      enter image description here



      We can see that the .rdd operation at DataFrameUtils line 30 (shown in the snippet above) takes 5.1mins - and yet the save operation still took 5.2 mins later -i.e. we did not save any time by doing the .rdd in terms of the execution time of the subsequent save.







      scala apache-spark partition






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 19 at 20:15







      javadba

















      asked Jan 19 at 15:56









      javadbajavadba

      22k33147287




      22k33147287
























          3 Answers
          3






          active

          oldest

          votes


















          7














          There is no inherent cost of rdd component in rdd.getNumPartitions, because returned RDD is never evaluated.



          While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario



          Spark session available as 'spark'.
          Welcome to
          ____ __
          / __/__ ___ _____/ /__
          _ / _ / _ `/ __/ '_/
          /___/ .__/_,_/_/ /_/_ version 2.4.0
          /_/

          Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
          Type in expressions to have them evaluated.
          Type :help for more information.


          scala> val ds = spark.read.text("README.md")
          ds: org.apache.spark.sql.DataFrame = [value: string]

          scala> ds.rdd.getNumPartitions
          res0: Int = 1

          scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
          res1: Boolean = true


          it might be not enough to convince you. So let's approach this in a more systematic way:





          • rdd returns a MapPartitionRDD (ds as defined above):



            scala> ds.rdd.getClass
            res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD


          • RDD.getNumPartitions invokes RDD.partitions.


          • In non-checkpointed scenario RDD.partitions invokes getPartitions (feel free to trace the checkpoint path as well).


          • RDD.getPartitions is abstract.

          • So the actual implementation used in this case is MapPartitionsRDD.getPartitions, which simply delegates the call to the parent.


          • There are only MapPartitionsRDD between rdd and the source.



            scala> ds.rdd.toDebugString
            res3: String =
            (1) MapPartitionsRDD[3] at rdd at <console>:26
            | MapPartitionsRDD[2] at rdd at <console>:26
            | MapPartitionsRDD[1] at rdd at <console>:26
            | FileScanRDD[0] at rdd at <console>:26


            Similarly if Dataset contained an exchange we would follow the parents to the nearest shuffle:



            scala> ds.orderBy("value").rdd.toDebugString
            res4: String =
            (67) MapPartitionsRDD[13] at rdd at <console>:26
            | MapPartitionsRDD[12] at rdd at <console>:26
            | MapPartitionsRDD[11] at rdd at <console>:26
            | ShuffledRowRDD[10] at rdd at <console>:26
            +-(1) MapPartitionsRDD[9] at rdd at <console>:26
            | MapPartitionsRDD[5] at rdd at <console>:26
            | FileScanRDD[4] at rdd at <console>:26


            Note that this case is particularly interesting, because we actually triggered a job:



            scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
            res5: Boolean = false

            scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
            res6: Array[Int] = Array(0)


            That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).



            In such scenario getNumPartitions will also trigger a job:



            scala> ds.orderBy("value").rdd.getNumPartitions
            res7: Int = 67

            scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
            res8: Array[Int] = Array(1, 0)


            however it doesn't mean that the observed cost is somehow related to .rdd call. Instead it is an intrinsic cost of finding partitions in case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).




          Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd. For example ds.rdd.count would be indeed expensive and wasteful.






          share|improve this answer

































            0














            In my experience df.rdd.getNumPartitions is very fast, I never encountered taking this more than a second or so.



            Alternatively, you could also try



            val numPartitions: Long = df
            .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()


            which would avoid using .rdd






            share|improve this answer
























            • My job is running in just over half the time after removing the .rdd.getNumPartitions calls

              – javadba
              Jan 19 at 20:12



















            -1














            I looked at the in-memory catalog and unfortunately it is quite limited. In particular the partitions information is not available:



            https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/catalog/Catalog.html



            A subset of the contents is:



            enter image description here
            Unless evidence were presented / discovered by another reader I am going to conclude there is no inexpensive way to check for the current number of partitions of a dataframe.






            share|improve this answer

























              Your Answer






              StackExchange.ifUsing("editor", function () {
              StackExchange.using("externalEditor", function () {
              StackExchange.using("snippets", function () {
              StackExchange.snippets.init();
              });
              });
              }, "code-snippets");

              StackExchange.ready(function() {
              var channelOptions = {
              tags: "".split(" "),
              id: "1"
              };
              initTagRenderer("".split(" "), "".split(" "), channelOptions);

              StackExchange.using("externalEditor", function() {
              // Have to fire editor after snippets, if snippets enabled
              if (StackExchange.settings.snippets.snippetsEnabled) {
              StackExchange.using("snippets", function() {
              createEditor();
              });
              }
              else {
              createEditor();
              }
              });

              function createEditor() {
              StackExchange.prepareEditor({
              heartbeatType: 'answer',
              autoActivateHeartbeat: false,
              convertImagesToLinks: true,
              noModals: true,
              showLowRepImageUploadWarning: true,
              reputationToPostImages: 10,
              bindNavPrevention: true,
              postfix: "",
              imageUploader: {
              brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
              contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
              allowUrls: true
              },
              onDemand: true,
              discardSelector: ".discard-answer"
              ,immediatelyShowMarkdownHelp:true
              });


              }
              });














              draft saved

              draft discarded


















              StackExchange.ready(
              function () {
              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54268845%2fhow-to-check-the-number-of-partitions-of-a-spark-dataframe-without-incurring-the%23new-answer', 'question_page');
              }
              );

              Post as a guest















              Required, but never shown

























              3 Answers
              3






              active

              oldest

              votes








              3 Answers
              3






              active

              oldest

              votes









              active

              oldest

              votes






              active

              oldest

              votes









              7














              There is no inherent cost of rdd component in rdd.getNumPartitions, because returned RDD is never evaluated.



              While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario



              Spark session available as 'spark'.
              Welcome to
              ____ __
              / __/__ ___ _____/ /__
              _ / _ / _ `/ __/ '_/
              /___/ .__/_,_/_/ /_/_ version 2.4.0
              /_/

              Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
              Type in expressions to have them evaluated.
              Type :help for more information.


              scala> val ds = spark.read.text("README.md")
              ds: org.apache.spark.sql.DataFrame = [value: string]

              scala> ds.rdd.getNumPartitions
              res0: Int = 1

              scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
              res1: Boolean = true


              it might be not enough to convince you. So let's approach this in a more systematic way:





              • rdd returns a MapPartitionRDD (ds as defined above):



                scala> ds.rdd.getClass
                res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD


              • RDD.getNumPartitions invokes RDD.partitions.


              • In non-checkpointed scenario RDD.partitions invokes getPartitions (feel free to trace the checkpoint path as well).


              • RDD.getPartitions is abstract.

              • So the actual implementation used in this case is MapPartitionsRDD.getPartitions, which simply delegates the call to the parent.


              • There are only MapPartitionsRDD between rdd and the source.



                scala> ds.rdd.toDebugString
                res3: String =
                (1) MapPartitionsRDD[3] at rdd at <console>:26
                | MapPartitionsRDD[2] at rdd at <console>:26
                | MapPartitionsRDD[1] at rdd at <console>:26
                | FileScanRDD[0] at rdd at <console>:26


                Similarly if Dataset contained an exchange we would follow the parents to the nearest shuffle:



                scala> ds.orderBy("value").rdd.toDebugString
                res4: String =
                (67) MapPartitionsRDD[13] at rdd at <console>:26
                | MapPartitionsRDD[12] at rdd at <console>:26
                | MapPartitionsRDD[11] at rdd at <console>:26
                | ShuffledRowRDD[10] at rdd at <console>:26
                +-(1) MapPartitionsRDD[9] at rdd at <console>:26
                | MapPartitionsRDD[5] at rdd at <console>:26
                | FileScanRDD[4] at rdd at <console>:26


                Note that this case is particularly interesting, because we actually triggered a job:



                scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
                res5: Boolean = false

                scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
                res6: Array[Int] = Array(0)


                That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).



                In such scenario getNumPartitions will also trigger a job:



                scala> ds.orderBy("value").rdd.getNumPartitions
                res7: Int = 67

                scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
                res8: Array[Int] = Array(1, 0)


                however it doesn't mean that the observed cost is somehow related to .rdd call. Instead it is an intrinsic cost of finding partitions in case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).




              Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd. For example ds.rdd.count would be indeed expensive and wasteful.






              share|improve this answer






























                7














                There is no inherent cost of rdd component in rdd.getNumPartitions, because returned RDD is never evaluated.



                While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario



                Spark session available as 'spark'.
                Welcome to
                ____ __
                / __/__ ___ _____/ /__
                _ / _ / _ `/ __/ '_/
                /___/ .__/_,_/_/ /_/_ version 2.4.0
                /_/

                Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
                Type in expressions to have them evaluated.
                Type :help for more information.


                scala> val ds = spark.read.text("README.md")
                ds: org.apache.spark.sql.DataFrame = [value: string]

                scala> ds.rdd.getNumPartitions
                res0: Int = 1

                scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
                res1: Boolean = true


                it might be not enough to convince you. So let's approach this in a more systematic way:





                • rdd returns a MapPartitionRDD (ds as defined above):



                  scala> ds.rdd.getClass
                  res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD


                • RDD.getNumPartitions invokes RDD.partitions.


                • In non-checkpointed scenario RDD.partitions invokes getPartitions (feel free to trace the checkpoint path as well).


                • RDD.getPartitions is abstract.

                • So the actual implementation used in this case is MapPartitionsRDD.getPartitions, which simply delegates the call to the parent.


                • There are only MapPartitionsRDD between rdd and the source.



                  scala> ds.rdd.toDebugString
                  res3: String =
                  (1) MapPartitionsRDD[3] at rdd at <console>:26
                  | MapPartitionsRDD[2] at rdd at <console>:26
                  | MapPartitionsRDD[1] at rdd at <console>:26
                  | FileScanRDD[0] at rdd at <console>:26


                  Similarly if Dataset contained an exchange we would follow the parents to the nearest shuffle:



                  scala> ds.orderBy("value").rdd.toDebugString
                  res4: String =
                  (67) MapPartitionsRDD[13] at rdd at <console>:26
                  | MapPartitionsRDD[12] at rdd at <console>:26
                  | MapPartitionsRDD[11] at rdd at <console>:26
                  | ShuffledRowRDD[10] at rdd at <console>:26
                  +-(1) MapPartitionsRDD[9] at rdd at <console>:26
                  | MapPartitionsRDD[5] at rdd at <console>:26
                  | FileScanRDD[4] at rdd at <console>:26


                  Note that this case is particularly interesting, because we actually triggered a job:



                  scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
                  res5: Boolean = false

                  scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
                  res6: Array[Int] = Array(0)


                  That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).



                  In such scenario getNumPartitions will also trigger a job:



                  scala> ds.orderBy("value").rdd.getNumPartitions
                  res7: Int = 67

                  scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
                  res8: Array[Int] = Array(1, 0)


                  however it doesn't mean that the observed cost is somehow related to .rdd call. Instead it is an intrinsic cost of finding partitions in case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).




                Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd. For example ds.rdd.count would be indeed expensive and wasteful.






                share|improve this answer




























                  7












                  7








                  7







                  There is no inherent cost of rdd component in rdd.getNumPartitions, because returned RDD is never evaluated.



                  While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario



                  Spark session available as 'spark'.
                  Welcome to
                  ____ __
                  / __/__ ___ _____/ /__
                  _ / _ / _ `/ __/ '_/
                  /___/ .__/_,_/_/ /_/_ version 2.4.0
                  /_/

                  Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
                  Type in expressions to have them evaluated.
                  Type :help for more information.


                  scala> val ds = spark.read.text("README.md")
                  ds: org.apache.spark.sql.DataFrame = [value: string]

                  scala> ds.rdd.getNumPartitions
                  res0: Int = 1

                  scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
                  res1: Boolean = true


                  it might be not enough to convince you. So let's approach this in a more systematic way:





                  • rdd returns a MapPartitionRDD (ds as defined above):



                    scala> ds.rdd.getClass
                    res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD


                  • RDD.getNumPartitions invokes RDD.partitions.


                  • In non-checkpointed scenario RDD.partitions invokes getPartitions (feel free to trace the checkpoint path as well).


                  • RDD.getPartitions is abstract.

                  • So the actual implementation used in this case is MapPartitionsRDD.getPartitions, which simply delegates the call to the parent.


                  • There are only MapPartitionsRDD between rdd and the source.



                    scala> ds.rdd.toDebugString
                    res3: String =
                    (1) MapPartitionsRDD[3] at rdd at <console>:26
                    | MapPartitionsRDD[2] at rdd at <console>:26
                    | MapPartitionsRDD[1] at rdd at <console>:26
                    | FileScanRDD[0] at rdd at <console>:26


                    Similarly if Dataset contained an exchange we would follow the parents to the nearest shuffle:



                    scala> ds.orderBy("value").rdd.toDebugString
                    res4: String =
                    (67) MapPartitionsRDD[13] at rdd at <console>:26
                    | MapPartitionsRDD[12] at rdd at <console>:26
                    | MapPartitionsRDD[11] at rdd at <console>:26
                    | ShuffledRowRDD[10] at rdd at <console>:26
                    +-(1) MapPartitionsRDD[9] at rdd at <console>:26
                    | MapPartitionsRDD[5] at rdd at <console>:26
                    | FileScanRDD[4] at rdd at <console>:26


                    Note that this case is particularly interesting, because we actually triggered a job:



                    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
                    res5: Boolean = false

                    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
                    res6: Array[Int] = Array(0)


                    That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).



                    In such scenario getNumPartitions will also trigger a job:



                    scala> ds.orderBy("value").rdd.getNumPartitions
                    res7: Int = 67

                    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
                    res8: Array[Int] = Array(1, 0)


                    however it doesn't mean that the observed cost is somehow related to .rdd call. Instead it is an intrinsic cost of finding partitions in case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).




                  Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd. For example ds.rdd.count would be indeed expensive and wasteful.






                  share|improve this answer















                  There is no inherent cost of rdd component in rdd.getNumPartitions, because returned RDD is never evaluated.



                  While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario



                  Spark session available as 'spark'.
                  Welcome to
                  ____ __
                  / __/__ ___ _____/ /__
                  _ / _ / _ `/ __/ '_/
                  /___/ .__/_,_/_/ /_/_ version 2.4.0
                  /_/

                  Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
                  Type in expressions to have them evaluated.
                  Type :help for more information.


                  scala> val ds = spark.read.text("README.md")
                  ds: org.apache.spark.sql.DataFrame = [value: string]

                  scala> ds.rdd.getNumPartitions
                  res0: Int = 1

                  scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
                  res1: Boolean = true


                  it might be not enough to convince you. So let's approach this in a more systematic way:





                  • rdd returns a MapPartitionRDD (ds as defined above):



                    scala> ds.rdd.getClass
                    res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD


                  • RDD.getNumPartitions invokes RDD.partitions.


                  • In non-checkpointed scenario RDD.partitions invokes getPartitions (feel free to trace the checkpoint path as well).


                  • RDD.getPartitions is abstract.

                  • So the actual implementation used in this case is MapPartitionsRDD.getPartitions, which simply delegates the call to the parent.


                  • There are only MapPartitionsRDD between rdd and the source.



                    scala> ds.rdd.toDebugString
                    res3: String =
                    (1) MapPartitionsRDD[3] at rdd at <console>:26
                    | MapPartitionsRDD[2] at rdd at <console>:26
                    | MapPartitionsRDD[1] at rdd at <console>:26
                    | FileScanRDD[0] at rdd at <console>:26


                    Similarly if Dataset contained an exchange we would follow the parents to the nearest shuffle:



                    scala> ds.orderBy("value").rdd.toDebugString
                    res4: String =
                    (67) MapPartitionsRDD[13] at rdd at <console>:26
                    | MapPartitionsRDD[12] at rdd at <console>:26
                    | MapPartitionsRDD[11] at rdd at <console>:26
                    | ShuffledRowRDD[10] at rdd at <console>:26
                    +-(1) MapPartitionsRDD[9] at rdd at <console>:26
                    | MapPartitionsRDD[5] at rdd at <console>:26
                    | FileScanRDD[4] at rdd at <console>:26


                    Note that this case is particularly interesting, because we actually triggered a job:



                    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
                    res5: Boolean = false

                    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
                    res6: Array[Int] = Array(0)


                    That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).



                    In such scenario getNumPartitions will also trigger a job:



                    scala> ds.orderBy("value").rdd.getNumPartitions
                    res7: Int = 67

                    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
                    res8: Array[Int] = Array(1, 0)


                    however it doesn't mean that the observed cost is somehow related to .rdd call. Instead it is an intrinsic cost of finding partitions in case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).




                  Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd. For example ds.rdd.count would be indeed expensive and wasteful.







                  share|improve this answer














                  share|improve this answer



                  share|improve this answer








                  edited Jan 19 at 19:58

























                  answered Jan 19 at 19:18









                  user10938362user10938362

                  919




                  919

























                      0














                      In my experience df.rdd.getNumPartitions is very fast, I never encountered taking this more than a second or so.



                      Alternatively, you could also try



                      val numPartitions: Long = df
                      .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()


                      which would avoid using .rdd






                      share|improve this answer
























                      • My job is running in just over half the time after removing the .rdd.getNumPartitions calls

                        – javadba
                        Jan 19 at 20:12
















                      0














                      In my experience df.rdd.getNumPartitions is very fast, I never encountered taking this more than a second or so.



                      Alternatively, you could also try



                      val numPartitions: Long = df
                      .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()


                      which would avoid using .rdd






                      share|improve this answer
























                      • My job is running in just over half the time after removing the .rdd.getNumPartitions calls

                        – javadba
                        Jan 19 at 20:12














                      0












                      0








                      0







                      In my experience df.rdd.getNumPartitions is very fast, I never encountered taking this more than a second or so.



                      Alternatively, you could also try



                      val numPartitions: Long = df
                      .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()


                      which would avoid using .rdd






                      share|improve this answer













                      In my experience df.rdd.getNumPartitions is very fast, I never encountered taking this more than a second or so.



                      Alternatively, you could also try



                      val numPartitions: Long = df
                      .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()


                      which would avoid using .rdd







                      share|improve this answer












                      share|improve this answer



                      share|improve this answer










                      answered Jan 19 at 19:58









                      Raphael RothRaphael Roth

                      12k53873




                      12k53873













                      • My job is running in just over half the time after removing the .rdd.getNumPartitions calls

                        – javadba
                        Jan 19 at 20:12



















                      • My job is running in just over half the time after removing the .rdd.getNumPartitions calls

                        – javadba
                        Jan 19 at 20:12

















                      My job is running in just over half the time after removing the .rdd.getNumPartitions calls

                      – javadba
                      Jan 19 at 20:12





                      My job is running in just over half the time after removing the .rdd.getNumPartitions calls

                      – javadba
                      Jan 19 at 20:12











                      -1














                      I looked at the in-memory catalog and unfortunately it is quite limited. In particular the partitions information is not available:



                      https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/catalog/Catalog.html



                      A subset of the contents is:



                      enter image description here
                      Unless evidence were presented / discovered by another reader I am going to conclude there is no inexpensive way to check for the current number of partitions of a dataframe.






                      share|improve this answer






























                        -1














                        I looked at the in-memory catalog and unfortunately it is quite limited. In particular the partitions information is not available:



                        https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/catalog/Catalog.html



                        A subset of the contents is:



                        enter image description here
                        Unless evidence were presented / discovered by another reader I am going to conclude there is no inexpensive way to check for the current number of partitions of a dataframe.






                        share|improve this answer




























                          -1












                          -1








                          -1







                          I looked at the in-memory catalog and unfortunately it is quite limited. In particular the partitions information is not available:



                          https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/catalog/Catalog.html



                          A subset of the contents is:



                          enter image description here
                          Unless evidence were presented / discovered by another reader I am going to conclude there is no inexpensive way to check for the current number of partitions of a dataframe.






                          share|improve this answer















                          I looked at the in-memory catalog and unfortunately it is quite limited. In particular the partitions information is not available:



                          https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/catalog/Catalog.html



                          A subset of the contents is:



                          enter image description here
                          Unless evidence were presented / discovered by another reader I am going to conclude there is no inexpensive way to check for the current number of partitions of a dataframe.







                          share|improve this answer














                          share|improve this answer



                          share|improve this answer








                          edited Jan 19 at 16:38

























                          answered Jan 19 at 16:24









                          javadbajavadba

                          22k33147287




                          22k33147287






























                              draft saved

                              draft discarded




















































                              Thanks for contributing an answer to Stack Overflow!


                              • Please be sure to answer the question. Provide details and share your research!

                              But avoid



                              • Asking for help, clarification, or responding to other answers.

                              • Making statements based on opinion; back them up with references or personal experience.


                              To learn more, see our tips on writing great answers.




                              draft saved


                              draft discarded














                              StackExchange.ready(
                              function () {
                              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54268845%2fhow-to-check-the-number-of-partitions-of-a-spark-dataframe-without-incurring-the%23new-answer', 'question_page');
                              }
                              );

                              Post as a guest















                              Required, but never shown





















































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown

































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown







                              Popular posts from this blog

                              Callistus III

                              Plistias Cous

                              Index Sanctorum