How can I efficiently transpose a 67 gb file/Dask dataframe without loading it entirely into memory?












3















I have 3 rather large files (67gb, 36gb, 30gb) that I need to train models on. However, the features are rows and the samples are columns. Since Dask hasn't implemented transpose and stores DataFrames split by row, I need to write something to do this myself. Is there a way I can efficiently transpose without loading into memory?



I've got 16 gb of ram at my disposal and am using jupyter notebook. I have written some rather slow code, but would really appreciate a faster solution. The speed of the code below will take a month to finish all the files. The slowest step by a few orders of magnitude is awk.



import dask.dataframe as dd
import subprocess
from IPython.display import clear_output

df = dd.read_csv('~/VeryLarge.tsv')
with open('output.csv','wb') as fout:
for i in range(1, len(df.columns)+1):
print('AWKing')
#read a column from the original data and store it elsewhere
x = "awk '{print $"+str(i)+"}' ~/VeryLarge.tsv > ~/file.temp"
subprocess.check_call([x], shell=True)

print('Reading')
#load and transpose the column
col = pd.read_csv('~/file.temp')
row = col.T
display(row)

print('Deleting')
#remove the temporary file created
!rm ../file.temp

print('Storing')
#store the row in its own csv just to be safe. not entirely necessary
row.to_csv('~/columns/col_{:09d}'.format(i), header=False)

print('Appending')
#append the row (transposed column) to the new file
with open('~/columns/col_{:09d}', 'rb') as fin:
for line in fin:
fout.write(line)

clear_output()
#Just a measure of progress
print(i/len(df.columns))


The data itself is 10million rows (features) and 2000 columns (samples). It just needs to be transposed. Currently, it looks like this: DataFrame










share|improve this question

























  • Can you post a sample of input to desired output?

    – Mike
    Jan 16 at 1:09











  • Just edited to show a sample of the data. The data is real, but I changed the names of features and samples because of company privacy.

    – Joe B
    Jan 16 at 1:20






  • 2





    at risk of wandering off topic, if your data set has a high proportion of zeros you might consider using a sparse matrix representation. Many common matrix operations can be much more efficient that way.

    – smarchese
    Jan 16 at 1:59











  • So you want 10m columns and 2000 rows?

    – Mike
    Jan 16 at 2:48











  • Try docs.dask.org/en/latest/array-api.html#dask.array.transpose

    – mdurant
    Jan 16 at 15:58
















3















I have 3 rather large files (67gb, 36gb, 30gb) that I need to train models on. However, the features are rows and the samples are columns. Since Dask hasn't implemented transpose and stores DataFrames split by row, I need to write something to do this myself. Is there a way I can efficiently transpose without loading into memory?



I've got 16 gb of ram at my disposal and am using jupyter notebook. I have written some rather slow code, but would really appreciate a faster solution. The speed of the code below will take a month to finish all the files. The slowest step by a few orders of magnitude is awk.



import dask.dataframe as dd
import subprocess
from IPython.display import clear_output

df = dd.read_csv('~/VeryLarge.tsv')
with open('output.csv','wb') as fout:
for i in range(1, len(df.columns)+1):
print('AWKing')
#read a column from the original data and store it elsewhere
x = "awk '{print $"+str(i)+"}' ~/VeryLarge.tsv > ~/file.temp"
subprocess.check_call([x], shell=True)

print('Reading')
#load and transpose the column
col = pd.read_csv('~/file.temp')
row = col.T
display(row)

print('Deleting')
#remove the temporary file created
!rm ../file.temp

print('Storing')
#store the row in its own csv just to be safe. not entirely necessary
row.to_csv('~/columns/col_{:09d}'.format(i), header=False)

print('Appending')
#append the row (transposed column) to the new file
with open('~/columns/col_{:09d}', 'rb') as fin:
for line in fin:
fout.write(line)

clear_output()
#Just a measure of progress
print(i/len(df.columns))


The data itself is 10million rows (features) and 2000 columns (samples). It just needs to be transposed. Currently, it looks like this: DataFrame










share|improve this question

























  • Can you post a sample of input to desired output?

    – Mike
    Jan 16 at 1:09











  • Just edited to show a sample of the data. The data is real, but I changed the names of features and samples because of company privacy.

    – Joe B
    Jan 16 at 1:20






  • 2





    at risk of wandering off topic, if your data set has a high proportion of zeros you might consider using a sparse matrix representation. Many common matrix operations can be much more efficient that way.

    – smarchese
    Jan 16 at 1:59











  • So you want 10m columns and 2000 rows?

    – Mike
    Jan 16 at 2:48











  • Try docs.dask.org/en/latest/array-api.html#dask.array.transpose

    – mdurant
    Jan 16 at 15:58














3












3








3








I have 3 rather large files (67gb, 36gb, 30gb) that I need to train models on. However, the features are rows and the samples are columns. Since Dask hasn't implemented transpose and stores DataFrames split by row, I need to write something to do this myself. Is there a way I can efficiently transpose without loading into memory?



I've got 16 gb of ram at my disposal and am using jupyter notebook. I have written some rather slow code, but would really appreciate a faster solution. The speed of the code below will take a month to finish all the files. The slowest step by a few orders of magnitude is awk.



import dask.dataframe as dd
import subprocess
from IPython.display import clear_output

df = dd.read_csv('~/VeryLarge.tsv')
with open('output.csv','wb') as fout:
for i in range(1, len(df.columns)+1):
print('AWKing')
#read a column from the original data and store it elsewhere
x = "awk '{print $"+str(i)+"}' ~/VeryLarge.tsv > ~/file.temp"
subprocess.check_call([x], shell=True)

print('Reading')
#load and transpose the column
col = pd.read_csv('~/file.temp')
row = col.T
display(row)

print('Deleting')
#remove the temporary file created
!rm ../file.temp

print('Storing')
#store the row in its own csv just to be safe. not entirely necessary
row.to_csv('~/columns/col_{:09d}'.format(i), header=False)

print('Appending')
#append the row (transposed column) to the new file
with open('~/columns/col_{:09d}', 'rb') as fin:
for line in fin:
fout.write(line)

clear_output()
#Just a measure of progress
print(i/len(df.columns))


The data itself is 10million rows (features) and 2000 columns (samples). It just needs to be transposed. Currently, it looks like this: DataFrame










share|improve this question
















I have 3 rather large files (67gb, 36gb, 30gb) that I need to train models on. However, the features are rows and the samples are columns. Since Dask hasn't implemented transpose and stores DataFrames split by row, I need to write something to do this myself. Is there a way I can efficiently transpose without loading into memory?



I've got 16 gb of ram at my disposal and am using jupyter notebook. I have written some rather slow code, but would really appreciate a faster solution. The speed of the code below will take a month to finish all the files. The slowest step by a few orders of magnitude is awk.



import dask.dataframe as dd
import subprocess
from IPython.display import clear_output

df = dd.read_csv('~/VeryLarge.tsv')
with open('output.csv','wb') as fout:
for i in range(1, len(df.columns)+1):
print('AWKing')
#read a column from the original data and store it elsewhere
x = "awk '{print $"+str(i)+"}' ~/VeryLarge.tsv > ~/file.temp"
subprocess.check_call([x], shell=True)

print('Reading')
#load and transpose the column
col = pd.read_csv('~/file.temp')
row = col.T
display(row)

print('Deleting')
#remove the temporary file created
!rm ../file.temp

print('Storing')
#store the row in its own csv just to be safe. not entirely necessary
row.to_csv('~/columns/col_{:09d}'.format(i), header=False)

print('Appending')
#append the row (transposed column) to the new file
with open('~/columns/col_{:09d}', 'rb') as fin:
for line in fin:
fout.write(line)

clear_output()
#Just a measure of progress
print(i/len(df.columns))


The data itself is 10million rows (features) and 2000 columns (samples). It just needs to be transposed. Currently, it looks like this: DataFrame







python dataframe dask






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Jan 16 at 1:25







Joe B

















asked Jan 15 at 23:29









Joe BJoe B

867




867













  • Can you post a sample of input to desired output?

    – Mike
    Jan 16 at 1:09











  • Just edited to show a sample of the data. The data is real, but I changed the names of features and samples because of company privacy.

    – Joe B
    Jan 16 at 1:20






  • 2





    at risk of wandering off topic, if your data set has a high proportion of zeros you might consider using a sparse matrix representation. Many common matrix operations can be much more efficient that way.

    – smarchese
    Jan 16 at 1:59











  • So you want 10m columns and 2000 rows?

    – Mike
    Jan 16 at 2:48











  • Try docs.dask.org/en/latest/array-api.html#dask.array.transpose

    – mdurant
    Jan 16 at 15:58



















  • Can you post a sample of input to desired output?

    – Mike
    Jan 16 at 1:09











  • Just edited to show a sample of the data. The data is real, but I changed the names of features and samples because of company privacy.

    – Joe B
    Jan 16 at 1:20






  • 2





    at risk of wandering off topic, if your data set has a high proportion of zeros you might consider using a sparse matrix representation. Many common matrix operations can be much more efficient that way.

    – smarchese
    Jan 16 at 1:59











  • So you want 10m columns and 2000 rows?

    – Mike
    Jan 16 at 2:48











  • Try docs.dask.org/en/latest/array-api.html#dask.array.transpose

    – mdurant
    Jan 16 at 15:58

















Can you post a sample of input to desired output?

– Mike
Jan 16 at 1:09





Can you post a sample of input to desired output?

– Mike
Jan 16 at 1:09













Just edited to show a sample of the data. The data is real, but I changed the names of features and samples because of company privacy.

– Joe B
Jan 16 at 1:20





Just edited to show a sample of the data. The data is real, but I changed the names of features and samples because of company privacy.

– Joe B
Jan 16 at 1:20




2




2





at risk of wandering off topic, if your data set has a high proportion of zeros you might consider using a sparse matrix representation. Many common matrix operations can be much more efficient that way.

– smarchese
Jan 16 at 1:59





at risk of wandering off topic, if your data set has a high proportion of zeros you might consider using a sparse matrix representation. Many common matrix operations can be much more efficient that way.

– smarchese
Jan 16 at 1:59













So you want 10m columns and 2000 rows?

– Mike
Jan 16 at 2:48





So you want 10m columns and 2000 rows?

– Mike
Jan 16 at 2:48













Try docs.dask.org/en/latest/array-api.html#dask.array.transpose

– mdurant
Jan 16 at 15:58





Try docs.dask.org/en/latest/array-api.html#dask.array.transpose

– mdurant
Jan 16 at 15:58












2 Answers
2






active

oldest

votes


















0














I would create an intermediate file and use fp.seek to write them in binary format in the new order before converting it back into a new CSV.
Given row, column becoming column, row - sys.float_info will give you the size of each element, the position of each element ((is column * old_row_length + row) * size of float).



You then re-combine them into a CSV by converting them back into text and reading in old_count_rows per line.






share|improve this answer































    0














    I've modified my original script to deploy on any number of cpus. It worked much faster since I could use multiple threads and deployed on aws. I used a 96 core machine that completed the task in about 8 hours. I was quite surprised since that is nearly linear scaling! The idea is to make some repetitive task distributable. Then you will be able to assign tasks to cpus. Here the parallelizing is done with the command pool.map().



    The usage of this script from command line is quite simple:



    python3 transposer.py -i largeFile.tsv


    you can specify other args as well if required.



    import argparse, subprocess
    import numpy as np
    import pandas as pd
    import dask.dataframe as dd
    from IPython.display import clear_output
    from contextlib import closing
    from os import cpu_count
    from multiprocessing import Pool

    parser = argparse.ArgumentParser(description='Transpose csv')
    parser.add_argument('-i', '--infile', help='Path to input folder',
    default=None)
    parser.add_argument('-s', '--sep', help='input separator',
    default='t')

    args = parser.parse_args()
    infile = args.infile
    sep = args.sep
    df = pd.read_csv(infile, sep='t', nrows=3)

    def READ_COL(item):
    print(item)
    outfile = 'outfile{}.temp'.format(item)
    if item !=0:
    x = "awk '{print $"+str(item)+"}' "+infile+" > "+outfile
    subprocess.check_call([x], shell=True)
    col = pd.read_csv(outfile)
    row = col.T
    display(row)
    row.to_csv('col_{:09d}.csv'.format(item), header=False)
    subprocess.check_call(['rm '+outfile], shell=True)
    print(item/len(df.columns))

    with closing(Pool(processes=cpu_count())) as pool:
    pool.map(READ_COL, list(range(1, len(df.columns)+1)))





    share|improve this answer

























      Your Answer






      StackExchange.ifUsing("editor", function () {
      StackExchange.using("externalEditor", function () {
      StackExchange.using("snippets", function () {
      StackExchange.snippets.init();
      });
      });
      }, "code-snippets");

      StackExchange.ready(function() {
      var channelOptions = {
      tags: "".split(" "),
      id: "1"
      };
      initTagRenderer("".split(" "), "".split(" "), channelOptions);

      StackExchange.using("externalEditor", function() {
      // Have to fire editor after snippets, if snippets enabled
      if (StackExchange.settings.snippets.snippetsEnabled) {
      StackExchange.using("snippets", function() {
      createEditor();
      });
      }
      else {
      createEditor();
      }
      });

      function createEditor() {
      StackExchange.prepareEditor({
      heartbeatType: 'answer',
      autoActivateHeartbeat: false,
      convertImagesToLinks: true,
      noModals: true,
      showLowRepImageUploadWarning: true,
      reputationToPostImages: 10,
      bindNavPrevention: true,
      postfix: "",
      imageUploader: {
      brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
      contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
      allowUrls: true
      },
      onDemand: true,
      discardSelector: ".discard-answer"
      ,immediatelyShowMarkdownHelp:true
      });


      }
      });














      draft saved

      draft discarded


















      StackExchange.ready(
      function () {
      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54208323%2fhow-can-i-efficiently-transpose-a-67-gb-file-dask-dataframe-without-loading-it-e%23new-answer', 'question_page');
      }
      );

      Post as a guest















      Required, but never shown

























      2 Answers
      2






      active

      oldest

      votes








      2 Answers
      2






      active

      oldest

      votes









      active

      oldest

      votes






      active

      oldest

      votes









      0














      I would create an intermediate file and use fp.seek to write them in binary format in the new order before converting it back into a new CSV.
      Given row, column becoming column, row - sys.float_info will give you the size of each element, the position of each element ((is column * old_row_length + row) * size of float).



      You then re-combine them into a CSV by converting them back into text and reading in old_count_rows per line.






      share|improve this answer




























        0














        I would create an intermediate file and use fp.seek to write them in binary format in the new order before converting it back into a new CSV.
        Given row, column becoming column, row - sys.float_info will give you the size of each element, the position of each element ((is column * old_row_length + row) * size of float).



        You then re-combine them into a CSV by converting them back into text and reading in old_count_rows per line.






        share|improve this answer


























          0












          0








          0







          I would create an intermediate file and use fp.seek to write them in binary format in the new order before converting it back into a new CSV.
          Given row, column becoming column, row - sys.float_info will give you the size of each element, the position of each element ((is column * old_row_length + row) * size of float).



          You then re-combine them into a CSV by converting them back into text and reading in old_count_rows per line.






          share|improve this answer













          I would create an intermediate file and use fp.seek to write them in binary format in the new order before converting it back into a new CSV.
          Given row, column becoming column, row - sys.float_info will give you the size of each element, the position of each element ((is column * old_row_length + row) * size of float).



          You then re-combine them into a CSV by converting them back into text and reading in old_count_rows per line.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Jan 16 at 5:30









          MikeMike

          2,0661715




          2,0661715

























              0














              I've modified my original script to deploy on any number of cpus. It worked much faster since I could use multiple threads and deployed on aws. I used a 96 core machine that completed the task in about 8 hours. I was quite surprised since that is nearly linear scaling! The idea is to make some repetitive task distributable. Then you will be able to assign tasks to cpus. Here the parallelizing is done with the command pool.map().



              The usage of this script from command line is quite simple:



              python3 transposer.py -i largeFile.tsv


              you can specify other args as well if required.



              import argparse, subprocess
              import numpy as np
              import pandas as pd
              import dask.dataframe as dd
              from IPython.display import clear_output
              from contextlib import closing
              from os import cpu_count
              from multiprocessing import Pool

              parser = argparse.ArgumentParser(description='Transpose csv')
              parser.add_argument('-i', '--infile', help='Path to input folder',
              default=None)
              parser.add_argument('-s', '--sep', help='input separator',
              default='t')

              args = parser.parse_args()
              infile = args.infile
              sep = args.sep
              df = pd.read_csv(infile, sep='t', nrows=3)

              def READ_COL(item):
              print(item)
              outfile = 'outfile{}.temp'.format(item)
              if item !=0:
              x = "awk '{print $"+str(item)+"}' "+infile+" > "+outfile
              subprocess.check_call([x], shell=True)
              col = pd.read_csv(outfile)
              row = col.T
              display(row)
              row.to_csv('col_{:09d}.csv'.format(item), header=False)
              subprocess.check_call(['rm '+outfile], shell=True)
              print(item/len(df.columns))

              with closing(Pool(processes=cpu_count())) as pool:
              pool.map(READ_COL, list(range(1, len(df.columns)+1)))





              share|improve this answer






























                0














                I've modified my original script to deploy on any number of cpus. It worked much faster since I could use multiple threads and deployed on aws. I used a 96 core machine that completed the task in about 8 hours. I was quite surprised since that is nearly linear scaling! The idea is to make some repetitive task distributable. Then you will be able to assign tasks to cpus. Here the parallelizing is done with the command pool.map().



                The usage of this script from command line is quite simple:



                python3 transposer.py -i largeFile.tsv


                you can specify other args as well if required.



                import argparse, subprocess
                import numpy as np
                import pandas as pd
                import dask.dataframe as dd
                from IPython.display import clear_output
                from contextlib import closing
                from os import cpu_count
                from multiprocessing import Pool

                parser = argparse.ArgumentParser(description='Transpose csv')
                parser.add_argument('-i', '--infile', help='Path to input folder',
                default=None)
                parser.add_argument('-s', '--sep', help='input separator',
                default='t')

                args = parser.parse_args()
                infile = args.infile
                sep = args.sep
                df = pd.read_csv(infile, sep='t', nrows=3)

                def READ_COL(item):
                print(item)
                outfile = 'outfile{}.temp'.format(item)
                if item !=0:
                x = "awk '{print $"+str(item)+"}' "+infile+" > "+outfile
                subprocess.check_call([x], shell=True)
                col = pd.read_csv(outfile)
                row = col.T
                display(row)
                row.to_csv('col_{:09d}.csv'.format(item), header=False)
                subprocess.check_call(['rm '+outfile], shell=True)
                print(item/len(df.columns))

                with closing(Pool(processes=cpu_count())) as pool:
                pool.map(READ_COL, list(range(1, len(df.columns)+1)))





                share|improve this answer




























                  0












                  0








                  0







                  I've modified my original script to deploy on any number of cpus. It worked much faster since I could use multiple threads and deployed on aws. I used a 96 core machine that completed the task in about 8 hours. I was quite surprised since that is nearly linear scaling! The idea is to make some repetitive task distributable. Then you will be able to assign tasks to cpus. Here the parallelizing is done with the command pool.map().



                  The usage of this script from command line is quite simple:



                  python3 transposer.py -i largeFile.tsv


                  you can specify other args as well if required.



                  import argparse, subprocess
                  import numpy as np
                  import pandas as pd
                  import dask.dataframe as dd
                  from IPython.display import clear_output
                  from contextlib import closing
                  from os import cpu_count
                  from multiprocessing import Pool

                  parser = argparse.ArgumentParser(description='Transpose csv')
                  parser.add_argument('-i', '--infile', help='Path to input folder',
                  default=None)
                  parser.add_argument('-s', '--sep', help='input separator',
                  default='t')

                  args = parser.parse_args()
                  infile = args.infile
                  sep = args.sep
                  df = pd.read_csv(infile, sep='t', nrows=3)

                  def READ_COL(item):
                  print(item)
                  outfile = 'outfile{}.temp'.format(item)
                  if item !=0:
                  x = "awk '{print $"+str(item)+"}' "+infile+" > "+outfile
                  subprocess.check_call([x], shell=True)
                  col = pd.read_csv(outfile)
                  row = col.T
                  display(row)
                  row.to_csv('col_{:09d}.csv'.format(item), header=False)
                  subprocess.check_call(['rm '+outfile], shell=True)
                  print(item/len(df.columns))

                  with closing(Pool(processes=cpu_count())) as pool:
                  pool.map(READ_COL, list(range(1, len(df.columns)+1)))





                  share|improve this answer















                  I've modified my original script to deploy on any number of cpus. It worked much faster since I could use multiple threads and deployed on aws. I used a 96 core machine that completed the task in about 8 hours. I was quite surprised since that is nearly linear scaling! The idea is to make some repetitive task distributable. Then you will be able to assign tasks to cpus. Here the parallelizing is done with the command pool.map().



                  The usage of this script from command line is quite simple:



                  python3 transposer.py -i largeFile.tsv


                  you can specify other args as well if required.



                  import argparse, subprocess
                  import numpy as np
                  import pandas as pd
                  import dask.dataframe as dd
                  from IPython.display import clear_output
                  from contextlib import closing
                  from os import cpu_count
                  from multiprocessing import Pool

                  parser = argparse.ArgumentParser(description='Transpose csv')
                  parser.add_argument('-i', '--infile', help='Path to input folder',
                  default=None)
                  parser.add_argument('-s', '--sep', help='input separator',
                  default='t')

                  args = parser.parse_args()
                  infile = args.infile
                  sep = args.sep
                  df = pd.read_csv(infile, sep='t', nrows=3)

                  def READ_COL(item):
                  print(item)
                  outfile = 'outfile{}.temp'.format(item)
                  if item !=0:
                  x = "awk '{print $"+str(item)+"}' "+infile+" > "+outfile
                  subprocess.check_call([x], shell=True)
                  col = pd.read_csv(outfile)
                  row = col.T
                  display(row)
                  row.to_csv('col_{:09d}.csv'.format(item), header=False)
                  subprocess.check_call(['rm '+outfile], shell=True)
                  print(item/len(df.columns))

                  with closing(Pool(processes=cpu_count())) as pool:
                  pool.map(READ_COL, list(range(1, len(df.columns)+1)))






                  share|improve this answer














                  share|improve this answer



                  share|improve this answer








                  edited Jan 18 at 17:41

























                  answered Jan 17 at 7:21









                  Joe BJoe B

                  867




                  867






























                      draft saved

                      draft discarded




















































                      Thanks for contributing an answer to Stack Overflow!


                      • Please be sure to answer the question. Provide details and share your research!

                      But avoid



                      • Asking for help, clarification, or responding to other answers.

                      • Making statements based on opinion; back them up with references or personal experience.


                      To learn more, see our tips on writing great answers.




                      draft saved


                      draft discarded














                      StackExchange.ready(
                      function () {
                      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54208323%2fhow-can-i-efficiently-transpose-a-67-gb-file-dask-dataframe-without-loading-it-e%23new-answer', 'question_page');
                      }
                      );

                      Post as a guest















                      Required, but never shown





















































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown

































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown







                      Popular posts from this blog

                      Liquibase includeAll doesn't find base path

                      How to use setInterval in EJS file?

                      Petrus Granier-Deferre