Sunday, March 17, 2013

Getting started with HBase in Java - column family, column, key, value and timestamp

Apache HBase is a very interesting database inspired by Google Bigtable. Its main purpose is to look up and store multiple key-value pairs by a single key. Even though it requires a bit different thinking than relational databases, for not (too) relational but extremely large datasets it is pretty much the only scalable approach.

Working with very large amount of data in HBase is quite easy as each keys (which are byte arrays too) are simply mapped to single machines in the cluster so the lookup by key is still very fast and very simple. If you want to look up by anything else the best approach is either rely on an external indexing service (like Apache Lucene) or roll your own.

Key to Value

HBase itself is just a simple storage that provides mapping from a
Key -> [Column Family A] [Column Family B] …
and from each column family
[Column Family A] -> [[Key K] [Value] [TS]], [[Key L] [Value] [TS]] …
The timestamp is a very important part of HBase, as each write modification is done to key-values so each timestamps show how recently a single value was modified within a row. Each value can have multiple previous versions, so undoing a write operation is quite easy. The default is 3 previous versions and depending on use case can be set to lower or higher. It’s important that it’s not for a time range but it’s for number of modifications. If 3 modifications to the same key-value come within 1 second, we have 1 second of ‘version history’.

API and the unfriendly byte arrays

As in HBase every value (key, column family name, column name, value, and timestamp) is a byte array, working directly with the API can be kind of cumbersome and non intuitive. Even thought the API is documented with JavaDoc, the MSDN style sample-codes are pretty much missing from everywhere. As I’ve spent quite long time figuring out the basic use cases of HBase, I’ve tried to assemble a simple Blog Database API to showcase the different API calls (it’s not production ready code but covers nicely what HBase is capable of and how to use the raw API).


package blog;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.NavigableMap;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

/**
 * Demo implementation of HBase based blog storing and loadin.
 * 
 * @author Adam Horvath
 */
public class BlogAPI {
private static final String blogColumnFamily = "blog";
private static final String blogsTable = "blogs";
private static final String usersTable = "users";

private static Configuration conf;
private static HTablePool pool;

// A sample userid. No user API is provided in this demo.
private static UUID userid = 
  UUID.fromString("21d211f0-731c-11e2-bcfd-0800200c9a66");

static {
 // By default, it's localhost, don't worry.
 conf = HBaseConfiguration.create();
 // Without pooling, the connection to a table will be reinitialized.
 // Creating a new connection to a table might take up to 5-10 seconds!
 pool = new HTablePool(conf, 10);

 // If you don't have tables or column families, HBase will throw an
 // exception. Need to pre-create those. If already exists, it will throw
 // as well. Ah, tricky... :)
 try {
  initDatabase();
 } catch (IOException e) {
 }
}

/**
 * Creates the tables and table columns in the database.
 * 
 * @throws IOException
 */
public static void initDatabase() throws IOException {
 HBaseAdmin admin = new HBaseAdmin(conf);
 HTableDescriptor[] blogs = admin.listTables(blogsTable);
 HTableDescriptor[] users = admin.listTables(usersTable);

 if (blogs.length == 0) {
  HTableDescriptor blogstable = new HTableDescriptor(blogsTable);
  admin.createTable(blogstable);
  // Cannot edit a stucture on an active table.
  admin.disableTable(blogsTable);

  HColumnDescriptor blogdesc = new HColumnDescriptor(blogColumnFamily);
  admin.addColumn(blogsTable, blogdesc);

  HColumnDescriptor commentsdesc = new HColumnDescriptor("comments");
  admin.addColumn(blogsTable, commentsdesc);

  // For readin, it needs to be re-enabled.
  admin.enableTable(blogsTable);
 }

 if (users.length == 0) {
  HTableDescriptor blogstable = new HTableDescriptor(usersTable);
  admin.createTable(blogstable);
  admin.disableTable(usersTable);

  HColumnDescriptor userdesc = new HColumnDescriptor("user");
  admin.addColumn(usersTable, userdesc);

  admin.enableTable(usersTable);
 }

 admin.close();
}

/**
 * @return List of Blog items.
 * @throws IOException
 */
public Iterable<Blog> getBlogs() throws IOException {
 HTableInterface table = pool.getTable(blogsTable);

 Scan scan = new Scan();
 scan.addFamily(Bytes.toBytes(blogColumnFamily));

 // For a range scan, set start / stop id or just start.
 // scan.setStartRow(Bytes.toBytes("id11"));
 // scan.setStopRow(Bytes.toBytes("id12"));

 ArrayList<Blog> blogs = new ArrayList<>();

 ResultScanner resultScanner = table.getScanner(scan);
 // For each row
 for (Result result : resultScanner) {
  for (KeyValue kv : result.raw()) {
   Blog b = new Blog();
   b.setTitle(Bytes.toString(kv.getQualifier()));
   b.setBody(Bytes.toString(kv.getValue()));
   b.setId(Bytes.toString(result.getRow()));
   blogs.add(b);
  }
 }

 resultScanner.close();
 table.close();

 return blogs;
}

/**
 * Retrieve a single blog post.
 * 
 * @param id
 *            User ID of the author.
 * @return
 * @throws IOException
 */
public Blog getBlog(UUID id) throws IOException {
 HTableInterface table = pool.getTable(blogsTable);

 Scan scan = new Scan();
 scan.setStartRow(Bytes.toBytes(id.toString()));
 // Don't pre-fetch more than 1 row.
 scan.setCaching(1);

 Blog blog = null;

 ResultScanner resultScanner = table.getScanner(scan);
 Result result = resultScanner.next();

 // If we want to access the column names as values, this is the 'nicest'
 // way.
 NavigableMap<byte[], byte[]> blogmap = 
   result.getFamilyMap(Bytes.toBytes(blogColumnFamily));

 blog = new Blog();
 // The tricky part is that the column key is the title, the column value
 // is the body.
 blog.setTitle(Bytes.toString(blogmap.firstEntry().getKey()));
 blog.setBody(Bytes.toString(blogmap.firstEntry().getKey()));
 blog.setId(Bytes.toString(result.getRow()));

 resultScanner.close();
 table.close();

 return blog;
}

/**
 * Store a single blog post with key format of "userid-dattime"
 * 
 * @param blog
 * @throws IOException
 */
public void addBlog(Blog blog) throws IOException {
 HTableInterface table = pool.getTable(blogsTable);
 Put b = new Put(Bytes.toBytes(userid.toString() + new Date().getTime()));
 b.add(
   Bytes.toBytes(blogColumnFamily), // Family ('blog') 
   Bytes.toBytes(blog.getTitle()), // Column (the title of the blog post) 
   Bytes.toBytes(blog.getBody())); // Value (the body of the blog post).
  table.put(b);
  table.close();
 }
}

2 comments:

  1. Shouldn't it be: blog.setBody(Bytes.toString(blogmap.firstEntry().getValue())); in method getBlog() ?

    ReplyDelete