001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.List;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.filter.Filter;
033import org.apache.hadoop.hbase.filter.TimestampsFilter;
034import org.apache.hadoop.hbase.testclassification.ClientTests;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.junit.After;
038import org.junit.AfterClass;
039import org.junit.Before;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Rule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.junit.rules.TestName;
046
047/**
048 * Run tests related to {@link TimestampsFilter} using HBase client APIs. Sets up the HBase mini
049 * cluster once at start. Each creates a table named for the method and does its stuff against that.
050 */
051@Category({ MediumTests.class, ClientTests.class })
052public class TestTimestampsFilter {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestTimestampsFilter.class);
057
058  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
059
060  @Rule
061  public TestName name = new TestName();
062
063  /**
064   * @throws java.lang.Exception
065   */
066  @BeforeClass
067  public static void setUpBeforeClass() throws Exception {
068    TEST_UTIL.startMiniCluster();
069  }
070
071  /**
072   * @throws java.lang.Exception
073   */
074  @AfterClass
075  public static void tearDownAfterClass() throws Exception {
076    TEST_UTIL.shutdownMiniCluster();
077  }
078
079  /**
080   * @throws java.lang.Exception
081   */
082  @Before
083  public void setUp() throws Exception {
084    // Nothing to do.
085  }
086
087  /**
088   * @throws java.lang.Exception
089   */
090  @After
091  public void tearDown() throws Exception {
092    // Nothing to do.
093  }
094
095  /**
096   * Test from client side for TimestampsFilter. The TimestampsFilter provides the ability to
097   * request cells (KeyValues) whose timestamp/version is in the specified list of
098   * timestamps/version.
099   */
100  @Test
101  public void testTimestampsFilter() throws Exception {
102    final byte[] TABLE = Bytes.toBytes(name.getMethodName());
103    byte[] FAMILY = Bytes.toBytes("event_log");
104    byte[][] FAMILIES = new byte[][] { FAMILY };
105    Cell kvs[];
106
107    // create table; set versions to max...
108    Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
109
110    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
111      for (int colIdx = 0; colIdx < 5; colIdx++) {
112        // insert versions 201..300
113        putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
114        // insert versions 1..100
115        putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
116      }
117    }
118
119    // do some verification before flush
120    verifyInsertedValues(ht, FAMILY);
121
122    TEST_UTIL.flush();
123
124    // do some verification after flush
125    verifyInsertedValues(ht, FAMILY);
126
127    // Insert some more versions after flush. These should be in memstore.
128    // After this we should have data in both memstore & HFiles.
129    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
130      for (int colIdx = 0; colIdx < 5; colIdx++) {
131        putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
132        putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
133      }
134    }
135
136    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
137      for (int colIdx = 0; colIdx < 5; colIdx++) {
138        kvs = getNVersions(ht, FAMILY, rowIdx, colIdx, Arrays.asList(505L, 5L, 105L, 305L, 205L));
139        assertEquals(4, kvs.length);
140        checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
141        checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
142        checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
143        checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
144      }
145    }
146
147    // Request an empty list of versions using the Timestamps filter;
148    // Should return none.
149    kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<>());
150    assertEquals(0, kvs == null ? 0 : kvs.length);
151
152    //
153    // Test the filter using a Scan operation
154    // Scan rows 0..4. For each row, get all its columns, but only
155    // those versions of the columns with the specified timestamps.
156    Result[] results = scanNVersions(ht, FAMILY, 0, 4, Arrays.asList(6L, 106L, 306L));
157    assertEquals("# of rows returned from scan", 5, results.length);
158    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
159      kvs = results[rowIdx].rawCells();
160      // each row should have 5 columns.
161      // And we have requested 3 versions for each.
162      assertEquals("Number of KeyValues in result for row:" + rowIdx, 3 * 5, kvs.length);
163      for (int colIdx = 0; colIdx < 5; colIdx++) {
164        int offset = colIdx * 3;
165        checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
166        checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
167        checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
168      }
169    }
170    ht.close();
171  }
172
173  @Test
174  public void testMultiColumns() throws Exception {
175    final byte[] TABLE = Bytes.toBytes(name.getMethodName());
176    byte[] FAMILY = Bytes.toBytes("event_log");
177    byte[][] FAMILIES = new byte[][] { FAMILY };
178
179    // create table; set versions to max...
180    Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
181
182    Put p = new Put(Bytes.toBytes("row"));
183    p.addColumn(FAMILY, Bytes.toBytes("column0"), 3L, Bytes.toBytes("value0-3"));
184    p.addColumn(FAMILY, Bytes.toBytes("column1"), 3L, Bytes.toBytes("value1-3"));
185    p.addColumn(FAMILY, Bytes.toBytes("column2"), 1L, Bytes.toBytes("value2-1"));
186    p.addColumn(FAMILY, Bytes.toBytes("column2"), 2L, Bytes.toBytes("value2-2"));
187    p.addColumn(FAMILY, Bytes.toBytes("column2"), 3L, Bytes.toBytes("value2-3"));
188    p.addColumn(FAMILY, Bytes.toBytes("column3"), 2L, Bytes.toBytes("value3-2"));
189    p.addColumn(FAMILY, Bytes.toBytes("column4"), 1L, Bytes.toBytes("value4-1"));
190    p.addColumn(FAMILY, Bytes.toBytes("column4"), 2L, Bytes.toBytes("value4-2"));
191    p.addColumn(FAMILY, Bytes.toBytes("column4"), 3L, Bytes.toBytes("value4-3"));
192    ht.put(p);
193
194    ArrayList<Long> timestamps = new ArrayList<>();
195    timestamps.add(new Long(3));
196    TimestampsFilter filter = new TimestampsFilter(timestamps);
197
198    Get g = new Get(Bytes.toBytes("row"));
199    g.setFilter(filter);
200    g.readAllVersions();
201    g.addColumn(FAMILY, Bytes.toBytes("column2"));
202    g.addColumn(FAMILY, Bytes.toBytes("column4"));
203
204    Result result = ht.get(g);
205    for (Cell kv : result.listCells()) {
206      System.out.println("found row " + Bytes.toString(CellUtil.cloneRow(kv)) + ", column "
207        + Bytes.toString(CellUtil.cloneQualifier(kv)) + ", value "
208        + Bytes.toString(CellUtil.cloneValue(kv)));
209    }
210
211    assertEquals(2, result.listCells().size());
212    assertTrue(CellUtil.matchingValue(result.listCells().get(0), Bytes.toBytes("value2-3")));
213    assertTrue(CellUtil.matchingValue(result.listCells().get(1), Bytes.toBytes("value4-3")));
214
215    ht.close();
216  }
217
218  /**
219   * Test TimestampsFilter in the presence of version deletes.
220   */
221  @Test
222  public void testWithVersionDeletes() throws Exception {
223
224    // first test from memstore (without flushing).
225    testWithVersionDeletes(false);
226
227    // run same test against HFiles (by forcing a flush).
228    testWithVersionDeletes(true);
229  }
230
231  private void testWithVersionDeletes(boolean flushTables) throws IOException {
232    final byte[] TABLE =
233      Bytes.toBytes(name.getMethodName() + "_" + (flushTables ? "flush" : "noflush"));
234    byte[] FAMILY = Bytes.toBytes("event_log");
235    byte[][] FAMILIES = new byte[][] { FAMILY };
236
237    // create table; set versions to max...
238    Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
239
240    // For row:0, col:0: insert versions 1 through 5.
241    putNVersions(ht, FAMILY, 0, 0, 1, 5);
242
243    // delete version 4.
244    deleteOneVersion(ht, FAMILY, 0, 0, 4);
245
246    if (flushTables) {
247      TEST_UTIL.flush();
248    }
249
250    // request a bunch of versions including the deleted version. We should
251    // only get back entries for the versions that exist.
252    Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
253    assertEquals(3, kvs.length);
254    checkOneCell(kvs[0], FAMILY, 0, 0, 5);
255    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
256    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
257
258    ht.close();
259  }
260
261  private void verifyInsertedValues(Table ht, byte[] cf) throws IOException {
262    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
263      for (int colIdx = 0; colIdx < 5; colIdx++) {
264        // ask for versions that exist.
265        Cell[] kvs = getNVersions(ht, cf, rowIdx, colIdx, Arrays.asList(5L, 300L, 6L, 80L));
266        assertEquals(4, kvs.length);
267        checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
268        checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
269        checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
270        checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
271
272        // ask for versions that do not exist.
273        kvs = getNVersions(ht, cf, rowIdx, colIdx, Arrays.asList(101L, 102L));
274        assertEquals(0, kvs == null ? 0 : kvs.length);
275
276        // ask for some versions that exist and some that do not.
277        kvs = getNVersions(ht, cf, rowIdx, colIdx, Arrays.asList(1L, 300L, 105L, 70L, 115L));
278        assertEquals(3, kvs.length);
279        checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
280        checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
281        checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
282      }
283    }
284  }
285
286  /**
287   * Assert that the passed in KeyValue has expected contents for the specified row, column &
288   * timestamp.
289   */
290  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
291
292    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
293
294    assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
295      Bytes.toString(CellUtil.cloneRow(kv)));
296
297    assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
298      Bytes.toString(CellUtil.cloneFamily(kv)));
299
300    assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
301      Bytes.toString(CellUtil.cloneQualifier(kv)));
302
303    assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
304
305    assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
306      Bytes.toString(CellUtil.cloneValue(kv)));
307  }
308
309  /**
310   * Uses the TimestampFilter on a Get to request a specified list of versions for the row/column
311   * specified by rowIdx & colIdx.
312   */
313  private Cell[] getNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, List<Long> versions)
314    throws IOException {
315    byte row[] = Bytes.toBytes("row:" + rowIdx);
316    byte column[] = Bytes.toBytes("column:" + colIdx);
317    Filter filter = new TimestampsFilter(versions);
318    Get get = new Get(row);
319    get.addColumn(cf, column);
320    get.setFilter(filter);
321    get.readAllVersions();
322    Result result = ht.get(get);
323
324    return result.rawCells();
325  }
326
327  /**
328   * Uses the TimestampFilter on a Scan to request a specified list of versions for the rows from
329   * startRowIdx to endRowIdx (both inclusive).
330   */
331  private Result[] scanNVersions(Table ht, byte[] cf, int startRowIdx, int endRowIdx,
332    List<Long> versions) throws IOException {
333    byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
334    byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); // exclusive
335    Filter filter = new TimestampsFilter(versions);
336    Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow);
337    scan.setFilter(filter);
338    scan.readAllVersions();
339    ResultScanner scanner = ht.getScanner(scan);
340    return scanner.next(endRowIdx - startRowIdx + 1);
341  }
342
343  /**
344   * Insert in specific row/column versions with timestamps versionStart..versionEnd.
345   */
346  private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, long versionStart,
347    long versionEnd) throws IOException {
348    byte row[] = Bytes.toBytes("row:" + rowIdx);
349    byte column[] = Bytes.toBytes("column:" + colIdx);
350    Put put = new Put(row);
351    put.setDurability(Durability.SKIP_WAL);
352
353    for (long idx = versionStart; idx <= versionEnd; idx++) {
354      put.addColumn(cf, column, idx, Bytes.toBytes("value-version-" + idx));
355    }
356
357    ht.put(put);
358  }
359
360  /**
361   * For row/column specified by rowIdx/colIdx, delete the cell corresponding to the specified
362   * version.
363   */
364  private void deleteOneVersion(Table ht, byte[] cf, int rowIdx, int colIdx, long version)
365    throws IOException {
366    byte row[] = Bytes.toBytes("row:" + rowIdx);
367    byte column[] = Bytes.toBytes("column:" + colIdx);
368    Delete del = new Delete(row);
369    del.addColumn(cf, column, version);
370    ht.delete(del);
371  }
372
373}