001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.*;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.testclassification.ClientTests;
032import org.apache.hadoop.hbase.testclassification.LargeTests;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.junit.After;
035import org.junit.AfterClass;
036import org.junit.Before;
037import org.junit.BeforeClass;
038import org.junit.ClassRule;
039import org.junit.Rule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.junit.rules.TestName;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Run tests related to {@link org.apache.hadoop.hbase.filter.TimestampsFilter} using HBase client
048 * APIs. Sets up the HBase mini cluster once at start. Each creates a table named for the method and
049 * does its stuff against that.
050 */
051@Category({ LargeTests.class, ClientTests.class })
052public class TestMultipleTimestamps {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestMultipleTimestamps.class);
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestMultipleTimestamps.class);
059  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
060
061  @Rule
062  public TestName name = new TestName();
063
064  /**
065   * @throws java.lang.Exception
066   */
067  @BeforeClass
068  public static void setUpBeforeClass() throws Exception {
069    TEST_UTIL.startMiniCluster();
070  }
071
072  /**
073   * @throws java.lang.Exception
074   */
075  @AfterClass
076  public static void tearDownAfterClass() throws Exception {
077    TEST_UTIL.shutdownMiniCluster();
078  }
079
080  /**
081   * @throws java.lang.Exception
082   */
083  @Before
084  public void setUp() throws Exception {
085    // Nothing to do.
086  }
087
088  /**
089   * @throws java.lang.Exception
090   */
091  @After
092  public void tearDown() throws Exception {
093    // Nothing to do.
094  }
095
096  @Test
097  public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
098    final TableName tableName = TableName.valueOf(name.getMethodName());
099    byte[] FAMILY = Bytes.toBytes("event_log");
100    byte[][] FAMILIES = new byte[][] { FAMILY };
101
102    // create table; set versions to max...
103    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
104
105    Integer[] putRows = new Integer[] { 1, 3, 5, 7 };
106    Integer[] putColumns = new Integer[] { 1, 3, 5 };
107    Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L };
108
109    Integer[] scanRows = new Integer[] { 3, 5 };
110    Integer[] scanColumns = new Integer[] { 3 };
111    Long[] scanTimestamps = new Long[] { 3L, 4L };
112    int scanMaxVersions = 2;
113
114    put(ht, FAMILY, putRows, putColumns, putTimestamps);
115
116    TEST_UTIL.flush(tableName);
117
118    ResultScanner scanner =
119      scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions);
120
121    Cell[] kvs;
122
123    kvs = scanner.next().rawCells();
124    assertEquals(2, kvs.length);
125    checkOneCell(kvs[0], FAMILY, 3, 3, 4);
126    checkOneCell(kvs[1], FAMILY, 3, 3, 3);
127    kvs = scanner.next().rawCells();
128    assertEquals(2, kvs.length);
129    checkOneCell(kvs[0], FAMILY, 5, 3, 4);
130    checkOneCell(kvs[1], FAMILY, 5, 3, 3);
131
132    ht.close();
133  }
134
135  @Test
136  public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
137    LOG.info(name.getMethodName());
138    final TableName tableName = TableName.valueOf(name.getMethodName());
139    byte[] FAMILY = Bytes.toBytes("event_log");
140    byte[][] FAMILIES = new byte[][] { FAMILY };
141
142    // create table; set versions to max...
143    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
144
145    Integer[] putRows = new Integer[] { 1, 3, 5, 7 };
146    Integer[] putColumns = new Integer[] { 1, 3, 5 };
147    Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L };
148
149    Integer[] scanRows = new Integer[] { 3, 5 };
150    Integer[] scanColumns = new Integer[] { 3, 4 };
151    Long[] scanTimestamps = new Long[] { 3L };
152    int scanMaxVersions = 2;
153
154    put(ht, FAMILY, putRows, putColumns, putTimestamps);
155
156    TEST_UTIL.flush(tableName);
157
158    ResultScanner scanner =
159      scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions);
160
161    Cell[] kvs;
162
163    kvs = scanner.next().rawCells();
164    assertEquals(1, kvs.length);
165    checkOneCell(kvs[0], FAMILY, 3, 3, 3);
166    kvs = scanner.next().rawCells();
167    assertEquals(1, kvs.length);
168    checkOneCell(kvs[0], FAMILY, 5, 3, 3);
169
170    ht.close();
171  }
172
173  @Test
174  public void testReseeksWithMultipleColumnMultipleTimestamp() throws IOException {
175    LOG.info(name.getMethodName());
176
177    final TableName tableName = TableName.valueOf(name.getMethodName());
178    byte[] FAMILY = Bytes.toBytes("event_log");
179    byte[][] FAMILIES = new byte[][] { FAMILY };
180
181    // create table; set versions to max...
182    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
183
184    Integer[] putRows = new Integer[] { 1, 3, 5, 7 };
185    Integer[] putColumns = new Integer[] { 1, 3, 5 };
186    Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L };
187
188    Integer[] scanRows = new Integer[] { 5, 7 };
189    Integer[] scanColumns = new Integer[] { 3, 4, 5 };
190    Long[] scanTimestamps = new Long[] { 2L, 3L };
191    int scanMaxVersions = 2;
192
193    put(ht, FAMILY, putRows, putColumns, putTimestamps);
194
195    TEST_UTIL.flush(tableName);
196    Scan scan = new Scan();
197    scan.readVersions(10);
198    ResultScanner scanner = ht.getScanner(scan);
199    while (true) {
200      Result r = scanner.next();
201      if (r == null) break;
202      LOG.info("r=" + r);
203    }
204    scanner = scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions);
205
206    Cell[] kvs;
207
208    // This looks like wrong answer. Should be 2. Even then we are returning wrong result,
209    // timestamps that are 3 whereas should be 2 since min is inclusive.
210    kvs = scanner.next().rawCells();
211    assertEquals(4, kvs.length);
212    checkOneCell(kvs[0], FAMILY, 5, 3, 3);
213    checkOneCell(kvs[1], FAMILY, 5, 3, 2);
214    checkOneCell(kvs[2], FAMILY, 5, 5, 3);
215    checkOneCell(kvs[3], FAMILY, 5, 5, 2);
216    kvs = scanner.next().rawCells();
217    assertEquals(4, kvs.length);
218    checkOneCell(kvs[0], FAMILY, 7, 3, 3);
219    checkOneCell(kvs[1], FAMILY, 7, 3, 2);
220    checkOneCell(kvs[2], FAMILY, 7, 5, 3);
221    checkOneCell(kvs[3], FAMILY, 7, 5, 2);
222
223    ht.close();
224  }
225
226  @Test
227  public void testReseeksWithMultipleFiles() throws IOException {
228    LOG.info(name.getMethodName());
229    final TableName tableName = TableName.valueOf(name.getMethodName());
230    byte[] FAMILY = Bytes.toBytes("event_log");
231    byte[][] FAMILIES = new byte[][] { FAMILY };
232
233    // create table; set versions to max...
234    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
235
236    Integer[] putRows1 = new Integer[] { 1, 2, 3 };
237    Integer[] putColumns1 = new Integer[] { 2, 5, 6 };
238    Long[] putTimestamps1 = new Long[] { 1L, 2L, 5L };
239
240    Integer[] putRows2 = new Integer[] { 6, 7 };
241    Integer[] putColumns2 = new Integer[] { 3, 6 };
242    Long[] putTimestamps2 = new Long[] { 4L, 5L };
243
244    Integer[] putRows3 = new Integer[] { 2, 3, 5 };
245    Integer[] putColumns3 = new Integer[] { 1, 2, 3 };
246    Long[] putTimestamps3 = new Long[] { 4L, 8L };
247
248    Integer[] scanRows = new Integer[] { 3, 5, 7 };
249    Integer[] scanColumns = new Integer[] { 3, 4, 5 };
250    Long[] scanTimestamps = new Long[] { 2L, 4L };
251    int scanMaxVersions = 5;
252
253    put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
254    TEST_UTIL.flush(tableName);
255    put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
256    TEST_UTIL.flush(tableName);
257    put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
258
259    ResultScanner scanner =
260      scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions);
261
262    Cell[] kvs;
263
264    kvs = scanner.next().rawCells();
265    assertEquals(2, kvs.length);
266    checkOneCell(kvs[0], FAMILY, 3, 3, 4);
267    checkOneCell(kvs[1], FAMILY, 3, 5, 2);
268
269    kvs = scanner.next().rawCells();
270    assertEquals(1, kvs.length);
271    checkOneCell(kvs[0], FAMILY, 5, 3, 4);
272
273    kvs = scanner.next().rawCells();
274    assertEquals(1, kvs.length);
275    checkOneCell(kvs[0], FAMILY, 6, 3, 4);
276
277    kvs = scanner.next().rawCells();
278    assertEquals(1, kvs.length);
279    checkOneCell(kvs[0], FAMILY, 7, 3, 4);
280
281    ht.close();
282  }
283
284  @Test
285  public void testWithVersionDeletes() throws Exception {
286
287    // first test from memstore (without flushing).
288    testWithVersionDeletes(false);
289
290    // run same test against HFiles (by forcing a flush).
291    testWithVersionDeletes(true);
292  }
293
294  public void testWithVersionDeletes(boolean flushTables) throws IOException {
295    LOG.info(name.getMethodName() + "_" + (flushTables ? "flush" : "noflush"));
296    final TableName tableName =
297      TableName.valueOf(name.getMethodName() + "_" + (flushTables ? "flush" : "noflush"));
298    byte[] FAMILY = Bytes.toBytes("event_log");
299    byte[][] FAMILIES = new byte[][] { FAMILY };
300
301    // create table; set versions to max...
302    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
303
304    // For row:0, col:0: insert versions 1 through 5.
305    putNVersions(ht, FAMILY, 0, 0, 1, 5);
306
307    if (flushTables) {
308      TEST_UTIL.flush(tableName);
309    }
310
311    // delete version 4.
312    deleteOneVersion(ht, FAMILY, 0, 0, 4);
313
314    // request a bunch of versions including the deleted version. We should
315    // only get back entries for the versions that exist.
316    Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
317    assertEquals(3, kvs.length);
318    checkOneCell(kvs[0], FAMILY, 0, 0, 5);
319    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
320    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
321
322    ht.close();
323  }
324
325  @Test
326  public void testWithMultipleVersionDeletes() throws IOException {
327    LOG.info(name.getMethodName());
328
329    final TableName tableName = TableName.valueOf(name.getMethodName());
330    byte[] FAMILY = Bytes.toBytes("event_log");
331    byte[][] FAMILIES = new byte[][] { FAMILY };
332
333    // create table; set versions to max...
334    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
335
336    // For row:0, col:0: insert versions 1 through 5.
337    putNVersions(ht, FAMILY, 0, 0, 1, 5);
338
339    TEST_UTIL.flush(tableName);
340
341    // delete all versions before 4.
342    deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
343
344    // request a bunch of versions including the deleted version. We should
345    // only get back entries for the versions that exist.
346    Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
347    assertEquals(0, kvs.length);
348
349    ht.close();
350  }
351
352  @Test
353  public void testWithColumnDeletes() throws IOException {
354    final TableName tableName = TableName.valueOf(name.getMethodName());
355    byte[] FAMILY = Bytes.toBytes("event_log");
356    byte[][] FAMILIES = new byte[][] { FAMILY };
357
358    // create table; set versions to max...
359    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
360
361    // For row:0, col:0: insert versions 1 through 5.
362    putNVersions(ht, FAMILY, 0, 0, 1, 5);
363
364    TEST_UTIL.flush(tableName);
365
366    // delete all versions before 4.
367    deleteColumn(ht, FAMILY, 0, 0);
368
369    // request a bunch of versions including the deleted version. We should
370    // only get back entries for the versions that exist.
371    Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
372    assertEquals(0, kvs.length);
373
374    ht.close();
375  }
376
377  @Test
378  public void testWithFamilyDeletes() throws IOException {
379    final TableName tableName = TableName.valueOf(name.getMethodName());
380    byte[] FAMILY = Bytes.toBytes("event_log");
381    byte[][] FAMILIES = new byte[][] { FAMILY };
382
383    // create table; set versions to max...
384    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
385
386    // For row:0, col:0: insert versions 1 through 5.
387    putNVersions(ht, FAMILY, 0, 0, 1, 5);
388
389    TEST_UTIL.flush(tableName);
390
391    // delete all versions before 4.
392    deleteFamily(ht, FAMILY, 0);
393
394    // request a bunch of versions including the deleted version. We should
395    // only get back entries for the versions that exist.
396    Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
397    assertEquals(0, kvs.length);
398
399    ht.close();
400  }
401
402  /**
403   * Assert that the passed in KeyValue has expected contents for the specified row, column &
404   * timestamp.
405   */
406  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
407
408    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
409
410    assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
411      Bytes.toString(CellUtil.cloneRow(kv)));
412
413    assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
414      Bytes.toString(CellUtil.cloneFamily(kv)));
415
416    assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
417      Bytes.toString(CellUtil.cloneQualifier(kv)));
418
419    assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
420
421    assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
422      Bytes.toString(CellUtil.cloneValue(kv)));
423  }
424
425  /**
426   * Uses the TimestampFilter on a Get to request a specified list of versions for the row/column
427   * specified by rowIdx & colIdx.
428   */
429  private Cell[] getNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, List<Long> versions)
430    throws IOException {
431    byte row[] = Bytes.toBytes("row:" + rowIdx);
432    byte column[] = Bytes.toBytes("column:" + colIdx);
433    Get get = new Get(row);
434    get.addColumn(cf, column);
435    get.readAllVersions();
436    get.setTimeRange(Collections.min(versions), Collections.max(versions) + 1);
437    Result result = ht.get(get);
438
439    return result.rawCells();
440  }
441
442  private ResultScanner scan(Table ht, byte[] cf, Integer[] rowIndexes, Integer[] columnIndexes,
443    Long[] versions, int maxVersions) throws IOException {
444    byte startRow[] = Bytes.toBytes("row:" + Collections.min(Arrays.asList(rowIndexes)));
445    byte endRow[] = Bytes.toBytes("row:" + Collections.max(Arrays.asList(rowIndexes)) + 1);
446    Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow);
447    for (Integer colIdx : columnIndexes) {
448      byte column[] = Bytes.toBytes("column:" + colIdx);
449      scan.addColumn(cf, column);
450    }
451    scan.readVersions(maxVersions);
452    scan.setTimeRange(Collections.min(Arrays.asList(versions)),
453      Collections.max(Arrays.asList(versions)) + 1);
454    ResultScanner scanner = ht.getScanner(scan);
455    return scanner;
456  }
457
458  private void put(Table ht, byte[] cf, Integer[] rowIndexes, Integer[] columnIndexes,
459    Long[] versions) throws IOException {
460    for (int rowIdx : rowIndexes) {
461      byte row[] = Bytes.toBytes("row:" + rowIdx);
462      Put put = new Put(row);
463      put.setDurability(Durability.SKIP_WAL);
464      for (int colIdx : columnIndexes) {
465        byte column[] = Bytes.toBytes("column:" + colIdx);
466        for (long version : versions) {
467          put.addColumn(cf, column, version, Bytes.toBytes("value-version-" + version));
468        }
469      }
470      ht.put(put);
471    }
472  }
473
474  /**
475   * Insert in specific row/column versions with timestamps versionStart..versionEnd.
476   */
477  private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, long versionStart,
478    long versionEnd) throws IOException {
479    byte row[] = Bytes.toBytes("row:" + rowIdx);
480    byte column[] = Bytes.toBytes("column:" + colIdx);
481    Put put = new Put(row);
482    put.setDurability(Durability.SKIP_WAL);
483
484    for (long idx = versionStart; idx <= versionEnd; idx++) {
485      put.addColumn(cf, column, idx, Bytes.toBytes("value-version-" + idx));
486    }
487
488    ht.put(put);
489  }
490
491  /**
492   * For row/column specified by rowIdx/colIdx, delete the cell corresponding to the specified
493   * version.
494   */
495  private void deleteOneVersion(Table ht, byte[] cf, int rowIdx, int colIdx, long version)
496    throws IOException {
497    byte row[] = Bytes.toBytes("row:" + rowIdx);
498    byte column[] = Bytes.toBytes("column:" + colIdx);
499    Delete del = new Delete(row);
500    del.addColumn(cf, column, version);
501    ht.delete(del);
502  }
503
504  /**
505   * For row/column specified by rowIdx/colIdx, delete all cells preceeding the specified version.
506   */
507  private void deleteAllVersionsBefore(Table ht, byte[] cf, int rowIdx, int colIdx, long version)
508    throws IOException {
509    byte row[] = Bytes.toBytes("row:" + rowIdx);
510    byte column[] = Bytes.toBytes("column:" + colIdx);
511    Delete del = new Delete(row);
512    del.addColumns(cf, column, version);
513    ht.delete(del);
514  }
515
516  private void deleteColumn(Table ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
517    byte row[] = Bytes.toBytes("row:" + rowIdx);
518    byte column[] = Bytes.toBytes("column:" + colIdx);
519    Delete del = new Delete(row);
520    del.addColumns(cf, column);
521    ht.delete(del);
522  }
523
524  private void deleteFamily(Table ht, byte[] cf, int rowIdx) throws IOException {
525    byte row[] = Bytes.toBytes("row:" + rowIdx);
526    Delete del = new Delete(row);
527    del.addFamily(cf);
528    ht.delete(del);
529  }
530
531}