001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.List;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.filter.Filter;
033import org.apache.hadoop.hbase.filter.TimestampsFilter;
034import org.apache.hadoop.hbase.testclassification.ClientTests;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.junit.After;
038import org.junit.AfterClass;
039import org.junit.Before;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Rule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.junit.rules.TestName;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Run tests related to {@link TimestampsFilter} using HBase client APIs.
051 * Sets up the HBase mini cluster once at start. Each creates a table
052 * named for the method and does its stuff against that.
053 */
054@Category({MediumTests.class, ClientTests.class})
055public class TestTimestampsFilter {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059      HBaseClassTestRule.forClass(TestTimestampsFilter.class);
060
061  private static final Logger LOG = LoggerFactory.getLogger(TestTimestampsFilter.class);
062  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
063
064  @Rule
065  public TestName name = new TestName();
066
067  /**
068   * @throws java.lang.Exception
069   */
070  @BeforeClass
071  public static void setUpBeforeClass() throws Exception {
072    TEST_UTIL.startMiniCluster();
073  }
074
075  /**
076   * @throws java.lang.Exception
077   */
078  @AfterClass
079  public static void tearDownAfterClass() throws Exception {
080    TEST_UTIL.shutdownMiniCluster();
081  }
082
083  /**
084   * @throws java.lang.Exception
085   */
086  @Before
087  public void setUp() throws Exception {
088    // Nothing to do.
089  }
090
091  /**
092   * @throws java.lang.Exception
093   */
094  @After
095  public void tearDown() throws Exception {
096    // Nothing to do.
097  }
098
099  /**
100   * Test from client side for TimestampsFilter.
101   *
102   * The TimestampsFilter provides the ability to request cells (KeyValues)
103   * whose timestamp/version is in the specified list of timestamps/version.
104   *
105   * @throws Exception
106   */
107  @Test
108  public void testTimestampsFilter() throws Exception {
109    final byte [] TABLE = Bytes.toBytes(name.getMethodName());
110    byte [] FAMILY = Bytes.toBytes("event_log");
111    byte [][] FAMILIES = new byte[][] { FAMILY };
112    Cell kvs[];
113
114    // create table; set versions to max...
115    Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
116
117    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
118      for (int colIdx = 0; colIdx < 5; colIdx++) {
119        // insert versions 201..300
120        putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
121        // insert versions 1..100
122        putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
123      }
124    }
125
126    // do some verification before flush
127    verifyInsertedValues(ht, FAMILY);
128
129    TEST_UTIL.flush();
130
131    // do some verification after flush
132    verifyInsertedValues(ht, FAMILY);
133
134    // Insert some more versions after flush. These should be in memstore.
135    // After this we should have data in both memstore & HFiles.
136    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
137      for (int colIdx = 0; colIdx < 5; colIdx++) {
138        putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
139        putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
140      }
141    }
142
143    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
144      for (int colIdx = 0; colIdx < 5; colIdx++) {
145        kvs = getNVersions(ht, FAMILY, rowIdx, colIdx,
146                           Arrays.asList(505L, 5L, 105L, 305L, 205L));
147        assertEquals(4, kvs.length);
148        checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
149        checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
150        checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
151        checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
152      }
153    }
154
155    // Request an empty list of versions using the Timestamps filter;
156    // Should return none.
157    kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<>());
158    assertEquals(0, kvs == null? 0: kvs.length);
159
160    //
161    // Test the filter using a Scan operation
162    // Scan rows 0..4. For each row, get all its columns, but only
163    // those versions of the columns with the specified timestamps.
164    Result[] results = scanNVersions(ht, FAMILY, 0, 4,
165                                     Arrays.asList(6L, 106L, 306L));
166    assertEquals("# of rows returned from scan", 5, results.length);
167    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
168      kvs = results[rowIdx].rawCells();
169      // each row should have 5 columns.
170      // And we have requested 3 versions for each.
171      assertEquals("Number of KeyValues in result for row:" + rowIdx,
172                   3*5, kvs.length);
173      for (int colIdx = 0; colIdx < 5; colIdx++) {
174        int offset = colIdx * 3;
175        checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
176        checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
177        checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
178      }
179    }
180    ht.close();
181  }
182
183  @Test
184  public void testMultiColumns() throws Exception {
185    final byte [] TABLE = Bytes.toBytes(name.getMethodName());
186    byte [] FAMILY = Bytes.toBytes("event_log");
187    byte [][] FAMILIES = new byte[][] { FAMILY };
188
189    // create table; set versions to max...
190    Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
191
192    Put p = new Put(Bytes.toBytes("row"));
193    p.addColumn(FAMILY, Bytes.toBytes("column0"), 3L, Bytes.toBytes("value0-3"));
194    p.addColumn(FAMILY, Bytes.toBytes("column1"), 3L, Bytes.toBytes("value1-3"));
195    p.addColumn(FAMILY, Bytes.toBytes("column2"), 1L, Bytes.toBytes("value2-1"));
196    p.addColumn(FAMILY, Bytes.toBytes("column2"), 2L, Bytes.toBytes("value2-2"));
197    p.addColumn(FAMILY, Bytes.toBytes("column2"), 3L, Bytes.toBytes("value2-3"));
198    p.addColumn(FAMILY, Bytes.toBytes("column3"), 2L, Bytes.toBytes("value3-2"));
199    p.addColumn(FAMILY, Bytes.toBytes("column4"), 1L, Bytes.toBytes("value4-1"));
200    p.addColumn(FAMILY, Bytes.toBytes("column4"), 2L, Bytes.toBytes("value4-2"));
201    p.addColumn(FAMILY, Bytes.toBytes("column4"), 3L, Bytes.toBytes("value4-3"));
202    ht.put(p);
203
204    ArrayList<Long> timestamps = new ArrayList<>();
205    timestamps.add(new Long(3));
206    TimestampsFilter filter = new TimestampsFilter(timestamps);
207
208    Get g = new Get(Bytes.toBytes("row"));
209    g.setFilter(filter);
210    g.setMaxVersions();
211    g.addColumn(FAMILY, Bytes.toBytes("column2"));
212    g.addColumn(FAMILY, Bytes.toBytes("column4"));
213
214    Result result = ht.get(g);
215    for (Cell kv : result.listCells()) {
216      System.out.println("found row " + Bytes.toString(CellUtil.cloneRow(kv)) +
217          ", column " + Bytes.toString(CellUtil.cloneQualifier(kv)) + ", value "
218          + Bytes.toString(CellUtil.cloneValue(kv)));
219    }
220
221    assertEquals(2, result.listCells().size());
222    assertTrue(CellUtil.matchingValue(result.listCells().get(0), Bytes.toBytes("value2-3")));
223    assertTrue(CellUtil.matchingValue(result.listCells().get(1), Bytes.toBytes("value4-3")));
224
225    ht.close();
226  }
227
228  /**
229   * Test TimestampsFilter in the presence of version deletes.
230   *
231   * @throws Exception
232   */
233  @Test
234  public void testWithVersionDeletes() throws Exception {
235
236    // first test from memstore (without flushing).
237    testWithVersionDeletes(false);
238
239    // run same test against HFiles (by forcing a flush).
240    testWithVersionDeletes(true);
241  }
242
243  private void testWithVersionDeletes(boolean flushTables) throws IOException {
244    final byte [] TABLE = Bytes.toBytes(name.getMethodName() + "_" +
245                                   (flushTables ? "flush" : "noflush"));
246    byte [] FAMILY = Bytes.toBytes("event_log");
247    byte [][] FAMILIES = new byte[][] { FAMILY };
248
249    // create table; set versions to max...
250    Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
251
252    // For row:0, col:0: insert versions 1 through 5.
253    putNVersions(ht, FAMILY, 0, 0, 1, 5);
254
255    // delete version 4.
256    deleteOneVersion(ht, FAMILY, 0, 0, 4);
257
258    if (flushTables) {
259      TEST_UTIL.flush();
260    }
261
262    // request a bunch of versions including the deleted version. We should
263    // only get back entries for the versions that exist.
264    Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
265    assertEquals(3, kvs.length);
266    checkOneCell(kvs[0], FAMILY, 0, 0, 5);
267    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
268    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
269
270    ht.close();
271  }
272
273  private void verifyInsertedValues(Table ht, byte[] cf) throws IOException {
274    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
275      for (int colIdx = 0; colIdx < 5; colIdx++) {
276        // ask for versions that exist.
277        Cell[] kvs = getNVersions(ht, cf, rowIdx, colIdx,
278                                      Arrays.asList(5L, 300L, 6L, 80L));
279        assertEquals(4, kvs.length);
280        checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
281        checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
282        checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
283        checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
284
285        // ask for versions that do not exist.
286        kvs = getNVersions(ht, cf, rowIdx, colIdx,
287                           Arrays.asList(101L, 102L));
288        assertEquals(0, kvs == null? 0: kvs.length);
289
290        // ask for some versions that exist and some that do not.
291        kvs = getNVersions(ht, cf, rowIdx, colIdx,
292                           Arrays.asList(1L, 300L, 105L, 70L, 115L));
293        assertEquals(3, kvs.length);
294        checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
295        checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
296        checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
297      }
298    }
299  }
300
301  /**
302   * Assert that the passed in KeyValue has expected contents for the
303   * specified row, column & timestamp.
304   */
305  private void checkOneCell(Cell kv, byte[] cf,
306                             int rowIdx, int colIdx, long ts) {
307
308    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
309
310    assertEquals("Row mismatch which checking: " + ctx,
311                 "row:"+ rowIdx, Bytes.toString(CellUtil.cloneRow(kv)));
312
313    assertEquals("ColumnFamily mismatch while checking: " + ctx,
314                 Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv)));
315
316    assertEquals("Column qualifier mismatch while checking: " + ctx,
317                 "column:" + colIdx,
318                  Bytes.toString(CellUtil.cloneQualifier(kv)));
319
320    assertEquals("Timestamp mismatch while checking: " + ctx,
321                 ts, kv.getTimestamp());
322
323    assertEquals("Value mismatch while checking: " + ctx,
324                 "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv)));
325  }
326
327  /**
328   * Uses the TimestampFilter on a Get to request a specified list of
329   * versions for the row/column specified by rowIdx & colIdx.
330   *
331   */
332  private  Cell[] getNVersions(Table ht, byte[] cf, int rowIdx,
333                                   int colIdx, List<Long> versions)
334    throws IOException {
335    byte row[] = Bytes.toBytes("row:" + rowIdx);
336    byte column[] = Bytes.toBytes("column:" + colIdx);
337    Filter filter = new TimestampsFilter(versions);
338    Get get = new Get(row);
339    get.addColumn(cf, column);
340    get.setFilter(filter);
341    get.setMaxVersions();
342    Result result = ht.get(get);
343
344    return result.rawCells();
345  }
346
347  /**
348   * Uses the TimestampFilter on a Scan to request a specified list of
349   * versions for the rows from startRowIdx to endRowIdx (both inclusive).
350   */
351  private Result[] scanNVersions(Table ht, byte[] cf, int startRowIdx,
352                                 int endRowIdx, List<Long> versions)
353    throws IOException {
354    byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
355    byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); // exclusive
356    Filter filter = new TimestampsFilter(versions);
357    Scan scan = new Scan(startRow, endRow);
358    scan.setFilter(filter);
359    scan.setMaxVersions();
360    ResultScanner scanner = ht.getScanner(scan);
361    return scanner.next(endRowIdx - startRowIdx + 1);
362  }
363
364  /**
365   * Insert in specific row/column versions with timestamps
366   * versionStart..versionEnd.
367   */
368  private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx,
369                            long versionStart, long versionEnd)
370      throws IOException {
371    byte row[] = Bytes.toBytes("row:" + rowIdx);
372    byte column[] = Bytes.toBytes("column:" + colIdx);
373    Put put = new Put(row);
374    put.setDurability(Durability.SKIP_WAL);
375
376    for (long idx = versionStart; idx <= versionEnd; idx++) {
377      put.addColumn(cf, column, idx, Bytes.toBytes("value-version-" + idx));
378    }
379
380    ht.put(put);
381  }
382
383  /**
384   * For row/column specified by rowIdx/colIdx, delete the cell
385   * corresponding to the specified version.
386   */
387  private void deleteOneVersion(Table ht, byte[] cf, int rowIdx,
388                                int colIdx, long version)
389    throws IOException {
390    byte row[] = Bytes.toBytes("row:" + rowIdx);
391    byte column[] = Bytes.toBytes("column:" + colIdx);
392    Delete del = new Delete(row);
393    del.addColumn(cf, column, version);
394    ht.delete(del);
395  }
396
397}
398
399