Search This Blog

Sunday, 31 December 2017

BigData - Data Transformations with Apache Pig

1.       Pig is used for ETL operations designed to work on both structured and unstructured data
2.       Hive is designed to work on Structured and Unstructured data storage but not designed for data transformations. In very simple terms, Hive is a data warehouse and Pig is ETL to get data loaded into Hive.
3.       Relations are basic structures in Pig to hold data on which transformations are performed. Relations are similar to variables.
4.       We will use LOAD to load data into Pig relations, STORE to store data into files, DUMP to display data in Pig
5.       General structure of Pig program is
a.       Load data into Pig
b.       Data is stored in a Relation
c.       Data in Relation is subject to transformation and updates
d.       Once transformation and updates are completed, store final data to file or display on screen
6.       Pig commands can be executed in interactive mode or batchmode
a.       To launch interactive mode, we just have to type “pig”  then it will launch the Grunt Shell. By default Pig works with HDFS file system. If we want to work with local file system then we have to launch pig grunt shell using “pig -x local”
b.       Batch mode is used to run the pig scripts stored in a file. This is can be done by supplying file name to pig command like “pig file.pig” (or -f option can be added for more clarity)
c.       Using “pig --help” for any additional information
7.       Relations in Pig are immutable. Any updates to relations create new relation.
8.       Relations store in memory and exists only during a single Pig session. As soon as session completed, related are cleared from memory. For persistence of relations, we must store them in a physical file.
9.       Relations are not evaluated until they are displayed (DUMP) or written to file (STORE) . This is called lazy evaluations. This allows Pig to perform optimization techniques.
10.   PigStorage() – used for loading data into Pig relations
a.       Bulkdeals = load ‘/home/rajendra/IdeaProjects/BigData-Practice/data/input/01-01-2013-TO-31-12-2013_bulk.csv’ using PigStorage(‘|’); ------- default delimiter PigStorage functions uses is tab (‘\t’)
b.       Load command does not delete duplicates. It also accepts directories and loads all files in the directory , and duplicate records are retained.
c.       Load in above point (a) is loaded without schema. We can also load using a schema as below
load ‘/home/rajendra/IdeaProjects/BigData-Practice/data/input/01-01-2013-TO-31-12-2013_bulk.csv’ using PigStorage(‘|’)
as
(
Tdate: datetime,
Symbol: chararray,
)
d.       If we use describe, then we can see the schema of a relation
11.   FOR EACH
a.       Using for each, we can loop through related data structure and generate a specific elemenet using index (index starts from 0).. for example “tdateandsymbol = foreach Bulkdeals generate $0, $1;”
b.       We can also use field names for schema defined relations
12.   SPLIT
a.       Splits a relation into multiple relations
b.       Example ; “split orders into orders_1 if (order ==1) , order_more if (order > 1);”
13.   FILTER
a.       Filters relations based on a condition
b.       “orders_filter = filter order by order >= ‘o4’;”
14.   DISTINCT,LIMIT, ORDER BY
a.       Orders_no_duplicates = distinct orders;
b.       Orders_3 = limit order 3;
c.       Orders_desc = order orders by order_id desc;
15.   LIMIT function can be used to select limited number of records example 5bulkdeals = limit bulkdeals 5;
16.   STORE function can be used as “store 5bulkdeals into ‘subdir’ using PigStorage();”… note subdir should not existing before running the store command.
17.   Case sensitivity
a.       Following care case sensitive
                                                               i.      Relation names
                                                             ii.      Field names within relations
                                                           iii.      Function names such as PigStorage(), SUM(), COUNT() etc
b.       Following are not case sensitive
                                                               i.      Keywords such as load, store, foreach, generate, group by , order by , dump
18.   Data Types
a.       Scalar or primitive that represents single entity
                                                               i.      Boolean
1.       To represent true or false values
                                                             ii.      Numeric
1.       int – 4 bytes , 231 to 231 -1
2.       long – 8byes
3.       float – 4 bytes
4.       double – 8bytes
                                                           iii.      String
1.       chararray - Variable length unbounded, size not to be specified … i.e. fieldname: chaarray(01) is invalid where as fieldname: chararray is valid.
                                                           iv.      Date/time
1.       datetime  - represents date and time upto nano seconds
                                                             v.      Bytes
1.       bytearray – BLOB of data to represent anything.. when schema not given , then this is default datatype for unknow schema elements.
b.       Complex or collection types to represent group of entities
                                                               i.      Tuple
1.       Ordered collection of fields where each fields will its own primitive data type.. example (a,1) or (a,b,01-jan-2018,0,true)
2.       In a tuple if data type not mentioned for an element, default bytearray data type is assumed
3.       A tuple can be considered as a row in a traditional relational database
4.       TOTUPLE() can be used to generate a tuple from individual fields
5.       We can define tuple field as “field name: tuple(e1:chararray,e2:int,e3:Boolean)”
6.       Individual fields in the tuple can be accessed using . (dot)notation. For example if we have 3rd filed tuple in a relation then we can access tuple fields as $2.$0, $2.$1 etc..
                                                             ii.      Bag
1.       Unordered collection of tuples
2.       This is similar to set in java and python but it allows duplicates
3.       Are enclosed by {}
4.       A relation is nothing but collection of tuples.. this is called inner bag
5.       TOBAG() to convert fields to a bag structure
                                                           iii.      Map
1.       Enclosed by []
2.       Key value pairs.. key should always by chararray, but value can be any type
3.       # is delimiter.. for example [name#jon, job#engineer] and access it as $3#’name’
4.       TOMAP() to convert fields to a map structure
19.   Partial schema specification and casting
a.       While loading data , we can specify all field names along with data types.. this is called full schema specification
b.       We can just specify the field names but not the data types.. this is called partial schema specification
c.       We do not specify field names or data types, this is no schema..
d.       Pig works in all cases
e.       We can also cast fields to different data types using casing operators i.e. (int)$3 to convert 4th field to int.
f.        Bytearray type can be casted to any other datatype but other data types have limitation in terms of  implicit conversion or explicit casting.
20.   Pig Functions
a.       UDF – User Defined Functions
                                                               i.      PigStorage(), TOMAP(),TOBAG(),TOTUPLE() etc are build in UDFs
                                                             ii.      Build in funcitons can be categorized into 4 groups based on the functions they perform
1.       Load – loading data to Pig
a.       PigStorage()
b.       HBaseStorage()
c.       JsonLoader()
d.       AvroStorage()
e.       CSVExcelStorage()
2.       Store – storing data to a file
a.       Same functions as of load
3.       Evaluate – transformations on record or fields
a.       Math functions
                                                                                                                                       i.      ROUND(int) returns long type
b.       String functions
                                                                                                                                       i.      SUBSTRING(string,startpos,stoppos)
                                                                                                                                     ii.      REPLACE(String,existing,new)
c.       Data functions
                                                                                                                                       i.      ToDate(date,’dateformat’)
                                                                                                                                     ii.      GetMonth(‘date’)
d.       Complex type functions
                                                                                                                                       i.      TOMAP(),TOTUPLE(),TOBAG()
e.       Aggregate functions
4.       Filter – to filter individual records
21.   GROUP BY
a.       Order_grp = group by orders item;
b.       orders_cnt = foreach orders_grp generate group, SUM(orders.quantity);
22.   JOIN
a.       LEFT OUTER JOIN
                                                               i.      Names_trades_lo = join names by symbol left outer, trades by symbol;
b.       RIGHT OUTER JOIN
c.       FULL OUTER JOIN
d.       SELF JOIN or JOIN
                                                               i.      Names_trades_jn = join names by symbol, trades by symbol;
e.       CROSS JOIN
                                                               i.      Cartesian join
                                                             ii.      Names_trades_cross =  cross names, trades;
23.   UNION
a.       Both relations should have same number of fileds and compitable schema
b.       UNION does not preserve order of tuples
c.       Preserve duplicates
d.       all_names = union names, other_names;
24.   UNION when schema is mismatched
a.       When both relations have different fields then result will be null
b.       If the both relations have same number of fields but data type not matched, the pig will try to find common ground by casting to higher type
c.       If fields are of complex time with incompaitable inner fields then fields does not what to do therefore it will result in null
d.       UNION ONSCHEMA can be used to union relations with mismatched schema
e.       all_names_2 = union onschema names, other_names;
25.   FLATTEN
a.       Can be used to flatten bag type to primitive type fields row
b.       It will create separate records for each different value in complex fields
c.       Flatten_activities = foreach student_activity_bag generate name, flatten(activities) as activity;
26.   Nested foreach
a.       Collision_stats_nstd_foreach =  foreach collision_data { total = collision_data.SUM(total); generate total;}
b.       Within {}, we can use several intermediate operations like sort, order by , limit etc.. and generate intermediate relations.. at then using generate, we can generate the fields required


Monday, 25 December 2017

BigData - Analyzing BigData with Hive

1.       Hive provides structure to unstructured data by providing Schema
2.       Hive architecture consists relational metastore, this is hive stores metadata like table definitions, location of underlying data, data type information, how tables partitioned etc.. this metastore is relational database. By default Hive Installs light weight Derby database, however Derby database has many limitations especially multi user access therefore typically Hive metastore is installed in MySQL.
3.       HiveQL Drive is responsible for compiling, optimizing and translating HiveQL into 0 or many MapReduce jobs
4.       Web UI or HDInsight provide a webinterface where we can interact with hive using HiveSQL
5.       Hive manages 2 types of data
a.       Internal
                                                               i.      Hive warehouse (not to be confused with data warehouse term) consists of metastore (all meta data about objects) and data
                                                             ii.      Hive owns and manages the data stored in the Hive Warehouse.. i.e. if table is dropped then hive drops all metadata as well as actual data.
b.       External
                                                               i.      Data can be stored anywhere Hive knows how to communicated with. If data stored outside Hive warehouse, its called external data. Hive still manages metadata even for external data.
                                                             ii.      Hive can create tables for data stored externally. Hive does not own the data stored externally i.e. if table is dropped, only meta data is dropped not the actual data
                                                           iii.      User has flexibility of managing and storing data as fit
6.       Hive follows SQL syntax similar to MySQL
7.       From Hive interactive CLI, we can run linux shell commands by prefixing !.. for example if we want to see directory listing in HDFS from Hive CLI.. we can run “!hdfs dfs -ls /dirname”
8.       One of subtle difference between HiveSQ and SQL is that, HiveSQ uses Jave expressions. For example if want to select all columns except ID and NAME, we can write select ‘(ID|NAME)?+.+’ from tablename
9.       Another difference between HiveSQ and SQL is  that interchangeable constructs.. i.e. we can write “from tablename select fields where condition” instead of “select fields from tablename where condition”
10.   Hive is not case sensitive and hive expects explicit termination of sentence using “;” just like Java.
11.   Hive supports subqueries only in the FROM list.. but does not support subqueries in SELECT Column list
12.   When we create a database in Hive, its creates database.db file in /hive/warehouse directory. This default directory name can be changed while creating databse by supplying LOCATION clause value
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
 [COMMENT comments]
[LOCATION  path]
[WITH DBPROPERTIES (propname=value, propname=value ….)];

To switch current context and default to database name..
USE db_name
To drop database
DROP [DATABASE|SCHEMA] [IF EXISTS] db_name

Creating a table
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name datatype [COMMENT comments])]
[COMMENT comments]
[PARTITIONED BY (col_name datatype [COMMENT comments])]
[ROW FORMAT rowformat] [STORED as file_format]
[LOCATION hdfs_path]
[TBLPROPERTIES (propname=value, propname=value ….)];

13.   LOAD DATA INPATH ‘hdfs file path’ [OVERWRITE] INTO TABLE ‘tablename’ is to load a table from hdfs file.. after this command execution, the file gets deleted form source hdfs location. This is because as the source file is already on HDFS, hive just moves data and organizes instead of making copies of data
14.   LOAD DATA LOCAL INPATH ‘hdfs file path’ INTO TABLE ‘tablename’ is to load a table from local file.
15.   If we are loading a table using HDFS files then its better to create external tables keeping data in same location
16.   We can also use CREATE AS <table_name> SELECT <fields> FROM <table> to create a table and user CREATE TABLE <tablename> LIKE <tablename> to copy table structure;
17.   Hive data types
a.       Hive support basic data types and complex data types
                                                               i.      Basic datatypes
1.       Numeric
a.       TINYINT, SMALLINT, INT, BIGINT
b.       FLOAT
c.       DOUBLE
d.       DECIMAL
2.       Dates/Time
a.       DATE
b.       TIMESTAMP
3.       Others
a.       BOOLEAN
b.       STRING
c.       BINARY (for bigger like blob)
                                                             ii.      Complex types
1.       ARRAY
2.       MAP (Key value pair .. dictionay)
a.       Key must be one of primitive data type
b.       Value can be any data type including complex data type.. example nameToID=map('A',1,'B',2); and access it as nameToID['A'] 
3.       STRUCT
a.       Data structures type.. this is similar to creating our own structure or class in programming language
b.       STRUCT<fild1:type,field2:type…>
c.       We will use . notation to access STRUCT element .. like structname.field2
4.       UNIONTYPE
a.       Hive assumes one of the data types provided in the UNIONTYPE.. for example if we define a column   MISC UNIONTYPE<INT, STRING, ARRAY<double>>… MISC column can have any of 3 data types defined using UNIONTYPE and based on content hive assumes one of the data type from list
18.   Hive provides implicit data type conversion where small ones coverts automatically to counterpart higher ones example SMALLINT to INT is automatic. We can also explicitly convert using cast example CAST (value as datatype). CAST can be cascaded if needed.
19.   For Managed tables, we can partition tables in Hive for improving performance. The partitioning concept is same as other relational database.
a.       The partitions are created based on the columns.
b.       We have to specify PARTITIONED BY <column names> at the time of table creation
c.       Also we have to specify partition name at the time of LOAD DATA otherwise Hive will throw exception
d.       There will be separate folders created in datastore in HDFS for each partition
e.       Partition columns are virtual columns , they are not part of actual table data columns but they are represented as actual columns therefore we can perform all select operations on virtual columns as we do on actual columns.
f.        If table data column present in partition column then hive will throw exception therefore if we need a partition columns in data columns then we should rename data columns to something else so that partitions can be queried with virtual column name.
20.   For external tables in order to partition, the LOCATION is optional at CREATE statement. CREATE statement will only have PARTITIONED BY but the location is added via ALTER TABLE … ADD PARTITION statement
21.   If we added data in hive partitioned structure in HDFS file system but Hive not aware of this, we can repair table to update all new partitions meta data using “msck repair table table_name”;
22.    Hive allows Multiple Inserts using below syntax
FROM <from statement>
INSERT INTO TABLE …….
INSERT OVERWRITE TABLE……
INSERT OVERWRITE DIRECTORY ………….
Example
FROM source table
INSERT INTO TABLE targetable SELECT columns WHERE conditions (Note here the FROM block moved top therefore SELECT does not hve FROM block, Hive allows block interchange) (if we are inserting into partition then PARTITIONED BY should appear before SELECT)
1.       The advantage of multiple inserts is that Hive will have to scan the source table only once and perform all insert operations multiple types
23.   We can create dynamic partitions by supllying the column values from source table for example PARTITION BY (p1=”01-Jan_2013”, p2, p3)  .. here static partition column is p1 and p2, p3 are dynamic partiotion columns… all dynamic partition columns should appear after all static partition columns and at the end in select column list
24.   Maximum allowed dynamic partitions are 1000 by default, this can be changed by setting value to hive.exec.max.dynamic.partitions parameter.
25.   We can select a table and do GROUP BY.
a.       GROUPING SETS
                                                               i.      Grouping sets defines the groups.. for example
Select a,b,sum(c) from table group by a,b grouping sets((a,b),a) is equivalent to
Select a,b,sum(c) from table group by a,b
Union all
Select a,null,sum(c) from table group by a;
b.       WITH CUBE
                                                               i.      CUBE is nothing all possible group sets for the group by columns.. i.e. group by a,b,c WITH CUBE is equal to group by a, group by b, group by c, group by( a,b), group by( a,c), group by( c,b), group by( a,b,c) , group by( )
c.       WITH ROLLUP
                                                               i.      Its similar to GROUPING SETS but only applies to fields hierarchy. Top level in hierarchy is the first element provided in group by clause and second element in second level in hierarchy etc.. i.e. GROUP BY a,b,c WITH ROLLUP is equal to GROUP BY (a,b,c), GROUP BY (a,b) , GROUP BY a,  GROUP  BY ()
26.   Hive Build-In functions
a.       Mathemetical
                                                               i.      Rand() – generate random numbers
                                                             ii.      Pow(a,b) – ab
                                                           iii.      Abs(doubale a)
                                                           iv.      Round(double a, int d)
                                                             v.      Floor(double a)
                                                           vi.      ….
b.       Collection
                                                               i.      Size, map_keys(map<k.v>), array_contains(array,’test’)
c.       Type Conversion
d.       Date
                                                               i.      Unix_timestamp(), data_add(), to_date(), etc..
e.       Conditional
                                                               i.      If (cond, true, false)
                                                             ii.      Coalesce() --- null functions like nvl
                                                           iii.      CASE WHEN THEN END
f.        String
                                                               i.      Concat(), contat_ws() i.e. with separator, refex_replace(), substr()
g.       Misc.
                                                               i.       
h.       xPath
27.   UDAFs (user defined aggregate functions)
a.       Count, sum, avg, min, max, variance, stddev_pop, histogram_numeric(col,b)
28.   UDTFs (User defined table functions)
a.       Explode(), flattern() etc
29.   UDF – User defined functions
a.       We can create our own functions  by extending UDF baseclass and overwriting EVALUATE mentod.
b.       We can then create a JAR and add this jar to hive environment using ADD JAR command and also use DESCRIBE FUNCTION command to see the function specs described in @DESCRIPTION annotation given at the time of function definition
c.       We can create a temporary function as an alias to original function using “CREATE TEMPORARY FUNCTION fname as ‘jarname’;
d.       This temporary function is only available in the session.. but if  we want to make it available to all users, we need to create ../bin/.hiverc file or initialization file and supply initialization when starting hive session with -i option
30.   Sorting and controlling data flow
a.       In select query, we can user ORDER BY Clause to order the results. ORDER BY Clause trigger only one Reducer and sorts globally.
b.       If we want to run multiple Reducers parallelly to improve the performance and utilize distributed computing power, ORDER BY Clause does not work as it triggers only one Reducer
c.       We need to use SORT BY to trigger multiple Reducers. But the sorting is done per Reducer i.e. we will never see globally sorted list. Each reducer creates its own output part i.e. part-000000, part-000001 etc.. and each output is sorted.
d.       In order to control what data the each reducer process in SORT BY, we can use DISTRIBUTED BY.
e.       For example if we are interested to see each user login time ordered by time then we can write query  like SELECT userid, timestamp FROM table  DISTRIBUTED BY userid SORT BY time;
f.        If same columns is used in both DISTRIBUTED BY and SORT BY then we can use short notation CLUSTER BY
31.   Command line interface and variable substitution
a.       Hive -e ‘sql’ – To run HiveQL via command line in batch mode
b.       Hive -S -e ‘sql’ – To run HiveQL via command line in silent mode and in batch mode
c.       Hive -f filenameofHiveQL – To run HiveSQL that is stored in a file
d.       For variable substitution hive provide 4 namespaces. Each name space variable is designated for special purpose
                                                               i.      Hivevar  -- is place to define user defined variables  ex; set hivevar:variablename=value or hive -d varname=value then use it as ${hivevar:varname}
                                                             ii.      Hiveconf – is where hive stored most of configuration ex; set hiveconf:propertyname=value
                                                           iii.      System – for more Hadoop related properties ex; set hivevar:propertyname=value
                                                           iv.      Env – for environment variables ex; set hivevar:propertyname=value
32.   Bucketing
a.       Bucking is similar to partitioning. It can further split partitions. Bucket can be applied at table level or partition level.
b.       Bucking is an approach to cluster or distribute data for better performance at Map-side joins and more efficient sampling
c.       Buckets can also be sorted
d.       Example CREATE TABLE table (field1 type, field2 type….) CLUSTERED BY (field1) INTO 256 BUCKETS… in this case Hive runs HASH function against column field1 and split that into 256 buckets
e.       Another example  CREATE TABLE table (field1 type, field2 type….) PARTITIONED BY (field3 type) CLUSTERED BY (field1) SORTED BY (fields2) INTO 64 BUCKETS.. this bucketing on top of partitions and sorted within buckets. SORTED enables further optimization when using JOINs.. this will impact performance when saving the data into table but this still ok as we would write once but read several times.. therefore writing slowly is ok as long as read is fast.
f.        Hive does not control or enforce bucking during the data load. Its user responsibility enforcing bucking when loading data. We can use one of following 2 approaches
Set mapred.reduce.tasks=64
INSERT OVERWRITE INTO TABLE table SELECT columns TABLE table CLUSTER BY col;
Or
Set hive.enforce.bucketing=true;
INSERT OVERWRITE TABLE table SELECT columns FROM table;
33.   Data Sampling
a.       Data Sampling is concerned with working with subset of data from very large data set
b.       Data Sampling can be applied to any table whether bucketed or not… however bucketed table improves performance
c.       Syntax is using TABLESAMPLE keyword example SELECT * FROM sourcetable TABLESAMPLE(BUCKET x OUT OF y [ON name])… even though we use BUCKET key word , it does not mean we have to use only bucketed tables
d.       We can also sampling data using block percentage i.e. SELECT * FROM sourcetable TABLESAMPLE(n PERCENT); but this approach does not work always
34.   JOINs
a.       JOIN (inner join) --- returns only matched from both sides
b.       LEFT, RIGHT, FULL [OUTER] JOIN
c.       LEFT SEMI JOIN
                                                               i.      IN or EXISTS do not work in Hive.. we need to use Semi Join.. for example SELECT col1 FROM t1 WHERE EXISTS (SELECT 1 FROM t2 WHERE t1.col1=t2.col2) do not work.. instead we need to write SELECT col1 FROM t1 LEFT SEMI JOIN t2 ON t1.col1=t2.col1
d.       CROSS JOIN same as Cartesian join.
e.       Hive does not support >,<,=>,=< etc in join conditions.. it only support = in join conditions
f.        STREAMABLE – hive always assumes the small table comes first in the multi join.. i.e. in join a,b,c tables hive assumes that a<b<c .. this can be changed using STREAMABLE key word in select col list
g.       Other ways of optimizing joins are Map Join, Map-side join (works on bucketed tables), distributed cache
35.   Windowing and analytical functions
a.       LEAD/LAG
b.       FIRST_VALUE
c.       LAST_VALUE
d.       PARTITION BY
e.       OVER Clause
f.        WINDOW
g.       RANK, ROW_NUMBER, DENSE_RANK etc

36.   Hive has TRANSFORM clause, this allow hive data to be sent to STDIN and process this STDIN using external programs like shell script or python or java etc..  example SELECT TRANSFORM (f1,f2) USING ‘./file.sh’ AS (f1,f2) FROM table;  notice here that file is in same directory.. to have file available to hive, we need to add the file to distributed cache using “ADD FILE /path/file.sh”