001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.testclassification.ClientTests;
031import org.apache.hadoop.hbase.testclassification.LargeTests;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.junit.jupiter.api.AfterAll;
034import org.junit.jupiter.api.BeforeAll;
035import org.junit.jupiter.api.BeforeEach;
036import org.junit.jupiter.api.Tag;
037import org.junit.jupiter.api.Test;
038import org.junit.jupiter.api.TestInfo;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Run tests related to {@link org.apache.hadoop.hbase.filter.TimestampsFilter} using HBase client
044 * APIs. Sets up the HBase mini cluster once at start. Each creates a table named for the method and
045 * does its stuff against that.
046 */
047@Tag(LargeTests.TAG)
048@Tag(ClientTests.TAG)
049public class TestMultipleTimestamps {
050
051  private static final Logger LOG = LoggerFactory.getLogger(TestMultipleTimestamps.class);
052  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
053
054  private String methodName;
055
056  @BeforeEach
057  public void setUp(TestInfo testInfo) {
058    methodName = testInfo.getTestMethod().get().getName();
059  }
060
061  @BeforeAll
062  public static void setUpBeforeClass() throws Exception {
063    TEST_UTIL.startMiniCluster();
064  }
065
066  @AfterAll
067  public static void tearDownAfterClass() throws Exception {
068    TEST_UTIL.shutdownMiniCluster();
069  }
070
071  @Test
072  public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
073    final TableName tableName = TableName.valueOf(methodName);
074    byte[] FAMILY = Bytes.toBytes("event_log");
075    byte[][] FAMILIES = new byte[][] { FAMILY };
076
077    // create table; set versions to max...
078    try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) {
079      Integer[] putRows = new Integer[] { 1, 3, 5, 7 };
080      Integer[] putColumns = new Integer[] { 1, 3, 5 };
081      Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L };
082
083      Integer[] scanRows = new Integer[] { 3, 5 };
084      Integer[] scanColumns = new Integer[] { 3 };
085      Long[] scanTimestamps = new Long[] { 3L, 4L };
086      int scanMaxVersions = 2;
087
088      put(ht, FAMILY, putRows, putColumns, putTimestamps);
089
090      TEST_UTIL.flush(tableName);
091
092      ResultScanner scanner =
093        scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions);
094
095      Cell[] kvs;
096
097      kvs = scanner.next().rawCells();
098      assertEquals(2, kvs.length);
099      checkOneCell(kvs[0], FAMILY, 3, 3, 4);
100      checkOneCell(kvs[1], FAMILY, 3, 3, 3);
101      kvs = scanner.next().rawCells();
102      assertEquals(2, kvs.length);
103      checkOneCell(kvs[0], FAMILY, 5, 3, 4);
104      checkOneCell(kvs[1], FAMILY, 5, 3, 3);
105    }
106  }
107
108  @Test
109  public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
110    LOG.info(methodName);
111    final TableName tableName = TableName.valueOf(methodName);
112    byte[] FAMILY = Bytes.toBytes("event_log");
113    byte[][] FAMILIES = new byte[][] { FAMILY };
114
115    // create table; set versions to max...
116    try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) {
117      Integer[] putRows = new Integer[] { 1, 3, 5, 7 };
118      Integer[] putColumns = new Integer[] { 1, 3, 5 };
119      Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L };
120
121      Integer[] scanRows = new Integer[] { 3, 5 };
122      Integer[] scanColumns = new Integer[] { 3, 4 };
123      Long[] scanTimestamps = new Long[] { 3L };
124      int scanMaxVersions = 2;
125
126      put(ht, FAMILY, putRows, putColumns, putTimestamps);
127
128      TEST_UTIL.flush(tableName);
129
130      ResultScanner scanner =
131        scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions);
132
133      Cell[] kvs;
134
135      kvs = scanner.next().rawCells();
136      assertEquals(1, kvs.length);
137      checkOneCell(kvs[0], FAMILY, 3, 3, 3);
138      kvs = scanner.next().rawCells();
139      assertEquals(1, kvs.length);
140      checkOneCell(kvs[0], FAMILY, 5, 3, 3);
141    }
142  }
143
144  @Test
145  public void testReseeksWithMultipleColumnMultipleTimestamp() throws IOException {
146    LOG.info(methodName);
147
148    final TableName tableName = TableName.valueOf(methodName);
149    byte[] FAMILY = Bytes.toBytes("event_log");
150    byte[][] FAMILIES = new byte[][] { FAMILY };
151
152    // create table; set versions to max...
153    try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) {
154      Integer[] putRows = new Integer[] { 1, 3, 5, 7 };
155      Integer[] putColumns = new Integer[] { 1, 3, 5 };
156      Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L };
157
158      Integer[] scanRows = new Integer[] { 5, 7 };
159      Integer[] scanColumns = new Integer[] { 3, 4, 5 };
160      Long[] scanTimestamps = new Long[] { 2L, 3L };
161      int scanMaxVersions = 2;
162
163      put(ht, FAMILY, putRows, putColumns, putTimestamps);
164
165      TEST_UTIL.flush(tableName);
166      Scan scan = new Scan();
167      scan.readVersions(10);
168      try (ResultScanner scanner = ht.getScanner(scan)) {
169        while (true) {
170          Result r = scanner.next();
171          if (r == null) {
172            break;
173          }
174          LOG.info("r=" + r);
175        }
176      }
177      try (ResultScanner scanner =
178        scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions)) {
179        Cell[] kvs;
180
181        // This looks like wrong answer. Should be 2. Even then we are returning wrong result,
182        // timestamps that are 3 whereas should be 2 since min is inclusive.
183        kvs = scanner.next().rawCells();
184        assertEquals(4, kvs.length);
185        checkOneCell(kvs[0], FAMILY, 5, 3, 3);
186        checkOneCell(kvs[1], FAMILY, 5, 3, 2);
187        checkOneCell(kvs[2], FAMILY, 5, 5, 3);
188        checkOneCell(kvs[3], FAMILY, 5, 5, 2);
189        kvs = scanner.next().rawCells();
190        assertEquals(4, kvs.length);
191        checkOneCell(kvs[0], FAMILY, 7, 3, 3);
192        checkOneCell(kvs[1], FAMILY, 7, 3, 2);
193        checkOneCell(kvs[2], FAMILY, 7, 5, 3);
194        checkOneCell(kvs[3], FAMILY, 7, 5, 2);
195      }
196    }
197  }
198
199  @Test
200  public void testReseeksWithMultipleFiles() throws IOException {
201    LOG.info(methodName);
202    final TableName tableName = TableName.valueOf(methodName);
203    byte[] FAMILY = Bytes.toBytes("event_log");
204    byte[][] FAMILIES = new byte[][] { FAMILY };
205
206    // create table; set versions to max...
207    try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) {
208
209      Integer[] putRows1 = new Integer[] { 1, 2, 3 };
210      Integer[] putColumns1 = new Integer[] { 2, 5, 6 };
211      Long[] putTimestamps1 = new Long[] { 1L, 2L, 5L };
212
213      Integer[] putRows2 = new Integer[] { 6, 7 };
214      Integer[] putColumns2 = new Integer[] { 3, 6 };
215      Long[] putTimestamps2 = new Long[] { 4L, 5L };
216
217      Integer[] putRows3 = new Integer[] { 2, 3, 5 };
218      Integer[] putColumns3 = new Integer[] { 1, 2, 3 };
219      Long[] putTimestamps3 = new Long[] { 4L, 8L };
220
221      Integer[] scanRows = new Integer[] { 3, 5, 7 };
222      Integer[] scanColumns = new Integer[] { 3, 4, 5 };
223      Long[] scanTimestamps = new Long[] { 2L, 4L };
224      int scanMaxVersions = 5;
225
226      put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
227      TEST_UTIL.flush(tableName);
228      put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
229      TEST_UTIL.flush(tableName);
230      put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
231
232      ResultScanner scanner =
233        scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions);
234
235      Cell[] kvs;
236
237      kvs = scanner.next().rawCells();
238      assertEquals(2, kvs.length);
239      checkOneCell(kvs[0], FAMILY, 3, 3, 4);
240      checkOneCell(kvs[1], FAMILY, 3, 5, 2);
241
242      kvs = scanner.next().rawCells();
243      assertEquals(1, kvs.length);
244      checkOneCell(kvs[0], FAMILY, 5, 3, 4);
245
246      kvs = scanner.next().rawCells();
247      assertEquals(1, kvs.length);
248      checkOneCell(kvs[0], FAMILY, 6, 3, 4);
249
250      kvs = scanner.next().rawCells();
251      assertEquals(1, kvs.length);
252      checkOneCell(kvs[0], FAMILY, 7, 3, 4);
253    }
254  }
255
256  @Test
257  public void testWithVersionDeletes() throws Exception {
258    // first test from memstore (without flushing).
259    testWithVersionDeletes(false);
260
261    // run same test against HFiles (by forcing a flush).
262    testWithVersionDeletes(true);
263  }
264
265  public void testWithVersionDeletes(boolean flushTables) throws IOException {
266    LOG.info(methodName + "_" + (flushTables ? "flush" : "noflush"));
267    final TableName tableName =
268      TableName.valueOf(methodName + "_" + (flushTables ? "flush" : "noflush"));
269    byte[] FAMILY = Bytes.toBytes("event_log");
270    byte[][] FAMILIES = new byte[][] { FAMILY };
271
272    // create table; set versions to max...
273    try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) {
274      // For row:0, col:0: insert versions 1 through 5.
275      putNVersions(ht, FAMILY, 0, 0, 1, 5);
276
277      if (flushTables) {
278        TEST_UTIL.flush(tableName);
279      }
280
281      // delete version 4.
282      deleteOneVersion(ht, FAMILY, 0, 0, 4);
283
284      // request a bunch of versions including the deleted version. We should
285      // only get back entries for the versions that exist.
286      Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
287      assertEquals(3, kvs.length);
288      checkOneCell(kvs[0], FAMILY, 0, 0, 5);
289      checkOneCell(kvs[1], FAMILY, 0, 0, 3);
290      checkOneCell(kvs[2], FAMILY, 0, 0, 2);
291    }
292  }
293
294  @Test
295  public void testWithMultipleVersionDeletes() throws IOException {
296    LOG.info(methodName);
297
298    final TableName tableName = TableName.valueOf(methodName);
299    byte[] FAMILY = Bytes.toBytes("event_log");
300    byte[][] FAMILIES = new byte[][] { FAMILY };
301
302    // create table; set versions to max...
303    try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) {
304      // For row:0, col:0: insert versions 1 through 5.
305      putNVersions(ht, FAMILY, 0, 0, 1, 5);
306
307      TEST_UTIL.flush(tableName);
308
309      // delete all versions before 4.
310      deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
311
312      // request a bunch of versions including the deleted version. We should
313      // only get back entries for the versions that exist.
314      Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
315      assertEquals(0, kvs.length);
316    }
317  }
318
319  @Test
320  public void testWithColumnDeletes() throws IOException {
321    final TableName tableName = TableName.valueOf(methodName);
322    byte[] FAMILY = Bytes.toBytes("event_log");
323    byte[][] FAMILIES = new byte[][] { FAMILY };
324
325    // create table; set versions to max...
326    try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) {
327      // For row:0, col:0: insert versions 1 through 5.
328      putNVersions(ht, FAMILY, 0, 0, 1, 5);
329
330      TEST_UTIL.flush(tableName);
331
332      // delete all versions before 4.
333      deleteColumn(ht, FAMILY, 0, 0);
334
335      // request a bunch of versions including the deleted version. We should
336      // only get back entries for the versions that exist.
337      Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
338      assertEquals(0, kvs.length);
339    }
340  }
341
342  @Test
343  public void testWithFamilyDeletes() throws IOException {
344    final TableName tableName = TableName.valueOf(methodName);
345    byte[] FAMILY = Bytes.toBytes("event_log");
346    byte[][] FAMILIES = new byte[][] { FAMILY };
347
348    // create table; set versions to max...
349    try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) {
350      // For row:0, col:0: insert versions 1 through 5.
351      putNVersions(ht, FAMILY, 0, 0, 1, 5);
352
353      TEST_UTIL.flush(tableName);
354
355      // delete all versions before 4.
356      deleteFamily(ht, FAMILY, 0);
357
358      // request a bunch of versions including the deleted version. We should
359      // only get back entries for the versions that exist.
360      Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
361      assertEquals(0, kvs.length);
362    }
363  }
364
365  /**
366   * Assert that the passed in KeyValue has expected contents for the specified row, column &
367   * timestamp.
368   */
369  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
370    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
371
372    assertEquals("row:" + rowIdx, Bytes.toString(CellUtil.cloneRow(kv)),
373      "Row mismatch which checking: " + ctx);
374
375    assertEquals(Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv)),
376      "ColumnFamily mismatch while checking: " + ctx);
377
378    assertEquals("column:" + colIdx, Bytes.toString(CellUtil.cloneQualifier(kv)),
379      "Column qualifier mismatch while checking: " + ctx);
380
381    assertEquals(ts, kv.getTimestamp(), "Timestamp mismatch while checking: " + ctx);
382
383    assertEquals("value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv)),
384      "Value mismatch while checking: " + ctx);
385  }
386
387  /**
388   * Uses the TimestampFilter on a Get to request a specified list of versions for the row/column
389   * specified by rowIdx & colIdx.
390   */
391  private Cell[] getNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, List<Long> versions)
392    throws IOException {
393    byte row[] = Bytes.toBytes("row:" + rowIdx);
394    byte column[] = Bytes.toBytes("column:" + colIdx);
395    Get get = new Get(row);
396    get.addColumn(cf, column);
397    get.readAllVersions();
398    get.setTimeRange(Collections.min(versions), Collections.max(versions) + 1);
399    Result result = ht.get(get);
400
401    return result.rawCells();
402  }
403
404  private ResultScanner scan(Table ht, byte[] cf, Integer[] rowIndexes, Integer[] columnIndexes,
405    Long[] versions, int maxVersions) throws IOException {
406    byte startRow[] = Bytes.toBytes("row:" + Collections.min(Arrays.asList(rowIndexes)));
407    byte endRow[] = Bytes.toBytes("row:" + Collections.max(Arrays.asList(rowIndexes)) + 1);
408    Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow);
409    for (Integer colIdx : columnIndexes) {
410      byte column[] = Bytes.toBytes("column:" + colIdx);
411      scan.addColumn(cf, column);
412    }
413    scan.readVersions(maxVersions);
414    scan.setTimeRange(Collections.min(Arrays.asList(versions)),
415      Collections.max(Arrays.asList(versions)) + 1);
416    ResultScanner scanner = ht.getScanner(scan);
417    return scanner;
418  }
419
420  private void put(Table ht, byte[] cf, Integer[] rowIndexes, Integer[] columnIndexes,
421    Long[] versions) throws IOException {
422    for (int rowIdx : rowIndexes) {
423      byte row[] = Bytes.toBytes("row:" + rowIdx);
424      Put put = new Put(row);
425      put.setDurability(Durability.SKIP_WAL);
426      for (int colIdx : columnIndexes) {
427        byte column[] = Bytes.toBytes("column:" + colIdx);
428        for (long version : versions) {
429          put.addColumn(cf, column, version, Bytes.toBytes("value-version-" + version));
430        }
431      }
432      ht.put(put);
433    }
434  }
435
436  /**
437   * Insert in specific row/column versions with timestamps versionStart..versionEnd.
438   */
439  private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, long versionStart,
440    long versionEnd) throws IOException {
441    byte row[] = Bytes.toBytes("row:" + rowIdx);
442    byte column[] = Bytes.toBytes("column:" + colIdx);
443    Put put = new Put(row);
444    put.setDurability(Durability.SKIP_WAL);
445
446    for (long idx = versionStart; idx <= versionEnd; idx++) {
447      put.addColumn(cf, column, idx, Bytes.toBytes("value-version-" + idx));
448    }
449
450    ht.put(put);
451  }
452
453  /**
454   * For row/column specified by rowIdx/colIdx, delete the cell corresponding to the specified
455   * version.
456   */
457  private void deleteOneVersion(Table ht, byte[] cf, int rowIdx, int colIdx, long version)
458    throws IOException {
459    byte row[] = Bytes.toBytes("row:" + rowIdx);
460    byte column[] = Bytes.toBytes("column:" + colIdx);
461    Delete del = new Delete(row);
462    del.addColumn(cf, column, version);
463    ht.delete(del);
464  }
465
466  /**
467   * For row/column specified by rowIdx/colIdx, delete all cells preceeding the specified version.
468   */
469  private void deleteAllVersionsBefore(Table ht, byte[] cf, int rowIdx, int colIdx, long version)
470    throws IOException {
471    byte row[] = Bytes.toBytes("row:" + rowIdx);
472    byte column[] = Bytes.toBytes("column:" + colIdx);
473    Delete del = new Delete(row);
474    del.addColumns(cf, column, version);
475    ht.delete(del);
476  }
477
478  private void deleteColumn(Table ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
479    byte row[] = Bytes.toBytes("row:" + rowIdx);
480    byte column[] = Bytes.toBytes("column:" + colIdx);
481    Delete del = new Delete(row);
482    del.addColumns(cf, column);
483    ht.delete(del);
484  }
485
486  private void deleteFamily(Table ht, byte[] cf, int rowIdx) throws IOException {
487    byte row[] = Bytes.toBytes("row:" + rowIdx);
488    Delete del = new Delete(row);
489    del.addFamily(cf);
490    ht.delete(del);
491  }
492
493}