PySpark Pandas_Udf()

Pyspark Pandas Udf



PySpark ڈیٹا فریم کو تبدیل کرنا pandas_udf() فنکشن کا استعمال کرتے ہوئے ممکن ہے۔ یہ یوزر ڈیفائنڈ فنکشن ہے جو پی اسپارک ڈیٹا فریم پر تیر کے ساتھ لاگو ہوتا ہے۔ ہم pandas_udf() کا استعمال کرتے ہوئے ویکٹرائزڈ آپریشن کر سکتے ہیں۔ اسے ڈیکوریٹر کے طور پر اس فنکشن کو پاس کرکے لاگو کیا جاسکتا ہے۔ آئیے نحو، پیرامیٹرز اور مختلف مثالوں کو جاننے کے لیے اس گائیڈ میں غوطہ لگائیں۔

مواد کا موضوع:

اگر آپ PySpark DataFrame اور ماڈیول کی تنصیب کے بارے میں جاننا چاہتے ہیں، تو اسے دیکھیں مضمون .







Pyspark.sql.functions.pandas_udf()

pandas_udf () PySpark میں sql.functions ماڈیول میں دستیاب ہے جسے 'from' کلیدی لفظ استعمال کرکے درآمد کیا جاسکتا ہے۔ یہ ہمارے PySpark ڈیٹا فریم پر ویکٹرائزڈ آپریشنز کرنے کے لیے استعمال ہوتا ہے۔ یہ فنکشن تین پیرامیٹرز کو پاس کر کے ڈیکوریٹر کی طرح لاگو کیا جاتا ہے۔ اس کے بعد، ہم صارف کی طرف سے طے شدہ فنکشن بنا سکتے ہیں جو ایک تیر کا استعمال کرتے ہوئے ویکٹر فارمیٹ میں ڈیٹا واپس کرتا ہے (جیسے ہم اس کے لیے سیریز/NumPy استعمال کرتے ہیں)۔ اس فنکشن کے اندر، ہم نتیجہ واپس کرنے کے قابل ہیں۔



ساخت اور نحو:



سب سے پہلے، اس فنکشن کی ساخت اور نحو کو دیکھتے ہیں:

@pandas_udf(datatype)
def function_name(operation) -> convert_format:
واپسی کا بیان

یہاں، function_name ہمارے متعین فنکشن کا نام ہے۔ ڈیٹا کی قسم ڈیٹا کی قسم کی وضاحت کرتی ہے جو اس فنکشن کے ذریعے واپس کی جاتی ہے۔ ہم 'واپسی' کلیدی لفظ کا استعمال کرتے ہوئے نتیجہ واپس کر سکتے ہیں۔ تمام آپریشنز فنکشن کے اندر تیر اسائنمنٹ کے ساتھ کیے جاتے ہیں۔





Pandas_udf (فنکشن اور ریٹرن ٹائپ)

  1. پہلا پیرامیٹر صارف کی طرف سے طے شدہ فنکشن ہے جو اسے پاس کیا جاتا ہے۔
  2. دوسرا پیرامیٹر فنکشن سے واپسی ڈیٹا کی قسم کی وضاحت کے لیے استعمال کیا جاتا ہے۔

ڈیٹا:

اس پوری گائیڈ میں، ہم مظاہرے کے لیے صرف ایک PySpark ڈیٹا فریم استعمال کرتے ہیں۔ تمام صارف کے بیان کردہ فنکشنز جن کی ہم وضاحت کرتے ہیں اس PySpark DataFrame پر لاگو ہوتے ہیں۔ اس بات کو یقینی بنائیں کہ آپ PySpark کی تنصیب کے بعد پہلے اپنے ماحول میں یہ ڈیٹا فریم بنائیں۔



pyspark درآمد کریں۔

pyspark.sql سے SparkSession درآمد کریں۔

linuxhint_spark_app = SparkSession.builder.appName( 'لینکس اشارہ' .getOrCreate()

pyspark.sql.functions سے pandas_udf درآمد کریں۔

pyspark.sql.types درآمد سے *

پانڈا کو پانڈا کے طور پر درآمد کریں۔

# سبزیوں کی تفصیلات

سبزی =[{ 'قسم' : 'سبزی' , 'نام' : 'ٹماٹر' , 'locate_country' : 'امریکا' , 'مقدار' : 800

{ 'قسم' : 'پھل' , 'نام' : 'کیلا' , 'locate_country' : 'چین' , 'مقدار' : بیس

{ 'قسم' : 'سبزی' , 'نام' : 'ٹماٹر' , 'locate_country' : 'امریکا' , 'مقدار' : 800

{ 'قسم' : 'سبزی' , 'نام' : 'آم' , 'locate_country' : 'جاپان' , 'مقدار' : 0

{ 'قسم' : 'پھل' , 'نام' : 'لیموں' , 'locate_country' : 'انڈیا' , 'مقدار' : 1700

{ 'قسم' : 'سبزی' , 'نام' : 'ٹماٹر' , 'locate_country' : 'امریکا' , 'مقدار' : 1200

{ 'قسم' : 'سبزی' , 'نام' : 'آم' , 'locate_country' : 'جاپان' , 'مقدار' : 0

{ 'قسم' : 'پھل' , 'نام' : 'لیموں' , 'locate_country' : 'انڈیا' , 'مقدار' : 0 }

]

# اوپر والے ڈیٹا سے مارکیٹ ڈیٹا فریم بنائیں

market_df = linuxhint_spark_app.createDataFrame(سبزی)

market_df.show()

آؤٹ پٹ:

یہاں، ہم یہ ڈیٹا فریم 4 کالم اور 8 قطاروں کے ساتھ بناتے ہیں۔ اب، ہم استعمال کرتے ہیں pandas_udf() صارف کی طرف سے بیان کردہ فنکشنز بنانے اور ان کو ان کالموں پر لاگو کرنے کے لیے۔

Pandas_udf() مختلف ڈیٹا کی اقسام کے ساتھ

اس منظر نامے میں، ہم pandas_udf() کے ساتھ صارف کے طے کردہ کچھ فنکشن بناتے ہیں اور انہیں کالموں پر لاگو کرتے ہیں اور سلیکٹ() طریقہ استعمال کرکے نتائج ظاہر کرتے ہیں۔ ہر معاملے میں، ہم pandas.Series استعمال کرتے ہیں جیسا کہ ہم ویکٹرائزڈ آپریشنز کرتے ہیں۔ یہ کالم کی قدروں کو ایک جہتی صف کے طور پر سمجھتا ہے اور کالم پر آپریشن کا اطلاق ہوتا ہے۔ ڈیکوریٹر میں ہی، ہم فنکشن کی واپسی کی قسم کی وضاحت کرتے ہیں۔

مثال 1: Pandas_udf() String Type کے ساتھ

یہاں، ہم سٹرنگ کی واپسی کی قسم کے ساتھ دو صارف کے متعین فنکشنز بناتے ہیں تاکہ سٹرنگ ٹائپ کالم ویلیو کو بڑے اور چھوٹے میں تبدیل کر سکیں۔ آخر میں، ہم ان فنکشنز کو 'قسم' اور 'locate_country' کالمز پر لاگو کرتے ہیں۔

# pandas_udf کے ساتھ ٹائپ کالم کو اپر کیس میں تبدیل کریں۔

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

واپسی i.str.upper()

# locate_country کالم کو pandas_udf کے ساتھ چھوٹے حروف میں تبدیل کریں۔

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

واپسی i.str.lower()

# سلیکٹ() کا استعمال کرتے ہوئے کالم ڈسپلے کریں

market_df.select( 'قسم' ,type_upper_case( 'قسم' ) 'locate_country' ,
ملک_لوئر_کیس( 'locate_country' )) دکھائیں()

آؤٹ پٹ:

وضاحت:

StringType() فنکشن pyspark.sql.types ماڈیول میں دستیاب ہے۔ PySpark DataFrame بناتے وقت ہم نے اس ماڈیول کو پہلے ہی درآمد کیا ہے۔

  1. سب سے پہلے، UDF (یوزر ڈیفائنڈ فنکشن) str.upper() فنکشن کا استعمال کرتے ہوئے سٹرنگز کو بڑے حروف میں واپس کرتا ہے۔ str.upper() سیریز ڈیٹا سٹرکچر میں دستیاب ہے (جیسا کہ ہم فنکشن کے اندر ایک تیر کے ساتھ سیریز میں تبدیل کر رہے ہیں) جو دی گئی سٹرنگ کو بڑے حروف میں تبدیل کرتا ہے۔ آخر میں، یہ فنکشن 'ٹائپ' کالم پر لاگو ہوتا ہے جو سلیکٹ() طریقہ کے اندر مخصوص ہوتا ہے۔ پہلے، قسم کے کالم میں تمام تار چھوٹے حروف میں ہوتے ہیں۔ اب، وہ بڑے حروف میں تبدیل ہو گئے ہیں۔
  2. دوسرا، UDF str.lower() فنکشن کا استعمال کرتے ہوئے تاروں کو بڑے حروف میں واپس کرتا ہے۔ str.lower() سیریز ڈیٹا سٹرکچر میں دستیاب ہے جو دی گئی سٹرنگ کو لوئر کیس میں تبدیل کرتا ہے۔ آخر میں، یہ فنکشن 'ٹائپ' کالم پر لاگو ہوتا ہے جو سلیکٹ() طریقہ کے اندر مخصوص ہوتا ہے۔ پہلے، قسم کے کالم میں تمام سٹرنگز بڑے حروف میں ہوتے ہیں۔ اب، انہیں چھوٹے حروف میں تبدیل کر دیا گیا ہے۔

مثال 2: انٹیجر ٹائپ کے ساتھ Pandas_udf()

آئیے ایک UDF بنائیں جو PySpark DataFrame کے انٹیجر کالم کو Pandas سیریز میں تبدیل کرے اور ہر قدر میں 100 کا اضافہ کرے۔ منتخب () طریقہ کے اندر اس فنکشن میں 'مقدار' کالم کو منتقل کریں۔

# 100 شامل کریں۔

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

واپسی i+ 100

# مقدار کے کالم کو اوپر والے فنکشن اور ڈسپلے میں منتقل کریں۔

market_df.select( 'مقدار' ,add_100( 'مقدار' )) دکھائیں()

آؤٹ پٹ:

وضاحت:

UDF کے اندر، ہم تمام اقدار کو دہراتے ہیں اور انہیں سیریز میں تبدیل کرتے ہیں۔ اس کے بعد، ہم سیریز میں ہر قدر میں 100 کا اضافہ کرتے ہیں۔ آخر میں، ہم 'مقدار' کالم کو اس فنکشن میں پاس کرتے ہیں اور ہم دیکھ سکتے ہیں کہ 100 تمام اقدار میں شامل ہو گیا ہے۔

Groupby() اور Agg() کا استعمال کرتے ہوئے مختلف ڈیٹا کی اقسام کے ساتھ Pandas_udf()

آئیے یو ڈی ایف کو مجموعی کالموں میں منتقل کرنے کے لیے مثالوں کو دیکھتے ہیں۔ یہاں، کالم کی قدروں کو پہلے گروپ بائی() فنکشن کا استعمال کرتے ہوئے گروپ کیا جاتا ہے اور agg() فنکشن کا استعمال کرتے ہوئے جمع کیا جاتا ہے۔ ہم اس مجموعی فنکشن کے اندر اپنا UDF پاس کرتے ہیں۔

نحو:

pyspark_dataframe_object.groupby( 'گروپنگ_کالم' agg(UDF
(pyspark_dataframe_object[ 'کالم' ]))

یہاں، گروپنگ کالم میں اقدار کو پہلے گروپ کیا جاتا ہے۔ پھر، ہمارے UDF کے حوالے سے ہر گروپ کردہ ڈیٹا پر جمع کیا جاتا ہے۔

مثال 1: Pandas_udf() Aggregate Mean() کے ساتھ

یہاں، ہم واپسی کی قسم کے فلوٹ کے ساتھ صارف کی طرف سے طے شدہ فنکشن بناتے ہیں۔ فنکشن کے اندر، ہم اوسط () فنکشن کا استعمال کرتے ہوئے حساب لگاتے ہیں۔ یہ UDF ہر قسم کی اوسط مقدار حاصل کرنے کے لیے 'مقدار' کالم میں منتقل کیا جاتا ہے۔

# اوسط/اوسط واپس کریں۔

@pandas_udf( 'فلوٹ' )

def average_function(i: panda.Series) -> float:

واپسی i.mean()

# قسم کے کالم کو گروپ کرکے فنکشن میں مقدار کے کالم کو منتقل کریں۔

market_df.groupby( 'قسم' .agg(اوسط_فنکشن(مارکیٹ_ڈی ایف[ 'مقدار' ])) شو()

آؤٹ پٹ:

ہم 'قسم' کالم میں عناصر کی بنیاد پر گروپ بندی کر رہے ہیں۔ دو گروپ بنتے ہیں - 'پھل' اور 'سبزی'۔ ہر گروپ کے لیے، اوسط کا حساب لگایا جاتا ہے اور واپس کیا جاتا ہے۔

مثال 2: Pandas_udf() Aggregate Max() اور Min() کے ساتھ

یہاں، ہم انٹیجر (int) کی واپسی کی قسم کے ساتھ دو صارف کے متعین فنکشن بناتے ہیں۔ پہلا UDF کم از کم قیمت اور دوسرا UDF زیادہ سے زیادہ قیمت لوٹاتا ہے۔

# pandas_udf جو کم از کم قیمت لوٹاتا ہے۔

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

واپسی i.min()

# pandas_udf جو زیادہ سے زیادہ قدر لوٹاتا ہے۔

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

واپسی i.max()

# locate_country کو گروپ بنا کر مقدار کے کالم کو min_pandas_udf تک پہنچائیں۔

market_df.groupby( 'locate_country' .agg(min_(market_df[ 'مقدار' ])) شو()

# locate_country کو گروپ کرکے مقدار کے کالم کو max_pandas_udf تک پہنچائیں۔

market_df.groupby( 'locate_country' .agg(max_(market_df[ 'مقدار' ])) شو()

آؤٹ پٹ:

کم سے کم اور زیادہ سے زیادہ اقدار واپس کرنے کے لیے، ہم UDFs کی واپسی کی قسم میں min() اور max() فنکشنز کا استعمال کرتے ہیں۔ اب، ہم ڈیٹا کو 'locate_country' کالم میں گروپ کرتے ہیں۔ چار گروپ بنائے گئے ہیں ('چین'، 'انڈیا'، 'جاپان'، 'امریکہ')۔ ہر گروپ کے لیے، ہم زیادہ سے زیادہ مقدار واپس کرتے ہیں۔ اسی طرح، ہم کم از کم مقدار واپس کرتے ہیں.

نتیجہ

بنیادی طور پر، pandas_udf () ہمارے PySpark ڈیٹا فریم پر ویکٹرائزڈ آپریشنز کرنے کے لیے استعمال ہوتا ہے۔ ہم نے دیکھا ہے کہ pandas_udf() کیسے بنایا جائے اور اسے PySpark DataFrame پر لاگو کیا جائے۔ بہتر تفہیم کے لیے، ہم نے تمام ڈیٹا ٹائپس (سٹرنگ، فلوٹ، اور انٹیجر) پر غور کرکے مختلف مثالوں پر تبادلہ خیال کیا۔ pandas_udf() کو گروپ بائی() کے ساتھ agg() فنکشن کے ذریعے استعمال کرنا ممکن ہے۔