PySpark Pandas_Udf()

Pyspark Pandas Udf



Det er muligt at transformere PySpark DataFrame ved hjælp af pandas_udf()-funktionen. Det er en brugerdefineret funktion, som anvendes på PySpark DataFrame med pil. Vi kan udføre de vektoriserede operationer ved hjælp af pandas_udf(). Det kan implementeres ved at bestå denne funktion som dekoratør. Lad os dykke ned i denne guide for at kende syntaks, parametre og forskellige eksempler.

Emne for indhold:

Hvis du vil vide mere om PySpark DataFrame og modulinstallation, skal du gennemgå dette artikel .







Pyspark.sql.functions.pandas_udf()

pandas_udf () er tilgængelig i sql.functions-modulet i PySpark, som kan importeres ved hjælp af nøgleordet 'fra'. Det bruges til at udføre de vektoriserede operationer på vores PySpark DataFrame. Denne funktion implementeres som en dekoratør ved at sende tre parametre. Derefter kan vi oprette en brugerdefineret funktion, der returnerer dataene i vektorformatet (som vi bruger serier/NumPy til dette) ved hjælp af en pil. Inden for denne funktion er vi i stand til at returnere resultatet.



Struktur og syntaks:



Lad os først se på strukturen og syntaksen for denne funktion:

@pandas_udf(datatype)
def funktionsnavn(operation) -> konverter_format:
returopgørelse

Her er funktionsnavn navnet på vores definerede funktion. Datatypen angiver den datatype, der returneres af denne funktion. Vi kan returnere resultatet ved at bruge nøgleordet 'retur'. Alle handlinger udføres inde i funktionen med piletildelingen.





Pandas_udf (Function og ReturnType)

  1. Den første parameter er den brugerdefinerede funktion, der sendes til den.
  2. Den anden parameter bruges til at angive returdatatypen fra funktionen.

Data:

I hele denne guide bruger vi kun én PySpark DataFrame til demonstration. Alle de brugerdefinerede funktioner, som vi definerer, anvendes på denne PySpark DataFrame. Sørg for at oprette denne DataFrame i dit miljø først efter installationen af ​​PySpark.



importere pyspark

fra pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux-tip' ).getOrCreate()

fra pyspark.sql.functions importer pandas_udf

fra pyspark.sql.types import *

importer pandaer som panda

# grøntsagsdetaljer

grøntsag =[{ 'type' : 'grøntsag' , 'navn' : 'tomat' , 'lokalisere_land' : 'USA' , 'antal' : 800 },

{ 'type' : 'frugt' , 'navn' : 'banan' , 'lokalisere_land' : 'KINA' , 'antal' : tyve },

{ 'type' : 'grøntsag' , 'navn' : 'tomat' , 'lokalisere_land' : 'USA' , 'antal' : 800 },

{ 'type' : 'grøntsag' , 'navn' : 'Mango' , 'lokalisere_land' : 'JAPAN' , 'antal' : 0 },

{ 'type' : 'frugt' , 'navn' : 'citron' , 'lokalisere_land' : 'INDIEN' , 'antal' : 1700 },

{ 'type' : 'grøntsag' , 'navn' : 'tomat' , 'lokalisere_land' : 'USA' , 'antal' : 1200 },

{ 'type' : 'grøntsag' , 'navn' : 'Mango' , 'lokalisere_land' : 'JAPAN' , 'antal' : 0 },

{ 'type' : 'frugt' , 'navn' : 'citron' , 'lokalisere_land' : 'INDIEN' , 'antal' : 0 }

]

# opret markedsdatarammen ud fra ovenstående data

market_df = linuxhint_spark_app.createDataFrame(grøntsag)

market_df.show()

Produktion:

Her opretter vi denne DataFrame med 4 kolonner og 8 rækker. Nu bruger vi pandas_udf() til at oprette de brugerdefinerede funktioner og anvende dem på disse kolonner.

Pandas_udf() med forskellige datatyper

I dette scenarie opretter vi nogle brugerdefinerede funktioner med pandas_udf() og anvender dem på kolonner og viser resultaterne ved hjælp af select()-metoden. I hvert tilfælde bruger vi pandas.Series, når vi udfører de vektoriserede operationer. Dette betragter kolonneværdierne som et endimensionelt array, og handlingen anvendes på kolonnen. I selve dekoratoren angiver vi funktionen returtype.

Eksempel 1: Pandas_udf() med strengtype

Her opretter vi to brugerdefinerede funktioner med strengreturtypen for at konvertere strengtypekolonneværdierne til store og små bogstaver. Til sidst anvender vi disse funktioner på kolonnerne 'type' og 'locate_country'.

# Konverter type kolonne til store bogstaver med pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

returner i.str.upper()

# Konverter kolonnen locate_country til små bogstaver med pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

returner i.str.lower()

# Vis kolonnerne ved hjælp af select()

market_df.select( 'type' ,type_store_bogstav( 'type' ), 'lokaliser_land' ,
land_små bogstaver( 'lokaliser_land' )).at vise()

Produktion:

Forklaring:

StringType()-funktionen er tilgængelig i pyspark.sql.types-modulet. Vi har allerede importeret dette modul, mens vi oprettede PySpark DataFrame.

  1. Først returnerer UDF (brugerdefineret funktion) strengene med store bogstaver ved hjælp af funktionen str.upper(). Str.upper() er tilgængelig i seriedatastrukturen (da vi konverterer til serier med en pil inde i funktionen), som konverterer den givne streng til store bogstaver. Til sidst anvendes denne funktion på kolonnen 'type', som er angivet i select()-metoden. Tidligere var alle strengene i typekolonnen med små bogstaver. Nu er de ændret til store bogstaver.
  2. For det andet returnerer UDF strengene med store bogstaver ved hjælp af str.lower()-funktionen. Str.lower() er tilgængelig i seriedatastrukturen, som konverterer den givne streng til små bogstaver. Til sidst anvendes denne funktion på kolonnen 'type', som er angivet i select()-metoden. Tidligere stod alle strengene i typekolonnen med store bogstaver. Nu er de ændret til små bogstaver.

Eksempel 2: Pandas_udf() med heltalstype

Lad os oprette en UDF, der konverterer PySpark DataFrame-heltalskolonnen til Pandas-serien og tilføjer 100 til hver værdi. Send kolonnen 'mængde' til denne funktion inde i select()-metoden.

# Tilføj 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

returner i+ 100

# Send mængdekolonnen til ovenstående funktion og display.

market_df.select( 'antal' ,tilføj_100( 'antal' )).at vise()

Produktion:

Forklaring:

Inde i UDF'en itererer vi alle værdierne og konverterer dem til serier. Derefter tilføjer vi 100 til hver værdi i serien. Til sidst sender vi kolonnen 'mængde' til denne funktion, og vi kan se, at 100 tilføjes til alle værdierne.

Pandas_udf() med forskellige datatyper ved hjælp af Groupby() & Agg()

Lad os se på eksemplerne for at overføre UDF til de aggregerede kolonner. Her grupperes kolonneværdierne først ved hjælp af groupby()-funktionen og aggregering udføres ved hjælp af agg()-funktionen. Vi sender vores UDF inde i denne aggregerede funktion.

Syntaks:

pyspark_dataframe_object.groupby( 'gruppering_kolonne' ).agg(UDF
(pyspark_dataframe_object[ 'kolonne' ]))

Her er værdierne i grupperingskolonnen grupperet først. Derefter foretages aggregeringen på hver grupperet data med hensyn til vores UDF.

Eksempel 1: Pandas_udf() med Aggregate Mean()

Her opretter vi en brugerdefineret funktion med en returtype float. Inde i funktionen beregner vi gennemsnittet ved hjælp af mean()-funktionen. Denne UDF overføres til kolonnen 'mængde' for at få den gennemsnitlige mængde for hver type.

# returner middelværdien/gennemsnittet

@pandas_udf( 'flyde' )

def average_function(i: panda.Series) -> float:

returner i.mean()

# Send mængdekolonnen til funktionen ved at gruppere typekolonnen.

market_df.groupby( 'type' ).agg(gennemsnitlig_funktion(marked_df[ 'antal' ])).at vise()

Produktion:

Vi grupperer baseret på elementer i kolonnen 'type'. Der dannes to grupper - 'frugt' og 'grøntsag'. For hver gruppe beregnes gennemsnittet og returneres.

Eksempel 2: Pandas_udf() med Aggregate Max() og Min()

Her opretter vi to brugerdefinerede funktioner med heltal (int) returtype. Den første UDF returnerer minimumsværdien, og den anden UDF returnerer maksimumværdien.

# pandas_udf, der returnerer minimumsværdien

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

returner i.min()

# pandas_udf, der returnerer den maksimale værdi

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

returner i.max()

# Send mængdekolonnen til min_ pandas_udf ved at gruppere locate_country.

market_df.groupby( 'lokaliser_land' ).agg(min_(marked_df[ 'antal' ])).at vise()

# Send mængdekolonnen til max_ pandas_udf ved at gruppere locate_country.

market_df.groupby( 'lokaliser_land' ).agg(max_(marked_df[ 'antal' ])).at vise()

Produktion:

For at returnere minimum- og maksimumværdier bruger vi min()- og max()-funktionerne i returtypen af ​​UDF'er. Nu grupperer vi dataene i kolonnen 'locate_country'. Der dannes fire grupper ('KINA', 'INDIEN', 'JAPAN', 'USA'). For hver gruppe returnerer vi den maksimale mængde. Tilsvarende returnerer vi minimumsmængden.

Konklusion

Grundlæggende bruges pandas_udf () til at udføre de vektoriserede operationer på vores PySpark DataFrame. Vi har set, hvordan man opretter pandas_udf() og anvender det på PySpark DataFrame. For bedre forståelse diskuterede vi de forskellige eksempler ved at overveje alle datatyperne (streng, float og heltal). Det kan være muligt at bruge pandas_udf() med groupby() gennem agg()-funktionen.