Thursday, 27 October 2016

Message sync protocol for messaging service

Using conventional back end systems for our messaging service like WhatsApp causes lag in performance and data usages- especially on networks with costly data plans and limited bandwidth. To fix this, we need to completely re-imagine how data is synchronized to device and change the way data is processed in the back end.

In this entry we will discuss a new sync protocol for messaging service that will decrease a non media data usage by 40% . By reducing the congestion on the network, we will see an approximately 20% decrease in the number of people who experience when trying to send messages.

Initially we started with pull based protocol for getting the data down to client. When client receives a message, it first receive a light weight push notification indicating new message is available. This trigger the app to send the server a complicated HTTPS request and receives a very large JSON response with the updated conversation view.

Instead of above model, we can move to push based snapshot and delta model. In this model client retrieve there initial snapshot of there message using HTTPS pull request and then subscribe to delta updates which are immediately pushed to the client through MQTT - a low power, low bandwidth protocol, as messages are received. As a result, without ever making the HTTPS request, the client can quickly display up-to-dated view. We can also remove the JSON based encoding for messages and delta updates.  JSON is great if we need a flexible, human readable format for transferring data without lot of developer overhead. We can replace to Apache Thrift from JSON. Switching to Thrift from JSON allows us to reduce our payload size by roughly 50%.

On server side, messaging data has traditionally been stored on spinning disks. In the pull-based model, we would write to disk before sending a trigger to Client to read from disk. This giant storage tier would serve real time message data as well as the full conversation history. But one large storage tier does not scale well to synchronize recent messages to the app in real time. In order to support synchronization with scaling we need to develop some faster sync protocol to maintain the consistency between the App Client and long term storage. To do this, we need to be able to stream the same sequence of updates in real time to App Client and to storage tier in parallel on a per user basis.

Our new Sync protocol will be totally ordered queue of messaging updates( new message, state change for messages  read etc...)  with separate pointers into the queue indicating the last update sent to your App Client and the traditional storage tier. When successfully sending a message to disk  or to your phone, the corresponding pointer will be advanced. When your phone(Client) is offline, or there is a disk outage, the pointer stays in place while new messages can still be en-queued and other pointers will advanced. As a result, long disk write latency do not hinder Client's real time communication and we can keep Client and the traditional storage tier in sync at independent rates.

Following the common sequence of operations using our brand new Message sync protocol.


1. Our ordered queue contains 5 updates and has 2 pointers:

  • The Disk pointer signals traditional disk storage is up-to-date and received update with sequence id 104.
  • The App/Client pointer indicates our App is offline and last received update was with sequence id 101.


2. Shagun sends me a new message, which is enqueued at the head of my queue and assigned a sequence id 105.


3.Then our message sync protocol sends the new message to traditional disk storage for long term persistence and the disk pointer is advanced.



4. Some time later my phone comes online, the client/App pings the queue to activate the App pointer.



5. The messing sync protocol sends all missing updates to client/App and the App pointer is advanced to indicate our App is up-to-date.



Effectively, this queue based model allows:
  • The most recent messages are immediately sent to online apps and to the disk storage tier from from protocol's memory.
  • A week's worth of messages are served by the queue's backing store in the case of disk outage or App being offline for a while.
  • Older conversation history and full inbox snapshot fetches are served from traditionally disk storage tier.

Tuesday, 25 October 2016

Design a messaging service, like WhatsApp



This entry will discuss a high level design for messaging service, like WhatsApp. Following are the list of features our messaging service is going to support.

List of features:

1. At WhatsApp scale, lets say we need to handle around 10 Billions message sends per day and 300 Millions users. Assuming each message on average  has 160 characters, implies 10B * 160  =  1.6TB data per day , not including message metadata. If messaging service provision for 10 years, we are looking at 10*1.6B*365 ~~6 Petabytes.

2. Messaging service will support only 1:1 plain text conversations and should be extendable to add group conversations.

3. Messaging service will only handle messages less than 64Kb in size.

4. Messaging service will have low latency, high consistency and availability. Consistency has higher priority than availability.

Design Strategy :

The messaging service will expose following APIs to clients.

1. SendStatus sendMessage(senderId, receipientIds, message, ClientMessageId) : This API will be idempotent. If the client retires the message, the message will not be added twice. One way to handle this by generating a random timestamp based ID on the client which can be used to avoid the same message being  sent repeatedly. Ordering of the messages will be maintained. Means, If I send message A, and then message B, then A should always appear before B. We can use server side timestamp based approach to handle this. The parameter used in the sendMessage API is self descriptive.

Returns: SendStatus -> Status of the sendMessage Request as enum.

2. ConversationList fetchConversation(userId, offset, messageCount, lastUpdatedTimeStamp):
This API will be used to fetch and show conversations in a thread. Think of this as view you see when you open the WhatsApp. For a user, we would only want to fetch few messages in one API call at a time(Lazy Loading). The offset and messageCount parameters are used to handle this.

In most cases, our API calls are made by the users who are active on messing services. As such, they already viewed conversations till a certain time stamp and only looking for updates after the time stamp. To handle the clients which are data sensitive we are using lastUpdatedTimeStamp Param.

Returns: ConversationList{
                     List<Conversation> conversations;
                     boolean isDBupdate;
                }
     

              Conversation {
                    int conversationId;
                    List<UUID>participantIds;
                    String snippet;
                    long lastUpdatedTimeStamp;
              }

Following is the high level component diagram for the messaging system.

             

Client (Mobile App/ Browser etc) calls sendMessage API for writing a message. Application server will interpret the API call and calls database to do following. 
  •  Put in the server time stamps to handle ordering of message.
  •  Figure out the conversation to which the message should be appended based on the participants. 
  •  Figure out if the recent message exists with the clientMessageId.
  •  Store the message in database.
Similarly to read conversations, client calls fetchConversation API. Application server interprets the API call and queries the database for the recent conversations.  

Message is going to be very write heavy. Unlike photos, videos etc which are written once and consumed a lot of times by lot of clients, message are written once and consumed by the participants once. For the write heavy system with a lot of data, RDBMS usually do not perform well as every write is not just an append to a table but also an updates to the multiple indices which might required locking and hence might interfere with reads and other writes. However, there are NoSQL databases like HBase, cassandra where writes are cheaper. 

With NoSQL, we need to store data in denormalized form. Every user would have his/her own copy of message box.  That means we will store two copies of the message, one for each participant for every message send for 1:1 conversation.

To increase the efficiency we can use Caching. This is however not as easy as it seems as one of our feature is to ensure high consistency. Most distributed caching system are good with availability and eventually consistent., but not tight consistent.

Lets consider the situation if the user messages and conversations are spread across machines then it causes trouble because the changes are not atomic. Consider the case when messages for a user are on one machine and conversations are on another. When a message is added, the update request is sent to server with messages and server with conversation for this user. There could be a period when one server has processed the update but other has not. If changes through servers are not atomic, the whole system is not atomic anymore. 

One way to resolve is to make sure that the caching for a user completely resides on one server. The same server can also have other users, but user assigned to exactly one server for caching. To further ensure consistency, all the writes for that user should be directed through this server and this server updates its cache when the write is successful on database before confirming success.

For messaging service, every bytes waste has very real impact on the experience of application. By sending less data and reducing HTTPS fetches, messaging service receives updates with low latency and high reliability. In the subsequent entry we will extend the design to improve performance and data usages. Meanwhile, take care and Enjoy learning!!





URL Shortening service like, bit.ly - 2

This is the continuation of URL shortening design. In this entry we will explore different expect of our proposed design.

In the proposed design, we are optimizing for consistency over availability and we agreed to choose a NoSQL database which is highly consistent like HBase or Cassandra. HBase maintains a table called metatable, that points which key is present at which machines. HBase distribute keys based on the load on machines, they called it region servers. Cassandra inherently use consistency hashing for sharding. Since total data in our case is few TBs, we don't need to think about sharding across data centers. NoSQL has single machine failure handling inbuilt. A NoSql database like HBase would have availability issue for a few seconds for the effected keys, as it tries to maintain strong consistency.

We can also use LRU caching strategy to add performance since most URL's  are accessed for only a small amount of time after creation. Also, if the shortened URL goes viral, serving it through cache will reduce the chances of overloading the database. We can go with Redis or Memcached for caching.

To shard the data across databases, we can use integer encoding of  the shortened URL to distribute data among our database shards. Let's we assign values from 0 to 61 to characters a to z ,A to Z and 0 to 9, we can compute the integer encoding of the shortened URL. We will use consistent hashing to ensure we do not have to rehash all data if we add a new shard in future.

To add fault tolerance, we can use a scheme called Master-slave scheme for shards, wherein one machine called master process all writes and there are slaves machines which just subscribe to all the writes and keep updating itself. In an event when the master goes down, one of the slave can take over and start responding to the write queries. The slaves are to serve read requests.  

Saturday, 8 October 2016

URL Shortening service like, bit.ly

In this entry we will design a URL shortening service. Lets begin with list of features.


Features:

URL shortening service will take a URL and return a much smaller URL. This short URL will have the capability to redirect to original URL. User can also pick the custom URL. What if two user try to shorten the same URL.

Assume 100 Million URL new URLs added each month to the system. Average lifetime of a shortened URL is 2 weeks and as per rule 20% of URLs creates 80% of traffic. So as per assumption, we may receive around 1 Billion queries in a month. ~400 queries per second == 360 read requests and 40 write requests. In 5 years we will need to shorten 100 Million * 12*5  ==6 Billion.

We will use (a-z A-Z 0-9) to encode our URL. Lets x be the minimum number of characters used to represent 6 Billion URLs, then x will be smallest integer such that x^62  > 6* 10^9 (where 62 are total characters including a-z A-Z 0-9) == log(6*10^9) to base 62 ==6. So x=6.

Data flow for read write request would contain a shortened URL and original URL. Shortened URL will take 6 bytes whereas the original URL will take 500 bytes.

Based on above data, Write data request per second  = 40*(500+6)
                                    Read data request per second  = 260*(500+6) bytes.



URL shortening service is latency sensitive. High latency on URL shorten er is as good as failure to resolve the request. Shortening service should have high consistency and availability.

Design Strategy:

We will define two APIs :

shortURL(url) - Store the URL mapping and return hash(url).
redirectURL(hash) - Redirect to URL_MAPPING[hash]

Basically we try to build a system that serve a huge HashMap.

High level design for write operation is as follow



And the high level design for read operation as similarly, as follow


We can use list of salt to handle the case where two separate URL get shorten to same URL(collision). Directly encoding URL to base_62 will allow a user to check if a URL has been shortened already or not, reverse engineering can lead the user to the exact same function used, this should be avoided. To avoid this, we will introduce randomization If two users shortened the same URL, it should result in two separate shortened URL . Encoding to base_62 also would not be suitable for a production environment because it will leak information about the database. For example, patterns can be learnt to compute the growth rate (new URLs added per day) and in worst case, can copy the whole database.

To add high availability, simplest thing we could do is to have multiple stateless app servers. They do not store any data(stateless) and all of them behave exact same way when up. If one of them goes down, we still have other app server to keep site running.

Load balancer is a black box that store meta data about the app server and delegate the client request to least overloaded active server. 

Our URL shortening service is simple key-value lookup (Given the hash, give me the corresponding URL). IF the size of the data is too small to fits on a single machine's main memory SQL with indexing is clear winner which has next to zero maintenance. If your index fits in RAM, its best to go with standard SQL solution. In our case, 

Write request per month = 100 Million
Average size of URL = 500 bytes.
System Provisioning for = ~5years
Implies, Space Required : 500bytes * 100M* 12*5 = 3TB 

3TB data can fit on single machine's hard disk, but index might not. Storing all data on single machine will make our read write operations very slow. (Page swap for indices) because when when we make access to SQL index frequently, that SQL index gets cached in RAM as a page. If we host the system on a single server machine due to large number of page swapping and limited RAM, page swaps(disk traversal) will increase and thus performance will degrade.  Given that the read latency is critical for shortening service we can not store the data on a single machine. So, A SQL solution will have sharding overhead. However, most NoSQL solution are built with the assumption that data does not fit on a single machine and hence support sharding builtin.

Since we are building a consistent shortening service with high availability, we will choose a NoSQL database like HBase.

We will discuss the database schema and other details for shortening service in subsequent entry. See you soon.


Friday, 7 October 2016

Design an E-commerce website like flipkart.com

In this entry we will discuss a design for E-commerce website like Flipkart. Before going into the design details, following are the list of Features.

List of Features:

1. User can sign-up, sign-in,search a product,view categories, view products,add a product to the cart, check out  products, add product to wishlist, share wishlist and what not...Here we will divide the list of features into following high level user stories:

  • Customer/User related stories eg cart, wishlist etc.
  • Admin related stories eg: ON/OFF a new product, canceling an order, adding offers etc. This all features will be wrapped into a Admin portal.
  • Product search and categorization related stories.
  • The system should support 1 Million requests per second.
  • The system should be high scaling and availability.
In this design our focus would be on Customer/User Related tasks and Product search and categorization related tasks. Our system should support 10,000 customer/second and should be able to serve 100 Million products over different categories.

For product search, we can use Solr- an open source enterprise search platform with cache.Whenever a product request comes, load balancer will search a product in Solr, if the product exist detail of the product returned without querying the database. We are going to cache 40% of our product in Solr. Assume each of our products is of 10MB including image,description and other metadata.100Million product would be approx 1000TB or more, So we need to cache 400TB data in Solr. Considering a machine can store 10TB data in hard disk,  we will need 140 machine for data and 100 machine to add redundancy.

Design Strategy:

To build high scaling application with ~zero downtime ~high availability one of our strategy would be to use Microservice based approach. Microservices is an approach to application development where a large application is built as a suite of granular services. Each service supports a specific business goal and uses a simple, well-defined interface to communicate with other services. High level design is as follow:


In above design, we are storing the session state at the client and and session data in in-memory database using session-service. The shopping cart for each user is stored as a restful resource on the server.

If you have your data sharded across multiple databases. lot of HTTP server, Cache server responding the request, you need to have a provisioning server to auto scale the capacity dynamically by setting up servers as need or removing in case of reduced load. In this case its better to go full stateless and leave the shopping cart with client session. Then once user hits check-in, shopping cart should be persisted on database. The key about the rest is that every request the client makes has to contains all data to identify itself. I would suggest reading about  OAuth.



Tuesday, 4 October 2016

Design a taxi-aggregator like Ola

This entry will discuss a high level design for Ola App. Following are the feature our application is going to support.

List of Features:
1. The cab will be visible to user only if the distance between cab and user is <2Kms.
2. Maximum 10 Cabs will be visible to the user.
3. Number of cab our system will integrate : 1 Millions.
4. User per second : 10 Millions/ Second.
5. Low latency and high availability is our primary goal.

Design Strategy:

To scale the application and handle all the load one strategy would be to user Microservice based design approach. Microservices is an approach to application development where a large application is built as a suite of granular services. Each service supports a specific business goal and uses a simple, well-defined interface to communicate with other services. High level design is as follow



Using this approach, our design inherently will have all the benefits of Microservice. For detail regarding Microservice, do visit our entry on Microservice.


Algorithm

 "The cab will be visible to user only if the distance between cab and user is <2Kms."



Distributed key-value Cache design

In this entry we will discuss a distributed key-value storage cache design. The App will use this cache as intermediate storage. If there is a cache miss, data will be read from database to cache and from cache to application. Application is  not suppose to directly hit the database for data read.

There are three kind of Caching strategies.
  1.  Write through Cache: Caching system where write goes through Cache and succeed only when write to Cache and database BOTH succeed. This is useful in the application where you write and re-read the information quickly but the write latency would be higher as we need to write to two separate system.
  2. Write around Cache: In this type of system Write directly goes to database. The caching system read information from database in case of cache miss. This ensure low write load to cache and faster write. This is not useful for the application that write the data and re-read it quickly.
  3. Write back Cache: This type of system directly write to Cache and Write is confirmed as soon as cache write is done. The cache asynchronously sync this written data to database.This implies quick write latency and high throughput. But there is risk of losing data in case of caching layer dies. We may use replication of cache data to multiple cache layer to avoid this type of data lose.  

Before going into detail of design, we will first list out the Features our system is going to support.

List of Features:

1. Amount of data to Cache : 30TB.  Remove one or more entry in case of memory is not available.

2. Query per second: 10 Million/ Sec. caching system should be inherently low latency. For low latency we need to store all cache data in main memory. Production level machine has capacity of 72GB or 144GB main memory. Minimum number of machine required for 30TB data = 30TB/72GB ~420 machines

3. Important matrices :  Low Latency, High available and eventual consistency.

Design Strategy:

Lets start start with single machine caching system and then we will extend it to distributed caching system. For a single machine and single threaded caching system  a HashMap and double linked list is sufficient to implement a cache. Double linkedList(DLL) will be used to store the data and HashMap is used to store key and address of the data stored in DLL as a key-value map. Least recently access data will be moved to front of the DLL and address is update in hashMap accordingly.

For Single machine and multiThreaded system we can use java Concurrent util data structure for hashmap and deque.

In Java we can implement LRU cache using LinkedHashMap by overriding a method of LinkedHashmap.

  @Override
   protected boolean removeEldestEntry(Map.Entry<Integer, String> eldest){
      return size() > this.capacity;
   } 

Implement LRU cache using LinkedhashMap

To cache 30TB(internet scale data) we can not store all the data on one machine. We need distribute it on multiple machine. To store it on multiple machine we will shard( partition the data) and store it on different machine. To read about distributed sharding of data please read the entry "Consistent hashing".


Saturday, 1 October 2016

Your own High available Database - 2

In the previous entry Your own High available Database we designed a simple high available database. The design was having some risks of losing data in case of hardware failure of Master shard.  In this entry, we will add some fault tolerant to our old system.

Initially we started with a Master and multiple Slaves in a shard. Its having Single point of Failure (Master Node) for a shard.

What about peer to peer system where every shard( node) is equally privileged and any two nodes can communicate to each other.It eliminate the single point of failure, and theoretically our system will be available even in presence of dying DB machines.Dynamo and Casandra works on similar principle.

How will we shard data for peer to peer system? To add redundant data we may choose P( replication factor = P)   clockwise consecutive nodes start from the node where Data D is mapped through our consistent hashing algorithm. The client can ask any P node for the data they want. This is essential in crating high available system.

Read- Write Strategy:

Now the data is distributed in various shards. A shard is having replica of data in a ring. Client can send put() request to any node in the ring to write data. This requested node act as a coordinating node for this put() request. Coordinating node then forwards this request to the mapping nodes for data and wait for acknowledgment signal(W).When it receive an acknowledgements W  it returns write accepted signal to client. W is the minimum number of nodes from which the coordinating nodes should get an acknowledgement before making a write operation success.

For get() request client can connect to any node in the ring which again becomes the coordinating node for the request. This coordinating node asks all replica nodes for the data key and return the consolidated data to client when R of the replica are back. R  is the minimum number of nodes from which the coordinating nodes should get data value back to return to client.

R and W value are very important here. For a read to be consistent and return the previous written value we need to keep W+R >P.

We can adjust the value of W or R based on the functionality needed. Example: For very fast write we can make W = 1 and  R = P . Similarly for read heavy we can make R=1 and W=P.  In case of equal distribution we will keep both  R and W (P+1)/2.

In current design no node is node is completely responsible for a piece of data. If a node goes down it would not effect the READ/WRITE. As long as W out of P nodes are available for the key, the data will be update. In case of less than W active nodes for the data we can relax the consistency mark for availability. Same is true for read request as well.

If we keep W==P  we will be able to provide high consistency but we would not be available for WRITE even one node goes down. Therefore we will build our system for W<P. Hence our write will be propagated that means some node will have updated view of data and some will not(Inconsistency). The best we can guarantee is eventually consistency that means, in due time  all changes will be applied to all nodes and at the end all will have consistent view of data.

How can achieve eventual consistency ??

To achieve eventual consistency among data, we need to resolve difference between data on two nodes. There are different data detect and resolve conflict may arises as follow.

1. If data( Key-value) is such that value is just a single column we can use simple criteria Last write wins to resolve conflict. So if two nodes have inconsistent view of data, in the resolve step we can update node with stale data with the new data and data become consistent.

2. If the data value is composed of more than one independent column value. In this case we will store augmented data for each row indicating all the coordinating nodes for the row till now.To detect and resolve the conflict we need to compare augmented data. If one is a subset of other we can safely ignore one with smaller augmented data. Otherwise, we have conflict for our row and we need application level logic to resolve the conflict.

Your own High available Database

This is the continuation of distributed system design. In this entry we will discuss, design of high available distributed key value database.

Before going into the design details, It would be awesome to note the features which our system should support.

List of Features :

1. Amount of data to store : 100TB 

2. CRUD(Create, Retrieve, Update, Delete) operation on data. With update operation the value for particular key may grow in size in such a way that sequence of keys co-exist on one server previously, but there value grew to a size where all of them do not fit on a single machine.

3.Upper limit for the value is 1GB in size.

4. Query processing speed - 100K/S

5. High available, low latency, partial tolerance key-value database.

Obeservations:

Lets say we have 10TB hard disc machine available. We would need minimum 100TB/10TB = 10 Machine to store the data without replication. In real life we replicate the data to multiple server for fault tolerance and in that case we would need more than 10 Machines.


Design Strategy:

To scale the system and handle all the load  the future growing load one strategy would be shard the data, store it on different machine and distribute the load among those machines. Our system will store denormalized data( key --> value) and all data for a row/key would be on same machine. This would help us in reducing latency.

Going back to our feature number 5 above, high availability and low latency is our primary design goal.

Lets assume somehow we distribute our data(Key - value)  into various shards. Initially we will start with master slave architecture for shards. Each shard will have a master node and multiple slave nodes. Slaves will keep reading  all new  update from the master and keeps updating themselves. All the WRITE requests will be served by master node and slaves are used to handle READ requests. This could lead to inconsistent views on latest data across the master and clients, but would ensure high availability to read requests.

 Our architecture within a shard may look like as follow:


However, what happens to data write in case of master failure. WRITE start falling as in our design master was only responsible for taking write requests. We can promote one of the slave to become the new master. However, shard become unavailabe for the period of fail-over from master to slave to become new master. As our slave is lagging with master (data update), in case of master's hardware failure we may lose some data. This is bad!!

Can we do better?Is there any way to handle this data lose and keep system available all the time ??
In my next entry, I will extend this design and solve these problems. So take care and stay tune :). Will see you soon!!