KAFKA-TWITTER IN HORTONWORKS
- Make sure kafka and zookeeper are running from yourhostname.cloudapp.net:8080
- Check if port for zookeeper is 2181 and kafka : 6667
- Create a Topic :
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper yourhostname.cloudapp.net:2181 --replication-factor 1 --partitions 1 --topic twitter-topic
- Verify if topic is created
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper yourhostname.cloudapp.net:2181
- Create a java mavaen project :
package SampleTwitterKafka;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
public class TwitterKafkaProducer {
private static final String topic = "twitter-topic";
public static void run(String consumerKey, String consumerSecret,
String token, String secret) throws InterruptedException {
Properties properties = new Properties();
properties.put("metadata.broker.list", "yourhostname.cloudapp.net:6667");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("client.id","camus");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(
producerConfig);
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
// add some track terms
endpoint.trackTerms(Lists.newArrayList("#ALDUB14thWeeksary",
"#MagpasikatAnneKimEruption", "#happydussehra", "ItsShowtime DARREN"));
Authentication auth = new OAuth1(consumerKey, consumerSecret, token,
secret);
// Authentication auth = new BasicAuth(username, password);
// Create a new BasicClient. By default gzip is enabled.
Client client = new ClientBuilder().hosts(Constants.STREAM_HOST)
.endpoint(endpoint).authentication(auth)
.processor(new StringDelimitedProcessor(queue)).build();
// Establish a connection
client.connect();
// Do whatever needs to be done with messages
for (int msgRead = 0; msgRead < 1000; msgRead++) {
KeyedMessage<String, String> message = null;
try {
message = new KeyedMessage<String, String>(topic, queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.send(message);
}
producer.close();
client.stop();
}
public static void main(String[] args) {
try {
TwitterKafkaProducer.run("XXXXXXXXXXXXXX", "XXXXXXXXXXXXXX", "XXXXXXXXXXXXXX", "XXXXXXXXXXXXXX");
} catch (InterruptedException e) {
System.out.println(e);
}
}
}
***********************************************************************************
POM File :
<dependencies>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
<version>2.2.0</version> <!-- or whatever the latest version is -->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.0</artifactId>
<version>0.8.1.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
<exclusions>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
</dependencies>
***********************************************************
- Change the fields, highlighted in yellow, in the java code, and create a runnable jar along with entry to class name
- Copy the jar to the linux machine : yourhostname.cloudapp.net
- Run jar in linux terminal:
java -jar twitter-snapshotv1.jar
- Check for producer in a new Terminal :
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper yourhostname.cloudapp.net:2181 --topic twitter-topic --from-beginning
BINGO !!!
- Make sure kafka and zookeeper are running from yourhostname.cloudapp.net:8080
- Check if port for zookeeper is 2181 and kafka : 6667
- Create a Topic :
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper yourhostname.cloudapp.net:2181 --replication-factor 1 --partitions 1 --topic twitter-topic
- Verify if topic is created
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper yourhostname.cloudapp.net:2181
- Create a java mavaen project :
package SampleTwitterKafka;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
public class TwitterKafkaProducer {
private static final String topic = "twitter-topic";
public static void run(String consumerKey, String consumerSecret,
String token, String secret) throws InterruptedException {
Properties properties = new Properties();
properties.put("metadata.broker.list", "yourhostname.cloudapp.net:6667");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("client.id","camus");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(
producerConfig);
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
// add some track terms
endpoint.trackTerms(Lists.newArrayList("#ALDUB14thWeeksary",
"#MagpasikatAnneKimEruption", "#happydussehra", "ItsShowtime DARREN"));
Authentication auth = new OAuth1(consumerKey, consumerSecret, token,
secret);
// Authentication auth = new BasicAuth(username, password);
// Create a new BasicClient. By default gzip is enabled.
Client client = new ClientBuilder().hosts(Constants.STREAM_HOST)
.endpoint(endpoint).authentication(auth)
.processor(new StringDelimitedProcessor(queue)).build();
// Establish a connection
client.connect();
// Do whatever needs to be done with messages
for (int msgRead = 0; msgRead < 1000; msgRead++) {
KeyedMessage<String, String> message = null;
try {
message = new KeyedMessage<String, String>(topic, queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.send(message);
}
producer.close();
client.stop();
}
public static void main(String[] args) {
try {
TwitterKafkaProducer.run("XXXXXXXXXXXXXX", "XXXXXXXXXXXXXX", "XXXXXXXXXXXXXX", "XXXXXXXXXXXXXX");
} catch (InterruptedException e) {
System.out.println(e);
}
}
}
***********************************************************************************
POM File :
<dependencies>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
<version>2.2.0</version> <!-- or whatever the latest version is -->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.0</artifactId>
<version>0.8.1.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
<exclusions>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
</dependencies>
***********************************************************
- Change the fields, highlighted in yellow, in the java code, and create a runnable jar along with entry to class name
- Copy the jar to the linux machine : yourhostname.cloudapp.net
- Run jar in linux terminal:
java -jar twitter-snapshotv1.jar
- Check for producer in a new Terminal :
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper yourhostname.cloudapp.net:2181 --topic twitter-topic --from-beginning
BINGO !!!
No comments:
Post a Comment