Introduction:
Column-Oriented Databases
Column-oriented databases save their data grouped by columns. Subsequent column
values are stored contiguously on disk. This differs from the usual row-oriented
approach of traditional databases, which store entire rows contiguously—
Sharding:
The term sharding describes the logical separation of records into horizontal partitions.
The idea is to spread data across multiple storage files—or servers—as opposed to
having each stored contiguously.
Consistency Models:
Strict
The changes to the data are atomic and appear to take effect instantaneously. This
is the highest form of consistency.
Sequential
Every client sees all changes in the same order they were applied.
Causal
All changes that are causally related are observed in the same order by all clients.
Eventual
When no updates occur for a period of time, eventually all updates will propagate
through the system and all replicas will be consistent.
Weak
No guarantee is made that all updates will propagate and changes may appear out
of order to various clients.
Tables, Rows, Columns, and Cells:
One or more columns form a row that is addressed uniquely by a row key. A number of rows, in turn, form a table, and there can be many of them.
Each column may have multiple versions, with each
distinct value contained in a separate cell.
Rows are composed of columns, and those, in turn, are grouped into column families.
create a simple table and add a few rows with some data:
hbase(main):002:0> create 'testtable', 'colfam1'
0 row(s) in 0.2930 seconds
hbase(main):003:0> list 'testtable'
TABLE
testtable
1 row(s) in 0.0520 seconds
hbase(main):004:0> put 'testtable', 'myrow-1', 'colfam1:q1', 'value-1'
0 row(s) in 0.1020 seconds
hbase(main):005:0> put 'testtable', 'myrow-2', 'colfam1:q2', 'value-2'
0 row(s) in 0.0410 seconds
hbase(main):006:0> put 'testtable', 'myrow-2', 'colfam1:q3', 'value-3'
0 row(s) in 0.0380 seconds
hbase(main):007:0> scan 'testtable'
ROW COLUMN+CELL
myrow-1 column=colfam1:q1, timestamp=1297345476469, value=value-1
myrow-2 column=colfam1:q2, timestamp=1297345495663, value=value-2
myrow-2 column=colfam1:q3, timestamp=1297345508999, value=value-3
2 row(s) in 0.1100 seconds
get exactly one row back:
hbase(main):008:0> get 'testtable', 'myrow-1'
COLUMN CELL
colfam1:q1 timestamp=1297345476469, value=value-1
1 row(s) in 0.0480 seconds
delete one specific cell and check that it is gone:
hbase(main):009:0> delete 'testtable', 'myrow-2', 'colfam1:q2'
0 row(s) in 0.0390 seconds
hbase(main):010:0> scan 'testtable'
ROW COLUMN+CELL
myrow-1 column=colfam1:q1, timestamp=1297345476469, value=value-1
myrow-2 column=colfam1:q3, timestamp=1297345508999, value=value-3
2 row(s) in 0.0620 seconds
disabling and then dropping the test table:
hbase(main):011:0> disable 'testtable'
0 row(s) in 2.1250 seconds
hbase(main):012:0> drop 'testtable'
0 row(s) in 1.2780 seconds
The filesystem negotiating transparently where data is stored |
hbase-env.sh
You set HBase environment variables in this file. Examples include options to pass to
the JVM when an HBase daemon starts, such as Java heap size and garbage collector
configurations. You also set options for HBase configuration, log directories, niceness,
SSH options, where to locate process pid files, and so on. Open the file at conf/hbaseenv.
sh and peruse its content.
Web-based UI Introduction:
By default, it is deployed on the master host at port 60010 (HBase region servers use 60030 by default). If the master is running on a host named master.foo.com on the default port, to
see the master’s home page you can point your browser at http://master.foo.com:60010.
Client API: The Basics:
All operations that mutate data are guaranteed to be atomic on a per-row basis. This
affects all other concurrent readers and writers of that same row. In other words, it does
not matter if another client or thread is reading from or writing to the same row: they
either read a consistent last mutation, or may have to wait before being able to apply
their change.*
HBase. Here is the call that lets you do that: void put(Put put) throws IOException
affects all other concurrent readers and writers of that same row. In other words, it does
not matter if another client or thread is reading from or writing to the same row: they
either read a consistent last mutation, or may have to wait before being able to apply
their change.*
Single Puts
The very first method you may want to know about is one that lets you store data inHBase. Here is the call that lets you do that: void put(Put put) throws IOException
Put(byte[] row)
Put(byte[] row, RowLock rowLock)
Put(byte[] row, long ts)
Put(byte[] row, long ts, RowLock rowLock)
List<KeyValue> get(byte[] family, byte[] qualifier)
Map<byte[], List<KeyValue>> getFamilyMap()
simple java program sudo code:
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "testtable");
Put put = new Put(Bytes.toBytes("row1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"),
Bytes.toBytes("val2"));
table.put(put);
Atomic compare-and-set:
A special type of check can be performed using the checkAndPut() call:
only update if another value is not already present. This is achieved by
setting the value parameter to null. In that case, the operation would
succeed when the specified column is nonexistent.
Related retrieval methods:
Using exists() involves the same lookup semantics on the region servers,
including loading file blocks to check if a row or column actually
exists. You only avoid shipping the data over the network—but that is
very useful if you are checking very large columns, or do so very
frequently.
Sometimes it might be necessary to find a specific row, or the one just before the requested
row, when retrieving data. The following call can help you find a row using
these semantics:
Result getRowOrBefore(byte[] row, byte[] family) throws IOException
Be careful to specify an existing column family name when using the
getRowOrBefore() method, or you will get a Java NullPointerException
back from the server. This is caused by the server trying to access a
nonexistent storage file.
Delete Method:
Atomic compare-and-delete:
Batch Operations:
All batch operations are executed before the results are checked: even
if you receive an error for one of the actions, all the other ones have been
applied. In a worst-case scenario, all actions might return faults, though.
On the other hand, the batch code is aware of transient errors, such as
the NotServingRegionException (indicating, for instance, that a region
has been moved), and is trying to apply the action multiple times. The
hbase.client.retries.number configuration property (by default set to
10) can be adjusted to increase, or reduce, the number of retries.
Row Locks:
The region servers provide a row lock feature ensuring that only a client
holding the matching lock can modify a row.
You should avoid using row locks whenever possible. Just as with
RDBMSes, you can end up in a situation where two clients create a
deadlock by waiting on a locked row, with the lock held by the other
client.
While the locks wait to time out, these two blocked clients are holding
on to a handler, which is a scarce resource. If this happens on a heavily
used row, many other clients will lock the remaining few handlers and
block access to the complete server for all other clients: the server will
not be able to serve any row of any region it hosts.
To reiterate: do not use row locks if you do not have to. And if you do,
use them sparingly!
The first call, lockRow(), takes a row key and returns an instance of RowLock, which you
can hand in to the constructors of Put or Delete subsequently. Once you no longer
require the lock, you must release it with the accompanying unlockRow() call.
The default timeout on locks is one minute, but can be configured
system-wide by adding the following property key to the hbasesite.
xml file and setting the value to a different, millisecond-based
timeout:
<property>
<name>hbase.regionserver.lease.period</name>
<value>120000</value>
</property>
Do Gets Require a Lock?:
servers do not
take out any locks during the get operation. They instead apply a multiversion concurrency
control-style* mechanism ensuring that row-level read operations, such as get()
calls, never return half-written data—for example, what is written by another thread
or client.
Scans:
akin to cursors† in database systems
If no start row was specified, it will start at the beginningof the table.
It will also end its work when the current row key is equal to or greater than the optional
stop row. If no stop row was specified, the scan will run to the end of the table.
If you only need subsets of the data, narrowing the scan’s scope is playing
into the strengths of HBase, since data is stored in column families
and omitting entire families from the scan results in those storage files
not being read at all. This is the power of column-oriented architecture
at its best.
Scan setTimeRange(long minStamp, long maxStamp) throws IOException
Scan setTimeStamp(long timestamp)
Scan setMaxVersions()
Scan setMaxVersions(int maxVersions)
The ResultScanner Class:
Scans do not ship all the matching rows in one RPC to the client, but instead do this
on a row basis. This obviously makes sense as rows could be very large and sending
thousands, and most likely more, of them in one call would use up too many resources,
and take a long time.
The ResultScanner converts the scan into a get-like operation, wrapping the Result
instance for each row into an iterator functionality. It has a few methods of its own:
Caching Versus Batching:
each call to next() will be a separate RPC for each row—even when you use the
next(int nbRows) method, because it is nothing else but a client-side loop over
next() calls. Obviously, this is not very good for performance when dealing with small
cells
This is called scanner caching and is disabled by default.
You can enable it at two different levels: on the table level, to be effective for all scan
instances, or at the scan level, only affecting the current scan. You can set the tablewide
scanner caching using these HTable calls:
void setScannerCaching(int scannerCaching)
int getScannerCaching()
Setting the scanner caching higher will improve scanning
performance most of the time, but setting it too high can have adverse effects as well:
each call to next() will take longer as more data is fetched and needs to be transported
to the client, and once you exceed the maximum heap the client process has available
it may terminate with an OutOfMemoryException.
When the time taken to transfer the rows to the client, or to process the
data on the client, exceeds the configured scanner lease threshold, you
will end up receiving a lease expired error, in the form of a Scan
nerTimeoutException being thrown.
Advanced Features:
Filters have access to the entire row they are applied to. This means that
they can decide the fate of a row based on any available information.
This includes the row key, column qualifiers, actual value of a column,
timestamps, and so on.
When referring to values, or comparisons, as we will discuss shortly, this
can be applied to any of these details. Specific filter implementations are
available that consider only one of those criteria each.
The filter hierarchy:
The lowest level in the filter hierarchy is the Filter interface, and the abstract Filter
Base class that implements an empty shell, or skeleton, that is used by the actual filter
classes to avoid having the same boilerplate code in each of them.
Comparison operators:
As CompareFilter-based filters add one more feature to the base FilterBase class,
namely the compare() operation, it has to have a user-supplied operator type that defines
how the result of the comparison is interpreted.
Comparators
The second type that you need to provide to CompareFilter-related classes is a comparator,
which is needed to compare various values and keys in different ways. They
are derived from WritableByteArrayComparable, which implements Writable, and
Comparable.
Comparison Filters:
The first type of supplied filter implementations are the comparison filters. They take
the comparison operator and comparator instance as described earlier. The constructor
of each of them has the same signature, inherited from CompareFilter:
CompareFilter(CompareOp valueCompareOp,
WritableByteArrayComparable valueComparator)
You need to supply this comparison operator and comparison class for the filters to do
their work. Next you will see the actual filters implementing a specific comparison
RowFilter:
This filter gives you the ability to filter data based on row keys.
Psudo Code:
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-0"));
Filter filter1 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("row-22")));
scan.setFilter(filter1);
ResultScanner scanner1 = table.getScanner(scan);
for (Result res : scanner1) {
System.out.println(res);
}
scanner1.close();
Filter filter2 = new RowFilter(CompareFilter.CompareOp.EQUAL,
new RegexStringComparator(".*-.5"));
scan.setFilter(filter2);
ResultScanner scanner2 = table.getScanner(scan);
for (Result res : scanner2) {
System.out.println(res);
}
scanner2.close();
Filter filter3 = new RowFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator("-5"));
scan.setFilter(filter3);
ResultScanner scanner3 = table.getScanner(scan);
for (Result res : scanner3) {
System.out.println(res);
}
scanner3.close();
FamilyFilter:
This filter works very similar to the RowFilter, but applies the comparison to the column
families available in a row—as opposed to the row key. Using the available combinations
of operators and comparators you can filter what is included in the retrieved data
on a column family level.
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS,
new BinaryComparator(Bytes.toBytes("colfam3")));
Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println(result);
}
scanner.close();
Get get1 = new Get(Bytes.toBytes("row-5"));
get1.setFilter(filter1);
Result result1 = table.get(get1);
System.out.println("Result of get(): " + result1);
Filter filter2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("colfam3")));
Get get2 = new Get(Bytes.toBytes("row-5"));
get2.addFamily(Bytes.toBytes("colfam1"));
get2.setFilter(filter2);
Result result2 = table.get(get2);
System.out.println("Result of get(): " + result2);
QualifierFilter:
same logic is applied on the column qualifier level. This allows you to filter specific columns from the table.
Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("col-2")));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println(result);
}
scanner.close();
Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = table.get(get);
System.out.println("Result of get(): " + result);
ValueFilter:
This filter makes it possible to include only columns that have a specific value. Combined
with the RegexStringComparator, for example, this can filter using powerful expression
syntax.
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator(".4"));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
}
scanner.close();
Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = table.get(get);
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
DependentColumnFilter:
Here you have a more complex filter that does not simply filter out data based on
directly available information. Rather, it lets you specify a dependent column—or
reference column—that controls how other columns are filtered. It uses the timestamp
of the reference column and includes all other columns that have the same timestamp.
Here are the constructors provided:
DependentColumnFilter(byte[] family, byte[] qualifier)
DependentColumnFilter(byte[] family, byte[] qualifier,
boolean dropDependentColumn)
DependentColumnFilter(byte[] family, byte[] qualifier,
boolean dropDependentColumn, CompareOp valueCompareOp,
WritableByteArrayComparable valueComparator)
SingleColumnValueFilter
You can use this filter when you have exactly one column that decides if an entire row
should be returned or not. You need to first specify the column you want to track, and
then some value to check against.
You must include the column you want to filter by, in other words, the
reference column, into the families you query for—using addColumn(),
for example. If you fail to do so, the column is considered missing and
the result is either empty, or contains all rows, based on the getFilter
IfMissing() result.
By using setLatestVersionOnly(false)—the default is true—you can change the default
behavior of the filter, which is only to check the newest version of the reference
Psudeo Code:
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("col-5"),
CompareFilter.CompareOp.NOT_EQUAL,
new SubstringComparator("val-5"));
filter.setFilterIfMissing(true);
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
}
scanner.close();
Get get = new Get(Bytes.toBytes("row-6"));
get.setFilter(filter);
Result result = table.get(get);
System.out.println("Result of get: ");
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
SingleColumnValueExcludeFilter:
The SingleColumnValueFilter we just discussed is extended in this class to provide
slightly different semantics: the reference column, as handed into the constructor, is
omitted from the result. In other words, you have the same features, constructors, and
methods to control how this filter works. The only difference is that you will never get
the column you are checking against as part of the Result instance(s) on the client side.
PrefixFilter:
Given a prefix, specified when you instantiate the filter instance, all rows that match
this prefix are returned to the client. The constructor is:
Filter filter = new PrefixFilter(Bytes.toBytes("row-1"));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
}
scanner.close();
Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = table.get(get);
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
PageFilter:
You paginate through rows by employing this filter. When you create the instance, you
specify a pageSize parameter, which controls how many rows per page should be
returned
There is a fundamental issue with filtering on physically separate servers.
Filters run on different region servers in parallel and cannot retain
or communicate their current state across those boundaries. Thus, each
filter is required to scan at least up to pageCount rows before ending the
scan. This means a slight inefficiency is given for the PageFilter as more
rows are reported to the client than necessary. The final consolidation
on the client obviously has visibility into all results and can reduce what
is accessible through the API accordingly.
The client code would need to remember the last row that was returned, and then,
when another iteration is about to start, set the start row of the scan accordingly, while
retaining the same filter properties.
Filter filter = new PageFilter(15);
int totalRows = 0;
byte[] lastRow = null;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
if (lastRow != null) {
byte[] startRow = Bytes.add(lastRow, POSTFIX);
System.out.println("start row: " +
Bytes.toStringBinary(startRow));
scan.setStartRow(startRow);
}
ResultScanner scanner = table.getScanner(scan);
int localRows = 0;
Result result;
while ((result = scanner.next()) != null) {
System.out.println(localRows++ + ": " + result);
totalRows++;
lastRow = result.getRow();
}
scanner.close();
if (localRows == 0) break;
}
System.out.println("total rows: " + totalRows);
KeyOnlyFilter:
Some applications need to access just the keys of each KeyValue, while omitting the
actual data. The KeyOnlyFilter provides this functionality by applying the filter’s ability
to modify the processed columns and cells, as they pass through. It does so by applying
the KeyValue.convertToKeyOnly(boolean) call that strips out the data part.
FirstKeyOnlyFilter
If you need to access the first column—as sorted implicitly by HBase—in each row,
this filter will provide this feature. Typically this is used by row counter type applications
that only need to check if a row exists. Recall that in column-oriented databases a row
really is composed of columns, and if there are none, the row ceases to exist.
InclusiveStopFilter:
filter to start at row-3, and stop at row-5 inclusively.
Filter filter = new InclusiveStopFilter(Bytes.toBytes("row-5"));
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes("row-3"));
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println(result);
}
scanner.close();
TimestampsFilter:
When you need fine-grained control over what versions are included in the scan result,
this filter provides the means. You have to hand in a List of timestamps:
TimestampsFilter(List<Long> timestamps)
version is a specific value
of a column at a unique point in time, denoted with a timestamp. When
the filter is asking for a list of timestamps, it will attempt to retrieve the
column versions with the matching timestamps.
List<Long> ts = new ArrayList<Long>();
ts.add(new Long(5));
ts.add(new Long(10));
ts.add(new Long(15));
Filter filter = new TimestampsFilter(ts);
Scan scan1 = new Scan();
scan1.setFilter(filter);
ResultScanner scanner1 = table.getScanner(scan1);
for (Result result : scanner1) {
System.out.println(result);
}
scanner1.close();
Scan scan2 = new Scan();
scan2.setFilter(filter);
scan2.setTimeRange(8, 12);
ResultScanner scanner2 = table.getScanner(scan2);
for (Result result : scanner2) {
System.out.println(result);
}
scanner2.close();
ColumnCountGetFilter:
You can use this filter to only retrieve a specific maximum number of columns per row.
You can set the number using the constructor of the filter:
ColumnCountGetFilter(int n)
ColumnPaginationFilter:
Similar to the PageFilter, this one can be used to page through columns in a row. Its
constructor has two parameters:
ColumnPaginationFilter(int limit, int offset)
It skips all columns up to the number given as offset, and then includes limit columns
Filter filter = new ColumnPaginationFilter(5, 15);
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println(result);
}
scanner.close();
starting each row at column (offset = 15) and printing five columns (limit = 5)
ColumnPrefixFilter:
Analog to the PrefixFilter, which worked by filtering on row key prefixes, this filter
does the same for columns. You specify a prefix when creating the filter:
ColumnPrefixFilter(byte[] prefix)
All columns that have the given prefix are then included in the result.
RandomRowFilter:
Finally, there is a filter that shows what is also possible using the API: including random
rows into the result. The constructor is given a parameter named chance, which represents
a value between 0.0 and 1.0:
RandomRowFilter(float chance)
Internally, this class is using a Java Random.nextFloat() call to randomize the row inclusion,
and then compares the value with the chance given. Giving it a negative chance
value will make the filter exclude all rows, while a value larger than 1.0 will make it
include all rows.
Decorating Filters:
SkipFilter
This filter wraps a given filter and extends it to exclude an entire row, when the wrapped
filter hints for a KeyValue to be skipped. In other words, as soon as a filter indicates that
a column in a row is omitted, the entire row is omitted.
The wrapped filter must implement the filterKeyValue() method, or
the SkipFilter will not work as expected.* This is because the SkipFil
ter is only checking the results of that method to decide how to handle
the current row.
Filter filter1 = new ValueFilter(CompareFilter.CompareOp.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("val-0")));
Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner1 = table.getScanner(scan);
for (Result result : scanner1) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
}
scanner1.close();
Filter filter2 = new SkipFilter(filter1);
scan.setFilter(filter2);
ResultScanner scanner2 = table.getScanner(scan);
for (Result result : scanner2) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
}
scanner2.close();
WhileMatchFilter:
This second decorating filter type works somewhat similarly to the previous one, but
aborts the entire scan once a piece of information is filtered. This works by checking
the wrapped filter and seeing if it skips a row by its key, or a column of a row because
of a KeyValue check.
Filter filter1 = new RowFilter(CompareFilter.CompareOp.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("row-05")));
Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner1 = table.getScanner(scan);
for (Result result : scanner1) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
}
scanner1.close();
Filter filter2 = new WhileMatchFilter(filter1);
scan.setFilter(filter2);
ResultScanner scanner2 = table.getScanner(scan);
for (Result result : scanner2) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
}
scanner2.close();
Once you run the example code, you should get this output on the console:
Adding rows to table...
Results of scan #1:
KV: row-01/colfam1:col-00/0/Put/vlen=9, Value: val-01.00
KV: row-02/colfam1:col-00/0/Put/vlen=9, Value: val-02.00
KV: row-03/colfam1:col-00/0/Put/vlen=9, Value: val-03.00
KV: row-04/colfam1:col-00/0/Put/vlen=9, Value: val-04.00
KV: row-06/colfam1:col-00/0/Put/vlen=9, Value: val-06.00
KV: row-07/colfam1:col-00/0/Put/vlen=9, Value: val-07.00
KV: row-08/colfam1:col-00/0/Put/vlen=9, Value: val-08.00
KV: row-09/colfam1:col-00/0/Put/vlen=9, Value: val-09.00
KV: row-10/colfam1:col-00/0/Put/vlen=9, Value: val-10.00
Total KeyValue count for scan #1: 9
Results of scan #2:
KV: row-01/colfam1:col-00/0/Put/vlen=9, Value: val-01.00
KV: row-02/colfam1:col-00/0/Put/vlen=9, Value: val-02.00
KV: row-03/colfam1:col-00/0/Put/vlen=9, Value: val-03.00
KV: row-04/colfam1:col-00/0/Put/vlen=9, Value: val-04.00
Total KeyValue count for scan #2: 4
The first scan used just the RowFilter to skip one out of 10 rows; the rest is returned to
the client. Adding the WhileMatchFilter for the second scan shows its behavior to stop
the entire scan operation, once the wrapped filter omits a row or column. In the example
this is row-05, triggering the end of the scan.
Decorating filters implement the same Filter interface, just like any
other single-purpose filter. In doing so, they can be used as a drop-in
replacement for those filters, while combining their behavior with the
wrapped filter instance.
FilterList:
may want to have more than one filter being applied to reduce the data returned to your client application. This is what the FilterList is for.
The FilterList class implements the same Filter interface, just like any
other single-purpose filter. In doing so, it can be used as a drop-in replacement
for those filters, while combining the effects of each included
instance.
List<Filter> filters = new ArrayList<Filter>();
Filter filter1 = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("row-03")));
filters.add(filter1);
Filter filter2 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("row-06")));
filters.add(filter2);
Filter filter3 = new QualifierFilter(CompareFilter.CompareOp.EQUAL,
new RegexStringComparator("col-0[03]"));
filters.add(filter3);
FilterList filterList1 = new FilterList(filters);
Scan scan = new Scan();
scan.setFilter(filterList1);
ResultScanner scanner1 = table.getScanner(scan);
for (Result result : scanner1) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
}
scanner1.close();
FilterList filterList2 = new FilterList(
FilterList.Operator.MUST_PASS_ONE, filters);
scan.setFilter(filterList2);
ResultScanner scanner2 = table.getScanner(scan);
for (Result result : scanner2) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
}
scanner2.close();
Custom Filters:
Eventually, you may exhaust the list of supplied filter types and need to implement
your own. This can be done by either implementing the Filter interface, or extending
the provided FilterBase class. The latter provides default implementations for all
methods that are members of the interface.
filterRow() and Batch Mode:
A filter using filterRow() to filter out an entire row, or filterRow(List) to modify the
final list of included values, must also override the hasRowFilter() function to return
true.
The framework is using this flag to ensure that a given filter is compatible with the
selected scan parameters. In particular, these filter methods collide with the scanner’s
batch mode: when the scanner is using batches to ship partial rows to the client, the
previous methods are not called for every batch, but only at the actual end of the current
Implementing a filter that lets certain rows pass
public class CustomFilter extends FilterBase{
private byte[] value = null;
private boolean filterRow = true;
public CustomFilter() {
super();
}
public CustomFilter(byte[] value) {
this.value = value;
}
@Override
public void reset() {
this.filterRow = true;
}
@Override
public ReturnCode filterKeyValue(KeyValue kv) {
if (Bytes.compareTo(value, kv.getValue()) == 0) {
filterRow = false;
}
return ReturnCode.INCLUDE;
}
@Override
public boolean filterRow() {
return filterRow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
Bytes.writeByteArray(dataOutput, this.value);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.value = Bytes.readByteArray(dataInput);
}
}
Counters
In addition to the functionality we already discussed, HBase offers another advanced
feature: counters. Many applications that collect statistics—such as clicks or views in
online advertising—were used to collect the data in logfiles that would subsequently
be analyzed. Using counters offers the potential of switching to live accounting, foregoing
the delayed batch processing step completely.
Advantage of using Counters
While trying to increment a value stored in a table, we would have to lock the row, read the value, increment it, write it back to the table, and finally remove the look from the row, so that it can be used by other clients. This could cause a row to be locked for a long period and may possibly cause a clash between the clients tying to access the same row. Counters help us to overcome this problem as Increments are done under a single row lock, so write operations to a row are synchronized.
While trying to increment a value stored in a table, we would have to lock the row, read the value, increment it, write it back to the table, and finally remove the look from the row, so that it can be used by other clients. This could cause a row to be locked for a long period and may possibly cause a clash between the clients tying to access the same row. Counters help us to overcome this problem as Increments are done under a single row lock, so write operations to a row are synchronized.
Single Counters
The first type of increment call is for single counters only: you need to specify the exact
column you want to use.
Multiple Counters
Another way to increment counters is provided by the increment() call of HTable. It
works similarly to the CRUD-type operations discussed earlier, using the following
method to do the increment:
Result increment(Increment increment) throws IOException
While you can guard the increment operation against other writers, you
currently cannot do this for readers. In fact, there is no atomicity guarantee
made for readers.
Since readers are not taking out locks on rows that are incremented, it
may happen that they have access to some counters—within one row—
that are already updated, and some that are not! This applies to scan
and get operations equally.
Coprocessors:
Using the client API, combined with specific selector mechanisms, such as filters, or
column family scoping, it is possible to limit what data is transferred to the client. It
would be good, though, to take this further and, for example, perform certain operations
directly on the server side while only returning a small result set. Think of this as
a small MapReduce framework that distributes work across the entire cluster
The framework already provides classes, based on the coprocessor framework, which
you can use to extend from when implementing your own functionality. They fall into
two main groups: observer and endpoint. Here is a brief overview of their purpose:
Observer
This type of coprocessor is comparable to triggers: callback functions (also referred
to here as hooks) are executed when certain events occur. This includes usergenerated,
but also server-internal, automated events.
The interfaces provided by the coprocessor framework are:
RegionObserver
You can handle data manipulation events with this kind of observer. They are
closely bound to the regions of a table.
MasterObserver
This can be used to react to administrative or DDL-type operations. These are
cluster-wide events.
WALObserver
This provides hooks into the write-ahead log processing.
Observers provide you with well-defined event callbacks, for every operation a
cluster server may handle.
Endpoint
Next to event handling there is also a need to add custom operations to a cluster.
User code can be deployed to the servers hosting the data to, for example, perform
server-local computations.
Endpoints are dynamic extensions to the RPC protocol, adding callable remote
procedures. Think of them as stored procedures, as known from RDBMSes. They
may be combined with observer implementations to directly interact with the
server-side state.
The Coprocessor Class:
SYSTEM Highest priority, defines coprocessors that are executed first
USER Defines all other coprocessors, which are executed subsequently
Loading from the configuration
You can configure globally which coprocessors are loaded when HBase starts. This is
done by adding one, or more, of the following to the hbase-site.xml configuration file:
<property>
<name>hbase.coprocessor.region.classes</name>
<value>coprocessor.RegionObserverExample, coprocessor.AnotherCoprocessor</value>
</property>
<property>
<name>hbase.coprocessor.master.classes</name>
<value>coprocessor.MasterObserverExample</value>
</property>
<property>
<name>hbase.coprocessor.wal.classes</name>
<value>coprocessor.WALObserverExample, bar.foo.MyWALObserver</value>
</property>
The order of the classes in each configuration property is important, as it defines the
execution order. All of these coprocessors are loaded with the system priority. You
should configure all globally active classes here so that they are executed first and have
a chance to take authoritative actions. Security coprocessors are loaded this way, for
example
Loading from the table descriptor
The other option to define what coprocessors to load is the table descriptor. As this is
per table, the coprocessors defined here are only loaded for regions of that table—and
only by the region servers. In other words, you can only use this approach for regionrelated
coprocessors, not for master or WAL-related ones.
Since they are loaded in the context of a table, they are more targeted compared to the
configuration loaded ones, which apply to all tables.
You need to add their definition to the table descriptor using the HTableDescriptor.set
Value() method. The key must start with COPROCESSOR, and the value has to conform to
the following format:
<path-to-jar>|<classname>|<priority>
Here is an example that defines two coprocessors, one with system-level priority, the
other with user-level priority:
'COPROCESSOR$1' => \
'hdfs://localhost:8020/users/leon/test.jar|coprocessor.Test|SYSTEM'
'COPROCESSOR$2' => \
'/Users/laura/test2.jar|coprocessor.AnotherTest|USER'
public class LoadWithTableDescriptorExample {
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(conf);
Path path = new Path(fs.getUri() + Path.SEPARATOR + "test.jar");
HTableDescriptor htd = new HTableDescriptor("testtable");
htd.addFamily(new HColumnDescriptor("colfam1"));
htd.setValue("COPROCESSOR$1", path.toString() +
"|" + RegionObserverExample.class.getCanonicalName() +
"|" + Coprocessor.Priority.USER);
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(htd);
System.out.println(admin.getTableDescriptor(Bytes.toBytes("testtable")));
}
}
The RegionObserver Class:
The first subclass of Coprocessor we will look into is the one used at the region level:
the RegionObserver class. You can learn from its name that it belongs to the group of
observer coprocessors: they have hooks that trigger when a specific region-level
operation occurs.
Handling region life-cycle events
Endpoints:
The earlier RegionObserver example used a well-known row key to add a computed
column during a get request. It seems that this could suffice to implement other functionality
as well—for example, aggregation functions that return the sum of all values
in a specific column.
Unfortunately, this does not work, as the row key defines which region is handling the
request, therefore only sending the computation request to a single server. What we
want, though, is a mechanism to send such a request to all regions, and therefore all
region servers, so that they can build the sum of the columns they have access to locally.
Once each region has returned its partial result, we can aggregate the total on the client
side much more easily.
The CoprocessorProtocol interface
In order to provide a custom RPC protocol to clients, a coprocessor implementation
defines an interface that extends CoprocessorProtocol. The interface can define any
methods that the coprocessor wishes to expose. Using this protocol, you can communicate
with the coprocessor instances via the following calls, provided by HTable:
<T extends CoprocessorProtocol> T coprocessorProxy(
Class<T> protocol, byte[] row)
<T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
Class<T> protocol, byte[] startKey, byte[] endKey,
Batch.Call<T,R> callable)
<T extends CoprocessorProtocol, R> void coprocessorExec(
Class<T> protocol, byte[] startKey, byte[] endKey,
Batch.Call<T,R> callable, Batch.Callback<R> callback)
Single region
This is done by calling coprocessorProxy() with a single row key. This returns a
dynamic proxy of the CoprocessorProtocol interface, which uses the region containing
the given row key—even if the row does not exist—as the RPC endpoint.
Range of regions You can call coprocessorExec() with a start row key and an end row key. All regions in the table from the one containing the start row key to the one containing the
end row key (inclusive) will be used as the RPC endpoints.
The row keys passed as parameters to the HTable methods are not passed
to the CoprocessorProtocol implementations. They are only used to
identify the regions for endpoints of the remote calls.
HTablePool:
Instead of creating an HTable instance for every request from your client application, it
makes much more sense to create one initially and subsequently reuse them.
The primary reason for doing so is that creating an HTable instance is a fairly expensive
operation that takes a few seconds to complete. In a highly contended environment
with thousands of requests per second, you would not be able to use this approach at
all—creating the HTable instance would be too slow. You need to create the instance
at startup and use it for the duration of your client’s life cycle.
There is an additional issue with the HTable being reused by multiple threads within
the same process.
The HTable class is not thread-safe, that is, the local write buffer is not
guarded against concurrent modifications. Even if you were to use
setAutoFlush(true) (which is the default currently; see “Client-side
write buffer” on page 86) this is not advisable. Instead, you should use
one instance of HTable for each thread you are running in your client
application.
The default constructor—the one without any parameters—creates a pool with the
configuration found in the classpath, while setting the maximum size to unlimited. This
equals calling the second constructor like so:
Configuration conf = HBaseConfiguration.create()
HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE)
Connection Handling:
Every instance of HTable requires a connection to the remote servers. This is internally
represented by the HConnection class, and more importantly managed process-wide by
the shared HConnectionManager class. From a user perspective, there is usually no
immediate need to deal with either of these two classes; instead, you simply create a
new Configuration instance, and use that with your client API calls.
Administrative Features:
Tables
The primary reason to have tables is to be able to control certain features that all columns in this table share.
The typical things you will want to define for a table are column families. The constructor of the table descriptor in Java looks like the following:
Although conceptually a table is a collection of rows with columns in HBase, physically
they are stored in separate partitions called regions.
Every region is served by
exactly one region server, which in turn serve the stored values directly to clients.
Table Properties:
The table descriptor offers getters and setters† to set other options of the table.
Name
The constructor already had the parameter to specify the table name. The Java API
has additional methods to access the name or change it.
Column families
This is the most important part of defining a table. You need to specify the column
families you want to use with the table you are creating.
void addFamily(HColumnDescriptor family);
boolean hasFamily(byte[] c);
HColumnDescriptor[] getColumnFamilies();
HColumnDescriptor getFamily(byte[]column);
HColumnDescriptor removeFamily(byte[] column);
Maximum file size
This parameter is specifying the maximum size a region within the table can grow
to. The size is specified in bytes and is read and set using the following methods:
long getMaxFileSize();
void setMaxFileSize(long maxFileSize);
Maximum file size is actually a misnomer, as it really is about the maximum size of each store, that is, all the files belonging to each column family. If one single column family exceeds this maximum
size, the region is split. Since in practice, this involves multiple files, the better name would be maxStoreSize. By default, it is set to 256 MB
Read-only
By default, all tables are writable, but it may make sense to specify the read-only
option for specific tables. If the flag is set to true, you can only read from the table and not modify it at all. The flag is set and read by these methods:
Memstore flush size We discussed the storage model earlier and identified how HBase uses an inmemory store to buffer values before writing them to disk as a new storage file in
an operation called flush. This parameter of the table controls when this is going
to happen and is specified in bytes. It is controlled by the following calls:
long getMemStoreFlushSize();
void setMemStoreFlushSize(long memstoreFlushSize);
Column Families:
Column families define shared features that apply to all columns that are created within
them.
The family name is added to the path and must comply
with filename standards. The advantage is that you can easily access families on the
filesystem level as you have the name in a human-readable format.
A column family cannot be renamed. The common approach to rename a family is to create a new family with the desired name and copy the data over, using the API.
Maximum versions:
Per family, you can specify how many versions of each value you want to keep.
Recall the predicate deletion mentioned earlier where the housekeeping of HBase
removes values that exceed the set maximum. Getting and setting the value is done
using the following API calls:
int getMaxVersions();
void setMaxVersions(int maxVersions);
The default value is 3, but you may reduce it to 1, for example, in case you know
for sure that you will never want to look at older values.
Compression
HBase has pluggable compression algorithm support that allows you to choose the best compression— or none—for the data stored in a particular column family.
Block size:
All stored files in HBase are divided into smaller blocks that are loaded during a get or scan operation, analogous to pages in RDBMSes. The default is set to 64 KB and can be adjusted with these methods:
synchronized int getBlocksize();
void setBlocksize(int s);
Block cache:
As HBase reads entire blocks of data for efficient I/O usage, it retains these blocks in an in-memory cache so that subsequent reads do not need any disk operation.
Time-to-live:
HBase supports predicate deletions on the number of versions kept for each value, but also on specific times. The time-to-live (or TTL) sets a threshold based on the timestamp of a value and the internal housekeeping is checking automatically if a value exceeds its TTL. If that is the case, it is dropped during major compactions.
In-memory:
We mentioned the block cache and how HBase is using it to keep entire blocks of
data in memory for efficient sequential access to values. The in-memory flag defaults
to false but can be modified with these methods:
Bloom filter:
Bloom filter An advanced feature available in HBase is Bloom filters,§ allowing you to improve
lookup times given you have a specific access pattern
Since they add overhead in terms of storage and memory, they are turned off by default
Replication scope:
Another more advanced feature coming with HBase is replication. It enables you
to have multiple clusters that ship local updates across the network so that they
are applied to the remote copies.
By default, replication is disabled and the replication scope is set to 0, meaning it is
disabled. You can change the scope with these functions:
HBaseAdmin:
Basic Operations:
Before you can use the administrative API, you will have to create an instance of the
HBaseAdmin class. The constructor is straightforward:
HBaseAdmin(Configuration conf)
throws MasterNotRunningException, ZooKeeperConnectionException
Table Operations:
Before you can do anything with HBase, you need to create tables. Here is the set of
functions to do so:
void createTable(HTableDescriptor desc)
void createTable(HTableDescriptor desc, byte[] startKey,
byte[] endKey, int numRegions)
void createTable(HTableDescriptor desc, byte[][] splitKeys)
void createTableAsync(HTableDescriptor desc, byte[][] splitKeys)
Schema Operations:
Besides using the modifyTable() call, there are dedicated methods provided by the
HBaseAdmin class to modify specific aspects of the current table schema. As usual, you
need to make sure the table to be modified is disabled first.
Cluster Operations:
The last group of operations the HBaseAdmin class exposes is related to cluster operations.
They allow you to check the status of the cluster, and perform tasks on tables and/or regions.
Available Clients:
REST
HBase ships with a powerful REST server, which supports the complete client and
administrative API. It also provides support for different message formats, offering
many choices for a client application to communicate with the server.
REST Java client:
The REST server also comes with a comprehensive Java client API. It is located in the
org.apache.hadoop.hbase.rest.client package. The central classes are RemoteHTable
and RemoteAdmin. Example 6-1 shows the use of the RemoteHTable class.
Example 6-1. Using the REST client classes
Cluster cluster = new Cluster();
cluster.add("localhost", 8080);
Client client = new Client(cluster);
RemoteHTable table = new RemoteHTable(client, "testtable");
Get get = new Get(Bytes.toBytes("row-30"));
get.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-3"));
Result result1 = table.get(get);
System.out.println("Get result1: " + result1);
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes("row-10"));
scan.setStopRow(Bytes.toBytes("row-15"));
scan.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-5"));
ResultScanner scanner = table.getScanner(scan);
for (Result result2 : scanner) {
System.out.println("Scan row[" + Bytes.toString(result2.getRow()) +
"]: " + result2);
}
Thrift:
Apache Thrift is written in C++, but provides schema compilers for many programming
languages, including Java, C++, Perl, PHP, Python, Ruby, and more. Once you have
compiled a schema, you can exchange messages transparently between systems implemented
in one or more of those languages.
HIVE:
You have the option to map any HBase column directly to a Hive column, or you can
map an entire column family to a Hive MAP type. This is useful when you do not know
the column qualifiers ahead of time: map the family and iterate over the columns from
within the Hive query instead.
MapReduce Integration:
public class ImportFromFile {
public static final String NAME = "ImportFromFile";
public enum Counters { LINES }
static class ImportMapper
extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
private byte[] family = null;
private byte[] qualifier = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
String column = context.getConfiguration().get("conf.column");
byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
family = colkey[0];
if (colkey.length > 1) {
qualifier = colkey[1];
}
}
@Override
public void map(LongWritable offset, Text line, Context context)
throws IOException {
try {
String lineString = line.toString();
byte[] rowkey = DigestUtils.md5(lineString);
Put put = new Put(rowkey);
put.add(family, qualifier, Bytes.toBytes(lineString));
context.write(new ImmutableBytesWritable(rowkey), put);
context.getCounter(Counters.LINES).increment(1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static CommandLine parseArgs(String[] args) throws ParseException {
Options options = new Options();
Option o = new Option("t", "table", true,
"table to import into (must exist)");
o.setArgName("table-name");
o.setRequired(true);
options.addOption(o);
o = new Option("c", "column", true,
"column to store row data into (must exist)");
o.setArgName("family:qualifier");
o.setRequired(true);
options.addOption(o);
o = new Option("i", "input", true,
"the directory or file to read from");
o.setArgName("path-in-HDFS");
o.setRequired(true);
options.addOption(o);
options.addOption("d", "debug", false, "switch on DEBUG log level");
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, args);
} catch (Exception e) {
System.err.println("ERROR: " + e.getMessage() + "\n");
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(NAME + " ", options, true);
System.exit(-1);
}
return cmd;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String[] otherArgs =
new GenericOptionsParser(conf, args).getRemainingArgs();
CommandLine cmd = parseArgs(otherArgs);
String table = cmd.getOptionValue("t");
String input = cmd.getOptionValue("i");
String column = cmd.getOptionValue("c");
conf.set("conf.column", column);
Job job = new Job(conf, "Import from file " + input + " into table " + table);
job.setJarByClass(ImportFromFile.class);
job.setMapperClass(ImportMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
MapReduce job that reads the imported data and analyzes it:
static class AnalyzeMapper extends TableMapper<Text, IntWritable> {
private JSONParser parser = new JSONParser();
private IntWritable ONE = new IntWritable(1);
@Override
public void map(ImmutableBytesWritable row, Result columns, Context context)
throws IOException {
context.getCounter(Counters.ROWS).increment(1);
String value = null;
try {
for (KeyValue kv : columns.list()) {
context.getCounter(Counters.COLS).increment(1);
value = Bytes.toStringBinary(kv.getValue());
JSONObject json = (JSONObject) parser.parse(value);
String author = (String) json.get("author");
context.write(new Text(author), ONE);
context.getCounter(Counters.VALID).increment(1);
}
} catch (Exception e) {
e.printStackTrace();
System.err.println("Row: " + Bytes.toStringBinary(row.get()) +
", JSON: " + value);
context.getCounter(Counters.ERROR).increment(1);
}
}
}
static class AnalyzeReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable one : values) count++;
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws Exception {
...
Scan scan = new Scan();
if (column != null) {
byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
if (colkey.length > 1) {
scan.addColumn(colkey[0], colkey[1]);
} else {
scan.addFamily(colkey[0]);
}
}
Job job = new Job(conf, "Analyze data in " + table);
job.setJarByClass(AnalyzeData.class);
TableMapReduceUtil.initTableMapperJob(table, scan, AnalyzeMapper.class,
Text.class, IntWritable.class, job);
job.setReducerClass(AnalyzeReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
FileOutputFormat.setOutputPath(job, new Path(output));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Architecture:
The figure shows that HBase handles basically two kinds of file types: one is used for
the write-ahead log and the other for the actual data storage. The files are primarily
handled by the HRegionServers.
The HRegionServer opens the region and creates a corresponding HRegion object. When
the HRegion is opened it sets up a Store instance for each HColumnFamily for every table
as defined by the user beforehand. Each Store instance can, in turn, have one or more
StoreFile instances, which are lightweight wrappers around the actual storage file
called HFile. A Store also has a MemStore, and the HRegionServer a shared HLog instance
the HRegion is opened it sets up a Store instance for each HColumnFamily for every table
as defined by the user beforehand. Each Store instance can, in turn, have one or more
StoreFile instances, which are lightweight wrappers around the actual storage file
called HFile. A Store also has a MemStore, and the HRegionServer a shared HLog instance
WAL:
The WAL is the lifeline that is needed when disaster strikes. Similar to a BIN log in MySQL it records all changes to the data.
Write Path:
The client issues an HTable.put(Put) request to the HRegionServer, which hands thedetails to the matching HRegion instance. The first step is to write the data to the writeahead
log (the WAL), represented by the HLog class
Root-level files:
The first set of files are the write-ahead log files handled by the HLog instances, created
in a directory called .logs underneath the HBase root directory. The .logs directory
contains a subdirectory for each HRegionServer.
Table-level files:
Every table in HBase has its own directory, located under the HBase root directory in
the filesystem. Each table directory contains a top-level file named .tableinfo, which
stores the serialized HTableDescriptor (see “Tables” on page 207 for details) for the
table. This includes the table and column family schemas, and can be read
Region-level files:
Inside each table directory, there is a separate directory for every region comprising the
table. The names of these directories are the MD5 hash portion of a region name.
Region splits:
When a store file within a region grows larger than the configured hbase.hregion.max.filesize—or what is configured at the column family level using HColumnDescriptor—the region is split in two
Compactions:
The store files are monitored by a background thread to keep them under control. The flushes of memstores slowly build up an increasing number of on-disk files. If there are enough of them, the compaction process will combine them to a few, larger files. This goes on until the largest of these files exceeds the configured maximum store file size
and triggers a region split
Compactions come in two varieties: minor and major. Minor compactions are responsible
for rewriting the last few files into one larger one. The number of files is set with the hbase.hstore.compaction.min property (which was previously called base.hstore.compactionThreshold, and although deprecated is still supported). It is set to 3 by default, and needs to be at least 2 or more. A number too large would delay minor compactions, but also would require more resources and take longer once the
compactions start.
HFile Format:
The actual storage files are implemented by the HFile class, which was specifically created to serve one purpose: store HBase’s data efficiently
The class that implements the WAL is called HLog. When an HRegion is instantiated, the
single HLog instance that runs inside each region server is passed on as a parameter to
the constructor of HRegion. When a region receives an update operation, it can save the
data directly to the shared WAL instance.
Advanced Usage:
Key Design:
HBase has two fundamental key structures: the row key and the column key. Both can
be used to convey meaning, by either the data they store, or by exploiting their sorting
order.
Tall-Narrow Versus Flat-Wide Tables:
At this time, you may be asking yourself where and how you should store your data.
The two choices are tall-narrow and flat-wide The former is a table with few columns
but many rows, while the latter has fewer rows but many columns.
Partial Key Scans:
Pagination:
The steps are the following:
1. Open a scanner at the start row.
2. Skip offset rows.
3. Read the next limit rows and return to the caller.
4. Close the scanner.
Time Series Data:
When dealing with stream processing of events, the most common use case is time
series data. Such data could be coming from a sensor in a power grid, a stock exchange,
or a monitoring system for computer systems. Its salient feature is that its row key
represents the event time. This imposes a problem with the way HBase is arranging its
rows: they are all stored sorted in a distinct range, namely regions with specific start
and stop keys.
Time-Ordered Relations:
Advanced Schemas:
Transactional HBase:
Indexed HBase:
Transactions:
Transactional HBase
The Indexed Transactional HBase project comes with a set of extended classes that
replace the default client- and server-side classes, while adding support for transactions
across row and table boundaries. The region servers, and more precisely,
each region, keeps a list of transactions, which are initiated with a beginTransac
tion() call, and are finalized with the matching commit() call. Every read and write
operation then takes a transaction ID to guard the call against other transactions.
ZooKeeper
HBase requires a ZooKeeper ensemble to be present, acting as the seed, or bootstrap
mechanism, for cluster setup. There are templates, or recipes, available that
show how ZooKeeper can also be used as a transaction control backend. For example,
the Cages project offers an abstraction to implement locks across multiple
resources, and is scheduled to add a specialized transactions class—using Zoo-
Keeper as the distributed coordination system.
Bloom Filters:
Cluster Monitoring:
The Metrics Framework
Every HBase process, including the master and region servers, exposes a specific set ofmetrics.server. For example, one group of metrics is provided by the Java Virtual Machine (JVM) itself, giving insight into many interesting details of the current process, such as garbage collection statistics and memory usage.
Performance Tuning:
As the data is collected in the in-memory buffers, it needsto remain there until it has outgrown the configured minimum flush size, set with
hbase.hregion.memstore.flush.size
No comments:
Post a Comment