How to check the number of partitions of a Spark DataFrame without incurring the cost of .rdd
There are a number of questions about how to obtain the number of partitions of a n RDD and or a DataFrame : the answers invariably are:
rdd.getNumPartitions
or
df.rdd.getNumPartitions
Unfortunately that is an expensive operation on a DataFrame because the
df.rdd
requires conversion from the DataFrame to an rdd. This is on the order of the time it takes to run
df.count
I am writing logic that optionally repartition's or coalesce's a DataFrame - based on whether the current number of partitions were within a range of acceptable values or instead below or above them.
def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}
But we can not afford to incur the cost of the rdd.getNumPartitions for every DataFrame in this manner.
Is there not any way to obtain this information - e.g. from querying the online/temporary catalog for the registered table maybe?
Update The Spark GUI showed the DataFrame.rdd operation as taking as long as the longest sql in the job. I will re-run the job and attach the screenshot in a bit here.
The following is just a testcase : it is using a small fraction of the data size of that in production. The longest sql is only five minutes - and this one is on its way to spending that amount of time as well (note that the sql is not helped out here: it also has to execute subsequently thus effectively doubling the cumulative execution time).

We can see that the .rdd operation at DataFrameUtils line 30 (shown in the snippet above) takes 5.1mins - and yet the save operation still took 5.2 mins later -i.e. we did not save any time by doing the .rdd in terms of the execution time of the subsequent save.
scala apache-spark partition
add a comment |
There are a number of questions about how to obtain the number of partitions of a n RDD and or a DataFrame : the answers invariably are:
rdd.getNumPartitions
or
df.rdd.getNumPartitions
Unfortunately that is an expensive operation on a DataFrame because the
df.rdd
requires conversion from the DataFrame to an rdd. This is on the order of the time it takes to run
df.count
I am writing logic that optionally repartition's or coalesce's a DataFrame - based on whether the current number of partitions were within a range of acceptable values or instead below or above them.
def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}
But we can not afford to incur the cost of the rdd.getNumPartitions for every DataFrame in this manner.
Is there not any way to obtain this information - e.g. from querying the online/temporary catalog for the registered table maybe?
Update The Spark GUI showed the DataFrame.rdd operation as taking as long as the longest sql in the job. I will re-run the job and attach the screenshot in a bit here.
The following is just a testcase : it is using a small fraction of the data size of that in production. The longest sql is only five minutes - and this one is on its way to spending that amount of time as well (note that the sql is not helped out here: it also has to execute subsequently thus effectively doubling the cumulative execution time).

We can see that the .rdd operation at DataFrameUtils line 30 (shown in the snippet above) takes 5.1mins - and yet the save operation still took 5.2 mins later -i.e. we did not save any time by doing the .rdd in terms of the execution time of the subsequent save.
scala apache-spark partition
add a comment |
There are a number of questions about how to obtain the number of partitions of a n RDD and or a DataFrame : the answers invariably are:
rdd.getNumPartitions
or
df.rdd.getNumPartitions
Unfortunately that is an expensive operation on a DataFrame because the
df.rdd
requires conversion from the DataFrame to an rdd. This is on the order of the time it takes to run
df.count
I am writing logic that optionally repartition's or coalesce's a DataFrame - based on whether the current number of partitions were within a range of acceptable values or instead below or above them.
def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}
But we can not afford to incur the cost of the rdd.getNumPartitions for every DataFrame in this manner.
Is there not any way to obtain this information - e.g. from querying the online/temporary catalog for the registered table maybe?
Update The Spark GUI showed the DataFrame.rdd operation as taking as long as the longest sql in the job. I will re-run the job and attach the screenshot in a bit here.
The following is just a testcase : it is using a small fraction of the data size of that in production. The longest sql is only five minutes - and this one is on its way to spending that amount of time as well (note that the sql is not helped out here: it also has to execute subsequently thus effectively doubling the cumulative execution time).

We can see that the .rdd operation at DataFrameUtils line 30 (shown in the snippet above) takes 5.1mins - and yet the save operation still took 5.2 mins later -i.e. we did not save any time by doing the .rdd in terms of the execution time of the subsequent save.
scala apache-spark partition
There are a number of questions about how to obtain the number of partitions of a n RDD and or a DataFrame : the answers invariably are:
rdd.getNumPartitions
or
df.rdd.getNumPartitions
Unfortunately that is an expensive operation on a DataFrame because the
df.rdd
requires conversion from the DataFrame to an rdd. This is on the order of the time it takes to run
df.count
I am writing logic that optionally repartition's or coalesce's a DataFrame - based on whether the current number of partitions were within a range of acceptable values or instead below or above them.
def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}
But we can not afford to incur the cost of the rdd.getNumPartitions for every DataFrame in this manner.
Is there not any way to obtain this information - e.g. from querying the online/temporary catalog for the registered table maybe?
Update The Spark GUI showed the DataFrame.rdd operation as taking as long as the longest sql in the job. I will re-run the job and attach the screenshot in a bit here.
The following is just a testcase : it is using a small fraction of the data size of that in production. The longest sql is only five minutes - and this one is on its way to spending that amount of time as well (note that the sql is not helped out here: it also has to execute subsequently thus effectively doubling the cumulative execution time).

We can see that the .rdd operation at DataFrameUtils line 30 (shown in the snippet above) takes 5.1mins - and yet the save operation still took 5.2 mins later -i.e. we did not save any time by doing the .rdd in terms of the execution time of the subsequent save.
scala apache-spark partition
scala apache-spark partition
edited Jan 19 at 20:15
javadba
asked Jan 19 at 15:56
javadbajavadba
22k33147287
22k33147287
add a comment |
add a comment |
3 Answers
3
active
oldest
votes
There is no inherent cost of rdd component in rdd.getNumPartitions, because returned RDD is never evaluated.
While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.4.0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]
scala> ds.rdd.getNumPartitions
res0: Int = 1
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
it might be not enough to convince you. So let's approach this in a more systematic way:
rddreturns aMapPartitionRDD(dsas defined above):
scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
RDD.getNumPartitionsinvokesRDD.partitions.- In non-checkpointed scenario
RDD.partitionsinvokesgetPartitions(feel free to trace the checkpoint path as well).
RDD.getPartitionsis abstract.- So the actual implementation used in this case is
MapPartitionsRDD.getPartitions, which simply delegates the call to the parent.
There are only
MapPartitionsRDDbetweenrddand the source.
scala> ds.rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[3] at rdd at <console>:26
| MapPartitionsRDD[2] at rdd at <console>:26
| MapPartitionsRDD[1] at rdd at <console>:26
| FileScanRDD[0] at rdd at <console>:26
Similarly if
Datasetcontained an exchange we would follow the parents to the nearest shuffle:
scala> ds.orderBy("value").rdd.toDebugString
res4: String =
(67) MapPartitionsRDD[13] at rdd at <console>:26
| MapPartitionsRDD[12] at rdd at <console>:26
| MapPartitionsRDD[11] at rdd at <console>:26
| ShuffledRowRDD[10] at rdd at <console>:26
+-(1) MapPartitionsRDD[9] at rdd at <console>:26
| MapPartitionsRDD[5] at rdd at <console>:26
| FileScanRDD[4] at rdd at <console>:26
Note that this case is particularly interesting, because we actually triggered a job:
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
res5: Boolean = false
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
res6: Array[Int] = Array(0)
That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).
In such scenario
getNumPartitionswill also trigger a job:
scala> ds.orderBy("value").rdd.getNumPartitions
res7: Int = 67
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
res8: Array[Int] = Array(1, 0)
however it doesn't mean that the observed cost is somehow related to
.rddcall. Instead it is an intrinsic cost of findingpartitionsin case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).
Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd. For example ds.rdd.count would be indeed expensive and wasteful.
add a comment |
In my experience df.rdd.getNumPartitions is very fast, I never encountered taking this more than a second or so.
Alternatively, you could also try
val numPartitions: Long = df
.select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()
which would avoid using .rdd
My job is running in just over half the time after removing the.rdd.getNumPartitionscalls
– javadba
Jan 19 at 20:12
add a comment |
I looked at the in-memory catalog and unfortunately it is quite limited. In particular the partitions information is not available:
https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/catalog/Catalog.html
A subset of the contents is:

Unless evidence were presented / discovered by another reader I am going to conclude there is no inexpensive way to check for the current number of partitions of a dataframe.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54268845%2fhow-to-check-the-number-of-partitions-of-a-spark-dataframe-without-incurring-the%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
There is no inherent cost of rdd component in rdd.getNumPartitions, because returned RDD is never evaluated.
While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.4.0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]
scala> ds.rdd.getNumPartitions
res0: Int = 1
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
it might be not enough to convince you. So let's approach this in a more systematic way:
rddreturns aMapPartitionRDD(dsas defined above):
scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
RDD.getNumPartitionsinvokesRDD.partitions.- In non-checkpointed scenario
RDD.partitionsinvokesgetPartitions(feel free to trace the checkpoint path as well).
RDD.getPartitionsis abstract.- So the actual implementation used in this case is
MapPartitionsRDD.getPartitions, which simply delegates the call to the parent.
There are only
MapPartitionsRDDbetweenrddand the source.
scala> ds.rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[3] at rdd at <console>:26
| MapPartitionsRDD[2] at rdd at <console>:26
| MapPartitionsRDD[1] at rdd at <console>:26
| FileScanRDD[0] at rdd at <console>:26
Similarly if
Datasetcontained an exchange we would follow the parents to the nearest shuffle:
scala> ds.orderBy("value").rdd.toDebugString
res4: String =
(67) MapPartitionsRDD[13] at rdd at <console>:26
| MapPartitionsRDD[12] at rdd at <console>:26
| MapPartitionsRDD[11] at rdd at <console>:26
| ShuffledRowRDD[10] at rdd at <console>:26
+-(1) MapPartitionsRDD[9] at rdd at <console>:26
| MapPartitionsRDD[5] at rdd at <console>:26
| FileScanRDD[4] at rdd at <console>:26
Note that this case is particularly interesting, because we actually triggered a job:
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
res5: Boolean = false
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
res6: Array[Int] = Array(0)
That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).
In such scenario
getNumPartitionswill also trigger a job:
scala> ds.orderBy("value").rdd.getNumPartitions
res7: Int = 67
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
res8: Array[Int] = Array(1, 0)
however it doesn't mean that the observed cost is somehow related to
.rddcall. Instead it is an intrinsic cost of findingpartitionsin case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).
Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd. For example ds.rdd.count would be indeed expensive and wasteful.
add a comment |
There is no inherent cost of rdd component in rdd.getNumPartitions, because returned RDD is never evaluated.
While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.4.0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]
scala> ds.rdd.getNumPartitions
res0: Int = 1
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
it might be not enough to convince you. So let's approach this in a more systematic way:
rddreturns aMapPartitionRDD(dsas defined above):
scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
RDD.getNumPartitionsinvokesRDD.partitions.- In non-checkpointed scenario
RDD.partitionsinvokesgetPartitions(feel free to trace the checkpoint path as well).
RDD.getPartitionsis abstract.- So the actual implementation used in this case is
MapPartitionsRDD.getPartitions, which simply delegates the call to the parent.
There are only
MapPartitionsRDDbetweenrddand the source.
scala> ds.rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[3] at rdd at <console>:26
| MapPartitionsRDD[2] at rdd at <console>:26
| MapPartitionsRDD[1] at rdd at <console>:26
| FileScanRDD[0] at rdd at <console>:26
Similarly if
Datasetcontained an exchange we would follow the parents to the nearest shuffle:
scala> ds.orderBy("value").rdd.toDebugString
res4: String =
(67) MapPartitionsRDD[13] at rdd at <console>:26
| MapPartitionsRDD[12] at rdd at <console>:26
| MapPartitionsRDD[11] at rdd at <console>:26
| ShuffledRowRDD[10] at rdd at <console>:26
+-(1) MapPartitionsRDD[9] at rdd at <console>:26
| MapPartitionsRDD[5] at rdd at <console>:26
| FileScanRDD[4] at rdd at <console>:26
Note that this case is particularly interesting, because we actually triggered a job:
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
res5: Boolean = false
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
res6: Array[Int] = Array(0)
That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).
In such scenario
getNumPartitionswill also trigger a job:
scala> ds.orderBy("value").rdd.getNumPartitions
res7: Int = 67
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
res8: Array[Int] = Array(1, 0)
however it doesn't mean that the observed cost is somehow related to
.rddcall. Instead it is an intrinsic cost of findingpartitionsin case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).
Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd. For example ds.rdd.count would be indeed expensive and wasteful.
add a comment |
There is no inherent cost of rdd component in rdd.getNumPartitions, because returned RDD is never evaluated.
While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.4.0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]
scala> ds.rdd.getNumPartitions
res0: Int = 1
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
it might be not enough to convince you. So let's approach this in a more systematic way:
rddreturns aMapPartitionRDD(dsas defined above):
scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
RDD.getNumPartitionsinvokesRDD.partitions.- In non-checkpointed scenario
RDD.partitionsinvokesgetPartitions(feel free to trace the checkpoint path as well).
RDD.getPartitionsis abstract.- So the actual implementation used in this case is
MapPartitionsRDD.getPartitions, which simply delegates the call to the parent.
There are only
MapPartitionsRDDbetweenrddand the source.
scala> ds.rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[3] at rdd at <console>:26
| MapPartitionsRDD[2] at rdd at <console>:26
| MapPartitionsRDD[1] at rdd at <console>:26
| FileScanRDD[0] at rdd at <console>:26
Similarly if
Datasetcontained an exchange we would follow the parents to the nearest shuffle:
scala> ds.orderBy("value").rdd.toDebugString
res4: String =
(67) MapPartitionsRDD[13] at rdd at <console>:26
| MapPartitionsRDD[12] at rdd at <console>:26
| MapPartitionsRDD[11] at rdd at <console>:26
| ShuffledRowRDD[10] at rdd at <console>:26
+-(1) MapPartitionsRDD[9] at rdd at <console>:26
| MapPartitionsRDD[5] at rdd at <console>:26
| FileScanRDD[4] at rdd at <console>:26
Note that this case is particularly interesting, because we actually triggered a job:
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
res5: Boolean = false
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
res6: Array[Int] = Array(0)
That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).
In such scenario
getNumPartitionswill also trigger a job:
scala> ds.orderBy("value").rdd.getNumPartitions
res7: Int = 67
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
res8: Array[Int] = Array(1, 0)
however it doesn't mean that the observed cost is somehow related to
.rddcall. Instead it is an intrinsic cost of findingpartitionsin case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).
Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd. For example ds.rdd.count would be indeed expensive and wasteful.
There is no inherent cost of rdd component in rdd.getNumPartitions, because returned RDD is never evaluated.
While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.4.0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]
scala> ds.rdd.getNumPartitions
res0: Int = 1
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
it might be not enough to convince you. So let's approach this in a more systematic way:
rddreturns aMapPartitionRDD(dsas defined above):
scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
RDD.getNumPartitionsinvokesRDD.partitions.- In non-checkpointed scenario
RDD.partitionsinvokesgetPartitions(feel free to trace the checkpoint path as well).
RDD.getPartitionsis abstract.- So the actual implementation used in this case is
MapPartitionsRDD.getPartitions, which simply delegates the call to the parent.
There are only
MapPartitionsRDDbetweenrddand the source.
scala> ds.rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[3] at rdd at <console>:26
| MapPartitionsRDD[2] at rdd at <console>:26
| MapPartitionsRDD[1] at rdd at <console>:26
| FileScanRDD[0] at rdd at <console>:26
Similarly if
Datasetcontained an exchange we would follow the parents to the nearest shuffle:
scala> ds.orderBy("value").rdd.toDebugString
res4: String =
(67) MapPartitionsRDD[13] at rdd at <console>:26
| MapPartitionsRDD[12] at rdd at <console>:26
| MapPartitionsRDD[11] at rdd at <console>:26
| ShuffledRowRDD[10] at rdd at <console>:26
+-(1) MapPartitionsRDD[9] at rdd at <console>:26
| MapPartitionsRDD[5] at rdd at <console>:26
| FileScanRDD[4] at rdd at <console>:26
Note that this case is particularly interesting, because we actually triggered a job:
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
res5: Boolean = false
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
res6: Array[Int] = Array(0)
That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).
In such scenario
getNumPartitionswill also trigger a job:
scala> ds.orderBy("value").rdd.getNumPartitions
res7: Int = 67
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
res8: Array[Int] = Array(1, 0)
however it doesn't mean that the observed cost is somehow related to
.rddcall. Instead it is an intrinsic cost of findingpartitionsin case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).
Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd. For example ds.rdd.count would be indeed expensive and wasteful.
edited Jan 19 at 19:58
answered Jan 19 at 19:18
user10938362user10938362
919
919
add a comment |
add a comment |
In my experience df.rdd.getNumPartitions is very fast, I never encountered taking this more than a second or so.
Alternatively, you could also try
val numPartitions: Long = df
.select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()
which would avoid using .rdd
My job is running in just over half the time after removing the.rdd.getNumPartitionscalls
– javadba
Jan 19 at 20:12
add a comment |
In my experience df.rdd.getNumPartitions is very fast, I never encountered taking this more than a second or so.
Alternatively, you could also try
val numPartitions: Long = df
.select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()
which would avoid using .rdd
My job is running in just over half the time after removing the.rdd.getNumPartitionscalls
– javadba
Jan 19 at 20:12
add a comment |
In my experience df.rdd.getNumPartitions is very fast, I never encountered taking this more than a second or so.
Alternatively, you could also try
val numPartitions: Long = df
.select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()
which would avoid using .rdd
In my experience df.rdd.getNumPartitions is very fast, I never encountered taking this more than a second or so.
Alternatively, you could also try
val numPartitions: Long = df
.select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()
which would avoid using .rdd
answered Jan 19 at 19:58
Raphael RothRaphael Roth
12k53873
12k53873
My job is running in just over half the time after removing the.rdd.getNumPartitionscalls
– javadba
Jan 19 at 20:12
add a comment |
My job is running in just over half the time after removing the.rdd.getNumPartitionscalls
– javadba
Jan 19 at 20:12
My job is running in just over half the time after removing the
.rdd.getNumPartitions calls– javadba
Jan 19 at 20:12
My job is running in just over half the time after removing the
.rdd.getNumPartitions calls– javadba
Jan 19 at 20:12
add a comment |
I looked at the in-memory catalog and unfortunately it is quite limited. In particular the partitions information is not available:
https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/catalog/Catalog.html
A subset of the contents is:

Unless evidence were presented / discovered by another reader I am going to conclude there is no inexpensive way to check for the current number of partitions of a dataframe.
add a comment |
I looked at the in-memory catalog and unfortunately it is quite limited. In particular the partitions information is not available:
https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/catalog/Catalog.html
A subset of the contents is:

Unless evidence were presented / discovered by another reader I am going to conclude there is no inexpensive way to check for the current number of partitions of a dataframe.
add a comment |
I looked at the in-memory catalog and unfortunately it is quite limited. In particular the partitions information is not available:
https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/catalog/Catalog.html
A subset of the contents is:

Unless evidence were presented / discovered by another reader I am going to conclude there is no inexpensive way to check for the current number of partitions of a dataframe.
I looked at the in-memory catalog and unfortunately it is quite limited. In particular the partitions information is not available:
https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/catalog/Catalog.html
A subset of the contents is:

Unless evidence were presented / discovered by another reader I am going to conclude there is no inexpensive way to check for the current number of partitions of a dataframe.
edited Jan 19 at 16:38
answered Jan 19 at 16:24
javadbajavadba
22k33147287
22k33147287
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54268845%2fhow-to-check-the-number-of-partitions-of-a-spark-dataframe-without-incurring-the%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown