Předpoklad
Chcete-li číst data z Kafky, musíte nainstalovat potřebnou knihovnu pythonu. V tomto výukovém programu se Python3 používá k napsání skriptu spotřebitele a producenta. Pokud balíček pip není nainstalován dříve ve vašem operačním systému Linux, musíte nainstalovat pip před instalací knihovny Kafka pro python. python3-kafka se v tomto tutoriálu používá ke čtení dat z Kafky. Spusťte následující příkaz a nainstalujte knihovnu.
$ pip nainstalujte python3-kafkaČtení jednoduchých textových dat z Kafky
Od výrobce lze zaslat různé typy údajů o konkrétním tématu, které si spotřebitel může přečíst. Jak lze odesílat a přijímat jednoduchá textová data od společnosti Kafka pomocí producenta a spotřebitele, ukazuje tato část tohoto tutoriálu.
Vytvořte soubor s názvem producent1.py s následujícím skriptem pythonu. KafkaVýrobce modul je importován z knihovny Kafka. Seznam zprostředkovatelů je třeba definovat v době inicializace objektu producenta, aby se mohl připojit k serveru Kafka. Výchozí port Kafky je '9092''. argument bootstrap_servers se používá k definování názvu hostitele s portem. ''První_téma'je nastaven jako název tématu, kterým bude textová zpráva odeslána od producenta. Dále jednoduchá textová zpráva, 'Ahoj od Kafky„je odeslán pomocí poslat() metoda KafkaVýrobce k tématu, 'První_téma''.
producent1.py:
# Importujte produkt KafkaProducer z knihovny Kafkaz importu kafky KafkaProducer
# Definujte server s portem
bootstrap_servers = ['localhost: 9092']
# Definujte název tématu, kde bude zpráva publikována
topicName = 'První_téma'
# Inicializovat proměnnou producenta
producent = KafkaProducer (bootstrap_servers = bootstrap_servers)
# Publikovat text v definovaném tématu
výrobce.send (topicName, b'Hello from kafka… ')
# Tisknout zprávu
print ("Zpráva odeslána")
Vytvořte soubor s názvem spotřebitel1.py s následujícím skriptem pythonu. Spotřebitel Kafka modul je importován z knihovny Kafka ke čtení dat z Kafky. sys modul se zde používá k ukončení skriptu. Ke čtení dat z Kafky se ve skriptu spotřebitele používá stejné jméno hostitele a číslo portu výrobce. Název tématu spotřebitele a výrobce musí být stejný jako „First_topic''. Dále je spotřebitelský objekt inicializován třemi argumenty. Název tématu, ID skupiny a informace o serveru. pro smyčka se zde používá ke čtení textu odeslaného od výrobce Kafka.
spotřebitel1.py:
# Importujte KafkaConsumer z knihovny Kafkaz importu kafka KafkaConsumer
# Importovat sys modul
import sys
# Definujte server s portem
bootstrap_servers = ['localhost: 9092']
# Definujte název tématu, odkud bude zpráva přijímána
topicName = 'První_téma'
# Inicializovat proměnnou spotřebitele
consumer = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Přečíst a vytisknout zprávu od spotřebitele
pro zprávu u spotřebitele:
print ("Název tématu =% s, Zpráva =% s"% (msg.téma, zpráva.hodnota))
# Ukončete skript
sys.výstup()
Výstup:
Spusťte následující příkaz z jednoho terminálu a proveďte producentský skript.
$ python3 producent1.pyPo odeslání zprávy se zobrazí následující výstup.
Spusťte následující příkaz z jiného terminálu a proveďte spotřebitelský skript.
$ python3 spotřebitel1.pyVýstup zobrazuje název tématu a textovou zprávu odeslanou od producenta.
Čtení dat ve formátu JSON z Kafky
Data ve formátu JSON mohou být zasílána výrobcem Kafka a čtena spotřebitelem Kafka pomocí JSON modul pythonu. Jak lze data JSON serializovat a de-serializovat před odesláním a příjmem dat pomocí modulu python-kafka, je ukázáno v této části tohoto tutoriálu.
Vytvořte skript pythonu s názvem producent2.py s následujícím skriptem. Importuje se další modul s názvem JSON KafkaVýrobce modul zde. value_serializer argument se používá s bootstrap_servers argument zde pro inicializaci objektu producenta Kafky. Tento argument naznačuje, že data JSON budou kódována pomocí 'utf-8'znaková sada v době odeslání. Dále se do formátu pojmenovaného tématu odešlou data ve formátu JSON JSONtopic.
producent2.py:
# Importujte produkt KafkaProducer z knihovny Kafkaz importu kafky KafkaProducer
# Importujte modul JSON k serializaci dat
importovat JSON
# Inicializovat proměnnou producenta a nastavit parametr pro kódování JSON
producent = KafkaProducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.skládky (v).enkódovat ('utf-8'))
# Odesílejte data ve formátu JSON
výrobce.send ('JSONtopic', 'name': 'fahmida', 'email': '[email protected]')
# Tisknout zprávu
print ("Zpráva odeslána do JSONtopic")
Vytvořte skript pythonu s názvem spotřebitel2.py s následujícím skriptem. Spotřebitel Kafka, sys a moduly JSON se importují do tohoto skriptu. Spotřebitel Kafka modul slouží ke čtení dat ve formátu JSON z Kafky. Modul JSON se používá k dekódování kódovaných dat JSON odeslaných od výrobce Kafka. Sys modul slouží k ukončení skriptu. value_deserializer argument se používá s bootstrap_servers definovat, jak budou data JSON dekódována. další, pro smyčka se používá k tisku všech záznamů o spotřebitelích a dat JSON získaných z Kafky.
spotřebitel2.py:
# Importujte KafkaConsumer z knihovny Kafkaz importu kafka KafkaConsumer
# Importovat sys modul
import sys
# Importujte modul JSON k serializaci dat
importovat JSON
# Inicializovat proměnnou spotřebitele a nastavit vlastnost pro dekódování JSON
consumer = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = lambda m: json.zatížení (m.dekódovat ('utf-8')))
# Načíst data z kafky
pro zprávu ve spotřebiteli:
print ("Záznamy spotřebitele: \ n")
tisk (zpráva)
print ("\ nČtení z dat JSON \ n")
tisk ("Jméno:", zpráva [6] ['jméno'])
print ("Email:", zpráva [6] ['email'])
# Ukončete skript
sys.výstup()
Výstup:
Spusťte následující příkaz z jednoho terminálu a proveďte producentský skript.
$ python3 producent2.pyPo odeslání dat JSON skript vytiskne následující zprávu.
Spusťte následující příkaz z jiného terminálu a proveďte spotřebitelský skript.
$ python3 spotřebitel2.pyPo spuštění skriptu se zobrazí následující výstup.
Závěr:
Data lze odesílat a přijímat v různých formátech od Kafky pomocí pythonu. Data lze také uložit do databáze a načíst z databáze pomocí Kafky a pythonu. Já doma, tento tutoriál pomůže uživateli pythonu začít pracovat s Kafkou.