Wednesday, 20 September 2017

Work with Kafka in Windows

If you are looking to test out kafka in windows OS:

Below is a blog, which explains the same in detail:

https://dzone.com/articles/running-apache-kafka-on-windows-os

Also below is simple code for connecting kafka and pyspark by streaming twitter data.

NOTE: 
Test.py == kafkaproducer
Kafka_twitter == pysparkconsumercode


Test.py:

#!/usr/bin/env python

# import required libraries
from kafka import SimpleProducer, SimpleClient
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream

print("I am at line 9")
consumer_key = "xxxxxxxxxxxxx"
consumer_secret = "xxxxxxxxxxxxx"
access_token = "293636697-xxxxxxxxxxxxxxxxxx"
access_token_secret = "xxxxxxxxxxxxxxxxxxx"
topic = 'twitter-stream'
# setting up Kafka producer
print("Trying to set kafka producer")
kafka = SimpleClient('localhost:9092')
print("Done with setting up of producer")
producer = SimpleProducer(kafka)
print("###############")

#This is a basic listener that just put received tweets to kafka cluster.
class StdOutListener(StreamListener):
    def on_data(self, data):
        #print("****Inside ON DATA function****")
        producer.send_messages(topic, data.encode('utf-8'))
        #print("Printing the message on console")
        print(data)
        return True

    def on_error(self, status):
        print status

#WORDS_TO_TRACK = "the to and is in it you of for on my that at with me do have just this be so are not was but out up what now new from your like good no get all about we if time as day will one how can some an am by going they go or has know today there love more work too got he back think did when see really had great off would need here thanks been still people who night want why home should well much then right make last over way does getting watching its only her post his morning very she them could first than better after tonight our again down news man looking us tomorrow best into any hope week nice show yes where take check come fun say next watch never bad free life".split()
WORDS_TO_TRACK = "modi india".split()

if __name__ == '__main__':
    print 'running the twitter-stream python code'
    #This handles Twitter authetification and the connection to Twitter Streaming API
    l = StdOutListener()
    print("Trying to connect to Handler")
    auth = OAuthHandler(consumer_key, consumer_secret)
    print("Auth basic done")
    auth.set_access_token(access_token, access_token_secret)
    print("auth final done")
    stream = Stream(auth, l)
    print(stream)
    # Goal is to keep this process always going
    while True:
        try:
            print("Let us get the response and filter")
            stream.filter(languages=["en"], track=WORDS_TO_TRACK)
        except Exception as err:
            #pass
            print(str(err))


Kafka_twitter.py

from __future__ import print_function

import sys
import json

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_twitter.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    print("Starting python streaming context....")
    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    zkQuorum, topic = sys.argv[1:]
    print("Creating stream....")
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    print("Loading data.....")
    parsed = kvs.map(lambda v: json.loads(v[1]))
    #print("Printing the records below....")
    #print(parsed)
    #text_counts = parsed.map(lambda tweet: (tweet['text'], 1)).reduceByKey(lambda x, y: x + y)
    #text_counts.pprint()
    try:
        #author_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'], 1)).reduceByKey(lambda x, y: x + y)
        text_counts = parsed.map(lambda tweet: (tweet['text'].encode('utf-8'), 1)).reduceByKey(lambda x, y: x + y)
        text_counts.pprint(num=100)
        #author_counts.pprint()
        #print("end")
    except Exception as err:
        pass

    ssc.start()

    ssc.awaitTermination()


Commands:

To start a zookeeper server:     zkserver

C:\Users\lokesh.nanda\kafka_2.11-0.11.0.0>.\bin\windows\kafka-server-start.bat .\config\server.properties

C:\Users\lokesh.nanda\kafka_2.11-0.11.0.0\bin\windows>kafka-console-consumer.bat --zookeeper localhost:2181 --topic twitter-stream --from-beginning


C:\WINDOWS\system32>spark-submit --jars C:\Users\lokesh.nanda\Desktop\spark-streaming-kafka-0-8_2.11-2.0.0.jar C:\Users\lokesh.nanda\Desktop\kafka_twitter.py localhost:2181 twitter-stream

No comments:

Post a Comment