001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.testclassification.ClientTests; 032import org.apache.hadoop.hbase.testclassification.LargeTests; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.junit.After; 035import org.junit.AfterClass; 036import org.junit.Before; 037import org.junit.BeforeClass; 038import org.junit.ClassRule; 039import org.junit.Rule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.junit.rules.TestName; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Run tests related to {@link org.apache.hadoop.hbase.filter.TimestampsFilter} using HBase client APIs. 048 * Sets up the HBase mini cluster once at start. Each creates a table 049 * named for the method and does its stuff against that. 050 */ 051@Category({LargeTests.class, ClientTests.class}) 052public class TestMultipleTimestamps { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestMultipleTimestamps.class); 057 058 private static final Logger LOG = LoggerFactory.getLogger(TestMultipleTimestamps.class); 059 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 060 061 @Rule 062 public TestName name = new TestName(); 063 064 /** 065 * @throws java.lang.Exception 066 */ 067 @BeforeClass 068 public static void setUpBeforeClass() throws Exception { 069 TEST_UTIL.startMiniCluster(); 070 } 071 072 /** 073 * @throws java.lang.Exception 074 */ 075 @AfterClass 076 public static void tearDownAfterClass() throws Exception { 077 TEST_UTIL.shutdownMiniCluster(); 078 } 079 080 /** 081 * @throws java.lang.Exception 082 */ 083 @Before 084 public void setUp() throws Exception { 085 // Nothing to do. 086 } 087 088 /** 089 * @throws java.lang.Exception 090 */ 091 @After 092 public void tearDown() throws Exception { 093 // Nothing to do. 094 } 095 096 @Test 097 public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException { 098 final TableName tableName = TableName.valueOf(name.getMethodName()); 099 byte [] FAMILY = Bytes.toBytes("event_log"); 100 byte [][] FAMILIES = new byte[][] { FAMILY }; 101 102 // create table; set versions to max... 103 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 104 105 Integer[] putRows = new Integer[] {1, 3, 5, 7}; 106 Integer[] putColumns = new Integer[] { 1, 3, 5}; 107 Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L}; 108 109 Integer[] scanRows = new Integer[] {3, 5}; 110 Integer[] scanColumns = new Integer[] {3}; 111 Long[] scanTimestamps = new Long[] {3L, 4L}; 112 int scanMaxVersions = 2; 113 114 put(ht, FAMILY, putRows, putColumns, putTimestamps); 115 116 TEST_UTIL.flush(tableName); 117 118 ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns, 119 scanTimestamps, scanMaxVersions); 120 121 Cell[] kvs; 122 123 kvs = scanner.next().rawCells(); 124 assertEquals(2, kvs.length); 125 checkOneCell(kvs[0], FAMILY, 3, 3, 4); 126 checkOneCell(kvs[1], FAMILY, 3, 3, 3); 127 kvs = scanner.next().rawCells(); 128 assertEquals(2, kvs.length); 129 checkOneCell(kvs[0], FAMILY, 5, 3, 4); 130 checkOneCell(kvs[1], FAMILY, 5, 3, 3); 131 132 ht.close(); 133 } 134 135 @Test 136 public void testReseeksWithMultipleColumnOneTimestamp() throws IOException { 137 LOG.info(name.getMethodName()); 138 final TableName tableName = TableName.valueOf(name.getMethodName()); 139 byte [] FAMILY = Bytes.toBytes("event_log"); 140 byte [][] FAMILIES = new byte[][] { FAMILY }; 141 142 // create table; set versions to max... 143 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 144 145 Integer[] putRows = new Integer[] {1, 3, 5, 7}; 146 Integer[] putColumns = new Integer[] { 1, 3, 5}; 147 Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L}; 148 149 Integer[] scanRows = new Integer[] {3, 5}; 150 Integer[] scanColumns = new Integer[] {3,4}; 151 Long[] scanTimestamps = new Long[] {3L}; 152 int scanMaxVersions = 2; 153 154 put(ht, FAMILY, putRows, putColumns, putTimestamps); 155 156 TEST_UTIL.flush(tableName); 157 158 ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns, 159 scanTimestamps, scanMaxVersions); 160 161 Cell[] kvs; 162 163 kvs = scanner.next().rawCells(); 164 assertEquals(1, kvs.length); 165 checkOneCell(kvs[0], FAMILY, 3, 3, 3); 166 kvs = scanner.next().rawCells(); 167 assertEquals(1, kvs.length); 168 checkOneCell(kvs[0], FAMILY, 5, 3, 3); 169 170 ht.close(); 171 } 172 173 @Test 174 public void testReseeksWithMultipleColumnMultipleTimestamp() throws 175 IOException { 176 LOG.info(name.getMethodName()); 177 178 final TableName tableName = TableName.valueOf(name.getMethodName()); 179 byte [] FAMILY = Bytes.toBytes("event_log"); 180 byte [][] FAMILIES = new byte[][] { FAMILY }; 181 182 // create table; set versions to max... 183 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 184 185 Integer[] putRows = new Integer[] {1, 3, 5, 7}; 186 Integer[] putColumns = new Integer[] { 1, 3, 5}; 187 Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L}; 188 189 Integer[] scanRows = new Integer[] {5, 7}; 190 Integer[] scanColumns = new Integer[] {3, 4, 5}; 191 Long[] scanTimestamps = new Long[] { 2L, 3L}; 192 int scanMaxVersions = 2; 193 194 put(ht, FAMILY, putRows, putColumns, putTimestamps); 195 196 TEST_UTIL.flush(tableName); 197 Scan scan = new Scan(); 198 scan.setMaxVersions(10); 199 ResultScanner scanner = ht.getScanner(scan); 200 while (true) { 201 Result r = scanner.next(); 202 if (r == null) break; 203 LOG.info("r=" + r); 204 } 205 scanner = scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions); 206 207 Cell[] kvs; 208 209 // This looks like wrong answer. Should be 2. Even then we are returning wrong result, 210 // timestamps that are 3 whereas should be 2 since min is inclusive. 211 kvs = scanner.next().rawCells(); 212 assertEquals(4, kvs.length); 213 checkOneCell(kvs[0], FAMILY, 5, 3, 3); 214 checkOneCell(kvs[1], FAMILY, 5, 3, 2); 215 checkOneCell(kvs[2], FAMILY, 5, 5, 3); 216 checkOneCell(kvs[3], FAMILY, 5, 5, 2); 217 kvs = scanner.next().rawCells(); 218 assertEquals(4, kvs.length); 219 checkOneCell(kvs[0], FAMILY, 7, 3, 3); 220 checkOneCell(kvs[1], FAMILY, 7, 3, 2); 221 checkOneCell(kvs[2], FAMILY, 7, 5, 3); 222 checkOneCell(kvs[3], FAMILY, 7, 5, 2); 223 224 ht.close(); 225 } 226 227 @Test 228 public void testReseeksWithMultipleFiles() throws IOException { 229 LOG.info(name.getMethodName()); 230 final TableName tableName = TableName.valueOf(name.getMethodName()); 231 byte [] FAMILY = Bytes.toBytes("event_log"); 232 byte [][] FAMILIES = new byte[][] { FAMILY }; 233 234 // create table; set versions to max... 235 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 236 237 Integer[] putRows1 = new Integer[] {1, 2, 3}; 238 Integer[] putColumns1 = new Integer[] { 2, 5, 6}; 239 Long[] putTimestamps1 = new Long[] {1L, 2L, 5L}; 240 241 Integer[] putRows2 = new Integer[] {6, 7}; 242 Integer[] putColumns2 = new Integer[] {3, 6}; 243 Long[] putTimestamps2 = new Long[] {4L, 5L}; 244 245 Integer[] putRows3 = new Integer[] {2, 3, 5}; 246 Integer[] putColumns3 = new Integer[] {1, 2, 3}; 247 Long[] putTimestamps3 = new Long[] {4L,8L}; 248 249 250 Integer[] scanRows = new Integer[] {3, 5, 7}; 251 Integer[] scanColumns = new Integer[] {3, 4, 5}; 252 Long[] scanTimestamps = new Long[] { 2L, 4L}; 253 int scanMaxVersions = 5; 254 255 put(ht, FAMILY, putRows1, putColumns1, putTimestamps1); 256 TEST_UTIL.flush(tableName); 257 put(ht, FAMILY, putRows2, putColumns2, putTimestamps2); 258 TEST_UTIL.flush(tableName); 259 put(ht, FAMILY, putRows3, putColumns3, putTimestamps3); 260 261 ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns, 262 scanTimestamps, scanMaxVersions); 263 264 Cell[] kvs; 265 266 kvs = scanner.next().rawCells(); 267 assertEquals(2, kvs.length); 268 checkOneCell(kvs[0], FAMILY, 3, 3, 4); 269 checkOneCell(kvs[1], FAMILY, 3, 5, 2); 270 271 kvs = scanner.next().rawCells(); 272 assertEquals(1, kvs.length); 273 checkOneCell(kvs[0], FAMILY, 5, 3, 4); 274 275 kvs = scanner.next().rawCells(); 276 assertEquals(1, kvs.length); 277 checkOneCell(kvs[0], FAMILY, 6, 3, 4); 278 279 kvs = scanner.next().rawCells(); 280 assertEquals(1, kvs.length); 281 checkOneCell(kvs[0], FAMILY, 7, 3, 4); 282 283 ht.close(); 284 } 285 286 @Test 287 public void testWithVersionDeletes() throws Exception { 288 289 // first test from memstore (without flushing). 290 testWithVersionDeletes(false); 291 292 // run same test against HFiles (by forcing a flush). 293 testWithVersionDeletes(true); 294 } 295 296 public void testWithVersionDeletes(boolean flushTables) throws IOException { 297 LOG.info(name.getMethodName() + "_"+ (flushTables ? "flush" : "noflush")); 298 final TableName tableName = TableName.valueOf(name.getMethodName() + "_" + (flushTables ? 299 "flush" : "noflush")); 300 byte [] FAMILY = Bytes.toBytes("event_log"); 301 byte [][] FAMILIES = new byte[][] { FAMILY }; 302 303 // create table; set versions to max... 304 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 305 306 // For row:0, col:0: insert versions 1 through 5. 307 putNVersions(ht, FAMILY, 0, 0, 1, 5); 308 309 if (flushTables) { 310 TEST_UTIL.flush(tableName); 311 } 312 313 // delete version 4. 314 deleteOneVersion(ht, FAMILY, 0, 0, 4); 315 316 // request a bunch of versions including the deleted version. We should 317 // only get back entries for the versions that exist. 318 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, 319 Arrays.asList(2L, 3L, 4L, 5L)); 320 assertEquals(3, kvs.length); 321 checkOneCell(kvs[0], FAMILY, 0, 0, 5); 322 checkOneCell(kvs[1], FAMILY, 0, 0, 3); 323 checkOneCell(kvs[2], FAMILY, 0, 0, 2); 324 325 ht.close(); 326 } 327 328 @Test 329 public void testWithMultipleVersionDeletes() throws IOException { 330 LOG.info(name.getMethodName()); 331 332 final TableName tableName = TableName.valueOf(name.getMethodName()); 333 byte [] FAMILY = Bytes.toBytes("event_log"); 334 byte [][] FAMILIES = new byte[][] { FAMILY }; 335 336 // create table; set versions to max... 337 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 338 339 // For row:0, col:0: insert versions 1 through 5. 340 putNVersions(ht, FAMILY, 0, 0, 1, 5); 341 342 TEST_UTIL.flush(tableName); 343 344 // delete all versions before 4. 345 deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4); 346 347 // request a bunch of versions including the deleted version. We should 348 // only get back entries for the versions that exist. 349 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); 350 assertEquals(0, kvs.length); 351 352 ht.close(); 353 } 354 355 @Test 356 public void testWithColumnDeletes() throws IOException { 357 final TableName tableName = TableName.valueOf(name.getMethodName()); 358 byte [] FAMILY = Bytes.toBytes("event_log"); 359 byte [][] FAMILIES = new byte[][] { FAMILY }; 360 361 // create table; set versions to max... 362 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 363 364 // For row:0, col:0: insert versions 1 through 5. 365 putNVersions(ht, FAMILY, 0, 0, 1, 5); 366 367 TEST_UTIL.flush(tableName); 368 369 // delete all versions before 4. 370 deleteColumn(ht, FAMILY, 0, 0); 371 372 // request a bunch of versions including the deleted version. We should 373 // only get back entries for the versions that exist. 374 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); 375 assertEquals(0, kvs.length); 376 377 ht.close(); 378 } 379 380 @Test 381 public void testWithFamilyDeletes() throws IOException { 382 final TableName tableName = TableName.valueOf(name.getMethodName()); 383 byte [] FAMILY = Bytes.toBytes("event_log"); 384 byte [][] FAMILIES = new byte[][] { FAMILY }; 385 386 // create table; set versions to max... 387 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 388 389 // For row:0, col:0: insert versions 1 through 5. 390 putNVersions(ht, FAMILY, 0, 0, 1, 5); 391 392 TEST_UTIL.flush(tableName); 393 394 // delete all versions before 4. 395 deleteFamily(ht, FAMILY, 0); 396 397 // request a bunch of versions including the deleted version. We should 398 // only get back entries for the versions that exist. 399 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); 400 assertEquals(0, kvs.length); 401 402 ht.close(); 403 } 404 405 /** 406 * Assert that the passed in KeyValue has expected contents for the 407 * specified row, column & timestamp. 408 */ 409 private void checkOneCell(Cell kv, byte[] cf, 410 int rowIdx, int colIdx, long ts) { 411 412 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts; 413 414 assertEquals("Row mismatch which checking: " + ctx, 415 "row:"+ rowIdx, Bytes.toString(CellUtil.cloneRow(kv))); 416 417 assertEquals("ColumnFamily mismatch while checking: " + ctx, 418 Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv))); 419 420 assertEquals("Column qualifier mismatch while checking: " + ctx, 421 "column:" + colIdx, 422 Bytes.toString(CellUtil.cloneQualifier(kv))); 423 424 assertEquals("Timestamp mismatch while checking: " + ctx, 425 ts, kv.getTimestamp()); 426 427 assertEquals("Value mismatch while checking: " + ctx, 428 "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv))); 429 } 430 431 /** 432 * Uses the TimestampFilter on a Get to request a specified list of 433 * versions for the row/column specified by rowIdx & colIdx. 434 * 435 */ 436 private Cell[] getNVersions(Table ht, byte[] cf, int rowIdx, 437 int colIdx, List<Long> versions) 438 throws IOException { 439 byte row[] = Bytes.toBytes("row:" + rowIdx); 440 byte column[] = Bytes.toBytes("column:" + colIdx); 441 Get get = new Get(row); 442 get.addColumn(cf, column); 443 get.setMaxVersions(); 444 get.setTimeRange(Collections.min(versions), Collections.max(versions)+1); 445 Result result = ht.get(get); 446 447 return result.rawCells(); 448 } 449 450 private ResultScanner scan(Table ht, byte[] cf, 451 Integer[] rowIndexes, Integer[] columnIndexes, 452 Long[] versions, int maxVersions) 453 throws IOException { 454 byte startRow[] = Bytes.toBytes("row:" + 455 Collections.min( Arrays.asList(rowIndexes))); 456 byte endRow[] = Bytes.toBytes("row:" + 457 Collections.max( Arrays.asList(rowIndexes))+1); 458 Scan scan = new Scan(startRow, endRow); 459 for (Integer colIdx: columnIndexes) { 460 byte column[] = Bytes.toBytes("column:" + colIdx); 461 scan.addColumn(cf, column); 462 } 463 scan.setMaxVersions(maxVersions); 464 scan.setTimeRange(Collections.min(Arrays.asList(versions)), 465 Collections.max(Arrays.asList(versions))+1); 466 ResultScanner scanner = ht.getScanner(scan); 467 return scanner; 468 } 469 470 private void put(Table ht, byte[] cf, Integer[] rowIndexes, 471 Integer[] columnIndexes, Long[] versions) 472 throws IOException { 473 for (int rowIdx: rowIndexes) { 474 byte row[] = Bytes.toBytes("row:" + rowIdx); 475 Put put = new Put(row); 476 put.setDurability(Durability.SKIP_WAL); 477 for(int colIdx: columnIndexes) { 478 byte column[] = Bytes.toBytes("column:" + colIdx); 479 for (long version: versions) { 480 put.addColumn(cf, column, version, Bytes.toBytes("value-version-" + 481 version)); 482 } 483 } 484 ht.put(put); 485 } 486 } 487 488 /** 489 * Insert in specific row/column versions with timestamps 490 * versionStart..versionEnd. 491 */ 492 private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, 493 long versionStart, long versionEnd) 494 throws IOException { 495 byte row[] = Bytes.toBytes("row:" + rowIdx); 496 byte column[] = Bytes.toBytes("column:" + colIdx); 497 Put put = new Put(row); 498 put.setDurability(Durability.SKIP_WAL); 499 500 for (long idx = versionStart; idx <= versionEnd; idx++) { 501 put.addColumn(cf, column, idx, Bytes.toBytes("value-version-" + idx)); 502 } 503 504 ht.put(put); 505 } 506 507 /** 508 * For row/column specified by rowIdx/colIdx, delete the cell 509 * corresponding to the specified version. 510 */ 511 private void deleteOneVersion(Table ht, byte[] cf, int rowIdx, 512 int colIdx, long version) 513 throws IOException { 514 byte row[] = Bytes.toBytes("row:" + rowIdx); 515 byte column[] = Bytes.toBytes("column:" + colIdx); 516 Delete del = new Delete(row); 517 del.addColumn(cf, column, version); 518 ht.delete(del); 519 } 520 521 /** 522 * For row/column specified by rowIdx/colIdx, delete all cells 523 * preceeding the specified version. 524 */ 525 private void deleteAllVersionsBefore(Table ht, byte[] cf, int rowIdx, 526 int colIdx, long version) 527 throws IOException { 528 byte row[] = Bytes.toBytes("row:" + rowIdx); 529 byte column[] = Bytes.toBytes("column:" + colIdx); 530 Delete del = new Delete(row); 531 del.addColumns(cf, column, version); 532 ht.delete(del); 533 } 534 535 private void deleteColumn(Table ht, byte[] cf, int rowIdx, int colIdx) throws IOException { 536 byte row[] = Bytes.toBytes("row:" + rowIdx); 537 byte column[] = Bytes.toBytes("column:" + colIdx); 538 Delete del = new Delete(row); 539 del.addColumns(cf, column); 540 ht.delete(del); 541 } 542 543 private void deleteFamily(Table ht, byte[] cf, int rowIdx) throws IOException { 544 byte row[] = Bytes.toBytes("row:" + rowIdx); 545 Delete del = new Delete(row); 546 del.addFamily(cf); 547 ht.delete(del); 548 } 549 550} 551 552