ازگر میں ریئل ٹائم ڈیٹا اسٹریمنگ کو کیسے نافذ کیا جائے۔

Azgr My Ryyl Aym Y A As Rymng Kw Kys Nafdh Kya Jay



Python میں ریئل ٹائم ڈیٹا سٹریمنگ کے نفاذ میں مہارت حاصل کرنا آج کی ڈیٹا سے منسلک دنیا میں ایک ضروری مہارت کے طور پر کام کرتا ہے۔ یہ گائیڈ ازگر میں صداقت کے ساتھ ریئل ٹائم ڈیٹا سٹریمنگ کو استعمال کرنے کے لیے بنیادی اقدامات اور ضروری ٹولز کو دریافت کرتا ہے۔ Apache Kafka یا Apache Pulsar جیسے موزوں فریم ورک کو منتخب کرنے سے لے کر ڈیٹا کے آسان استعمال، پروسیسنگ، اور مؤثر تصور کے لیے Python کوڈ لکھنے تک، ہم چست اور موثر ریئل ٹائم ڈیٹا چینلز کی تعمیر کے لیے ضروری مہارتیں حاصل کریں گے۔

مثال 1: ازگر میں ریئل ٹائم ڈیٹا سٹریمنگ کا نفاذ

Python میں ریئل ٹائم ڈیٹا سٹریمنگ کو نافذ کرنا آج کے ڈیٹا سے چلنے والے دور اور دنیا میں بہت ضروری ہے۔ اس تفصیلی مثال میں، ہم Google Colab میں Apache Kafka اور Python کا استعمال کرتے ہوئے ریئل ٹائم ڈیٹا اسٹریمنگ سسٹم بنانے کے عمل سے گزریں گے۔







کوڈنگ شروع کرنے سے پہلے مثال کو شروع کرنے کے لیے، Google Colab میں ایک مخصوص ماحول بنانا ضروری ہے۔ سب سے پہلے ہمیں ضروری لائبریریوں کو انسٹال کرنے کی ضرورت ہے۔ ہم کافکا کے انضمام کے لیے 'کافکا ازگر' لائبریری کا استعمال کرتے ہیں۔



! pip انسٹال کریں کافکا ازگر


یہ کمانڈ 'kafka-python' لائبریری کو انسٹال کرتی ہے جو Apache Kafka کے لیے Python فنکشنز اور بائنڈنگز فراہم کرتی ہے۔ اگلا، ہم اپنے پروجیکٹ کے لیے مطلوبہ لائبریریاں درآمد کرتے ہیں۔ 'کافکا پروڈیوسر' اور 'کافکا کنزیومر' سمیت مطلوبہ لائبریریوں کو درآمد کرنا 'کافکا پائتھون' لائبریری کی کلاسیں ہیں جو ہمیں کافکا بروکرز کے ساتھ بات چیت کرنے کی اجازت دیتی ہیں۔ JSON Python لائبریری ہے جو JSON ڈیٹا کے ساتھ کام کرتی ہے جسے ہم پیغامات کو سیریلائز اور ڈی سیریلائز کرنے کے لیے استعمال کرتے ہیں۔



کافکا درآمد کافکا پروڈیوسر، کافکا کنزیومر سے
json درآمد کریں۔


کافکا پروڈیوسر کی تخلیق





یہ اہم ہے کیونکہ کافکا پروڈیوسر ڈیٹا کو کافکا کے موضوع پر بھیجتا ہے۔ ہماری مثال میں، ہم ایک پروڈیوسر بناتے ہیں تاکہ 'ریئل ٹائم ٹاپک' نامی موضوع پر نقلی ریئل ٹائم ڈیٹا بھیج سکے۔

ہم ایک 'کافکا پروڈیوسر' مثال بناتے ہیں جو کافکا بروکر کے ایڈریس کو 'لوکل ہوسٹ: 9092' کے طور پر بیان کرتا ہے۔ پھر، ہم 'value_serializer' کا استعمال کرتے ہیں، ایک ایسا فنکشن جو ڈیٹا کو کافکا کو بھیجنے سے پہلے اسے سیریلائز کرتا ہے۔ ہمارے معاملے میں، ایک لیمبڈا فنکشن ڈیٹا کو UTF-8-encoded JSON کے بطور انکوڈ کرتا ہے۔ اب، آئیے کچھ ریئل ٹائم ڈیٹا کی نقل کرتے ہیں اور اسے کافکا کے موضوع پر بھیجتے ہیں۔



پروڈیوسر = کافکا پروڈیوسر ( bootstrap_servers = 'localhost:9092' ،
قدر_سیریلائزر =lambda v: json.dumps ( میں ) .encode ( 'utf-8' ) )
# نقلی ریئل ٹائم ڈیٹا
ڈیٹا = { 'sensor_id' : 1 ، 'درجہ حرارت' : 25.5 ، 'نمی' : 60.2 }
# موضوع پر ڈیٹا بھیجنا
پروڈیوسر بھیجیں۔ ( 'حقیقی وقت کا موضوع' ، ڈیٹا )


ان لائنوں میں، ہم ایک 'ڈیٹا' لغت کی وضاحت کرتے ہیں جو ایک مصنوعی سینسر ڈیٹا کی نمائندگی کرتا ہے۔ اس کے بعد ہم اس ڈیٹا کو 'ریئل ٹائم ٹاپک' پر شائع کرنے کے لیے 'بھیجیں' کا طریقہ استعمال کرتے ہیں۔

پھر، ہم ایک کافکا صارف بنانا چاہتے ہیں، اور ایک کافکا صارف کافکا کے موضوع سے ڈیٹا پڑھتا ہے۔ ہم 'ریئل ٹائم ٹاپک' میں پیغامات کو استعمال کرنے اور اس پر کارروائی کرنے کے لیے ایک صارف بناتے ہیں۔ ہم ایک 'کافکا کنزیومر' مثال بناتے ہیں، اس موضوع کی وضاحت کرتے ہوئے جسے ہم استعمال کرنا چاہتے ہیں، جیسے، (ریئل ٹائم ٹاپک) اور کافکا بروکر کا پتہ۔ پھر، 'value_deserializer' ایک فنکشن ہے جو کافکا سے موصول ہونے والے ڈیٹا کو ڈی سیریلائز کرتا ہے۔ ہمارے معاملے میں، ایک لیمبڈا فنکشن ڈیٹا کو UTF-8-encoded JSON کے طور پر ڈی کوڈ کرتا ہے۔

صارف = کافکا صارف ( 'حقیقی وقت کا موضوع' ،
bootstrap_servers = 'localhost:9092' ،
قدر_ڈیسیریلائزر =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )


ہم موضوع کے پیغامات کو مسلسل استعمال کرنے اور اس پر کارروائی کرنے کے لیے ایک تکراری لوپ کا استعمال کرتے ہیں۔

# ریئل ٹائم ڈیٹا کو پڑھنا اور اس پر کارروائی کرنا
کے لیے پیغام میں صارف:
ڈیٹا = message.value
پرنٹ کریں ( f 'موصول شدہ ڈیٹا: {data}' )


ہم لوپ کے اندر ہر پیغام کی قدر اور اپنے مصنوعی سینسر ڈیٹا کو بازیافت کرتے ہیں اور اسے کنسول پر پرنٹ کرتے ہیں۔ کافکا پروڈیوسر اور صارف کو چلانے میں اس کوڈ کو Google Colab میں چلانا اور کوڈ سیلز کو انفرادی طور پر چلانا شامل ہے۔ پروڈیوسر نقلی ڈیٹا کو کافکا کے موضوع پر بھیجتا ہے، اور صارف موصول ہونے والے ڈیٹا کو پڑھتا اور پرنٹ کرتا ہے۔


کوڈ کے چلتے ہی آؤٹ پٹ کا تجزیہ

ہم ریئل ٹائم ڈیٹا کا مشاہدہ کریں گے جو تیار اور استعمال کیا جا رہا ہے۔ ڈیٹا فارمیٹ ہمارے نقلی یا اصل ڈیٹا ماخذ کے لحاظ سے مختلف ہو سکتا ہے۔ اس تفصیلی مثال میں، ہم Google Colab میں Apache Kafka اور Python کا استعمال کرتے ہوئے ریئل ٹائم ڈیٹا اسٹریمنگ سسٹم ترتیب دینے کے پورے عمل کا احاطہ کرتے ہیں۔ ہم اس نظام کی تعمیر میں کوڈ کی ہر سطر اور اس کی اہمیت کی وضاحت کریں گے۔ ریئل ٹائم ڈیٹا سٹریمنگ ایک طاقتور صلاحیت ہے، اور یہ مثال زیادہ پیچیدہ حقیقی دنیا کی ایپلی کیشنز کی بنیاد کے طور پر کام کرتی ہے۔

مثال 2: اسٹاک مارکیٹ ڈیٹا کا استعمال کرتے ہوئے ازگر میں ریئل ٹائم ڈیٹا سٹریمنگ کو نافذ کرنا

آئیے ایک مختلف منظر نامے کا استعمال کرتے ہوئے پائتھون میں ریئل ٹائم ڈیٹا اسٹریمنگ کو لاگو کرنے کی ایک اور منفرد مثال کرتے ہیں۔ اس بار، ہم اسٹاک مارکیٹ کے اعداد و شمار پر توجہ مرکوز کریں گے. ہم ایک ریئل ٹائم ڈیٹا اسٹریمنگ سسٹم بناتے ہیں جو اسٹاک کی قیمتوں میں ہونے والی تبدیلیوں کو پکڑتا ہے اور Google Colab میں Apache Kafka اور Python کا استعمال کرتے ہوئے ان پر کارروائی کرتا ہے۔ جیسا کہ پچھلی مثال میں دکھایا گیا ہے، ہم گوگل کولاب میں اپنے ماحول کو ترتیب دے کر شروع کرتے ہیں۔ سب سے پہلے، ہم مطلوبہ لائبریریوں کو انسٹال کرتے ہیں:

! pip انسٹال کریں kafka-python yfinance


یہاں، ہم 'yfinance' لائبریری شامل کرتے ہیں جو ہمیں ریئل ٹائم اسٹاک مارکیٹ ڈیٹا حاصل کرنے کی اجازت دیتی ہے۔ اگلا، ہم ضروری لائبریریاں درآمد کرتے ہیں۔ ہم کافکا کے تعامل کے لیے 'کافکا ازگر' لائبریری سے 'کافکا پروڈیوسر' اور 'کافکا کنزیومر' کلاسز کا استعمال جاری رکھتے ہیں۔ ہم JSON ڈیٹا کے ساتھ کام کرنے کے لیے JSON درآمد کرتے ہیں۔ ہم ریئل ٹائم اسٹاک مارکیٹ ڈیٹا حاصل کرنے کے لیے 'yfinance' کا بھی استعمال کرتے ہیں۔ ہم ریئل ٹائم اپ ڈیٹس کی تقلید کے لیے وقت میں تاخیر شامل کرنے کے لیے 'ٹائم' لائبریری بھی درآمد کرتے ہیں۔

کافکا درآمد کافکا پروڈیوسر، کافکا کنزیومر سے
json درآمد کریں۔
درآمد yfinance کے طور پر yf
درآمد وقت


اب، ہم اسٹاک ڈیٹا کے لیے کافکا پروڈیوسر بناتے ہیں۔ ہمارے کافکا پروڈیوسر کو ریئل ٹائم اسٹاک ڈیٹا ملتا ہے اور اسے کافکا کے موضوع پر بھیجتا ہے جس کا نام 'اسٹاک کی قیمت' ہے۔

پروڈیوسر = کافکا پروڈیوسر ( bootstrap_servers = 'localhost:9092' ،
قدر_سیریلائزر =lambda v: json.dumps ( میں ) .encode ( 'utf-8' ) )

جبکہ سچ:
اسٹاک = yf.Ticker ( 'AAPL' ) # مثال: Apple Inc. اسٹاک
stock_data = stock.history ( مدت = '1d' )
آخری_قیمت = اسٹاک_ڈیٹا [ 'بند کریں' ] .iloc [ - 1 ]
ڈیٹا = { 'علامت' : 'AAPL' ، 'قیمت' : آخری قیمت }
پروڈیوسر بھیجیں۔ ( 'اسٹاک کی قیمت' ، ڈیٹا )
وقت، نیند ( 10 ) # ہر 10 سیکنڈ میں ریئل ٹائم اپڈیٹس کی تقلید کریں۔


ہم اس کوڈ میں کافکا بروکر کے ایڈریس کے ساتھ ایک 'کافکا پروڈیوسر' مثال بناتے ہیں۔ لوپ کے اندر، ہم Apple Inc. ('AAPL') کے لیے تازہ ترین اسٹاک کی قیمت حاصل کرنے کے لیے 'yfinance' کا استعمال کرتے ہیں۔ پھر، ہم آخری اختتامی قیمت نکالتے ہیں اور اسے 'اسٹاک کی قیمت' کے عنوان پر بھیج دیتے ہیں۔ آخر کار، ہم ہر 10 سیکنڈ میں ریئل ٹائم اپڈیٹس کی تقلید کے لیے وقت میں تاخیر متعارف کرواتے ہیں۔

آئیے 'اسٹاک کی قیمت' کے عنوان سے اسٹاک کی قیمت کے ڈیٹا کو پڑھنے اور اس پر کارروائی کرنے کے لیے ایک کافکا صارف بنائیں۔

صارف = کافکا صارف ( 'اسٹاک کی قیمت' ،
bootstrap_servers = 'localhost:9092' ،
قدر_ڈیسیریلائزر =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

کے لیے پیغام میں صارف:
stock_data = message.value
پرنٹ کریں ( f 'موصول شدہ اسٹاک ڈیٹا: {stock_data['symbol']} - قیمت: {stock_data['price']}' )


یہ کوڈ پچھلی مثال کے صارف سیٹ اپ جیسا ہے۔ یہ 'اسٹاک کی قیمت' کے عنوان سے پیغامات کو مسلسل پڑھتا اور اس پر کارروائی کرتا ہے اور اسٹاک کی علامت اور قیمت کو کنسول پر پرنٹ کرتا ہے۔ ہم پروڈیوسر اور کنزیومر کو چلانے کے لیے گوگل کولاب میں ایک ایک کرکے کوڈ سیلز کو ترتیب وار انجام دیتے ہیں۔ پروڈیوسر ریئل ٹائم سٹاک کی قیمت کی اپ ڈیٹس حاصل کرتا ہے اور بھیجتا ہے جبکہ صارف اس ڈیٹا کو پڑھتا اور ڈسپلے کرتا ہے۔

! pip انسٹال کریں kafka-python yfinance
کافکا درآمد کافکا پروڈیوسر، کافکا کنزیومر سے
json درآمد کریں۔
درآمد yfinance کے طور پر yf
درآمد وقت
پروڈیوسر = کافکا پروڈیوسر ( bootstrap_servers = 'localhost:9092' ،
قدر_سیریلائزر =lambda v: json.dumps ( میں ) .encode ( 'utf-8' ) )

جبکہ سچ:
اسٹاک = yf.Ticker ( 'AAPL' ) # Apple Inc. اسٹاک
stock_data = stock.history ( مدت = '1d' )
آخری_قیمت = اسٹاک_ڈیٹا [ 'بند کریں' ] .iloc [ - 1 ]

ڈیٹا = { 'علامت' : 'AAPL' ، 'قیمت' : آخری قیمت }

پروڈیوسر بھیجیں۔ ( 'اسٹاک کی قیمت' ، ڈیٹا )

وقت، نیند ( 10 ) # ہر 10 سیکنڈ میں ریئل ٹائم اپڈیٹس کی تقلید کریں۔
صارف = کافکا صارف ( 'اسٹاک کی قیمت' ،
bootstrap_servers = 'localhost:9092' ،
قدر_ڈیسیریلائزر =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

کے لیے پیغام میں صارف:
stock_data = message.value
پرنٹ کریں ( f 'موصول شدہ اسٹاک ڈیٹا: {stock_data['symbol']} - قیمت: {stock_data['price']}' )


کوڈ کے چلنے کے بعد آؤٹ پٹ کے تجزیے میں، ہم ایپل انکارپوریشن کے لیے ریئل ٹائم سٹاک کی قیمتوں کی تازہ کاریوں کا مشاہدہ کریں گے جو تیار اور استعمال کیے جا رہے ہیں۔

نتیجہ

اس منفرد مثال میں، ہم نے اسٹاک مارکیٹ کے ڈیٹا کو حاصل کرنے اور اس پر کارروائی کرنے کے لیے اپاچی کافکا اور 'yfinance' لائبریری کا استعمال کرتے ہوئے Python میں ریئل ٹائم ڈیٹا سٹریمنگ کے نفاذ کا مظاہرہ کیا۔ ہم نے کوڈ کی ہر سطر کی اچھی طرح وضاحت کی۔ فنانس، IoT اور مزید میں حقیقی دنیا کی ایپلی کیشنز بنانے کے لیے ریئل ٹائم ڈیٹا سٹریمنگ کو مختلف شعبوں میں لاگو کیا جا سکتا ہے۔