Apache Kafka

Jak číst data z Kafky pomocí Pythonu

Jak číst data z Kafky pomocí Pythonu
Kafka je distribuovaný systém zasílání zpráv s otevřeným zdrojovým kódem, který umožňuje odesílat zprávy rozdělené na různá témata. Streamování dat v reálném čase lze implementovat pomocí Kafky pro příjem dat mezi aplikacemi. Má tři hlavní části. Jedná se o producenta, spotřebitele a témata. Producent se používá k odeslání zprávy k určitému tématu a ke každé zprávě je připojen klíč. Spotřebitel se používá ke čtení zprávy o konkrétním tématu ze sady oddílů. Data přijatá od výrobce a uložená v oddílech na základě konkrétního tématu. Mnoho knihoven existuje v pythonu, aby vytvořily producenta a spotřebitele, aby vytvořily systém zasílání zpráv pomocí Kafky. Jak lze data z Kafky číst pomocí pythonu, ukazuje tento návod.

Předpoklad

Chcete-li číst data z Kafky, musíte nainstalovat potřebnou knihovnu pythonu. V tomto výukovém programu se Python3 používá k napsání skriptu spotřebitele a producenta. Pokud balíček pip není nainstalován dříve ve vašem operačním systému Linux, musíte nainstalovat pip před instalací knihovny Kafka pro python. python3-kafka se v tomto tutoriálu používá ke čtení dat z Kafky. Spusťte následující příkaz a nainstalujte knihovnu.

$ pip nainstalujte python3-kafka

Čtení jednoduchých textových dat z Kafky

Od výrobce lze zaslat různé typy údajů o konkrétním tématu, které si spotřebitel může přečíst. Jak lze odesílat a přijímat jednoduchá textová data od společnosti Kafka pomocí producenta a spotřebitele, ukazuje tato část tohoto tutoriálu.

Vytvořte soubor s názvem producent1.py s následujícím skriptem pythonu. KafkaVýrobce modul je importován z knihovny Kafka. Seznam zprostředkovatelů je třeba definovat v době inicializace objektu producenta, aby se mohl připojit k serveru Kafka. Výchozí port Kafky je '9092''. argument bootstrap_servers se používá k definování názvu hostitele s portem. ''První_téma'je nastaven jako název tématu, kterým bude textová zpráva odeslána od producenta. Dále jednoduchá textová zpráva, 'Ahoj od Kafky„je odeslán pomocí poslat() metoda KafkaVýrobce k tématu, 'První_téma''.

producent1.py:

# Importujte produkt KafkaProducer z knihovny Kafka
z importu kafky KafkaProducer
# Definujte server s portem
bootstrap_servers = ['localhost: 9092']
# Definujte název tématu, kde bude zpráva publikována
topicName = 'První_téma'
# Inicializovat proměnnou producenta
producent = KafkaProducer (bootstrap_servers = bootstrap_servers)
# Publikovat text v definovaném tématu
výrobce.send (topicName, b'Hello from kafka… ')
# Tisknout zprávu
print ("Zpráva odeslána")

Vytvořte soubor s názvem spotřebitel1.py s následujícím skriptem pythonu. Spotřebitel Kafka modul je importován z knihovny Kafka ke čtení dat z Kafky. sys modul se zde používá k ukončení skriptu. Ke čtení dat z Kafky se ve skriptu spotřebitele používá stejné jméno hostitele a číslo portu výrobce. Název tématu spotřebitele a výrobce musí být stejný jako „First_topic''.  Dále je spotřebitelský objekt inicializován třemi argumenty. Název tématu, ID skupiny a informace o serveru. pro smyčka se zde používá ke čtení textu odeslaného od výrobce Kafka.

spotřebitel1.py:

# Importujte KafkaConsumer z knihovny Kafka
z importu kafka KafkaConsumer
# Importovat sys modul
import sys
# Definujte server s portem
bootstrap_servers = ['localhost: 9092']
# Definujte název tématu, odkud bude zpráva přijímána
topicName = 'První_téma'
# Inicializovat proměnnou spotřebitele
consumer = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Přečíst a vytisknout zprávu od spotřebitele
pro zprávu u spotřebitele:
print ("Název tématu =% s, Zpráva =% s"% (msg.téma, zpráva.hodnota))
# Ukončete skript
sys.výstup()

Výstup:

Spusťte následující příkaz z jednoho terminálu a proveďte producentský skript.

$ python3 producent1.py

Po odeslání zprávy se zobrazí následující výstup.

Spusťte následující příkaz z jiného terminálu a proveďte spotřebitelský skript.

$ python3 spotřebitel1.py

Výstup zobrazuje název tématu a textovou zprávu odeslanou od producenta.

Čtení dat ve formátu JSON z Kafky

Data ve formátu JSON mohou být zasílána výrobcem Kafka a čtena spotřebitelem Kafka pomocí JSON modul pythonu. Jak lze data JSON serializovat a de-serializovat před odesláním a příjmem dat pomocí modulu python-kafka, je ukázáno v této části tohoto tutoriálu.

Vytvořte skript pythonu s názvem producent2.py s následujícím skriptem. Importuje se další modul s názvem JSON KafkaVýrobce modul zde. value_serializer argument se používá s bootstrap_servers argument zde pro inicializaci objektu producenta Kafky. Tento argument naznačuje, že data JSON budou kódována pomocí 'utf-8'znaková sada v době odeslání. Dále se do formátu pojmenovaného tématu odešlou data ve formátu JSON JSONtopic.

producent2.py:

# Importujte produkt KafkaProducer z knihovny Kafka
z importu kafky KafkaProducer
# Importujte modul JSON k serializaci dat
importovat JSON
# Inicializovat proměnnou producenta a nastavit parametr pro kódování JSON
producent = KafkaProducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.skládky (v).enkódovat ('utf-8'))
# Odesílejte data ve formátu JSON
výrobce.send ('JSONtopic', 'name': 'fahmida', 'email': '[email protected]')
 
# Tisknout zprávu
print ("Zpráva odeslána do JSONtopic")

Vytvořte skript pythonu s názvem spotřebitel2.py s následujícím skriptem. Spotřebitel Kafka, sys a moduly JSON se importují do tohoto skriptu. Spotřebitel Kafka modul slouží ke čtení dat ve formátu JSON z Kafky. Modul JSON se používá k dekódování kódovaných dat JSON odeslaných od výrobce Kafka. Sys modul slouží k ukončení skriptu. value_deserializer argument se používá s bootstrap_servers definovat, jak budou data JSON dekódována. další, pro smyčka se používá k tisku všech záznamů o spotřebitelích a dat JSON získaných z Kafky.

spotřebitel2.py:

# Importujte KafkaConsumer z knihovny Kafka
z importu kafka KafkaConsumer
# Importovat sys modul
import sys
# Importujte modul JSON k serializaci dat
importovat JSON
# Inicializovat proměnnou spotřebitele a nastavit vlastnost pro dekódování JSON
consumer = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = lambda m: json.zatížení (m.dekódovat ('utf-8')))
# Načíst data z kafky
pro zprávu ve spotřebiteli:
print ("Záznamy spotřebitele: \ n")
tisk (zpráva)
print ("\ nČtení z dat JSON \ n")
tisk ("Jméno:", zpráva [6] ['jméno'])
print ("Email:", zpráva [6] ['email'])
# Ukončete skript
sys.výstup()

Výstup:

Spusťte následující příkaz z jednoho terminálu a proveďte producentský skript.

$ python3 producent2.py

Po odeslání dat JSON skript vytiskne následující zprávu.

Spusťte následující příkaz z jiného terminálu a proveďte spotřebitelský skript.

$ python3 spotřebitel2.py

Po spuštění skriptu se zobrazí následující výstup.

Závěr:

Data lze odesílat a přijímat v různých formátech od Kafky pomocí pythonu. Data lze také uložit do databáze a načíst z databáze pomocí Kafky a pythonu. Já doma, tento tutoriál pomůže uživateli pythonu začít pracovat s Kafkou.

Hry OpenTTD vs Simutrans
OpenTTD vs Simutrans
Vytvoření vlastní simulace dopravy může být zábavné, uvolňující a mimořádně lákavé. Proto se musíte ujistit, že vyzkoušíte co nejvíce her, abyste našl...
Hry Výukový program OpenTTD
Výukový program OpenTTD
OpenTTD je jednou z nejpopulárnějších her pro obchodní simulaci. V této hře musíte vytvořit skvělé dopravní podnikání. Začnete však na začátku kolem r...
Hry SuperTuxKart pro Linux
SuperTuxKart pro Linux
SuperTuxKart je skvělý titul navržený tak, aby vám ve vašem systému Linux přinesl zážitek Mario Kart zdarma. Je to docela náročné a zábavné hrát, je n...