Spark SQL filter multiple similar fields












0















Is there a better way to write a filter of multiple conditions which are similar in nature on a spark dataframe.



Assuming df is a spark dataframe having timestamp columns t1,t2,t3,t4.



val filteredDF=df.filter(col("t1").lt(current_date()-expr("INTERVAL 30 DAYS")) || col("t2").lt(current_date()-expr("INTERVAL 30 DAYS")) ||
col("t3").lt(current_date()-expr("INTERVAL 30 DAYS")) ||
col("t4").lt(current_date()-expr("INTERVAL 30 DAYS")))


Any better way to write the same. Since I'm new to scala, I kind of don't know the best practices to code in scala yet. Appreciate any help.










share|improve this question



























    0















    Is there a better way to write a filter of multiple conditions which are similar in nature on a spark dataframe.



    Assuming df is a spark dataframe having timestamp columns t1,t2,t3,t4.



    val filteredDF=df.filter(col("t1").lt(current_date()-expr("INTERVAL 30 DAYS")) || col("t2").lt(current_date()-expr("INTERVAL 30 DAYS")) ||
    col("t3").lt(current_date()-expr("INTERVAL 30 DAYS")) ||
    col("t4").lt(current_date()-expr("INTERVAL 30 DAYS")))


    Any better way to write the same. Since I'm new to scala, I kind of don't know the best practices to code in scala yet. Appreciate any help.










    share|improve this question

























      0












      0








      0








      Is there a better way to write a filter of multiple conditions which are similar in nature on a spark dataframe.



      Assuming df is a spark dataframe having timestamp columns t1,t2,t3,t4.



      val filteredDF=df.filter(col("t1").lt(current_date()-expr("INTERVAL 30 DAYS")) || col("t2").lt(current_date()-expr("INTERVAL 30 DAYS")) ||
      col("t3").lt(current_date()-expr("INTERVAL 30 DAYS")) ||
      col("t4").lt(current_date()-expr("INTERVAL 30 DAYS")))


      Any better way to write the same. Since I'm new to scala, I kind of don't know the best practices to code in scala yet. Appreciate any help.










      share|improve this question














      Is there a better way to write a filter of multiple conditions which are similar in nature on a spark dataframe.



      Assuming df is a spark dataframe having timestamp columns t1,t2,t3,t4.



      val filteredDF=df.filter(col("t1").lt(current_date()-expr("INTERVAL 30 DAYS")) || col("t2").lt(current_date()-expr("INTERVAL 30 DAYS")) ||
      col("t3").lt(current_date()-expr("INTERVAL 30 DAYS")) ||
      col("t4").lt(current_date()-expr("INTERVAL 30 DAYS")))


      Any better way to write the same. Since I'm new to scala, I kind of don't know the best practices to code in scala yet. Appreciate any help.







      apache-spark apache-spark-sql






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Jan 19 at 6:24









      Vikas JVikas J

      891111




      891111
























          2 Answers
          2






          active

          oldest

          votes


















          0














          import df.sparkSession.implicits._
          import org.apache.spark.sql.functions._
          def filterDates(dates: Column*): Column =
          dates
          .map(_.lt(current_date()-expr("INTERVAL 30 DAYS")))
          .reduce(_ or _)
          val filteredDF = df.filter(filterDates($"t1", $"t2", $"t3", $"t4"))


          I didn't even check if it compiles, but give or take a few typos it should do the job.






          share|improve this answer































            0














            Check this out:



            scala>  val df =Seq( ( (Timestamp.valueOf("2019-01-01 01:02:03")), (Timestamp.valueOf("2019-01-10 01:02:03")), (Timestamp.valueOf("2019-01-15 01:02:03") ), (Timestamp.valueOf("2019-02-22 01:02:03")) ) ).toDF("t1","t2","t3","t4")
            df: org.apache.spark.sql.DataFrame = [t1: timestamp, t2: timestamp ... 2 more fields]

            scala> df.show(false)
            +-------------------+-------------------+-------------------+-------------------+
            |t1 |t2 |t3 |t4 |
            +-------------------+-------------------+-------------------+-------------------+
            |2019-01-01 01:02:03|2019-01-10 01:02:03|2019-01-15 01:02:03|2019-02-22 01:02:03|
            +-------------------+-------------------+-------------------+-------------------+


            scala> val ts_cols = df.dtypes.filter( _._2 == "TimestampType" ).map( _._1)
            ts_cols: Array[String] = Array(t1, t2, t3, t4)

            scala> val exp1 = ts_cols.map ( x=> col(x).lt(current_date()-expr("INTERVAL 30 DAYS")) ).reduce( _||_ )
            exp1: org.apache.spark.sql.Column = ((((t1 < (current_date() - interval 4 weeks 2 days)) OR (t2 < (current_date() - interval 4 weeks 2 days))) OR (t3 < (current_date() - interval 4 weeks 2 days))) OR (t4 < (current_date() - interval 4 weeks 2 days)))

            scala> df.select(col("*"),exp1.as("ts_comp") ).show(false)
            +-------------------+-------------------+-------------------+-------------------+-------+
            |t1 |t2 |t3 |t4 |ts_comp|
            +-------------------+-------------------+-------------------+-------------------+-------+
            |2019-01-01 01:02:03|2019-01-10 01:02:03|2019-01-15 01:02:03|2019-02-22 01:02:03|false |
            +-------------------+-------------------+-------------------+-------------------+-------+


            true test case



            scala> val df2 =Seq( ( (Timestamp.valueOf("2018-01-01 01:02:03")), (Timestamp.valueOf("2018-01-10 01:02:03")), (Timestamp.valueOf("2018-01-15 01:
            02:03") ), (Timestamp.valueOf("2018-02-22 01:02:03")) ) ).toDF("t1","t2","t3","t4")
            df2: org.apache.spark.sql.DataFrame = [t1: timestamp, t2: timestamp ... 2 more fields]

            scala> df2.select(col("*"),exp1.as("ts_comp") ).show(false)
            +-------------------+-------------------+-------------------+-------------------+-------+
            |t1 |t2 |t3 |t4 |ts_comp|
            +-------------------+-------------------+-------------------+-------------------+-------+
            |2018-01-01 01:02:03|2018-01-10 01:02:03|2018-01-15 01:02:03|2018-02-22 01:02:03|true |
            +-------------------+-------------------+-------------------+-------------------+-------+


            scala>





            share|improve this answer























              Your Answer






              StackExchange.ifUsing("editor", function () {
              StackExchange.using("externalEditor", function () {
              StackExchange.using("snippets", function () {
              StackExchange.snippets.init();
              });
              });
              }, "code-snippets");

              StackExchange.ready(function() {
              var channelOptions = {
              tags: "".split(" "),
              id: "1"
              };
              initTagRenderer("".split(" "), "".split(" "), channelOptions);

              StackExchange.using("externalEditor", function() {
              // Have to fire editor after snippets, if snippets enabled
              if (StackExchange.settings.snippets.snippetsEnabled) {
              StackExchange.using("snippets", function() {
              createEditor();
              });
              }
              else {
              createEditor();
              }
              });

              function createEditor() {
              StackExchange.prepareEditor({
              heartbeatType: 'answer',
              autoActivateHeartbeat: false,
              convertImagesToLinks: true,
              noModals: true,
              showLowRepImageUploadWarning: true,
              reputationToPostImages: 10,
              bindNavPrevention: true,
              postfix: "",
              imageUploader: {
              brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
              contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
              allowUrls: true
              },
              onDemand: true,
              discardSelector: ".discard-answer"
              ,immediatelyShowMarkdownHelp:true
              });


              }
              });














              draft saved

              draft discarded


















              StackExchange.ready(
              function () {
              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54264624%2fspark-sql-filter-multiple-similar-fields%23new-answer', 'question_page');
              }
              );

              Post as a guest















              Required, but never shown

























              2 Answers
              2






              active

              oldest

              votes








              2 Answers
              2






              active

              oldest

              votes









              active

              oldest

              votes






              active

              oldest

              votes









              0














              import df.sparkSession.implicits._
              import org.apache.spark.sql.functions._
              def filterDates(dates: Column*): Column =
              dates
              .map(_.lt(current_date()-expr("INTERVAL 30 DAYS")))
              .reduce(_ or _)
              val filteredDF = df.filter(filterDates($"t1", $"t2", $"t3", $"t4"))


              I didn't even check if it compiles, but give or take a few typos it should do the job.






              share|improve this answer




























                0














                import df.sparkSession.implicits._
                import org.apache.spark.sql.functions._
                def filterDates(dates: Column*): Column =
                dates
                .map(_.lt(current_date()-expr("INTERVAL 30 DAYS")))
                .reduce(_ or _)
                val filteredDF = df.filter(filterDates($"t1", $"t2", $"t3", $"t4"))


                I didn't even check if it compiles, but give or take a few typos it should do the job.






                share|improve this answer


























                  0












                  0








                  0







                  import df.sparkSession.implicits._
                  import org.apache.spark.sql.functions._
                  def filterDates(dates: Column*): Column =
                  dates
                  .map(_.lt(current_date()-expr("INTERVAL 30 DAYS")))
                  .reduce(_ or _)
                  val filteredDF = df.filter(filterDates($"t1", $"t2", $"t3", $"t4"))


                  I didn't even check if it compiles, but give or take a few typos it should do the job.






                  share|improve this answer













                  import df.sparkSession.implicits._
                  import org.apache.spark.sql.functions._
                  def filterDates(dates: Column*): Column =
                  dates
                  .map(_.lt(current_date()-expr("INTERVAL 30 DAYS")))
                  .reduce(_ or _)
                  val filteredDF = df.filter(filterDates($"t1", $"t2", $"t3", $"t4"))


                  I didn't even check if it compiles, but give or take a few typos it should do the job.







                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Jan 19 at 7:29









                  shay__shay__

                  2,133622




                  2,133622

























                      0














                      Check this out:



                      scala>  val df =Seq( ( (Timestamp.valueOf("2019-01-01 01:02:03")), (Timestamp.valueOf("2019-01-10 01:02:03")), (Timestamp.valueOf("2019-01-15 01:02:03") ), (Timestamp.valueOf("2019-02-22 01:02:03")) ) ).toDF("t1","t2","t3","t4")
                      df: org.apache.spark.sql.DataFrame = [t1: timestamp, t2: timestamp ... 2 more fields]

                      scala> df.show(false)
                      +-------------------+-------------------+-------------------+-------------------+
                      |t1 |t2 |t3 |t4 |
                      +-------------------+-------------------+-------------------+-------------------+
                      |2019-01-01 01:02:03|2019-01-10 01:02:03|2019-01-15 01:02:03|2019-02-22 01:02:03|
                      +-------------------+-------------------+-------------------+-------------------+


                      scala> val ts_cols = df.dtypes.filter( _._2 == "TimestampType" ).map( _._1)
                      ts_cols: Array[String] = Array(t1, t2, t3, t4)

                      scala> val exp1 = ts_cols.map ( x=> col(x).lt(current_date()-expr("INTERVAL 30 DAYS")) ).reduce( _||_ )
                      exp1: org.apache.spark.sql.Column = ((((t1 < (current_date() - interval 4 weeks 2 days)) OR (t2 < (current_date() - interval 4 weeks 2 days))) OR (t3 < (current_date() - interval 4 weeks 2 days))) OR (t4 < (current_date() - interval 4 weeks 2 days)))

                      scala> df.select(col("*"),exp1.as("ts_comp") ).show(false)
                      +-------------------+-------------------+-------------------+-------------------+-------+
                      |t1 |t2 |t3 |t4 |ts_comp|
                      +-------------------+-------------------+-------------------+-------------------+-------+
                      |2019-01-01 01:02:03|2019-01-10 01:02:03|2019-01-15 01:02:03|2019-02-22 01:02:03|false |
                      +-------------------+-------------------+-------------------+-------------------+-------+


                      true test case



                      scala> val df2 =Seq( ( (Timestamp.valueOf("2018-01-01 01:02:03")), (Timestamp.valueOf("2018-01-10 01:02:03")), (Timestamp.valueOf("2018-01-15 01:
                      02:03") ), (Timestamp.valueOf("2018-02-22 01:02:03")) ) ).toDF("t1","t2","t3","t4")
                      df2: org.apache.spark.sql.DataFrame = [t1: timestamp, t2: timestamp ... 2 more fields]

                      scala> df2.select(col("*"),exp1.as("ts_comp") ).show(false)
                      +-------------------+-------------------+-------------------+-------------------+-------+
                      |t1 |t2 |t3 |t4 |ts_comp|
                      +-------------------+-------------------+-------------------+-------------------+-------+
                      |2018-01-01 01:02:03|2018-01-10 01:02:03|2018-01-15 01:02:03|2018-02-22 01:02:03|true |
                      +-------------------+-------------------+-------------------+-------------------+-------+


                      scala>





                      share|improve this answer




























                        0














                        Check this out:



                        scala>  val df =Seq( ( (Timestamp.valueOf("2019-01-01 01:02:03")), (Timestamp.valueOf("2019-01-10 01:02:03")), (Timestamp.valueOf("2019-01-15 01:02:03") ), (Timestamp.valueOf("2019-02-22 01:02:03")) ) ).toDF("t1","t2","t3","t4")
                        df: org.apache.spark.sql.DataFrame = [t1: timestamp, t2: timestamp ... 2 more fields]

                        scala> df.show(false)
                        +-------------------+-------------------+-------------------+-------------------+
                        |t1 |t2 |t3 |t4 |
                        +-------------------+-------------------+-------------------+-------------------+
                        |2019-01-01 01:02:03|2019-01-10 01:02:03|2019-01-15 01:02:03|2019-02-22 01:02:03|
                        +-------------------+-------------------+-------------------+-------------------+


                        scala> val ts_cols = df.dtypes.filter( _._2 == "TimestampType" ).map( _._1)
                        ts_cols: Array[String] = Array(t1, t2, t3, t4)

                        scala> val exp1 = ts_cols.map ( x=> col(x).lt(current_date()-expr("INTERVAL 30 DAYS")) ).reduce( _||_ )
                        exp1: org.apache.spark.sql.Column = ((((t1 < (current_date() - interval 4 weeks 2 days)) OR (t2 < (current_date() - interval 4 weeks 2 days))) OR (t3 < (current_date() - interval 4 weeks 2 days))) OR (t4 < (current_date() - interval 4 weeks 2 days)))

                        scala> df.select(col("*"),exp1.as("ts_comp") ).show(false)
                        +-------------------+-------------------+-------------------+-------------------+-------+
                        |t1 |t2 |t3 |t4 |ts_comp|
                        +-------------------+-------------------+-------------------+-------------------+-------+
                        |2019-01-01 01:02:03|2019-01-10 01:02:03|2019-01-15 01:02:03|2019-02-22 01:02:03|false |
                        +-------------------+-------------------+-------------------+-------------------+-------+


                        true test case



                        scala> val df2 =Seq( ( (Timestamp.valueOf("2018-01-01 01:02:03")), (Timestamp.valueOf("2018-01-10 01:02:03")), (Timestamp.valueOf("2018-01-15 01:
                        02:03") ), (Timestamp.valueOf("2018-02-22 01:02:03")) ) ).toDF("t1","t2","t3","t4")
                        df2: org.apache.spark.sql.DataFrame = [t1: timestamp, t2: timestamp ... 2 more fields]

                        scala> df2.select(col("*"),exp1.as("ts_comp") ).show(false)
                        +-------------------+-------------------+-------------------+-------------------+-------+
                        |t1 |t2 |t3 |t4 |ts_comp|
                        +-------------------+-------------------+-------------------+-------------------+-------+
                        |2018-01-01 01:02:03|2018-01-10 01:02:03|2018-01-15 01:02:03|2018-02-22 01:02:03|true |
                        +-------------------+-------------------+-------------------+-------------------+-------+


                        scala>





                        share|improve this answer


























                          0












                          0








                          0







                          Check this out:



                          scala>  val df =Seq( ( (Timestamp.valueOf("2019-01-01 01:02:03")), (Timestamp.valueOf("2019-01-10 01:02:03")), (Timestamp.valueOf("2019-01-15 01:02:03") ), (Timestamp.valueOf("2019-02-22 01:02:03")) ) ).toDF("t1","t2","t3","t4")
                          df: org.apache.spark.sql.DataFrame = [t1: timestamp, t2: timestamp ... 2 more fields]

                          scala> df.show(false)
                          +-------------------+-------------------+-------------------+-------------------+
                          |t1 |t2 |t3 |t4 |
                          +-------------------+-------------------+-------------------+-------------------+
                          |2019-01-01 01:02:03|2019-01-10 01:02:03|2019-01-15 01:02:03|2019-02-22 01:02:03|
                          +-------------------+-------------------+-------------------+-------------------+


                          scala> val ts_cols = df.dtypes.filter( _._2 == "TimestampType" ).map( _._1)
                          ts_cols: Array[String] = Array(t1, t2, t3, t4)

                          scala> val exp1 = ts_cols.map ( x=> col(x).lt(current_date()-expr("INTERVAL 30 DAYS")) ).reduce( _||_ )
                          exp1: org.apache.spark.sql.Column = ((((t1 < (current_date() - interval 4 weeks 2 days)) OR (t2 < (current_date() - interval 4 weeks 2 days))) OR (t3 < (current_date() - interval 4 weeks 2 days))) OR (t4 < (current_date() - interval 4 weeks 2 days)))

                          scala> df.select(col("*"),exp1.as("ts_comp") ).show(false)
                          +-------------------+-------------------+-------------------+-------------------+-------+
                          |t1 |t2 |t3 |t4 |ts_comp|
                          +-------------------+-------------------+-------------------+-------------------+-------+
                          |2019-01-01 01:02:03|2019-01-10 01:02:03|2019-01-15 01:02:03|2019-02-22 01:02:03|false |
                          +-------------------+-------------------+-------------------+-------------------+-------+


                          true test case



                          scala> val df2 =Seq( ( (Timestamp.valueOf("2018-01-01 01:02:03")), (Timestamp.valueOf("2018-01-10 01:02:03")), (Timestamp.valueOf("2018-01-15 01:
                          02:03") ), (Timestamp.valueOf("2018-02-22 01:02:03")) ) ).toDF("t1","t2","t3","t4")
                          df2: org.apache.spark.sql.DataFrame = [t1: timestamp, t2: timestamp ... 2 more fields]

                          scala> df2.select(col("*"),exp1.as("ts_comp") ).show(false)
                          +-------------------+-------------------+-------------------+-------------------+-------+
                          |t1 |t2 |t3 |t4 |ts_comp|
                          +-------------------+-------------------+-------------------+-------------------+-------+
                          |2018-01-01 01:02:03|2018-01-10 01:02:03|2018-01-15 01:02:03|2018-02-22 01:02:03|true |
                          +-------------------+-------------------+-------------------+-------------------+-------+


                          scala>





                          share|improve this answer













                          Check this out:



                          scala>  val df =Seq( ( (Timestamp.valueOf("2019-01-01 01:02:03")), (Timestamp.valueOf("2019-01-10 01:02:03")), (Timestamp.valueOf("2019-01-15 01:02:03") ), (Timestamp.valueOf("2019-02-22 01:02:03")) ) ).toDF("t1","t2","t3","t4")
                          df: org.apache.spark.sql.DataFrame = [t1: timestamp, t2: timestamp ... 2 more fields]

                          scala> df.show(false)
                          +-------------------+-------------------+-------------------+-------------------+
                          |t1 |t2 |t3 |t4 |
                          +-------------------+-------------------+-------------------+-------------------+
                          |2019-01-01 01:02:03|2019-01-10 01:02:03|2019-01-15 01:02:03|2019-02-22 01:02:03|
                          +-------------------+-------------------+-------------------+-------------------+


                          scala> val ts_cols = df.dtypes.filter( _._2 == "TimestampType" ).map( _._1)
                          ts_cols: Array[String] = Array(t1, t2, t3, t4)

                          scala> val exp1 = ts_cols.map ( x=> col(x).lt(current_date()-expr("INTERVAL 30 DAYS")) ).reduce( _||_ )
                          exp1: org.apache.spark.sql.Column = ((((t1 < (current_date() - interval 4 weeks 2 days)) OR (t2 < (current_date() - interval 4 weeks 2 days))) OR (t3 < (current_date() - interval 4 weeks 2 days))) OR (t4 < (current_date() - interval 4 weeks 2 days)))

                          scala> df.select(col("*"),exp1.as("ts_comp") ).show(false)
                          +-------------------+-------------------+-------------------+-------------------+-------+
                          |t1 |t2 |t3 |t4 |ts_comp|
                          +-------------------+-------------------+-------------------+-------------------+-------+
                          |2019-01-01 01:02:03|2019-01-10 01:02:03|2019-01-15 01:02:03|2019-02-22 01:02:03|false |
                          +-------------------+-------------------+-------------------+-------------------+-------+


                          true test case



                          scala> val df2 =Seq( ( (Timestamp.valueOf("2018-01-01 01:02:03")), (Timestamp.valueOf("2018-01-10 01:02:03")), (Timestamp.valueOf("2018-01-15 01:
                          02:03") ), (Timestamp.valueOf("2018-02-22 01:02:03")) ) ).toDF("t1","t2","t3","t4")
                          df2: org.apache.spark.sql.DataFrame = [t1: timestamp, t2: timestamp ... 2 more fields]

                          scala> df2.select(col("*"),exp1.as("ts_comp") ).show(false)
                          +-------------------+-------------------+-------------------+-------------------+-------+
                          |t1 |t2 |t3 |t4 |ts_comp|
                          +-------------------+-------------------+-------------------+-------------------+-------+
                          |2018-01-01 01:02:03|2018-01-10 01:02:03|2018-01-15 01:02:03|2018-02-22 01:02:03|true |
                          +-------------------+-------------------+-------------------+-------------------+-------+


                          scala>






                          share|improve this answer












                          share|improve this answer



                          share|improve this answer










                          answered Jan 19 at 13:45









                          stack0114106stack0114106

                          3,1142417




                          3,1142417






























                              draft saved

                              draft discarded




















































                              Thanks for contributing an answer to Stack Overflow!


                              • Please be sure to answer the question. Provide details and share your research!

                              But avoid



                              • Asking for help, clarification, or responding to other answers.

                              • Making statements based on opinion; back them up with references or personal experience.


                              To learn more, see our tips on writing great answers.




                              draft saved


                              draft discarded














                              StackExchange.ready(
                              function () {
                              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54264624%2fspark-sql-filter-multiple-similar-fields%23new-answer', 'question_page');
                              }
                              );

                              Post as a guest















                              Required, but never shown





















































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown

































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown







                              Popular posts from this blog

                              Liquibase includeAll doesn't find base path

                              How to use setInterval in EJS file?

                              Petrus Granier-Deferre