Sådan læser og skriver du en tabeldata i PySpark

Sadan Laeser Og Skriver Du En Tabeldata I Pyspark



Databehandling i PySpark er hurtigere, hvis dataene indlæses i form af tabel. Med dette, ved hjælp af SQL-udtryk, vil behandlingen være hurtig. Så at konvertere PySpark DataFrame/RDD til en tabel, før den sendes til behandling, er den bedre tilgang. I dag vil vi se, hvordan du læser tabeldataene ind i PySpark DataFrame, skriver PySpark DataFrame til tabellen og indsætter ny DataFrame i den eksisterende tabel ved hjælp af de indbyggede funktioner. Lad os gå!

Pyspark.sql.DataFrameWriter.saveAsTable()

Først vil vi se, hvordan man skriver den eksisterende PySpark DataFrame ind i tabellen ved hjælp af write.saveAsTable()-funktionen. Det kræver tabelnavnet og andre valgfrie parametre som modes, partionBy osv. for at skrive DataFrame til tabellen. Den opbevares som en parketfil.

Syntaks:







dataframe_obj.write.saveAsTable(sti/tabelnavn,tilstand,partitionBy,...)
  1. Table_name er navnet på den tabel, der er oprettet fra dataframe_obj.
  2. Vi kan tilføje/overskrive tabellens data ved hjælp af tilstandsparameteren.
  3. PartitionBy tager de enkelte/flere kolonner til at oprette partitioner baseret på værdier i disse angivne kolonner.

Eksempel 1:

Opret en PySpark DataFrame med 5 rækker og 4 kolonner. Skriv denne dataramme til en tabel med navnet 'Agri_Table1'.



importere pyspark

fra pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux-tip' ).getOrCreate()

# landbrugsdata med 5 rækker og 5 kolonner

agri =[{ 'Jordtype' : 'Sort' , 'Irrigation_availability' : 'Ingen' , 'Acres' : 2500 , 'Jordstatus' : 'Tør' ,
'Land' : 'USA' },

{ 'Jordtype' : 'Sort' , 'Irrigation_availability' : 'Ja' , 'Acres' : 3500 , 'Jordstatus' : 'Våd' ,
'Land' : 'Indien' },

{ 'Jordtype' : 'Rød' , 'Irrigation_availability' : 'Ja' , 'Acres' : 210 , 'Jordstatus' : 'Tør' ,
'Land' : 'UK' },

{ 'Jordtype' : 'Andet' , 'Irrigation_availability' : 'Ingen' , 'Acres' : 1000 , 'Jordstatus' : 'Våd' ,
'Land' : 'USA' },

{ 'Jordtype' : 'Sand' , 'Irrigation_availability' : 'Ingen' , 'Acres' : 500 , 'Jordstatus' : 'Tør' ,
'Land' : 'Indien' }]



# opret datarammen ud fra ovenstående data

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Skriv ovenstående DataFrame til tabellen.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Tabel1' )

Produktion:







Vi kan se, at der oprettes én parketfil med den tidligere PySpark Data.



Eksempel 2:

Overvej den forrige DataFrame og skriv 'Agri_Table2' til tabellen ved at partitionere posterne baseret på værdierne i kolonnen 'Country'.

# Skriv ovenstående DataFrame til tabellen med parameteren partitionBy

agri_df.write.saveAsTable( 'Agri_Tabel2' ,partitionBy=[ 'Land' ])

Produktion:

Der er tre unikke værdier i kolonnen 'Land' - 'Indien', 'UK' og 'USA'. Så der oprettes tre partitioner. Hver skillevæg rummer parketfilerne.

Pyspark.sql.DataFrameReader.table()

Lad os indlæse tabellen i PySpark DataFrame ved hjælp af spark.read.table()-funktionen. Det kræver kun én parameter, som er stien/tabelnavnet. Den indlæser tabellen direkte i PySpark DataFrame, og alle de SQL-funktioner, der anvendes på PySpark DataFrame, kan også anvendes på denne indlæste DataFrame.

Syntaks:

spark_app.read.table(sti/'Tabel_navn')

I dette scenarie bruger vi den forrige tabel, som blev oprettet fra PySpark DataFrame. Sørg for, at du skal implementere de tidligere scenariekodestykker i dit miljø.

Eksempel:

Indlæs 'Agri_Table1'-tabellen i DataFrame med navnet 'loaded_data'.

loaded_data = linuxhint_spark_app.read.table( 'Agri_Tabel1' )

loaded_data.show()

Produktion:

Vi kan se, at tabellen er indlæst i PySpark DataFrame.

Udførelse af SQL-forespørgsler

Nu udfører vi nogle SQL-forespørgsler på den indlæste DataFrame ved hjælp af spark.sql()-funktionen.

# Brug kommandoen SELECT til at vise alle kolonner fra ovenstående tabel.

linuxhint_spark_app.sql( 'VÆLG * fra Agri_Table1' ).at vise()

# WHERE-klausul

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry' ' ).at vise()

linuxhint_spark_app.sql( 'VÆLG * fra Agri_Table1 WHERE Acres > 2000' ).at vise()

Produktion:

  1. Den første forespørgsel viser alle kolonner og poster fra DataFrame.
  2. Den anden forespørgsel viser posterne baseret på kolonnen 'Jordstatus'. Der er kun tre plader med 'Dry'-elementet.
  3. Den sidste forespørgsel returnerer to poster med 'Acres', der er større end 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Ved at bruge funktionen insertInto() kan vi tilføje DataFrame til den eksisterende tabel. Vi kan bruge denne funktion sammen med selectExpr() til at definere kolonnenavnene og derefter indsætte den i tabellen. Denne funktion tager også tabelnavnet som en parameter.

Syntaks:

DataFrame_obj.write.insertInto('Tabel_navn')

I dette scenarie bruger vi den forrige tabel, som blev oprettet fra PySpark DataFrame. Sørg for, at du skal implementere de tidligere scenariekodestykker i dit miljø.

Eksempel:

Opret en ny DataFrame med to poster og indsæt dem i 'Agri_Table1' tabellen.

importere pyspark

fra pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux-tip' ).getOrCreate()

# landbrugsdata med 2 rækker

agri =[{ 'Jordtype' : 'Sand' , 'Irrigation_availability' : 'Ingen' , 'Acres' : 2500 , 'Jordstatus' : 'Tør' ,
'Land' : 'USA' },

{ 'Jordtype' : 'Sand' , 'Irrigation_availability' : 'Ingen' , 'Acres' : 1200 , 'Jordstatus' : 'Våd' ,
'Land' : 'Japan' }]

# opret datarammen ud fra ovenstående data

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Acres' , 'Land' , 'Irrigation_availability' , 'Jordtype' ,
'Jordstatus' ).write.insertInto( 'Agri_Tabel1' )

# Vis den endelige Agri_Table1

linuxhint_spark_app.sql( 'VÆLG * fra Agri_Table1' ).at vise()

Produktion:

Nu er det samlede antal rækker, der er til stede i DataFrame, 7.

Konklusion

Du forstår nu, hvordan du skriver PySpark DataFrame til tabellen ved hjælp af funktionen write.saveAsTable(). Det tager tabelnavnet og andre valgfrie parametre. Derefter indlæste vi denne tabel i PySpark DataFrame ved hjælp af spark.read.table()-funktionen. Det kræver kun én parameter, som er stien/tabelnavnet. Hvis du vil tilføje den nye DataFrame til den eksisterende tabel, skal du bruge funktionen insertInto() .