Wednesday, 20 September 2017

Work with Spark in Windows

Quick easy guide to setup spark in Windows

Windows x64197.78 MB  jdk-8u144-windows-x64.exe
  • Open command prompt and type java -version    
  • The above should give you a response. Java installation is done now.


  • Now, create the below folder structure in C:\
C:\Hadoop\bin
Copy the downloaded winutils.exe, in the above path.
  • Create a new system env variable - "HADOOP_HOME" and its value "C:\Hadoop" 
  • Now, open command line terminal as administrator, and enter the below commands:
C:\WINDOWS\system32>cd \
C:\>mkdir tmp
C:\>cd tmp
C:\tmp>mkdir hive
C:\tmp>c:\hadoop\bin\winutils chmod 777 \tmp\hive
C:\tmp>

  • Now we need to download spark, https://spark.apache.org/downloads.html
  • Once this is done, extract the downloaded file using 7zip. (You need to extract twice, as it is tar.gz)
  • Now create a folder "spark" in C:\\
  • Copy the contents of "C:\Users\Lokesh\Downloads\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\" to C:\spark\
  • Now go to system environment variables, and create a variable "SPARK_HOME" and its value as "C:\spark"
  • Now, it is time to test spark, open cmd as administrator and type
    • C:\spark\bin\spark-shell

Work with Kafka in Windows

If you are looking to test out kafka in windows OS:

Below is a blog, which explains the same in detail:

https://dzone.com/articles/running-apache-kafka-on-windows-os

Also below is simple code for connecting kafka and pyspark by streaming twitter data.

NOTE: 
Test.py == kafkaproducer
Kafka_twitter == pysparkconsumercode


Test.py:

#!/usr/bin/env python

# import required libraries
from kafka import SimpleProducer, SimpleClient
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream

print("I am at line 9")
consumer_key = "xxxxxxxxxxxxx"
consumer_secret = "xxxxxxxxxxxxx"
access_token = "293636697-xxxxxxxxxxxxxxxxxx"
access_token_secret = "xxxxxxxxxxxxxxxxxxx"
topic = 'twitter-stream'
# setting up Kafka producer
print("Trying to set kafka producer")
kafka = SimpleClient('localhost:9092')
print("Done with setting up of producer")
producer = SimpleProducer(kafka)
print("###############")

#This is a basic listener that just put received tweets to kafka cluster.
class StdOutListener(StreamListener):
    def on_data(self, data):
        #print("****Inside ON DATA function****")
        producer.send_messages(topic, data.encode('utf-8'))
        #print("Printing the message on console")
        print(data)
        return True

    def on_error(self, status):
        print status

#WORDS_TO_TRACK = "the to and is in it you of for on my that at with me do have just this be so are not was but out up what now new from your like good no get all about we if time as day will one how can some an am by going they go or has know today there love more work too got he back think did when see really had great off would need here thanks been still people who night want why home should well much then right make last over way does getting watching its only her post his morning very she them could first than better after tonight our again down news man looking us tomorrow best into any hope week nice show yes where take check come fun say next watch never bad free life".split()
WORDS_TO_TRACK = "modi india".split()

if __name__ == '__main__':
    print 'running the twitter-stream python code'
    #This handles Twitter authetification and the connection to Twitter Streaming API
    l = StdOutListener()
    print("Trying to connect to Handler")
    auth = OAuthHandler(consumer_key, consumer_secret)
    print("Auth basic done")
    auth.set_access_token(access_token, access_token_secret)
    print("auth final done")
    stream = Stream(auth, l)
    print(stream)
    # Goal is to keep this process always going
    while True:
        try:
            print("Let us get the response and filter")
            stream.filter(languages=["en"], track=WORDS_TO_TRACK)
        except Exception as err:
            #pass
            print(str(err))


Kafka_twitter.py

from __future__ import print_function

import sys
import json

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_twitter.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    print("Starting python streaming context....")
    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    zkQuorum, topic = sys.argv[1:]
    print("Creating stream....")
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    print("Loading data.....")
    parsed = kvs.map(lambda v: json.loads(v[1]))
    #print("Printing the records below....")
    #print(parsed)
    #text_counts = parsed.map(lambda tweet: (tweet['text'], 1)).reduceByKey(lambda x, y: x + y)
    #text_counts.pprint()
    try:
        #author_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'], 1)).reduceByKey(lambda x, y: x + y)
        text_counts = parsed.map(lambda tweet: (tweet['text'].encode('utf-8'), 1)).reduceByKey(lambda x, y: x + y)
        text_counts.pprint(num=100)
        #author_counts.pprint()
        #print("end")
    except Exception as err:
        pass

    ssc.start()

    ssc.awaitTermination()


Commands:

To start a zookeeper server:     zkserver

C:\Users\lokesh.nanda\kafka_2.11-0.11.0.0>.\bin\windows\kafka-server-start.bat .\config\server.properties

C:\Users\lokesh.nanda\kafka_2.11-0.11.0.0\bin\windows>kafka-console-consumer.bat --zookeeper localhost:2181 --topic twitter-stream --from-beginning


C:\WINDOWS\system32>spark-submit --jars C:\Users\lokesh.nanda\Desktop\spark-streaming-kafka-0-8_2.11-2.0.0.jar C:\Users\lokesh.nanda\Desktop\kafka_twitter.py localhost:2181 twitter-stream

Sunday, 14 May 2017

Start Ambari Services automatically on cluster startup

#!/bin/bash

set -x

# =========================================================== #
# Created By: Lokesh Nanda
# Date: 14/03/2016
# Starts ambari services
# Configured to start HDFS,YARN,MapReduce2,Hive,ZooKeper,Kafka,Spark,Spark2
# =========================================================== #

curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Start HDFS via REST"}, "Body": {"ServiceInfo": {"state": "STARTED"}}}' http://localhost:8080/api/v1/clusters/mydevcluster/services/HDFS

curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Start YARN via REST"}, "Body": {"ServiceInfo": {"state": "STARTED"}}}' http://localhost:8080/api/v1/clusters/mydevcluster/services/YARN

curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Start MapReduce2 via REST"}, "Body": {"ServiceInfo": {"state": "STARTED"}}}' http://localhost:8080/api/v1/clusters/mydevcluster/services/MAPREDUCE2

curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Start ZooKeeper via REST"}, "Body": {"ServiceInfo": {"state": "STARTED"}}}' http://localhost:8080/api/v1/clusters/mydevcluster/services/ZOOKEEPER

#curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Start Kafka via REST"}, "Body": {"ServiceInfo": {"state": "STARTED"}}}' http://localhost:8080/api/v1/clusters/mydevcluster/services/KAFKA

curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Start Spark via REST"}, "Body": {"ServiceInfo": {"state": "STARTED"}}}' http://localhost:8080/api/v1/clusters/mydevcluster/services/SPARK

#curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Start Spark2 via REST"}, "Body": {"ServiceInfo": {"state": "STARTED"}}}' http://localhost:8080/api/v1/clusters/mydevcluster/services/SPARK2

curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Start Hive via REST"}, "Body": {"ServiceInfo": {"state": "STARTED"}}}' http://localhost:8080/api/v1/clusters/mydevcluster/services/HIVE


1. Save the above file start-ambari.sh
2. Enter in terminal
    crontab -e
    @reboot /home/lokesh/Scripts/start-ambari.sh
3. Save and exit