001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.testclassification.ClientTests; 031import org.apache.hadoop.hbase.testclassification.LargeTests; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.junit.jupiter.api.AfterAll; 034import org.junit.jupiter.api.BeforeAll; 035import org.junit.jupiter.api.BeforeEach; 036import org.junit.jupiter.api.Tag; 037import org.junit.jupiter.api.Test; 038import org.junit.jupiter.api.TestInfo; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * Run tests related to {@link org.apache.hadoop.hbase.filter.TimestampsFilter} using HBase client 044 * APIs. Sets up the HBase mini cluster once at start. Each creates a table named for the method and 045 * does its stuff against that. 046 */ 047@Tag(LargeTests.TAG) 048@Tag(ClientTests.TAG) 049public class TestMultipleTimestamps { 050 051 private static final Logger LOG = LoggerFactory.getLogger(TestMultipleTimestamps.class); 052 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 053 054 private String methodName; 055 056 @BeforeEach 057 public void setUp(TestInfo testInfo) { 058 methodName = testInfo.getTestMethod().get().getName(); 059 } 060 061 @BeforeAll 062 public static void setUpBeforeClass() throws Exception { 063 TEST_UTIL.startMiniCluster(); 064 } 065 066 @AfterAll 067 public static void tearDownAfterClass() throws Exception { 068 TEST_UTIL.shutdownMiniCluster(); 069 } 070 071 @Test 072 public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException { 073 final TableName tableName = TableName.valueOf(methodName); 074 byte[] FAMILY = Bytes.toBytes("event_log"); 075 byte[][] FAMILIES = new byte[][] { FAMILY }; 076 077 // create table; set versions to max... 078 try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) { 079 Integer[] putRows = new Integer[] { 1, 3, 5, 7 }; 080 Integer[] putColumns = new Integer[] { 1, 3, 5 }; 081 Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L }; 082 083 Integer[] scanRows = new Integer[] { 3, 5 }; 084 Integer[] scanColumns = new Integer[] { 3 }; 085 Long[] scanTimestamps = new Long[] { 3L, 4L }; 086 int scanMaxVersions = 2; 087 088 put(ht, FAMILY, putRows, putColumns, putTimestamps); 089 090 TEST_UTIL.flush(tableName); 091 092 ResultScanner scanner = 093 scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions); 094 095 Cell[] kvs; 096 097 kvs = scanner.next().rawCells(); 098 assertEquals(2, kvs.length); 099 checkOneCell(kvs[0], FAMILY, 3, 3, 4); 100 checkOneCell(kvs[1], FAMILY, 3, 3, 3); 101 kvs = scanner.next().rawCells(); 102 assertEquals(2, kvs.length); 103 checkOneCell(kvs[0], FAMILY, 5, 3, 4); 104 checkOneCell(kvs[1], FAMILY, 5, 3, 3); 105 } 106 } 107 108 @Test 109 public void testReseeksWithMultipleColumnOneTimestamp() throws IOException { 110 LOG.info(methodName); 111 final TableName tableName = TableName.valueOf(methodName); 112 byte[] FAMILY = Bytes.toBytes("event_log"); 113 byte[][] FAMILIES = new byte[][] { FAMILY }; 114 115 // create table; set versions to max... 116 try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) { 117 Integer[] putRows = new Integer[] { 1, 3, 5, 7 }; 118 Integer[] putColumns = new Integer[] { 1, 3, 5 }; 119 Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L }; 120 121 Integer[] scanRows = new Integer[] { 3, 5 }; 122 Integer[] scanColumns = new Integer[] { 3, 4 }; 123 Long[] scanTimestamps = new Long[] { 3L }; 124 int scanMaxVersions = 2; 125 126 put(ht, FAMILY, putRows, putColumns, putTimestamps); 127 128 TEST_UTIL.flush(tableName); 129 130 ResultScanner scanner = 131 scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions); 132 133 Cell[] kvs; 134 135 kvs = scanner.next().rawCells(); 136 assertEquals(1, kvs.length); 137 checkOneCell(kvs[0], FAMILY, 3, 3, 3); 138 kvs = scanner.next().rawCells(); 139 assertEquals(1, kvs.length); 140 checkOneCell(kvs[0], FAMILY, 5, 3, 3); 141 } 142 } 143 144 @Test 145 public void testReseeksWithMultipleColumnMultipleTimestamp() throws IOException { 146 LOG.info(methodName); 147 148 final TableName tableName = TableName.valueOf(methodName); 149 byte[] FAMILY = Bytes.toBytes("event_log"); 150 byte[][] FAMILIES = new byte[][] { FAMILY }; 151 152 // create table; set versions to max... 153 try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) { 154 Integer[] putRows = new Integer[] { 1, 3, 5, 7 }; 155 Integer[] putColumns = new Integer[] { 1, 3, 5 }; 156 Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L }; 157 158 Integer[] scanRows = new Integer[] { 5, 7 }; 159 Integer[] scanColumns = new Integer[] { 3, 4, 5 }; 160 Long[] scanTimestamps = new Long[] { 2L, 3L }; 161 int scanMaxVersions = 2; 162 163 put(ht, FAMILY, putRows, putColumns, putTimestamps); 164 165 TEST_UTIL.flush(tableName); 166 Scan scan = new Scan(); 167 scan.readVersions(10); 168 try (ResultScanner scanner = ht.getScanner(scan)) { 169 while (true) { 170 Result r = scanner.next(); 171 if (r == null) { 172 break; 173 } 174 LOG.info("r=" + r); 175 } 176 } 177 try (ResultScanner scanner = 178 scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions)) { 179 Cell[] kvs; 180 181 // This looks like wrong answer. Should be 2. Even then we are returning wrong result, 182 // timestamps that are 3 whereas should be 2 since min is inclusive. 183 kvs = scanner.next().rawCells(); 184 assertEquals(4, kvs.length); 185 checkOneCell(kvs[0], FAMILY, 5, 3, 3); 186 checkOneCell(kvs[1], FAMILY, 5, 3, 2); 187 checkOneCell(kvs[2], FAMILY, 5, 5, 3); 188 checkOneCell(kvs[3], FAMILY, 5, 5, 2); 189 kvs = scanner.next().rawCells(); 190 assertEquals(4, kvs.length); 191 checkOneCell(kvs[0], FAMILY, 7, 3, 3); 192 checkOneCell(kvs[1], FAMILY, 7, 3, 2); 193 checkOneCell(kvs[2], FAMILY, 7, 5, 3); 194 checkOneCell(kvs[3], FAMILY, 7, 5, 2); 195 } 196 } 197 } 198 199 @Test 200 public void testReseeksWithMultipleFiles() throws IOException { 201 LOG.info(methodName); 202 final TableName tableName = TableName.valueOf(methodName); 203 byte[] FAMILY = Bytes.toBytes("event_log"); 204 byte[][] FAMILIES = new byte[][] { FAMILY }; 205 206 // create table; set versions to max... 207 try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) { 208 209 Integer[] putRows1 = new Integer[] { 1, 2, 3 }; 210 Integer[] putColumns1 = new Integer[] { 2, 5, 6 }; 211 Long[] putTimestamps1 = new Long[] { 1L, 2L, 5L }; 212 213 Integer[] putRows2 = new Integer[] { 6, 7 }; 214 Integer[] putColumns2 = new Integer[] { 3, 6 }; 215 Long[] putTimestamps2 = new Long[] { 4L, 5L }; 216 217 Integer[] putRows3 = new Integer[] { 2, 3, 5 }; 218 Integer[] putColumns3 = new Integer[] { 1, 2, 3 }; 219 Long[] putTimestamps3 = new Long[] { 4L, 8L }; 220 221 Integer[] scanRows = new Integer[] { 3, 5, 7 }; 222 Integer[] scanColumns = new Integer[] { 3, 4, 5 }; 223 Long[] scanTimestamps = new Long[] { 2L, 4L }; 224 int scanMaxVersions = 5; 225 226 put(ht, FAMILY, putRows1, putColumns1, putTimestamps1); 227 TEST_UTIL.flush(tableName); 228 put(ht, FAMILY, putRows2, putColumns2, putTimestamps2); 229 TEST_UTIL.flush(tableName); 230 put(ht, FAMILY, putRows3, putColumns3, putTimestamps3); 231 232 ResultScanner scanner = 233 scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions); 234 235 Cell[] kvs; 236 237 kvs = scanner.next().rawCells(); 238 assertEquals(2, kvs.length); 239 checkOneCell(kvs[0], FAMILY, 3, 3, 4); 240 checkOneCell(kvs[1], FAMILY, 3, 5, 2); 241 242 kvs = scanner.next().rawCells(); 243 assertEquals(1, kvs.length); 244 checkOneCell(kvs[0], FAMILY, 5, 3, 4); 245 246 kvs = scanner.next().rawCells(); 247 assertEquals(1, kvs.length); 248 checkOneCell(kvs[0], FAMILY, 6, 3, 4); 249 250 kvs = scanner.next().rawCells(); 251 assertEquals(1, kvs.length); 252 checkOneCell(kvs[0], FAMILY, 7, 3, 4); 253 } 254 } 255 256 @Test 257 public void testWithVersionDeletes() throws Exception { 258 // first test from memstore (without flushing). 259 testWithVersionDeletes(false); 260 261 // run same test against HFiles (by forcing a flush). 262 testWithVersionDeletes(true); 263 } 264 265 public void testWithVersionDeletes(boolean flushTables) throws IOException { 266 LOG.info(methodName + "_" + (flushTables ? "flush" : "noflush")); 267 final TableName tableName = 268 TableName.valueOf(methodName + "_" + (flushTables ? "flush" : "noflush")); 269 byte[] FAMILY = Bytes.toBytes("event_log"); 270 byte[][] FAMILIES = new byte[][] { FAMILY }; 271 272 // create table; set versions to max... 273 try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) { 274 // For row:0, col:0: insert versions 1 through 5. 275 putNVersions(ht, FAMILY, 0, 0, 1, 5); 276 277 if (flushTables) { 278 TEST_UTIL.flush(tableName); 279 } 280 281 // delete version 4. 282 deleteOneVersion(ht, FAMILY, 0, 0, 4); 283 284 // request a bunch of versions including the deleted version. We should 285 // only get back entries for the versions that exist. 286 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L)); 287 assertEquals(3, kvs.length); 288 checkOneCell(kvs[0], FAMILY, 0, 0, 5); 289 checkOneCell(kvs[1], FAMILY, 0, 0, 3); 290 checkOneCell(kvs[2], FAMILY, 0, 0, 2); 291 } 292 } 293 294 @Test 295 public void testWithMultipleVersionDeletes() throws IOException { 296 LOG.info(methodName); 297 298 final TableName tableName = TableName.valueOf(methodName); 299 byte[] FAMILY = Bytes.toBytes("event_log"); 300 byte[][] FAMILIES = new byte[][] { FAMILY }; 301 302 // create table; set versions to max... 303 try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) { 304 // For row:0, col:0: insert versions 1 through 5. 305 putNVersions(ht, FAMILY, 0, 0, 1, 5); 306 307 TEST_UTIL.flush(tableName); 308 309 // delete all versions before 4. 310 deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4); 311 312 // request a bunch of versions including the deleted version. We should 313 // only get back entries for the versions that exist. 314 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); 315 assertEquals(0, kvs.length); 316 } 317 } 318 319 @Test 320 public void testWithColumnDeletes() throws IOException { 321 final TableName tableName = TableName.valueOf(methodName); 322 byte[] FAMILY = Bytes.toBytes("event_log"); 323 byte[][] FAMILIES = new byte[][] { FAMILY }; 324 325 // create table; set versions to max... 326 try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) { 327 // For row:0, col:0: insert versions 1 through 5. 328 putNVersions(ht, FAMILY, 0, 0, 1, 5); 329 330 TEST_UTIL.flush(tableName); 331 332 // delete all versions before 4. 333 deleteColumn(ht, FAMILY, 0, 0); 334 335 // request a bunch of versions including the deleted version. We should 336 // only get back entries for the versions that exist. 337 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); 338 assertEquals(0, kvs.length); 339 } 340 } 341 342 @Test 343 public void testWithFamilyDeletes() throws IOException { 344 final TableName tableName = TableName.valueOf(methodName); 345 byte[] FAMILY = Bytes.toBytes("event_log"); 346 byte[][] FAMILIES = new byte[][] { FAMILY }; 347 348 // create table; set versions to max... 349 try (Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE)) { 350 // For row:0, col:0: insert versions 1 through 5. 351 putNVersions(ht, FAMILY, 0, 0, 1, 5); 352 353 TEST_UTIL.flush(tableName); 354 355 // delete all versions before 4. 356 deleteFamily(ht, FAMILY, 0); 357 358 // request a bunch of versions including the deleted version. We should 359 // only get back entries for the versions that exist. 360 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); 361 assertEquals(0, kvs.length); 362 } 363 } 364 365 /** 366 * Assert that the passed in KeyValue has expected contents for the specified row, column & 367 * timestamp. 368 */ 369 private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) { 370 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts; 371 372 assertEquals("row:" + rowIdx, Bytes.toString(CellUtil.cloneRow(kv)), 373 "Row mismatch which checking: " + ctx); 374 375 assertEquals(Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv)), 376 "ColumnFamily mismatch while checking: " + ctx); 377 378 assertEquals("column:" + colIdx, Bytes.toString(CellUtil.cloneQualifier(kv)), 379 "Column qualifier mismatch while checking: " + ctx); 380 381 assertEquals(ts, kv.getTimestamp(), "Timestamp mismatch while checking: " + ctx); 382 383 assertEquals("value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv)), 384 "Value mismatch while checking: " + ctx); 385 } 386 387 /** 388 * Uses the TimestampFilter on a Get to request a specified list of versions for the row/column 389 * specified by rowIdx & colIdx. 390 */ 391 private Cell[] getNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, List<Long> versions) 392 throws IOException { 393 byte row[] = Bytes.toBytes("row:" + rowIdx); 394 byte column[] = Bytes.toBytes("column:" + colIdx); 395 Get get = new Get(row); 396 get.addColumn(cf, column); 397 get.readAllVersions(); 398 get.setTimeRange(Collections.min(versions), Collections.max(versions) + 1); 399 Result result = ht.get(get); 400 401 return result.rawCells(); 402 } 403 404 private ResultScanner scan(Table ht, byte[] cf, Integer[] rowIndexes, Integer[] columnIndexes, 405 Long[] versions, int maxVersions) throws IOException { 406 byte startRow[] = Bytes.toBytes("row:" + Collections.min(Arrays.asList(rowIndexes))); 407 byte endRow[] = Bytes.toBytes("row:" + Collections.max(Arrays.asList(rowIndexes)) + 1); 408 Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow); 409 for (Integer colIdx : columnIndexes) { 410 byte column[] = Bytes.toBytes("column:" + colIdx); 411 scan.addColumn(cf, column); 412 } 413 scan.readVersions(maxVersions); 414 scan.setTimeRange(Collections.min(Arrays.asList(versions)), 415 Collections.max(Arrays.asList(versions)) + 1); 416 ResultScanner scanner = ht.getScanner(scan); 417 return scanner; 418 } 419 420 private void put(Table ht, byte[] cf, Integer[] rowIndexes, Integer[] columnIndexes, 421 Long[] versions) throws IOException { 422 for (int rowIdx : rowIndexes) { 423 byte row[] = Bytes.toBytes("row:" + rowIdx); 424 Put put = new Put(row); 425 put.setDurability(Durability.SKIP_WAL); 426 for (int colIdx : columnIndexes) { 427 byte column[] = Bytes.toBytes("column:" + colIdx); 428 for (long version : versions) { 429 put.addColumn(cf, column, version, Bytes.toBytes("value-version-" + version)); 430 } 431 } 432 ht.put(put); 433 } 434 } 435 436 /** 437 * Insert in specific row/column versions with timestamps versionStart..versionEnd. 438 */ 439 private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, long versionStart, 440 long versionEnd) throws IOException { 441 byte row[] = Bytes.toBytes("row:" + rowIdx); 442 byte column[] = Bytes.toBytes("column:" + colIdx); 443 Put put = new Put(row); 444 put.setDurability(Durability.SKIP_WAL); 445 446 for (long idx = versionStart; idx <= versionEnd; idx++) { 447 put.addColumn(cf, column, idx, Bytes.toBytes("value-version-" + idx)); 448 } 449 450 ht.put(put); 451 } 452 453 /** 454 * For row/column specified by rowIdx/colIdx, delete the cell corresponding to the specified 455 * version. 456 */ 457 private void deleteOneVersion(Table ht, byte[] cf, int rowIdx, int colIdx, long version) 458 throws IOException { 459 byte row[] = Bytes.toBytes("row:" + rowIdx); 460 byte column[] = Bytes.toBytes("column:" + colIdx); 461 Delete del = new Delete(row); 462 del.addColumn(cf, column, version); 463 ht.delete(del); 464 } 465 466 /** 467 * For row/column specified by rowIdx/colIdx, delete all cells preceeding the specified version. 468 */ 469 private void deleteAllVersionsBefore(Table ht, byte[] cf, int rowIdx, int colIdx, long version) 470 throws IOException { 471 byte row[] = Bytes.toBytes("row:" + rowIdx); 472 byte column[] = Bytes.toBytes("column:" + colIdx); 473 Delete del = new Delete(row); 474 del.addColumns(cf, column, version); 475 ht.delete(del); 476 } 477 478 private void deleteColumn(Table ht, byte[] cf, int rowIdx, int colIdx) throws IOException { 479 byte row[] = Bytes.toBytes("row:" + rowIdx); 480 byte column[] = Bytes.toBytes("column:" + colIdx); 481 Delete del = new Delete(row); 482 del.addColumns(cf, column); 483 ht.delete(del); 484 } 485 486 private void deleteFamily(Table ht, byte[] cf, int rowIdx) throws IOException { 487 byte row[] = Bytes.toBytes("row:" + rowIdx); 488 Delete del = new Delete(row); 489 del.addFamily(cf); 490 ht.delete(del); 491 } 492 493}