Saturday 26 December 2015

Running Process in Backend in Linux Environment for indefinite Time

Sometimes we need to run some process in backend continously once we invoke the process.
There is one commend for that in linux nohup

Example :

nohup command-to-execute

nohup jupyter-notebook

Thursday 24 December 2015

How to create External Table in Hive with Partitions And Load Data ?

Start with Create Table 


Create EXTERNAL TABLE Countries(
Id TINYINT,
Country String,
udate String,
UPDATE_DT String,
ACTIVE_FLAG String)
PARTITIONED BY (INSERT_DT String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','

Location '/training/test/';

Now table is create d in Hive but data is still not in hive tables.

Data can be loaded into partitions table in two ways :

1)Static partitions Insert
2) Dynamic Partition Insert

1.Static Partitions Insert

LOAD Data into partiton table by giving path of file
ALTER TABLE Countries ADD PARTITION(INSERT_DT='25-12-2015')
LOCATION '/training/test/25-12-2015';


In case you want to delete or drop partitions from existing table :

ALTER TABLE Countries PARTITION(INSERT_DT='25-12-2015')
SET LOCATION '/training/test/25th';

Note :This won't delete the existing data .it will simply change the location of partiton data.

2. Dynamic Partitions

create a new Hive Table

Create EXTERNAL TABLE Countries_dynamic(
Id TINYINT,
Country String,
udate String,
UPDATE_DT String,
ACTIVE_FLAG String)
PARTITIONED BY (INSERT_DT String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','

Location '/training/test_dynamic/';

Set following features :

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;

Now to add data in partitions run following query :

INSERT OVERWRITE TABLE Countries_dynamic
PARTITION (dt) SELECT Id  ,
Country ,udate,UPDATE_DT,ACTIVE_FLAG ,INSERT_DT as dt

FROM Countries ;





Tuesday 22 December 2015

Execute a Sqoop Job using Windows Power Shell

#region - provide the following values

$subscriptionID = "XXXXXXXXXXXXXXXXXXXXXXXXXXX"

#region - variables

# Resource group variables
$resourceGroupName = "XXXXXXXXXXX"
$location = "XXXXXXXXXXX" # used by all Azure services defined in this tutorial


# HDInsight variables
$hdinsightClusterName = "XXXXXXXXXXXXXXXXX"
$defaultStorageAccountName = "XXXXXXXXXXXXXXXXXXX"
$defaultBlobContainerName = "XXXXXXXXXXXXXXXXXXXX"
$defaultStorageAccountKey=Get-AzureRmStorageAccountKey -ResourceGroupName $resourceGroupName -Name $defaultStorageAccountName | %{ $_.Key1 }


$username = "XXXXXXXXXX"
$password = " XXXXXXXXXX" | ConvertTo-SecureString -AsPlainText -Force
$httpCredential = New-Object -TypeName System.Management.Automation.PSCredential -ArgumentList $username, $password
#endregion

#region - Connect to Azure subscription
Write-Host "`nConnecting to your Azure subscription ..." -ForegroundColor Green
try{Get-AzureRmContext}
catch{Login-AzureRmAccount}
#endregion

#region - Create Azure resouce group
Write-Host "`nCreating an Azure resource group ..." -ForegroundColor Green
try{
    Get-AzureRmResourceGroup -Name $resourceGroupName
}
catch{
    New-AzureRmResourceGroup -Name $resourceGroupName -Location $location
}
#endregion

# Enter Table Name
$tableName_log4j = "XXXXXXXXXXXXXX"

# Connection string for Remote SQL Database.

$connectionString = "jdbc:sqlserver://sqlserver-vm1.cloudapp.net:1433;database=XXXXXXXXXXXXX;username=XXXXXXX;password=XXXXXXXX"

# Submit a Sqoop job
$sqoopDef = New-AzureRmHDInsightSqoopJobDefinition `
    -Command "import --connect $connectionString --table $tableName_log4j --num-mappers 32 --null-string '\\N' --null-non-string '\\N' --target-dir  /sqlserverdump/XXXXXXXXXXX"
$sqoopJob = Start-AzureRmHDInsightJob `
                -ClusterName $hdinsightClusterName `
                -HttpCredential $httpCredential `
                -JobDefinition $sqoopDef #-Debug -Verbose
Wait-AzureRmHDInsightJob `
    -ResourceGroupName $resourceGroupName `
    -ClusterName $hdinsightClusterName `
    -HttpCredential $httpCredential `
    -JobId $sqoopJob.JobId

Write-Host "Standard Error" -BackgroundColor Green
Get-AzureRmHDInsightJobOutput -ResourceGroupName $resourceGroupName -ClusterName $hdinsightClusterName -DefaultStorageAccountName $defaultStorageAccountName -DefaultStorageAccountKey $defaultStorageAccountKey -DefaultContainer $defaultBlobContainerName -HttpCredential $httpCredential -JobId $sqoopJob.JobId -DisplayOutputType StandardError
Write-Host "Standard Output" -BackgroundColor Green
Get-AzureRmHDInsightJobOutput -ResourceGroupName $resourceGroupName -ClusterName $hdinsightClusterName -DefaultStorageAccountName $defaultStorageAccountName -DefaultStorageAccountKey $defaultStorageAccountKey -DefaultContainer $defaultBlobContainerName -HttpCredential $httpCredential -JobId $sqoopJob.JobId -DisplayOutputType StandardOutput

#endregion

Thursday 17 December 2015

Working with Sqoop


  • Used to import data from traditional RDBMS to HDFS/Hive/HBase etc and vice-versa
  • Best approach for filtering :
    • Run Query in RDBMS -> Create a temp table there -> Import this temp table using Sqoop.
  • Password passed in Sqoop Query?
    • Use -P : Prompts user to enter password.
    • Save password in a file -> in query mention: -- password-file
  • Default, outputs CSV file in HDFS after import:
    • Avro support : -- as-avrodatafile
    • SequenceFile : -- as-sequencefile
  • Compression Support :
    • --compress --compression-codec..............
    • Splittable : Bzip2, LZO
    • Not Splittable: GZip,Snappy
  • For faster transfer:
    • --direct : Supported for MySql, PostGreSql
  • -- map-column-java col1=String,col2=Float  (Change Col type while importing from RDBMS)
  • CSV output file, does not handle BLANK values well, so.
    • If colType = VARCHAR,CHAR,NCHAR,TEXT 
      • --null-string '\\N'
    • If any other colType
      • --null-non-string '\\N'
  • Import all tables from a DB? 
    • sqoop import-all-tables 
    • Tables imported in sequential order
    • option of --exclude-tables
    • Cannot use --target-dir instead --warehouse-dir is fine.
  • Incremental in Sqoop:
    • When getting new rows and existing is not change:
      • Use --incemental append --check-column id --last-value 0
    • When data is changed:
      • Use --incremental lastmodified --check-column  --last-value
  • Create Sqoop job for automatic pickup of last-value:
    • sqoop job --create name_of_job --import --connect...............
    • sqoop job --list
    • sqoop job --exec name_of_job
      • Sqoop will searialise last imported value back to metastore after each successful incremental job
  • Use boundary query for optimization:
    • --username sqoop --password sqoop --query 'SELECT normcities.id, countries.country, normcities.city FROM normcities JOIN countries USING(country_id) WHERE $CONDITIONS' --split-by id --target-dir cities --boundary-query "select min(id), max(id) from normcities"

Monday 14 December 2015

Load CSV data to Hbase Using Pig


  1. Open hbase-shell
  2. Create a table:
    • create 'mydata1','mycf'
  3. Open pig shell
  4. A = LOAD '/lokesh/hbasetest.txt' USING PigStorage(',') as (strdata:chararray, intdata:long);
  5. STORE A INTO 'hbase://mydata1' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('mycf:intdata');
  6. Done!!!!!!


Wednesday 9 December 2015

Load data from MYSQL and dump to S3 using SPARK


RDBMS IMPORT



***** Using Python *******

pyspark --jars /mnt/resource/lokeshtest/guava-12.0.1.jar,/mnt/resource/lokeshtest/hadoop-aws-2.6.0.jar,/mnt/resource/lokeshtest/aws-java-sdk-1.7.3.jar,/mnt/resource/lokeshtest/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar --packages com.databricks:spark-csv_2.10:1.2.0

from pyspark import SQLContext

sqlcontext=SQLContext(sc)

dataframe_mysql = sqlcontext.read.format("jdbc").options(url="jdbc:mysql://YOUR_PUBLIC IP:3306/DB_NAME",driver = "com.mysql.jdbc.Driver",dbtable = "TBL_NAME",user="sqluser",password="sqluser").load()

dataframe_mysql.show()



****** Using Scala *******

sudo -u root spark-shell --jars /mnt/resource/lokeshtest/guava-12.0.1.jar,/mnt/resource/lokeshtest/hadoop-aws-2.6.0.jar,/mnt/resource/lokeshtest/aws-java-sdk-1.7.3.jar,/mnt/resource/lokeshtest/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar --packages com.databricks:spark-csv_2.10:1.2.0

import org.apache.spark.sql.SQLContext

val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

val dataframe_mysql = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://YOUR_PUBLIC IP:3306/DB_NAME").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "TBL_NAME").option("user", "sqluser").option("password", "sqluser").load()

dataframe_mysql.show()


**********************************************************************************************************************************************************

****** Using Scala *******

Persist in Mem Cache:

dataframe_mysql.cache

Perform some transformation or filter on df using map, etc.

val filter_gta = dataframe_mysql.filter(dataframe_mysql("date") === "20151129")

Optional: Repartition Data:

filter_gta.repartition(1)

Save to S3 as CSV:

filter_gta.write.format("com.databricks.spark.csv").option("header","true").save("s3n://YOUR_KEY:YOUR_SECRET@BUCKET_NAME/resources/spark-csv/mysqlimport1.csv")


************************************************************************************************************************************************************

Spark and AWS Sample Code

Load CSV data to Amazon S3

***** Using Python *******

pyspark --jars /mnt/resource/lokeshtest/guava-12.0.1.jar,/mnt/resource/lokeshtest/hadoop-aws-2.6.0.jar,/mnt/resource/lokeshtest/aws-java-sdk-1.7.3.jar --packages com.databricks:spark-csv_2.10:1.2.0

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('s3n://YOUR_KEY:YOUR_SECRET@BUCKET_NAME/resources/spark-csv/sparksamplecsv.csv')

print df.show()



****** Using Scala *******

sudo -u root spark-shell --jars /mnt/resource/lokeshtest/guava-12.0.1.jar,/mnt/resource/lokeshtest/hadoop-aws-2.6.0.jar,/mnt/resource/lokeshtest/aws-java-sdk-1.7.3.jar --packages com.databricks:spark-csv_2.10:1.2.0

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("s3n://YOUR_KEY:YOUR_SECRET@BUCKET_NAME/resources/spark-csv/sparksamplecsv.csv")

df.show()


**********************************************************************************************************************************************************

Monday 7 December 2015

For using AWS in Spark

Add following jars at runtime in spark-classpath :

1. hadoop-aws-2.6.0.jar
2. aws-java-sdk-1.7.3.jar
3.guava-12.0.1.jar

spark-shell --jars  hadoop-aws-2.6.0.jar,aws-java-sdk-1.7.3.jar,guava-12.0.1.jar  (for spark with scala)
pyspark --jars  hadoop-aws-2.6.0.jar,aws-java-sdk-1.7.3.jar,guava-12.0.1.jar  (for spark with python)