Below is a simple python based Kafka producer which reads data from twitter and puts data into kafka topic
You will have to register with twitter to get tweets streamed into this app.
After registration you will have your own access_tokens,access_token_secret,consumer_key,consumer_secret .
Install tweepy and twitter libraries using below command
pip install tweepy
pip install python-twitter
Create kafka topic by executing below commandkafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic twitter_topic
from tweepy.streaming import StreamListener from tweepy import OAuthHandler from tweepy import Stream from kafka import SimpleProducer, KafkaClient import json access_token = "replace with your own access_token" access_token_secret = "replace with your own access_token_secret" consumer_key = "replace with your own consumer_key" consumer_secret = "replace with your own consumer_secret" class StdOutListener(StreamListener): def on_data(self, data): json_data = {} json_data = json.loads(data) #Send twitter text to kafka topic "twitter_topic" if 'text' in json_data: print (json_data["text"]) producer.send_messages("twitter_topic", json_data["text"].encode('utf-8')) else: print("text not found") return True def on_error(self, status): print (status) #kafka installed in localhost kafka = KafkaClient("localhost:9092") producer = SimpleProducer(kafka) l = StdOutListener() auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) stream = Stream(auth, l) #Filter tweets which have the word climate stream.filter(track=["climate"])
Run the client by executing below commandpython3 TwitterKafkaProducer.py