Friday, February 13, 2015

Few notes on Locking and Shared Memory access

I was reading Disruptor paper and came across some mind boggling performance numbers. Disruptor is a concurrency design pattern.
These are some general notes which I collected from this paper and Martin Thompson blogs. It will be useful to give a thought to them while implementing concurrent program.

Cost of Locks:
Locks provide mutual exclusion and ensure that the visibility of change occurs in an orderly manner. Locks are expensive because they require arbitration when contended. The arbitration is achieved by a context switch to the operating system kernel which will suspend threads waiting on a lock until it is released.

Cost of CAS:
CAS is a special machine code instruction that allows a word in a memory to be conditionally set as an atomic operation. The old and new value is provided as parameters to this instruction. Instruction is only successful if old and new value is same. CAS approach is significantly more efficient than locks because it does not require a context switch to the kernel for arbitration. However CAS is not free of cost. The processor must lock its instruction pipeline to ensure atomicity and employ a memory barrier to make the changes visible to other threads.

Cache Lines:
Modern CPUs are much faster than memory systems. Fetching the data from main memory is comparatively very slow. To fix it, multiple layer of hardware caches are used to cache the recently accessed main memory data. When data is accessed by CPU, the data is accessed in blocks and it is stored in the cache for future retrievals. 
Memory is pre-fetched into the cache if the memory access is predictable. When iterating over arrays the stride is predictable so memory will be pre-fetched in cache lines. For linked list and tree the nodes are distributed so there is no predictable stride of access. The lack of a consistent pattern in memory constraints the ability of the system to pre-fetch cache lines, resulting in main memory accesses which can be more than 2 orders of magnitude less efficient.

False Sharing: 
Even if two unrelated variables are accessed by different threads, then sharing may happen if they are located next to each other in the main memory. This is due to the fact that, even if one of the variable is accessed, the other variable will also be cached as both may be stored in the  hardware cache store. 

Busy Spin Wait Strategy:
Busy spin may give better performance than using Locks in many cases. The busy spin can be implemented with a small sleep interval. As discussed using Locks has lot of overhead of context switching. A context switch may have an overhead equivalent to spinning a few hundred or thousand times, so if a lock can be acquired by burning a few cycles spinning, this may overall very well be more efficient. Also, for realtime applications it may not be acceptable to block and wait for the scheduler to come back to them at some far away time in the future. This thread spinlock vs semaphore http://stackoverflow.com/questions/195853/spinlock-versus-semaphore is worth reading for the details. 

Avoid Contention:
The most costly operation in any concurrent environment is a contended write access. It is best to write in a single thread and read through multiple threads.

Sunday, February 9, 2014

Async request processing in Java EE 7

There are multiple features in J2EE/java now to make enterprise application process the request asynchronously.

1. Async request/response in Servlet 3.0

If a servlet processing is taking lot of time then servlet container may only be able to serve limited number of clients depending on the number of threads configured for servlet container. This would negatively impact the server throughput.

This is not a optimal handling of server resources as threads may not be doing CPU activity all the time. They may be busy in io etc. CPU is not completely utilized yet we end up refusing the connections to the users. Servlet 3.0 provides the framwork/apis for processing the requests asynchronously in a separate thread other than servlet container thread. This way actual processing is done in a separate user defined thread and the response is sent back to the caller through async callback api provided by the servlet conatiner.

Please note that the client making the request can be asynchronous using the client library https://jersey.java.net/documentation/latest/async.html#d0e8674.

The following URL http://www.javacodegeeks.com/2013/08/async-servlet-feature-of-servlet-3.html has a good explanation of the apis with a sample application.

2. Async EJB 3.1


EJB 3.1 allows the EJB method to be called asynchronously if it is annoted using @Asynchronous. Such methods are called by the container in a separate thread. CLient will not be blocked for the response.There are currently two flavors which are supported:
* Fire and forget methods where result is not expected. Method has a void return type.
* Methods that have some return type. Method returns java.util.concurrent.Future object which can be polled for method execution status and retrieving the return object.

Following URL shoes some code snippets: http://satishgopal.wordpress.com/2011/04/24/ejb-3-1-asynchronous-methods/

3. Support for WebSocket

Java EE 7 supports WebSocket applications. WebSocket application can be deployed in a standard way. Java annotation @ServerEndpoint can be specified at the class level for exposinjg a class as WebSocket endpoint. Additionally @onMessage method annotation marks the method for receiving the messages.

This is a sample picked from http://docs.oracle.com/javaee/7/tutorial/doc/websocket004.htm.

@ServerEndpoint("/echo")
public class EchoEndpoint {
   @OnMessage
   public void onMessage(Session session, String msg) {
      try {
         session.getBasicRemote().sendText(msg);
      } catch (IOException e) { ... }
   }
}


Some important points to be noted:

* By default for each client connection, a separate instance of the endpoint class is created.
* Client connection is maintained till the connection is closed. During the communication, a single instance which was created initially will serve the requests.
* Client can chose to serve multiple client connections through a same instance by specifying custom ServerEndpointConfig.Configurator. In this case, application developer is responsible for handling concurrency.
* Only URL template parameters can be passes to the WebSocket endpoint. Unlike REST services there is no notion of query parameters.
* Encoders and decoders can be defined to serialize/deserialize java objects to/from the WebSocket stream.

http://docs.oracle.com/javaee/7/tutorial/doc/websocket.htm#GKJIQ5 provides a very good explanation.

4. JAX RS 2.0

The support is similar to Servlet async support. Methods can be marked as async using @Suspended as the method parameter. This annotation injects AsyncResponse as the method parameter. Any method with @Suspended AsyncResponse as the parameter is understood as asynchronous by the runtime. Method call is immediately returned while server keeps the connection open waiting for the AsyncResponse.

This is a sample picked from https://jersey.java.net/documentation/latest/async.html

@Path("/resource")
public class AsyncResource {
    @GET
    public void asyncGet(@Suspended final AsyncResponse asyncResponse) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                String result = veryExpensiveOperation();
                asyncResponse.resume(result);
            }

            private String veryExpensiveOperation() {
                // ... very expensive operation
            }
        }).start();
    }
}


https://jersey.java.net/documentation/latest/async.html has the details.




Sunday, April 28, 2013

Unique Id Generation in a distributed environment

I was looking at various ways to generate Unique ids under different constraints and system requirements. Came across Snowflake, an open source s/w published by Twitter for generating unique ids and also an article by Fickr. It gives us good idea to solve this problem.

It is a common requirement to generate unique ids in a distributed environment. Data stored across the different databases identified by ids (as primary key) should not conflict when they are queried and consolidated across the multiple databases. For ex. Twitter may want read all the tweets which were published on 1st Jan. This query will retieve the data from multiple databases and return the consolidated result in a sorted fashion. Some of these requirements drives Twitter to uniquely mark all the tweets with unique ids so that they can be stored and queried efficiently.

Flickr uses sharding to store data in multiple databases. The primary key should be unique across the databases so that in case data is moved between the databases, uniqueness is maintained.

These are some of the constrains which have the bearing on the id generation design.
1. Unique id should not contain more than 64 bits.
2. It must be atleast k-sortable. For ex. Tweets generated in a time interval should be roughly sortable.
3. Id generation service could be running on multiple machines to avoid single point of failure.


Unique Ids generation without the above constraints:

* Database can generate unique ids. For ex. in ,MySQL user can define an autoincrement column which will be filled automatically when the data is inserted in the row. Flickr uses the this approach. They have dedicated MySql instances (Flickr calls them Ticket servers) with the dedicated tables for generating the unique ids.

* Use UUID.randomUUID() to generate unique id. The generated id is 128 bit and is not sortable.

* Use combination of Timestamp + UUID, this is k-sortable but the length exceeds 128 bits.

* Use single server to generate the ids using increment of the count. This will become single point of failure and also may not scale where system requires thousands of unique ids per seconds. 


Unique Ids generation with the above constraints:
Snowflake defined a pretty generic approach. This s/w is made open source by Twitter.

Snowflake runs on distributed environment (on multiple machines) to generate unique ids which are 64 bits and sortable. It follows the following logic

unique id  = timestamp + nodeid + counter.

We can fix the bits required by each of the above variables. For nodeid can be 4 bits and counter is 8 bit. Rest 48 bits are used by timestamp.

nodeid is the unique id across the different machines running Snokflake instances. nodeid is assigned to each instance when it is started. It is required to avoid any conflict between the Unique ids generated by multiple instances. The nodeid assignment needs to done properly so that no two instances share the same nodeid. Snowflake uses ZooKeeper for this purpose.

We need counter since a server may receive multiple request within a single timestamp. For example server may receive 100 tweets at exactly 12:00:00 PM This counter is reset everytime the timestamp changes to new value. 

Snowflake can generate 1000 (millisec) * (2^8 -1) unique ids in a second.

Above all, we need to make sure that sytem clocks used by all the machines running Snowflake instances are synched  Network Time Protocol (NTP)

128 bit K-sortable unique id generation
Boundary follows a similar approach to generate 128 bit unique ids which are not dependent on ZooKeeper for assigning the nodeids.
In this timestamp uses 64 bits, 48 bits are used by MAC address of the machine hosting the instance and 16 bits for counter.
This can generate 1000 (millisec) * (2^16 – 1) unique ids in a second.

Thursday, November 29, 2012

How HBase Works

HBase is a distributed column family database. It is designed to support random read write access to large data. This wikipedia link http://en.wikipedia.org/wiki/Column-oriented_DBMS gives a nice description of column oriented DBMS. Though this link describes column oriented DBMS, HBase is infact column family oriented DBMS. For more reading on the difference between column and column family DBMS, refer to this excellent blog post.
http://dbmsmusings.blogspot.com/2010/03/distinguishing-two-major-types-of_29.html.

HBase is built over HDFS as the underlying data store. Though it is possible to run HBase over other distributed file systems like Amazon s3, GFS etc, by default it runs over HDFS. HDFS proved a highly distributed and fail safe storage to HBase tables.

This is a simplified view of HBase from a user point of view.

User --> HBase Client ---> HBase Server --> HDFS

User interacts with HBase client which connects to HBase server and reads/writes the data. HBase server in turn reads/writes the table data files from/to HDFS.


HBase Architecture

HBase follows the master slave pattern. It has a HBase master and multiple slaves called Regionservers. HBase tables (except ROOT table which is a special table) are dynamically partitioned into row ranges called regions. Region is a unit of distribution and load balancing in HBase. Each table is divided into multiple regions which are then distributed across the Regionservers. Each Regionserver hosts multiple regions. Master assigns the regions to Regionservers and recovers the regions in case of Regionserver crash. Regionserver serves the client read/write requests directly so that the master is lightly loaded. 

HFile: HBase uses HFile as the format to store the tables on HDFS. HFile stores the keys in a lexicographic order using row keys. It's a block indexed file format for storing key-value pairs. Block indexed means that the data is stored in a sequence of blocks and a separate index is maintained at the end of the file to locate the blocks. When a read request comes, the index is searched for the block location. Then the data is read from that block. 

HLog: Regionserver maintains the inmemory copy of the table updates in memcache. In-memory copy is flushed to the disc periodically. Updates to HBase table is stored in HLog files which stores redo records. In case of region recovery, these logs are applied to the last commited HFile and reconstruct the in-memory image of the table. After reconstructing the in-memory copy is flushed to the disc so that the disc copy is latest.

HBase writes the tables (in HFile format) and log files in HDFS so they are highly available even in case of region server crash.

HBase in Action:

HBase has a ROOT table which stores the information of .META table regions. ROOT table is not partitioned into regions so as to minimize the tables lookups for the client request for a particular key. META table stores the user table regions. Region name is made of table name and region start row. .META table itself is treated as any other HBase table and is divided into regions. As user request comes, .META table is searched for the region to which this query belongs. Searching for a region is easy as the HBase table is lexicographically ordered and hence locating the regions to which this client request belongs is just a matter of doing binary search.

This is flow as it happens for looking up the key in a HBase table.

Client --> ROOT --> .META Table Region --> Requested Table Region --> Requested Row


References:

MemCache: It is an in-memory key value pair store. As it is not write through, once the cache is full or mem cache server crashes, the immemory data is lost. http://en.wikipedia.org/wiki/Memcached

ZooKeeper: It is a distributed, coordination service for distributed applications. HBase uses it for storing the metadata relates to ROOT file. This is a pretty good description of ZooKeeper http://www.igvita.com/2010/04/30/distributed-coordination-with-zookeeper/

Tuesday, October 2, 2012

Pig Vs MapReduce

Tried few examples on Hadoop Map Reduce. After some initial hiccups, Hadoop setup on my local box turned out to be pretty hassle free. As Hadoop is written in Java, it definitely helped me. Understanding the whole business of running the map reduce jobs was a breeze.

Later I was reading Pig Latin. It a  high level scripting language for analyzing large data sets. Since its written over Map reduce, out of curiosity, I tried the same examples with Pig which I tried earlier with Map Reduce. These are my observations

  • Pig is good for modelling and prototyping purposes. You can do iterative development as it is easy to change the script and run it again. No need to package/compile for every change.
  • Pig is definitely slow compared to Map Reduce jobs.
  • There is not much documentation on how to optimize the Pig script. User may end up writing the script in such a way that it creates lots of Map reduce jobs.
  • If you are a programmer, probably you would like more power to optimize your code which comes with Map Reduce. I personally prefer the solution where I have more understanding of how things are working.
  • Pig map involve packaging/compiling code if you are using custom functions. In a real problem, user may end up writing lot of custom functions which map end  up making Pig development almost as complex as Map Reduce.