Tuesday, 30 August 2016

ieee754.c.src:7:29: fatal error: npy_math_common.h: No such file or directory


To fix this:

If you are trying to setup numpy in python virtual env then do the following

sudo apt-get remove python-virtualenv
sudo pip install virtualenv

virtualenv from apt-get doesn't come with it setup tools.


numpy/core/src/npymath/ieee754.c.src:7:29: fatal error: npy_math_common.h: No such file or directory
compilation terminated.
numpy/core/src/npymath/ieee754.c.src:7:29: fatal error: npy_math_common.h: No such file or directory
compilation terminated.

error: Setup script exited with error: Command "gcc -pthread -fno-strict-aliasing -DNDEBUG -g -fwrapv -O2 -Wall -Wstrict-prototypes -fPIC -Inumpy/core/include -Ibuild/src.linux-x86_64-2.7/numpy/core/include/numpy -I/usr/include/python2.7 -Ibuild/src.linux-x86_64-2.7/numpy/core/src/private -Ibuild/src.linux-x86_64-2.7/numpy/core/src/private -Ibuild/src.linux-x86_64-2.7/numpy/core/src/private -c build/src.linux-x86_64-2.7/numpy/core/src/npymath/ieee754.c -o build/temp.linux-x86_64-2.7/build/src.linux-x86_64-2.7/numpy/core/src/npymath/ieee754.o" failed with exit status 1

Thursday, 21 July 2016

What is RDD? Apache spark Resilient distributed datasets ( RDD ) Explained , Demystified

What is RDD?

Resilient distributed datasets(RDD) - Is a collection of elements, partitioned across the nodes of a cluster and can be operated in parallel.

RDD Life cycle:

How to create an RDD?

1) Starting from a file in any of the Hadoop distributed FS, shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

2) From an existing Scala collection in the driver program, and transforming it by calling the paralyze method on the sparkcontext

What are the features of an RDD?

  1. Users can also ask Spark to persist the RDD in memory, allows efficient re-use across parallel operations.
  2. RDD will automatically recover from node failures.

What are the operations supported by RDD?

supports two types of Operations:
  1. Transformations - creates new dataset from existing RDD
  2. Actions - Runs a function on the RDD, aggregates and returns a result

Roughly above operations can be linked to Map and reduce, but exception being reduceByKey  which creates a new RDD

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. 


Apache spark program life cycle

Spark Application —> driver program —> main function —> parallel operations on cluster -> result

Wednesday, 18 May 2016

How to convert ISO 8601 time to UTC in python?

There are many approaches, i'll try to list all of them i can.

1) ts = '2015-06-02T12:14:55-04:00'

print datetime.datetime.strptime(ts[:-6],'%Y-%m-%dT%H:%M:%S')+\
            datetime.timedelta(hours=int(ts[-5:-3]),
            minutes=int(ts[-2:]))*int(ts[-6:-5]+'1')

would print
2015-06-02 08:14:55

Thursday, 5 May 2016

How does Map-side aggregation work in Hive?

What is map-side aggregation in hive?

Hive trys to optimize the query execution by performing a map-side aggregations when certain conditions are met in the query execution.

What exactly is the optimization being perfromed?

The optimization does a partial aggregation inside of the mapper, which results in the mapper output fewer rows, reducing the data that Hadoop needs to sort and distribute to the reducers.

How does the optimization/map-side aggregation work?

Mappers store each map output key with the corresponding partial aggregation,  Periodically, the mappers will output the pairs (#{token}, #{token_count}). The Hadoop framework again sorts these pairs and the reducers aggregates these values. 

So what is the tradeoff of using map-side aggregations?

There is the need to keep a map of all tokens in memory. This leads in increase in the Main Memory needed to execute the Mapper

How does hive decide to apply map-side aggregation optimizations?


By default, Hive will try to use the map-side aggregation optimization but it falls back vanilla MR approach if the hash map is not producing enough of a memory savings. 

Decision policy:

After processing the first 100,000 (hive.groupby.mapaggr.checkinterval) records in the mapper, hive checks if the size of the hash map exceeds 50%(hive.map.aggr.hash.min.reduction) of the number of records, If true it aborts Map side aggregations.  The respective configurations can be modified to control the map-side aggregation behaviour.
 So what is hive.map.aggr.hash.percentmemory  for ?

We know the trade-off with Map-side aggregations is the hash Map need to be stored in memory. As a preventive measure to avoid "Out of Memory Exception " in the mapper by 
the size explosion in the aggregation hash map, hive.map.aggr.hash.percentmemory
can be used to control the flush of the hash map to the reducer wherever hash map size exceeds the percentage specified in the parameter compared to the total memory available to the mapper.

This, however, is an estimate based on the number of rows and the expected size of each row, so if the memory usage is per row is unexpectedly high, the mappers may run out of memory before the hash map is flushed to the reducers.

Monday, 2 May 2016

How to convert Unicode to Str in Python

Python
TypeError: expected unicode, got str

Strings are not the same as unicode in Python 2.
type("text")
>> <type 'str'>
type(u"text")
>> <type 'unicode'>
So, the conversions:
  • From unicode to str, use encode:
    u"text".encode('utf-8') == "text"
  • From str to unicode, use decode:
    "text".decode("utf-8") == u"text"

Tuesday, 12 April 2016

How to []byte (byte array) to json in golang

Converting a json byte array to son/map object in golang.

//below is just a declaration of a map pointer holder, no map object is instantiatedvar dat map[string]string
//Now unmarshall the byte array into assign the map object to the pointer in second argumentif err := json.Unmarshal(body,&dat); err != nil {
   panic(err)
}
fmt.Println(dat)

how to []byte to string in golang?

1) print formatted string,  byte array

fmt.Printf("%s\n",body)


2) byte array to string

buff := bytes.NewBuffer(body)
fmt.Println(buff.String())




Monday, 11 April 2016

go / golang difference between := vs =

:= is for declaration + assignment, and = is for assignment only
For the golang documentation:

Inside a function, the := short assignment statement can be used in place of a var declaration with implicit type.

Outside a function, every construct begins with a keyword (var, func, and so on) and the := construct is not available.

In the below example:
the usage of both := and  = operator can be seen.

func Reverse(s string) string {
 r := []rune(s)
 for i,j := 0,len(r)-1 ; i < len(r)/2 ; i,j=i+1,j-1 {
  r[i],r[j] = r[j],r[i]
 }
 return string(r)
}


what is rune in go

Like other programming languages golang doesn't have character type.

So how are characters represented in a string?

ASCII charters in a string are represented by a Byte.
UNICODE characters in a string are represented by a RUNE.

In reality they are both just aliases for integer types (uint8 and int32).

if you want to force them to be printed as characters instead of numbers, you need to use Printf("%c", x). The %c format specification works for any integer type.

In the below program, the rune function is used to return the right character representation of an String based on the underlying storage (uint8,int32) .

package main

import "fmt"

func main() {
    fmt.Println(string([]rune("Hello, 日本語")[1])) // UTF-8
    fmt.Println(string([]rune("Hello, 日本語")[8])) // UTF-8}

go initial errors "cannot refer to unexported name fmt.printf"


I'm getting started with golang, just recording my initial hiccups and the solutions.

cannot refer to unexported name fmt.printf
hello/hello.go:6: undefined: fmt.printf

Golang is case sensitive , hence the function is fmt.Printf , with a capital P.

In Go, a name is exported if it begins with a capital letter. For example, Pizza is an exported name, as is Pi, which is exported from the math package.

Friday, 8 April 2016

What is Elasticsearch Analyzers?

Analyzers are pre-processors which are executed on the Text before generating inverted index. 

So why do we need Analyzers in Elasticsearch?

Consider a field which we want have inverted-index

Document 1: "Hello Elasticsearch World "
Document 2: "hello Elasticsearch world"

The Inverted Index looks like this:

Term(Document,Frequency)
Hello(1,1)
World(1,1)
Elasticsearch(1,1),(2,1)
hello(2,1)
world(2,1)

Its evident that "Hello" and "hello" are same words with change in case, As there are two separate indexes for the same word , the query returns partial result when we need a case insensitive search.

To fix the above issue,Now lets have the words converted to lowercase before creating the inverted-index.

Term(Document,Frequency)
hello(1,1),(2,1)
world(1,1),(2,1)
Elasticsearch(1,1),(2,1)

This is what analyzers in Elasticsearch are for. This a simple illustration and analyzers are designed to do more than the example used for illustration.

In simple terms an Analyzer does:
  1. Split the text into individual terms or token, based on whitespace.
  2. Standardize the individual terms so they are searchable.



What is Inverted Index in Elasticsearch

Elasticsearch is a Full Text Search Engine built on top of Lucene. The index structure used in Lucene to enable fast search lookups is called "Inverted Index". 

Inverted index is a simple concept, but yet powerful to enable efficient search. Its a list of unique words(terms) that appear in the documents stored. Each unique words map to list of documents in which they appear along with how many times the word has occurred in the document (Frequency).

Thursday, 17 March 2016

Orchestrating Hadoop Workflows - What your workflow tool should look like.

This post illustrates what are the most important qualities to look for while deciding a Big data workflow tool.


  1. Schedule the ETL process to ingest incremental data from various streams.

    Ability to schedule is the most elementary requirement of any orchestration tool, and the tool
    must be abel to ingest and process data simultaneously form various sources to various endpoints. It should support most common sources and endpoints out of the box.

  2. Notifications on failure of ETL process.

    Notifications and Alerts are a must, a failure can setback you to backfill and can drastically increase the infrastructure costs (especially if u pay by the hour for the cluster). So a good reliable notification mechanism is essential and a must have.

  3. Optimization on resources usage for daily ingestion.

    Not directly supported by the workflow tool, but the tool should persist the execution time and trends for a task, which can be used to optimize your tasks. This data becomes important when you have run the workflow sufficiently and can understand the time trends.

  4. Retry on failures, make ur tasks as Idempotent incremental loads

    The ownership of making the workflow idempotent is on the developer and not the tool. The tool should support retires.

  5. Zero/Minimal Manual intervention

  6. Self recovering/ rescheduling system

  7. Pause the workflow

    The tool should support you , when needed to pause the execution after the current running task. This enables you to fix any issue with the next task in your workflow without having to fail the workflow.

  8. Automatic Data audits, for incremental loaded data.

    Should support audits on tasks and help make decision in the workflow.



Friday, 4 March 2016

Submit Hive queries to different Queues

Hive Execution Engine is MapReduce 
set mapreduce.job.queuename=<queuename>; 

Hive Execution Engine is Tez
set tez.queue.name=<queuename>; 

Hive Default Field Delimiter

\U0001  ( CTRL+SHIFT+A in windows) 

Hive RuntimeException

"Failed with exception java.io.IOException:java.lang.RuntimeException: serious problem"

If the hive cluster is a Kerberos secured cluster, disable Hive optimizations.

set hive.fetch.task.conversion=none;

Friday, 26 February 2016

Configuring STMP with gmail

  
Exception: File "/usr/lib/python2.7/smtplib.py", line 366, in getreply
    raise SMTPServerDisconnected("Connection unexpectedly closed")

smtplib.SMTPServerDisconnected: Connection unexpectedly closed


Check the port you have configured for gmail.
If ur using SSL(465) change to TLS (587) , vice versa.

Thursday, 4 February 2016

Bucket Map Join in Hive

Shuffle Join and Map Joins are most common joins used in Hive, these joins might just not be the best solution to all the cases.

understanding Bucket joins can be handy to improve join execution speeds based on the join columns.

Bucket Map Joins as the name indicates is a Mapper only joins. (not to be confused with map joins/broadcast joins).

So when do we use Bucket Map join?

  1. We have Large tables as part of our n-way join.
  2. The tables are bucketed on join columns.
  3. Data is unsorted
  4. The number of buckets in the tables are multiples of eachother. 


So , Large unsorted tables which have been bucketed on join columns are good candidates. Other way is to design the table to be bucketed based on the use case. 

Bucket map joins are not the default behavior of hive, hence we need to suggest hive by setting the bucketmapjoin to true.

set hive.optimize.bucketmapjoin = true;
Select T1.key, T1.value FROM T1 JOIN T2 ON T1.key = T2.key;


We need to bucket Map join on table T1 and T2. The tables must be bucketed on the column "Key" and the number of buckets in both the tables should be multiples of each other.

Bucket join now has an advantage of choosing only the required Buckets from T2 instead of the entire T2 table.

The Mapper will be able to access only the required buckets from the n-1 tables in the n-way join. This results to join happening in the Mapper, the expensive shuffle part is skipped.