001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.*;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.testclassification.ClientTests;
032import org.apache.hadoop.hbase.testclassification.LargeTests;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.junit.After;
035import org.junit.AfterClass;
036import org.junit.Before;
037import org.junit.BeforeClass;
038import org.junit.ClassRule;
039import org.junit.Rule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.junit.rules.TestName;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Run tests related to {@link org.apache.hadoop.hbase.filter.TimestampsFilter} using HBase client APIs.
048 * Sets up the HBase mini cluster once at start. Each creates a table
049 * named for the method and does its stuff against that.
050 */
051@Category({LargeTests.class, ClientTests.class})
052public class TestMultipleTimestamps {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestMultipleTimestamps.class);
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestMultipleTimestamps.class);
059  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
060
061  @Rule
062  public TestName name = new TestName();
063
064  /**
065   * @throws java.lang.Exception
066   */
067  @BeforeClass
068  public static void setUpBeforeClass() throws Exception {
069    TEST_UTIL.startMiniCluster();
070  }
071
072  /**
073   * @throws java.lang.Exception
074   */
075  @AfterClass
076  public static void tearDownAfterClass() throws Exception {
077    TEST_UTIL.shutdownMiniCluster();
078  }
079
080  /**
081   * @throws java.lang.Exception
082   */
083  @Before
084  public void setUp() throws Exception {
085    // Nothing to do.
086  }
087
088  /**
089   * @throws java.lang.Exception
090   */
091  @After
092  public void tearDown() throws Exception {
093    // Nothing to do.
094  }
095
096  @Test
097  public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
098    final TableName tableName = TableName.valueOf(name.getMethodName());
099    byte [] FAMILY = Bytes.toBytes("event_log");
100    byte [][] FAMILIES = new byte[][] { FAMILY };
101
102    // create table; set versions to max...
103    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
104
105    Integer[] putRows = new Integer[] {1, 3, 5, 7};
106    Integer[] putColumns = new Integer[] { 1, 3, 5};
107    Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
108
109    Integer[] scanRows = new Integer[] {3, 5};
110    Integer[] scanColumns = new Integer[] {3};
111    Long[] scanTimestamps = new Long[] {3L, 4L};
112    int scanMaxVersions = 2;
113
114    put(ht, FAMILY, putRows, putColumns, putTimestamps);
115
116    TEST_UTIL.flush(tableName);
117
118    ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
119        scanTimestamps, scanMaxVersions);
120
121    Cell[] kvs;
122
123    kvs = scanner.next().rawCells();
124    assertEquals(2, kvs.length);
125    checkOneCell(kvs[0], FAMILY, 3, 3, 4);
126    checkOneCell(kvs[1], FAMILY, 3, 3, 3);
127    kvs = scanner.next().rawCells();
128    assertEquals(2, kvs.length);
129    checkOneCell(kvs[0], FAMILY, 5, 3, 4);
130    checkOneCell(kvs[1], FAMILY, 5, 3, 3);
131
132    ht.close();
133  }
134
135  @Test
136  public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
137    LOG.info(name.getMethodName());
138    final TableName tableName = TableName.valueOf(name.getMethodName());
139    byte [] FAMILY = Bytes.toBytes("event_log");
140    byte [][] FAMILIES = new byte[][] { FAMILY };
141
142    // create table; set versions to max...
143    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
144
145    Integer[] putRows = new Integer[] {1, 3, 5, 7};
146    Integer[] putColumns = new Integer[] { 1, 3, 5};
147    Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
148
149    Integer[] scanRows = new Integer[] {3, 5};
150    Integer[] scanColumns = new Integer[] {3,4};
151    Long[] scanTimestamps = new Long[] {3L};
152    int scanMaxVersions = 2;
153
154    put(ht, FAMILY, putRows, putColumns, putTimestamps);
155
156    TEST_UTIL.flush(tableName);
157
158    ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
159        scanTimestamps, scanMaxVersions);
160
161    Cell[] kvs;
162
163    kvs = scanner.next().rawCells();
164    assertEquals(1, kvs.length);
165    checkOneCell(kvs[0], FAMILY, 3, 3, 3);
166    kvs = scanner.next().rawCells();
167    assertEquals(1, kvs.length);
168    checkOneCell(kvs[0], FAMILY, 5, 3, 3);
169
170    ht.close();
171  }
172
173  @Test
174  public void testReseeksWithMultipleColumnMultipleTimestamp() throws
175  IOException {
176    LOG.info(name.getMethodName());
177
178    final TableName tableName = TableName.valueOf(name.getMethodName());
179    byte [] FAMILY = Bytes.toBytes("event_log");
180    byte [][] FAMILIES = new byte[][] { FAMILY };
181
182    // create table; set versions to max...
183    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
184
185    Integer[] putRows = new Integer[] {1, 3, 5, 7};
186    Integer[] putColumns = new Integer[] { 1, 3, 5};
187    Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
188
189    Integer[] scanRows = new Integer[] {5, 7};
190    Integer[] scanColumns = new Integer[] {3, 4, 5};
191    Long[] scanTimestamps = new Long[] { 2L, 3L};
192    int scanMaxVersions = 2;
193
194    put(ht, FAMILY, putRows, putColumns, putTimestamps);
195
196    TEST_UTIL.flush(tableName);
197    Scan scan = new Scan();
198    scan.readVersions(10);
199    ResultScanner scanner = ht.getScanner(scan);
200    while (true) {
201      Result r = scanner.next();
202      if (r == null) break;
203      LOG.info("r=" + r);
204    }
205    scanner = scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions);
206
207    Cell[] kvs;
208
209    // This looks like wrong answer.  Should be 2.  Even then we are returning wrong result,
210    // timestamps that are 3 whereas should be 2 since min is inclusive.
211    kvs = scanner.next().rawCells();
212    assertEquals(4, kvs.length);
213    checkOneCell(kvs[0], FAMILY, 5, 3, 3);
214    checkOneCell(kvs[1], FAMILY, 5, 3, 2);
215    checkOneCell(kvs[2], FAMILY, 5, 5, 3);
216    checkOneCell(kvs[3], FAMILY, 5, 5, 2);
217    kvs = scanner.next().rawCells();
218    assertEquals(4, kvs.length);
219    checkOneCell(kvs[0], FAMILY, 7, 3, 3);
220    checkOneCell(kvs[1], FAMILY, 7, 3, 2);
221    checkOneCell(kvs[2], FAMILY, 7, 5, 3);
222    checkOneCell(kvs[3], FAMILY, 7, 5, 2);
223
224    ht.close();
225  }
226
227  @Test
228  public void testReseeksWithMultipleFiles() throws IOException {
229    LOG.info(name.getMethodName());
230    final TableName tableName = TableName.valueOf(name.getMethodName());
231    byte [] FAMILY = Bytes.toBytes("event_log");
232    byte [][] FAMILIES = new byte[][] { FAMILY };
233
234    // create table; set versions to max...
235    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
236
237    Integer[] putRows1 = new Integer[] {1, 2, 3};
238    Integer[] putColumns1 = new Integer[] { 2, 5, 6};
239    Long[] putTimestamps1 = new Long[] {1L, 2L, 5L};
240
241    Integer[] putRows2 = new Integer[] {6, 7};
242    Integer[] putColumns2 = new Integer[] {3, 6};
243    Long[] putTimestamps2 = new Long[] {4L, 5L};
244
245    Integer[] putRows3 = new Integer[] {2, 3, 5};
246    Integer[] putColumns3 = new Integer[] {1, 2, 3};
247    Long[] putTimestamps3 = new Long[] {4L,8L};
248
249
250    Integer[] scanRows = new Integer[] {3, 5, 7};
251    Integer[] scanColumns = new Integer[] {3, 4, 5};
252    Long[] scanTimestamps = new Long[] { 2L, 4L};
253    int scanMaxVersions = 5;
254
255    put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
256    TEST_UTIL.flush(tableName);
257    put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
258    TEST_UTIL.flush(tableName);
259    put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
260
261    ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
262        scanTimestamps, scanMaxVersions);
263
264    Cell[] kvs;
265
266    kvs = scanner.next().rawCells();
267    assertEquals(2, kvs.length);
268    checkOneCell(kvs[0], FAMILY, 3, 3, 4);
269    checkOneCell(kvs[1], FAMILY, 3, 5, 2);
270
271    kvs = scanner.next().rawCells();
272    assertEquals(1, kvs.length);
273    checkOneCell(kvs[0], FAMILY, 5, 3, 4);
274
275    kvs = scanner.next().rawCells();
276    assertEquals(1, kvs.length);
277    checkOneCell(kvs[0], FAMILY, 6, 3, 4);
278
279    kvs = scanner.next().rawCells();
280    assertEquals(1, kvs.length);
281    checkOneCell(kvs[0], FAMILY, 7, 3, 4);
282
283    ht.close();
284  }
285
286  @Test
287  public void testWithVersionDeletes() throws Exception {
288
289    // first test from memstore (without flushing).
290    testWithVersionDeletes(false);
291
292    // run same test against HFiles (by forcing a flush).
293    testWithVersionDeletes(true);
294  }
295
296  public void testWithVersionDeletes(boolean flushTables) throws IOException {
297    LOG.info(name.getMethodName() + "_"+ (flushTables ? "flush" : "noflush"));
298    final TableName tableName = TableName.valueOf(name.getMethodName() + "_" + (flushTables ?
299            "flush" : "noflush"));
300    byte [] FAMILY = Bytes.toBytes("event_log");
301    byte [][] FAMILIES = new byte[][] { FAMILY };
302
303    // create table; set versions to max...
304    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
305
306    // For row:0, col:0: insert versions 1 through 5.
307    putNVersions(ht, FAMILY, 0, 0, 1, 5);
308
309    if (flushTables) {
310      TEST_UTIL.flush(tableName);
311    }
312
313    // delete version 4.
314    deleteOneVersion(ht, FAMILY, 0, 0, 4);
315
316    // request a bunch of versions including the deleted version. We should
317    // only get back entries for the versions that exist.
318    Cell kvs[] = getNVersions(ht, FAMILY, 0, 0,
319        Arrays.asList(2L, 3L, 4L, 5L));
320    assertEquals(3, kvs.length);
321    checkOneCell(kvs[0], FAMILY, 0, 0, 5);
322    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
323    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
324
325    ht.close();
326  }
327
328  @Test
329  public void testWithMultipleVersionDeletes() throws IOException {
330    LOG.info(name.getMethodName());
331
332    final TableName tableName = TableName.valueOf(name.getMethodName());
333    byte [] FAMILY = Bytes.toBytes("event_log");
334    byte [][] FAMILIES = new byte[][] { FAMILY };
335
336    // create table; set versions to max...
337    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
338
339    // For row:0, col:0: insert versions 1 through 5.
340    putNVersions(ht, FAMILY, 0, 0, 1, 5);
341
342    TEST_UTIL.flush(tableName);
343
344    // delete all versions before 4.
345    deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
346
347    // request a bunch of versions including the deleted version. We should
348    // only get back entries for the versions that exist.
349    Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
350    assertEquals(0, kvs.length);
351
352    ht.close();
353  }
354
355  @Test
356  public void testWithColumnDeletes() throws IOException {
357    final TableName tableName = TableName.valueOf(name.getMethodName());
358    byte [] FAMILY = Bytes.toBytes("event_log");
359    byte [][] FAMILIES = new byte[][] { FAMILY };
360
361    // create table; set versions to max...
362    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
363
364    // For row:0, col:0: insert versions 1 through 5.
365    putNVersions(ht, FAMILY, 0, 0, 1, 5);
366
367    TEST_UTIL.flush(tableName);
368
369    // delete all versions before 4.
370    deleteColumn(ht, FAMILY, 0, 0);
371
372    // request a bunch of versions including the deleted version. We should
373    // only get back entries for the versions that exist.
374    Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
375    assertEquals(0, kvs.length);
376
377    ht.close();
378  }
379
380  @Test
381  public void testWithFamilyDeletes() throws IOException {
382    final TableName tableName = TableName.valueOf(name.getMethodName());
383    byte [] FAMILY = Bytes.toBytes("event_log");
384    byte [][] FAMILIES = new byte[][] { FAMILY };
385
386    // create table; set versions to max...
387    Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE);
388
389    // For row:0, col:0: insert versions 1 through 5.
390    putNVersions(ht, FAMILY, 0, 0, 1, 5);
391
392    TEST_UTIL.flush(tableName);
393
394    // delete all versions before 4.
395    deleteFamily(ht, FAMILY, 0);
396
397    // request a bunch of versions including the deleted version. We should
398    // only get back entries for the versions that exist.
399    Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
400    assertEquals(0, kvs.length);
401
402    ht.close();
403  }
404
405  /**
406   * Assert that the passed in KeyValue has expected contents for the
407   * specified row, column & timestamp.
408   */
409  private void checkOneCell(Cell kv, byte[] cf,
410      int rowIdx, int colIdx, long ts) {
411
412    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
413
414    assertEquals("Row mismatch which checking: " + ctx,
415        "row:"+ rowIdx, Bytes.toString(CellUtil.cloneRow(kv)));
416
417    assertEquals("ColumnFamily mismatch while checking: " + ctx,
418        Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv)));
419
420    assertEquals("Column qualifier mismatch while checking: " + ctx,
421        "column:" + colIdx,
422        Bytes.toString(CellUtil.cloneQualifier(kv)));
423
424    assertEquals("Timestamp mismatch while checking: " + ctx,
425        ts, kv.getTimestamp());
426
427    assertEquals("Value mismatch while checking: " + ctx,
428        "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv)));
429  }
430
431  /**
432   * Uses the TimestampFilter on a Get to request a specified list of
433   * versions for the row/column specified by rowIdx & colIdx.
434   *
435   */
436  private  Cell[] getNVersions(Table ht, byte[] cf, int rowIdx,
437      int colIdx, List<Long> versions)
438  throws IOException {
439    byte row[] = Bytes.toBytes("row:" + rowIdx);
440    byte column[] = Bytes.toBytes("column:" + colIdx);
441    Get get = new Get(row);
442    get.addColumn(cf, column);
443    get.readAllVersions();
444    get.setTimeRange(Collections.min(versions), Collections.max(versions)+1);
445    Result result = ht.get(get);
446
447    return result.rawCells();
448  }
449
450  private  ResultScanner scan(Table ht, byte[] cf,
451      Integer[] rowIndexes, Integer[] columnIndexes,
452      Long[] versions, int maxVersions)
453  throws IOException {
454    byte startRow[] = Bytes.toBytes("row:" +
455        Collections.min( Arrays.asList(rowIndexes)));
456    byte endRow[] = Bytes.toBytes("row:" +
457        Collections.max( Arrays.asList(rowIndexes))+1);
458    Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow);
459    for (Integer colIdx: columnIndexes) {
460      byte column[] = Bytes.toBytes("column:" + colIdx);
461      scan.addColumn(cf, column);
462    }
463    scan.readVersions(maxVersions);
464    scan.setTimeRange(Collections.min(Arrays.asList(versions)),
465        Collections.max(Arrays.asList(versions))+1);
466    ResultScanner scanner = ht.getScanner(scan);
467    return scanner;
468  }
469
470  private void put(Table ht, byte[] cf, Integer[] rowIndexes,
471      Integer[] columnIndexes, Long[] versions)
472  throws IOException {
473    for (int rowIdx: rowIndexes) {
474      byte row[] = Bytes.toBytes("row:" + rowIdx);
475      Put put = new Put(row);
476      put.setDurability(Durability.SKIP_WAL);
477      for(int colIdx: columnIndexes) {
478        byte column[] = Bytes.toBytes("column:" + colIdx);
479        for (long version: versions) {
480          put.addColumn(cf, column, version, Bytes.toBytes("value-version-" +
481                  version));
482        }
483      }
484      ht.put(put);
485    }
486  }
487
488  /**
489   * Insert in specific row/column versions with timestamps
490   * versionStart..versionEnd.
491   */
492  private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx,
493      long versionStart, long versionEnd)
494  throws IOException {
495    byte row[] = Bytes.toBytes("row:" + rowIdx);
496    byte column[] = Bytes.toBytes("column:" + colIdx);
497    Put put = new Put(row);
498    put.setDurability(Durability.SKIP_WAL);
499
500    for (long idx = versionStart; idx <= versionEnd; idx++) {
501      put.addColumn(cf, column, idx, Bytes.toBytes("value-version-" + idx));
502    }
503
504    ht.put(put);
505  }
506
507  /**
508   * For row/column specified by rowIdx/colIdx, delete the cell
509   * corresponding to the specified version.
510   */
511  private void deleteOneVersion(Table ht, byte[] cf, int rowIdx,
512      int colIdx, long version)
513  throws IOException {
514    byte row[] = Bytes.toBytes("row:" + rowIdx);
515    byte column[] = Bytes.toBytes("column:" + colIdx);
516    Delete del = new Delete(row);
517    del.addColumn(cf, column, version);
518    ht.delete(del);
519  }
520
521  /**
522   * For row/column specified by rowIdx/colIdx, delete all cells
523   * preceeding the specified version.
524   */
525  private void deleteAllVersionsBefore(Table ht, byte[] cf, int rowIdx,
526      int colIdx, long version)
527  throws IOException {
528    byte row[] = Bytes.toBytes("row:" + rowIdx);
529    byte column[] = Bytes.toBytes("column:" + colIdx);
530    Delete del = new Delete(row);
531    del.addColumns(cf, column, version);
532    ht.delete(del);
533  }
534
535  private void deleteColumn(Table ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
536    byte row[] = Bytes.toBytes("row:" + rowIdx);
537    byte column[] = Bytes.toBytes("column:" + colIdx);
538    Delete del = new Delete(row);
539    del.addColumns(cf, column);
540    ht.delete(del);
541  }
542
543  private void deleteFamily(Table ht, byte[] cf, int rowIdx) throws IOException {
544    byte row[] = Bytes.toBytes("row:" + rowIdx);
545    Delete del = new Delete(row);
546    del.addFamily(cf);
547    ht.delete(del);
548  }
549
550}
551
552