Sådan implementeres realtidsdatastreaming i Python

Sadan Implementeres Realtidsdatastreaming I Python



At mestre implementeringen af ​​realtidsdatastreaming i Python fungerer som en væsentlig færdighed i nutidens data-involverede verden. Denne vejledning udforsker de centrale trin og væsentlige værktøjer til at bruge realtidsdatastreaming med ægthed i Python. Fra at vælge en passende ramme som Apache Kafka eller Apache Pulsar til at skrive en Python-kode til ubesværet dataforbrug, behandling og effektiv visualisering, vil vi tilegne os de nødvendige færdigheder til at konstruere de agile og effektive datakanaler i realtid.

Eksempel 1: Implementering af realtidsdatastreaming i Python

Implementering af en realtidsdatastreaming i Python er afgørende i nutidens datadrevne tidsalder og verden. I dette detaljerede eksempel vil vi gennemgå processen med at bygge et datastreamingsystem i realtid ved hjælp af Apache Kafka og Python i Google Colab.







For at initialisere eksemplet, før vi begynder at kode, er det vigtigt at bygge et specifikt miljø i Google Colab. Den første ting, vi skal gøre, er at installere de nødvendige biblioteker. Vi bruger 'kafka-python'-biblioteket til Kafka-integration.



! pip installere kafka-python


Denne kommando installerer 'kafka-python'-biblioteket, som giver Python-funktionerne og bindingerne til Apache Kafka. Dernæst importerer vi de nødvendige biblioteker til vores projekt. Import af de påkrævede biblioteker inklusive 'KafkaProducer' og 'KafkaConsumer' er klasserne fra 'kafka-python'-biblioteket, der giver os mulighed for at interagere med Kafka-mæglere. JSON er Python-biblioteket til at arbejde med JSON-dataene, som vi bruger til at serialisere og deserialisere meddelelserne.



fra kafka import KafkaProducer, KafkaConsumer
importer json


Oprettelse af en Kafka-producent





Dette er vigtigt, fordi en Kafka-producent sender dataene til et Kafka-emne. I vores eksempel opretter vi en producent til at sende simulerede realtidsdata til et emne kaldet 'realtidsemne.'

Vi opretter en 'KafkaProducer'-instans, som angiver Kafka-mæglerens adresse som 'localhost:9092'. Derefter bruger vi 'value_serializer', en funktion, der serialiserer dataene, før de sendes til Kafka. I vores tilfælde koder en lambda-funktion dataene som UTF-8-kodet JSON. Lad os nu simulere nogle realtidsdata og sende dem til Kafka-emnet.



producer = KafkaProducer ( bootstrap_servere = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( i ) .encode ( 'utf-8' ) )
# Simulerede realtidsdata
data = { 'sensor_id' : 1 , 'temperatur' : 25,5 , 'fugtighed' : 60,2 }
# Sender data til emnet
producer.send ( 'realtids-emne' , data )


I disse linjer definerer vi en 'data'-ordbog, der repræsenterer simulerede sensordata. Vi bruger derefter 'send'-metoden til at publicere disse data til 'real-time-emnet'.

Så vil vi skabe en Kafka-forbruger, og en Kafka-forbruger læser data fra et Kafka-emne. Vi skaber en forbruger til at forbruge og behandle budskaberne i 'real-time-emnet.' Vi opretter en 'KafkaConsumer'-instans, der specificerer det emne, vi ønsker at forbruge, f.eks. (real-time-emne) og Kafka-mæglerens adresse. Så er 'value_deserializer' en funktion, der deserialiserer de data, der modtages fra Kafka. I vores tilfælde afkoder en lambda-funktion dataene som UTF-8-kodet JSON.

forbruger = KafkaForbruger ( 'realtids-emne' ,
bootstrap_servere = 'localhost:9092' ,
værdi_deserializer =lambda x: json.loads ( x.afkode ( 'utf-8' ) ) )


Vi bruger en iterativ loop til løbende at forbruge og behandle budskaberne fra emnet.

# Læsning og behandling af realtidsdata
til besked i forbruger:
data = besked.værdi
Print ( f 'Modtaget data: {data}' )


Vi henter hver beskeds værdi og vores simulerede sensordata inde i løkken og udskriver den til konsollen. At køre Kafka-producenten og forbrugeren involverer at køre denne kode i Google Colab og eksekvere kodecellerne individuelt. Producenten sender de simulerede data til Kafka-emnet, og forbrugeren læser og udskriver de modtagne data.


Analyse af output, mens koden kører

Vi vil observere en realtidsdata, der produceres og forbruges. Dataformatet kan variere afhængigt af vores simulering eller faktiske datakilde. I dette detaljerede eksempel dækker vi hele processen med at opsætte et datastreamingsystem i realtid ved hjælp af Apache Kafka og Python i Google Colab. Vi vil forklare hver linje kode og dens betydning i opbygningen af ​​dette system. Datastreaming i realtid er en kraftfuld funktion, og dette eksempel tjener som grundlag for mere komplekse applikationer i den virkelige verden.

Eksempel 2: Implementering af en realtidsdatastreaming i Python ved hjælp af aktiemarkedsdata

Lad os tage endnu et unikt eksempel på implementering af en datastreaming i realtid i Python ved hjælp af et andet scenarie; denne gang vil vi fokusere på aktiemarkedsdata. Vi skaber et datastreaming-system i realtid, der fanger aktiekursændringerne og behandler dem ved hjælp af Apache Kafka og Python i Google Colab. Som vist i det foregående eksempel starter vi med at konfigurere vores miljø i Google Colab. Først installerer vi de nødvendige biblioteker:

! pip installere kafka-python yfinance


Her tilføjer vi 'yfinance'-biblioteket, som giver os mulighed for at få aktiemarkedsdata i realtid. Dernæst importerer vi de nødvendige biblioteker. Vi fortsætter med at bruge klasserne 'KafkaProducer' og 'KafkaConsumer' fra 'kafka-python'-biblioteket til Kafka-interaktion. Vi importerer JSON for at arbejde med JSON-dataene. Vi bruger også 'yfinance' til at få aktiemarkedsdata i realtid. Vi importerer også 'tids'-biblioteket for at tilføje en tidsforsinkelse for at simulere realtidsopdateringerne.

fra kafka import KafkaProducer, KafkaConsumer
importer json
import yfinance som yf
importere tid


Nu opretter vi en Kafka-producent til lagerdata. Vores Kafka-producent får en aktiedata i realtid og sender den til et Kafka-emne kaldet 'aktiepris'.

producer = KafkaProducer ( bootstrap_servere = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( i ) .encode ( 'utf-8' ) )

mens Rigtigt:
aktie = yf.Ticker ( 'AAPL' ) # Eksempel: Apple Inc. aktie
stock_data = stock.history ( periode = '1d' )
sidste_pris = lager_data [ 'Tæt' ] .iloc [ - 1 ]
data = { 'symbol' : 'AAPL' , 'pris' : sidste pris }
producer.send ( 'lagerpris' , data )
tid.søvn ( 10 ) # Simuler opdateringer i realtid hvert 10. sekund


Vi opretter en 'KafkaProducer'-instans med Kafka-mæglerens adresse i denne kode. Inde i løkken bruger vi 'yfinance' til at få den seneste aktiekurs for Apple Inc. ('AAPL'). Derefter udtrækker vi den sidste lukkekurs og sender den til emnet 'aktiekurs'. Til sidst introducerer vi en tidsforsinkelse for at simulere realtidsopdateringerne hvert 10. sekund.

Lad os skabe en Kafka-forbruger til at læse og behandle aktiekursdata fra emnet 'aktiekurs'.

forbruger = KafkaForbruger ( 'lagerpris' ,
bootstrap_servere = 'localhost:9092' ,
værdi_deserializer =lambda x: json.loads ( x.afkode ( 'utf-8' ) ) )

til besked i forbruger:
stock_data = message.value
Print ( f 'Modtaget lagerdata: {stock_data['symbol']} - Pris: {stock_data['price']}' )


Denne kode ligner det forrige eksempels forbrugeropsætning. Den læser og behandler løbende beskederne fra emnet 'aktiepris' og udskriver aktiesymbolet og prisen til konsollen. Vi udfører kodecellerne sekventielt, f.eks. én efter én i Google Colab for at køre producenten og forbrugeren. Producenten får og sender aktiekursopdateringerne i realtid, mens forbrugeren læser og viser disse data.

! pip installere kafka-python yfinance
fra kafka import KafkaProducer, KafkaConsumer
importer json
import yfinance som yf
importere tid
producer = KafkaProducer ( bootstrap_servere = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( i ) .encode ( 'utf-8' ) )

mens Rigtigt:
aktie = yf.Ticker ( 'AAPL' ) # Apple Inc. aktie
stock_data = stock.history ( periode = '1d' )
sidste_pris = lager_data [ 'Tæt' ] .iloc [ - 1 ]

data = { 'symbol' : 'AAPL' , 'pris' : sidste pris }

producer.send ( 'lagerpris' , data )

tid.søvn ( 10 ) # Simuler opdateringer i realtid hvert 10. sekund
forbruger = KafkaForbruger ( 'lagerpris' ,
bootstrap_servere = 'localhost:9092' ,
værdi_deserializer =lambda x: json.loads ( x.afkode ( 'utf-8' ) ) )

til besked i forbruger:
stock_data = message.value
Print ( f 'Modtaget lagerdata: {stock_data['symbol']} - Pris: {stock_data['price']}' )


I analysen af ​​outputtet, efter at koden er kørt, vil vi observere aktiekursopdateringerne i realtid for Apple Inc., der produceres og forbruges.

Konklusion

I dette unikke eksempel demonstrerede vi implementeringen af ​​realtidsdatastreaming i Python ved hjælp af Apache Kafka og 'yfinance'-biblioteket til at fange og behandle aktiemarkedsdataene. Vi forklarede grundigt hver linje i koden. Datastreaming i realtid kan anvendes på forskellige områder for at bygge de virkelige applikationer inden for finans, IoT og mere.