Sunday 22 January 2017

Elastic search - Index Routing

The purpose of routing is to inform elastic search to route the query to the shard which contains the data, to get the result of the query quickly. It boost up the performance of the system significantly


For instance when you run a query to get all the post which has a text "custom", by default the http request will be routed to all the nodes in the cluster to look for the data which is quite inefficient. This can be fixed with routing. 

Default query execution
Defining the routing in schema

{
"mappings": {
"post": {
"_routing": {
"required":true//query must use the routing parameter when searching
"path":"user_id" //The field which we wanna route with
}, 
"properties": {
"user_id": {
"type": "integer",
"store": true
},
"post_text": {
"type": "string"
},
"post_date": {
"type":"date",
"format":"YYYY-MM-DD"
}
}
}
}
}

The routing mapping will cause, elastic search to place all user data related to that user_id in one shard that helps to narrow down our search significantly. 

GET my_blog/post/_search?routing=2&q=post_text:custom

In the above query, the routing parameter specify to route the query to the shard which owns all the data of user_id=2. This is much more efficient.


Saturday 21 January 2017

Elastic search- Rolling index or time base indexes

Common use case for rolling index is logging. Most applications generate a lot of logs  and its convenient to search them easily. It also useful to remove old logs for availability. 

Template and Aliasis

A template is a scheme setup. 

{
"template": "log_*", //any index which is created with log_ name will automatically inherit the //template from the mapping block and the new indexes will be added to the alias log
"aliases": {
"log": {
}
},
"settings": {
"index": {
"number_of_shards": 5,
"number_of_replicas": 1
}
},
"mappings": {
"log_event": {
"properties": {
"log_name": {
"type": "string"
},
"log_text": {
"type":"string"
},
"log_date": {
"type": "date"
}
}
}
}
}

Any indexes which is created with the naming schema should inherit the schema below. In this example indexes which is created dynamically with log_* will inherit the mapping information we have. 

Alias is a shortcut name you can install infornt of multiple indexes. In this way if we write a query against the alias the query will be automatically circulated to all the indexes assingned to the alias. 

Dynamic index is created when you insert a document into a index that doesnt exist yet. In that situation elastic search will create a index for you automatically. And takes the best guess of the property field types. So instead of letting elastic search to guess at the property field types, we set up a template so that it knows what field type to use. Now if I insert a document with correct properties to the index name that matches the wildcard elastic search will create it on demand. 

In post man I am going to insert a log document to a index that doesnt exist yet. but it does match the template we setup. 

http://localhost:9200/log_2013/log_event

{
"log_name": "my_app",
"log_text": "null pointer exception"
"article_date": "2015-10-13t00:00:00z"
}


output

{
  "_index": "log_2013",
  "_type": "log_event",
  "_id": "AVnES-4-bB-_0C_ORt6J",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

http://localhost:9200/log_2014/log_event

{
"log_name": "my_app",
"log_text": "null pointer exception",
"article_date": "2014-10-13t00:00:00z"
}

output

{
  "_index": "log_2014",
  "_type": "log_event",
  "_id": "AVnET4DsbB-_0C_ORt6K",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}


http://localhost:9200/log/

{
"took": 8,
"timed_out": false,
"_shards": {
"total": 10,
"successful": 10,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "log_2014",
"_type": "log_event",
"_id": "AVnET4DsbB-_0C_ORt6K",
"_score": 1,
"_source": {
"log_name": "my_app",
"log_text": "null pointer exception",
"article_date": "2014-10-13t00:00:00z"
}
}
,
{
"_index": "log_2013",
"_type": "log_event",
"_id": "AVnES-4-bB-_0C_ORt6J",
"_score": 1,
"_source": {
"log_name": "my_app",
"log_text": "null pointer exception",
"article_date": "2015-10-13t00:00:00z"
}
}
]
}
}



http://localhost:9200/log_2013

{
"took": 3,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "log_2013",
"_type": "log_event",
"_id": "AVnES-4-bB-_0C_ORt6J",
"_score": 1,
"_source": {
"log_name": "my_app",
"log_text": "null pointer exception",
"article_date": "2015-10-13t00:00:00z"
}
}
]
}
}

This is a very powerful way of adding and removing indexes in the cluster without the user notices.



Elastic Search - Introduction

Elastic search is build on top of lucene, its a java app so you need jdk installed on your machine. Its widely used in real time analytics to speed up search. Its a nosql, text based distributed database.

Installation

Pre-requestise: java


To install java run time in ubuntu, 

  1. add-apt-repository ppa:webupd8team/java
  2. refresh package list  with apt-get update
  3. then install with apt-get-install oracle-java8-installer
  4. check the version of java installed with java -version

To install elastic search in ubuntu

  1. use wget to download the package: wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/deb/elasticsearch/2.0.0-beta2/elasticsearch-2.0.0-beta.deb
  2. run the installation with: elasticsearch-2.0.0-beta2.deb
  3. make sure its running with curl command: curl http://localhost:9200 you will receive a json response body, with few details of your elastic node
To install elastic search using binary

  1. download the binary with: wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz
  2. tar-xvf elasticsearch-5.1.2.tar.gz
  3. change to the  directory: cd elasticsearch-5.1.2/bin
  4. execute the bash command:  ./elasticsearch
  5. to verify if elastic is running:  curl http://localhost:9200/
Thats it you got your very first instance of elastic search running successfully on your machine. By default it will act as a master and data node. You can fine tune this option as you want, we will be seeing those soon in this tutorial

Getting into theory in a basic level


Its a distributed system which helps to scale horizontally. 

Terminology to know

Indexes:  How data is stored in elastic search? It is stored in indexes. In relational database you can think of index as database, its a logical container of data which you can reference by name in query

Index contain all the json you add to it and some additional meta data about the document

Shards: Indexes are stored in shards which itself is a complete lucene database . A shard is smallest unit of scale in elastic search. You must atleast contain one shard to contain 1 index. However this is an important concept to understand indexes can live in multiple shards .

For instance:

I've created an elastic search server instance which is called a node and when I create an index, I can tell to allocate its data across several shards. Having data split across multiple shards seems cool but it does it really buy us in terms of performance and scalability? not much coz we are still working on one set of nodes for finite set of resources with ram , cpu cycles, disk capacity . This is where elastic search put its distributed nature, when we add another node to the setup, serveral things will happen automatically, the node will join the cluster as a peer to the other node, the node will gossip with each other and the new node will gain all the information about the cluster . Some shards will automatically moved to another node inorder to balance out data that is stored and serverd. This process is called rebalancing. 

Now a single index is stored in multiple node. 



Replication: In addition to shards, elastic has replication. You can think of it as a replica of shards. Its for redundancy. 

How do index get created? Its created with a simple http call

Elastic search can be schema less but also you can create a schema upfront to get a better idea of how your information is going to be indexed.

To form a cluster


  1.  cd config/ vim elasticserch.yml
  2. change the:  cluster.name: clusterName when a node join the cluster, it should have the same cluster name
  3. set node.name to identify the node for trouble shooting
  4. path.logs in prod it would be pointing to some log system
  5. bootstrap.mlockall
Elastic search architecture
This will allocate the memory in the startup which will prevent the jvm from swapping the memory when running. This is important because you dont want jvm to swap which will lead to performance degradation

     6. set network.host: 0.0.0.0

     7. discovery

by default multicast is enabled, if its enabled, nodes can talk to each other and form a cluster. In production, you will disable multicast and provide unicast which will help only predefined set of nodes to join the cluster

discovery.zen.ping.unicast.hosts: ["host1", "host2"]

discovery.zen.minimum_master_nodes: 3 to prevent split brain

Master node and Data node


It manages the cluster state which node is in the cluster, where it is located and so on. when a master leaves new one will be elected. At this point we need a quoram of nodes for participating in the election. Otherwise you will have 2 elective master and it is hard to recover from this state. 
This quorom is defined by the minimum_master_nodes option

If you have a cluster of 5 node a quorom would be 3 nodes. by default each node can become a master because node.master:true. If the same node also contain data, 
when jvm is doing garbage collection, it might looks unresponsive when it is unresponsive for a long time it will be dropped out of the cluster and a new master is elected. 

Typically in production we will avoid data nodes being masters by setting noode. master: false. Instead, we will have dedicated master nodes node.maste:true node.data:false.


These master nodes wont be bother to serve the searches so it less likely dropped out of the cluster. Its can be small as it wont do lot of work. So it is little cost at over load cluster. 


Load balancer node


If the cluster get so big then you can have load balancer node. This node balancer node would have 


node.master: false

node.data: false

So it can forward your request to appropriate nodes.


You can set of the amount of memory elastic search used by using 
ES_HEAP_SIZE=256m

Normally this will go to /etc/sysconfig or /etc/defaults on the system where you installed the  rpm packages. And the value you can start with is half of the memory available on your system then you probably want to monitor it and tweak it as per ur actual node. 


So with heap size selected start you elastic node


ES_HEAP_SIZE=256m; bin/elasticsearch

ES_HEAP_SIZE=256m; bin/elasticsearch

cat api: To get details of node: 
curl localhost:9200/_cat/nodes

At this point we can set the minimum master nodes option throught the api as well. In this case, it will change the persistence setting which will server a full cluster restart.


Most production setting will have 3 masters with mimum_master_nodes set to 2 for hight availability.
Then have as many data node as needed.

we also can have load balancers typically we will be having 2 of them to mantaine high avaialbilty for serving your request





Push notification via Urban Airship

In this tutorial, we will see how you can send push notification through urban airship.

Architecture diagram




Pre-requisite

  1. Android-SDK
  2. Intellij
You can get a working copy of android app with urban airship integration in https://github.com/Minisha/Urban-Airship-Push-Notification


Create a new app in urban airship, once its created it will take you to a quick start guide with the steps required to connect urban airship with mobile app. Follow all the steps in there, everything is quite straight forward except the last step: Add Google Cloud Messaging (GCM) API key which is explained in detail below




Add Google Cloud Messaging (GCM) API 

Goto http://docs.urbanairship.com/reference/push-providers/gcm.html#android-gcm-setup and register you app. Enable notification in the app by clicking on the notification tab. This step will create bunch of keys and user ids in cloud messaging tab and download google-services.json to your computer. From the json or from the setting project page get your app-key and package name and populate the urban airship project settings page. Once this is done, it will create a certificate to authenticate google server with urban airship.

Note: if you have problem in finding the gcmSender id, look in cloud messaging tab.

To connect server with urban airship

Follow the steps in: https://support.urbanairship.com/hc/en-us/articles/213491663-How-do-I-find-my-App-Key-or-App-Secret-

Thats it you are good to go.










Saturday 7 January 2017

Apache kafka in rehat 7 -part 2

Lets see how to set up kafka with multiple brokers and single partition topic with a replication factor 2

Setting up multiple brokers cluster in a single machine is relatively simple. We basically have to create separate server.properties file for each broker and edit the broker propeties to make sure that it is running on different ports whereas in production environment usually all the brokers will run on different machine in the same port

The broker id is a unique identifier of each kafka broker. Then edit the port to 9093 and the log directory

To create a topic with replication factor 2
bin/kafka-topics.sh --create --topic my_topic_replica --zookeeper localhost:2181  --replication-factor 2 --partitions 1
bin/kafka-topics.sh --describe --topic my_topic_replica --zookeeper localhost:2181 
Output

Topic:my_topic_replica PartitionCount:1 ReplicationFactor:2 Configs:
Topic: my_topic_replica Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1

If the topic is created with mutiple partition you will see multiple lines here.

Leader host for this topic is 0

In sync replicas is 1, since the number of replicas is equal to the Isr we can say that the partition is in healthy state.

When one of the broker is down, where you run the describe command the output will be the following. Notice that the number of Isr is not equal to the number of partition which means the quoram is not in healthy state. If there is another server available zookeeper will add this to the quoram automatically.

Topic:my_topic_replica PartitionCount:1 ReplicationFactor:2 Configs:
Topic: my_topic_replica Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1

To test the producer and consumer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_replica

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic_replica --from-beginning

Command to list the number of brokers registered to zookeper
bin/zookeeper-shell.sh localhost:2181 <<< "ls /brokers/ids"





Apache Kafka installation in redhat 7

In this tutorial, we will see how you can install kafka  in Red Hat Enterprise Linux Server release 7.3 (Maipo) and bring up a single zookeeper and broker instance along with the producer and consumer.

Note: To get information about your rehat os use the command cat /etc/redhat-release

Prerequisites for basic kafka installation

1) Linux operation system
2) Java 8 JDK 
3) Scala 2.11.x (Since apache kafka was mostly written in java you need its runtime)

To install java follow the instruction in the below site:
http://tecadmin.net/install-java-8-on-centos-rhel-and-fedora/#
To install Scala follow the instruction in the below site:
http://backtobazics.com/scala/4-steps-to-setup-scala-on-centos/
Telnet installation

You might need telnet to check the status of the zookeeper in future. To install it follow the instructions from the below site
https://www.unixmen.com/installing-telnet-centosrhelscientific-linux-6-7/
Wget installation

You will need yum in future, so install it with the following command
sudo yum install wget
Kafka installation

wget http://apache.cs.utah.edu/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
tar -xvf kafka_2.11-0.10.0.1.tgz

To run zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties 
To check the status of zookeeper
telnet localhost 2181
stat
The stat command will give the following status
Received: 2606
Sent: 2608
Connections: 6
Outstanding: 0
Zxid: 0x37
Mode: standalone
Node count: 33
From the stat you can see that we are running in standalone mode, that says only a single instance running for testing and development purposes.

With zookeeper started now we can run a single kafka broker 

To run Kafka server
bin/kafka-server-start.sh config/server.properties 
When starting the kafka server if you get the following error, resolve it with the following instructions:

Errors
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/ec2-user/kafka_2.11-0.10.0.1/hs_err_pid19313.log
To resolve the error

Type the below command and then start the broker
export KAFKA_HEAP_OPTS="-Xmx2048m -Xms256m"
References to know about the error in detail:
http://stackoverflow.com/questions/34966739/kafka-failed-to-map-1073741824-bytes-for-committing-reserved-memory

http://stackoverflow.com/questions/27681511/how-do-i-set-the-java-options-for-kafka

To create a topic
bin/kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181  --replication-factor 1 --partitions 1
where replication-factor 1 means, the specific topic will be available in only one broker. For fault tolerance its a good practice to replicate the data across multiple broker.

you have to specify the zookeeper instance here because there could be multiple zookeeper instance each managing their own independent clusters. By specifying the zookeeper server here you are specifically saying, I want the topic to be created for this specific zookeeper managed cluster.

Remember its the zookeeper component thats responsible for assigning a broker leader to be responsible for the topic

When the topic is created a couple of interesting things happened behind the scenes. 

1) Zookeper scanned its registery of brokers and made a decision to assign a leader for the specific topic 

2) On the broker there is a log directory and in there /tmp/kafka-logs there will be 2 files index and log files: 00000000000000000000.index  00000000000000000000.log

That keeps the record of the messages (commit log)

To list the topics
bin/kafka-topics.sh --list --zookeeper localhost:2181
To describe the topic
kafka_2.11-0.10.0.1]$ bin/kafka-topics.sh --describe --topic my_topic --zookeeper localhost:2181 
output:  
Topic:my_topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: my_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0

To start the producer 
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
Then type the message and press enter

To start the consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning
cd /tmp/kafka-logs/my_topic-0/cat *.log
Here you will see the messages are persisted in the logs

Finally, to see the number of brokers registered to a zookeeper use the command

Command to list the number of brokers registered to zookeeper

bin/zookeeper-shell.sh localhost:2181 <<< "ls /brokers/ids"
To test the performance of the producer 

bin/kafka-producer-perf-test.sh --topic my_topic --num-records 50 --record-size 1 --throughput 10 --producer-props bootstrap.servers=localhost:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer

Here we are producing 50 records with 1 byte and a throughput of 10 per seconds. so it take 5 seconds to send all the messages.

Output:

50 records sent, 10.052272 records/sec (0.00 MB/sec), 8.46 ms avg latency, 270.00 ms max latency, 2 ms 50th, 15 ms 95th, 270 ms 99th, 270 ms 99.9th.


To alter a topic
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --partition 2
To view the offset topic
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic __consumer_offsets

by default it has 50 partitions



TODO

check the status of zookeeper script