If you are looking to test out kafka in windows OS:
Below is a blog, which explains the same in detail:
https://dzone.com/articles/running-apache-kafka-on-windows-os
Also below is simple code for connecting kafka and pyspark by streaming twitter data.
NOTE:
Test.py == kafkaproducer
Test.py:
#!/usr/bin/env python
# import required libraries
from kafka import SimpleProducer, SimpleClient
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
print("I am at line 9")
consumer_key = "xxxxxxxxxxxxx"
consumer_secret = "xxxxxxxxxxxxx"
access_token = "293636697-xxxxxxxxxxxxxxxxxx"
access_token_secret = "xxxxxxxxxxxxxxxxxxx"
topic = 'twitter-stream'
# setting up Kafka producer
print("Trying to set kafka producer")
kafka = SimpleClient('localhost:9092')
print("Done with setting up of producer")
producer = SimpleProducer(kafka)
print("###############")
#This is a basic listener that just put received tweets to kafka cluster.
class StdOutListener(StreamListener):
def on_data(self, data):
#print("****Inside ON DATA function****")
producer.send_messages(topic, data.encode('utf-8'))
#print("Printing the message on console")
print(data)
return True
def on_error(self, status):
print status
#WORDS_TO_TRACK = "the to and is in it you of for on my that at with me do have just this be so are not was but out up what now new from your like good no get all about we if time as day will one how can some an am by going they go or has know today there love more work too got he back think did when see really had great off would need here thanks been still people who night want why home should well much then right make last over way does getting watching its only her post his morning very she them could first than better after tonight our again down news man looking us tomorrow best into any hope week nice show yes where take check come fun say next watch never bad free life".split()
WORDS_TO_TRACK = "modi india".split()
if __name__ == '__main__':
print 'running the twitter-stream python code'
#This handles Twitter authetification and the connection to Twitter Streaming API
l = StdOutListener()
print("Trying to connect to Handler")
auth = OAuthHandler(consumer_key, consumer_secret)
print("Auth basic done")
auth.set_access_token(access_token, access_token_secret)
print("auth final done")
stream = Stream(auth, l)
print(stream)
# Goal is to keep this process always going
while True:
try:
print("Let us get the response and filter")
stream.filter(languages=["en"], track=WORDS_TO_TRACK)
except Exception as err:
#pass
print(str(err))
Kafka_twitter.py
from __future__ import print_function
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_twitter.py <zk> <topic>", file=sys.stderr)
exit(-1)
print("Starting python streaming context....")
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 10)
zkQuorum, topic = sys.argv[1:]
print("Creating stream....")
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
print("Loading data.....")
parsed = kvs.map(lambda v: json.loads(v[1]))
#print("Printing the records below....")
#print(parsed)
#text_counts = parsed.map(lambda tweet: (tweet['text'], 1)).reduceByKey(lambda x, y: x + y)
#text_counts.pprint()
try:
#author_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'], 1)).reduceByKey(lambda x, y: x + y)
text_counts = parsed.map(lambda tweet: (tweet['text'].encode('utf-8'), 1)).reduceByKey(lambda x, y: x + y)
text_counts.pprint(num=100)
#author_counts.pprint()
#print("end")
except Exception as err:
pass
ssc.start()
ssc.awaitTermination()
Below is a blog, which explains the same in detail:
https://dzone.com/articles/running-apache-kafka-on-windows-os
Also below is simple code for connecting kafka and pyspark by streaming twitter data.
NOTE:
Test.py == kafkaproducer
Kafka_twitter
== pysparkconsumercode
#!/usr/bin/env python
# import required libraries
from kafka import SimpleProducer, SimpleClient
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
print("I am at line 9")
consumer_key = "xxxxxxxxxxxxx"
consumer_secret = "xxxxxxxxxxxxx"
access_token = "293636697-xxxxxxxxxxxxxxxxxx"
access_token_secret = "xxxxxxxxxxxxxxxxxxx"
topic = 'twitter-stream'
# setting up Kafka producer
print("Trying to set kafka producer")
kafka = SimpleClient('localhost:9092')
print("Done with setting up of producer")
producer = SimpleProducer(kafka)
print("###############")
#This is a basic listener that just put received tweets to kafka cluster.
class StdOutListener(StreamListener):
def on_data(self, data):
#print("****Inside ON DATA function****")
producer.send_messages(topic, data.encode('utf-8'))
#print("Printing the message on console")
print(data)
return True
def on_error(self, status):
print status
#WORDS_TO_TRACK = "the to and is in it you of for on my that at with me do have just this be so are not was but out up what now new from your like good no get all about we if time as day will one how can some an am by going they go or has know today there love more work too got he back think did when see really had great off would need here thanks been still people who night want why home should well much then right make last over way does getting watching its only her post his morning very she them could first than better after tonight our again down news man looking us tomorrow best into any hope week nice show yes where take check come fun say next watch never bad free life".split()
WORDS_TO_TRACK = "modi india".split()
if __name__ == '__main__':
print 'running the twitter-stream python code'
#This handles Twitter authetification and the connection to Twitter Streaming API
l = StdOutListener()
print("Trying to connect to Handler")
auth = OAuthHandler(consumer_key, consumer_secret)
print("Auth basic done")
auth.set_access_token(access_token, access_token_secret)
print("auth final done")
stream = Stream(auth, l)
print(stream)
# Goal is to keep this process always going
while True:
try:
print("Let us get the response and filter")
stream.filter(languages=["en"], track=WORDS_TO_TRACK)
except Exception as err:
#pass
print(str(err))
Kafka_twitter.py
from __future__ import print_function
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_twitter.py <zk> <topic>", file=sys.stderr)
exit(-1)
print("Starting python streaming context....")
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 10)
zkQuorum, topic = sys.argv[1:]
print("Creating stream....")
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
print("Loading data.....")
parsed = kvs.map(lambda v: json.loads(v[1]))
#print("Printing the records below....")
#print(parsed)
#text_counts = parsed.map(lambda tweet: (tweet['text'], 1)).reduceByKey(lambda x, y: x + y)
#text_counts.pprint()
try:
#author_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'], 1)).reduceByKey(lambda x, y: x + y)
text_counts = parsed.map(lambda tweet: (tweet['text'].encode('utf-8'), 1)).reduceByKey(lambda x, y: x + y)
text_counts.pprint(num=100)
#author_counts.pprint()
#print("end")
except Exception as err:
pass
ssc.start()
ssc.awaitTermination()
Commands:
To start a zookeeper server: zkserver
C:\Users\lokesh.nanda\kafka_2.11-0.11.0.0>.\bin\windows\kafka-server-start.bat
.\config\server.properties
C:\Users\lokesh.nanda\kafka_2.11-0.11.0.0\bin\windows>kafka-console-consumer.bat
--zookeeper localhost:2181 --topic twitter-stream --from-beginning
C:\WINDOWS\system32>spark-submit
--jars C:\Users\lokesh.nanda\Desktop\spark-streaming-kafka-0-8_2.11-2.0.0.jar
C:\Users\lokesh.nanda\Desktop\kafka_twitter.py localhost:2181 twitter-stream
No comments:
Post a Comment