Pyspark.sql.DataFrameWriter.saveAsTable()
Først vil vi se, hvordan man skriver den eksisterende PySpark DataFrame ind i tabellen ved hjælp af write.saveAsTable()-funktionen. Det kræver tabelnavnet og andre valgfrie parametre som modes, partionBy osv. for at skrive DataFrame til tabellen. Den opbevares som en parketfil.
Syntaks:
dataframe_obj.write.saveAsTable(sti/tabelnavn,tilstand,partitionBy,...)
- Table_name er navnet på den tabel, der er oprettet fra dataframe_obj.
- Vi kan tilføje/overskrive tabellens data ved hjælp af tilstandsparameteren.
- PartitionBy tager de enkelte/flere kolonner til at oprette partitioner baseret på værdier i disse angivne kolonner.
Eksempel 1:
Opret en PySpark DataFrame med 5 rækker og 4 kolonner. Skriv denne dataramme til en tabel med navnet 'Agri_Table1'.
importere pyspark
fra pyspark.sql importer SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux-tip' ).getOrCreate()
# landbrugsdata med 5 rækker og 5 kolonner
agri =[{ 'Jordtype' : 'Sort' , 'Irrigation_availability' : 'Ingen' , 'Acres' : 2500 , 'Jordstatus' : 'Tør' ,
'Land' : 'USA' },
{ 'Jordtype' : 'Sort' , 'Irrigation_availability' : 'Ja' , 'Acres' : 3500 , 'Jordstatus' : 'Våd' ,
'Land' : 'Indien' },
{ 'Jordtype' : 'Rød' , 'Irrigation_availability' : 'Ja' , 'Acres' : 210 , 'Jordstatus' : 'Tør' ,
'Land' : 'UK' },
{ 'Jordtype' : 'Andet' , 'Irrigation_availability' : 'Ingen' , 'Acres' : 1000 , 'Jordstatus' : 'Våd' ,
'Land' : 'USA' },
{ 'Jordtype' : 'Sand' , 'Irrigation_availability' : 'Ingen' , 'Acres' : 500 , 'Jordstatus' : 'Tør' ,
'Land' : 'Indien' }]
# opret datarammen ud fra ovenstående data
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Skriv ovenstående DataFrame til tabellen.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Tabel1' )
Produktion:
Vi kan se, at der oprettes én parketfil med den tidligere PySpark Data.
Eksempel 2:
Overvej den forrige DataFrame og skriv 'Agri_Table2' til tabellen ved at partitionere posterne baseret på værdierne i kolonnen 'Country'.
# Skriv ovenstående DataFrame til tabellen med parameteren partitionByagri_df.write.saveAsTable( 'Agri_Tabel2' ,partitionBy=[ 'Land' ])
Produktion:
Der er tre unikke værdier i kolonnen 'Land' - 'Indien', 'UK' og 'USA'. Så der oprettes tre partitioner. Hver skillevæg rummer parketfilerne.
Pyspark.sql.DataFrameReader.table()
Lad os indlæse tabellen i PySpark DataFrame ved hjælp af spark.read.table()-funktionen. Det kræver kun én parameter, som er stien/tabelnavnet. Den indlæser tabellen direkte i PySpark DataFrame, og alle de SQL-funktioner, der anvendes på PySpark DataFrame, kan også anvendes på denne indlæste DataFrame.
Syntaks:
spark_app.read.table(sti/'Tabel_navn')I dette scenarie bruger vi den forrige tabel, som blev oprettet fra PySpark DataFrame. Sørg for, at du skal implementere de tidligere scenariekodestykker i dit miljø.
Eksempel:
Indlæs 'Agri_Table1'-tabellen i DataFrame med navnet 'loaded_data'.
loaded_data = linuxhint_spark_app.read.table( 'Agri_Tabel1' )loaded_data.show()
Produktion:
Vi kan se, at tabellen er indlæst i PySpark DataFrame.
Udførelse af SQL-forespørgsler
Nu udfører vi nogle SQL-forespørgsler på den indlæste DataFrame ved hjælp af spark.sql()-funktionen.
# Brug kommandoen SELECT til at vise alle kolonner fra ovenstående tabel.linuxhint_spark_app.sql( 'VÆLG * fra Agri_Table1' ).at vise()
# WHERE-klausul
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry' ' ).at vise()
linuxhint_spark_app.sql( 'VÆLG * fra Agri_Table1 WHERE Acres > 2000' ).at vise()
Produktion:
- Den første forespørgsel viser alle kolonner og poster fra DataFrame.
- Den anden forespørgsel viser posterne baseret på kolonnen 'Jordstatus'. Der er kun tre plader med 'Dry'-elementet.
- Den sidste forespørgsel returnerer to poster med 'Acres', der er større end 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Ved at bruge funktionen insertInto() kan vi tilføje DataFrame til den eksisterende tabel. Vi kan bruge denne funktion sammen med selectExpr() til at definere kolonnenavnene og derefter indsætte den i tabellen. Denne funktion tager også tabelnavnet som en parameter.
Syntaks:
DataFrame_obj.write.insertInto('Tabel_navn')I dette scenarie bruger vi den forrige tabel, som blev oprettet fra PySpark DataFrame. Sørg for, at du skal implementere de tidligere scenariekodestykker i dit miljø.
Eksempel:
Opret en ny DataFrame med to poster og indsæt dem i 'Agri_Table1' tabellen.
importere pysparkfra pyspark.sql importer SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux-tip' ).getOrCreate()
# landbrugsdata med 2 rækker
agri =[{ 'Jordtype' : 'Sand' , 'Irrigation_availability' : 'Ingen' , 'Acres' : 2500 , 'Jordstatus' : 'Tør' ,
'Land' : 'USA' },
{ 'Jordtype' : 'Sand' , 'Irrigation_availability' : 'Ingen' , 'Acres' : 1200 , 'Jordstatus' : 'Våd' ,
'Land' : 'Japan' }]
# opret datarammen ud fra ovenstående data
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Acres' , 'Land' , 'Irrigation_availability' , 'Jordtype' ,
'Jordstatus' ).write.insertInto( 'Agri_Tabel1' )
# Vis den endelige Agri_Table1
linuxhint_spark_app.sql( 'VÆLG * fra Agri_Table1' ).at vise()
Produktion:
Nu er det samlede antal rækker, der er til stede i DataFrame, 7.
Konklusion
Du forstår nu, hvordan du skriver PySpark DataFrame til tabellen ved hjælp af funktionen write.saveAsTable(). Det tager tabelnavnet og andre valgfrie parametre. Derefter indlæste vi denne tabel i PySpark DataFrame ved hjælp af spark.read.table()-funktionen. Det kræver kun én parameter, som er stien/tabelnavnet. Hvis du vil tilføje den nye DataFrame til den eksisterende tabel, skal du bruge funktionen insertInto() .