001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.List; 031import java.util.Optional; 032import java.util.Random; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicInteger; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.Coprocessor; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HColumnDescriptor; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.HTableDescriptor; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 051import org.apache.hadoop.hbase.coprocessor.ObserverContext; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 054import org.apache.hadoop.hbase.coprocessor.RegionObserver; 055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 056import org.apache.hadoop.hbase.ipc.ServerRpcController; 057import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; 058import org.apache.hadoop.hbase.regionserver.HRegion; 059import org.apache.hadoop.hbase.regionserver.HRegionServer; 060import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 061import org.apache.hadoop.hbase.regionserver.RegionScanner; 062import org.apache.hadoop.hbase.testclassification.ClientTests; 063import org.apache.hadoop.hbase.testclassification.LargeTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.Pair; 066import org.junit.After; 067import org.junit.AfterClass; 068import org.junit.Assert; 069import org.junit.Before; 070import org.junit.BeforeClass; 071import org.junit.ClassRule; 072import org.junit.Rule; 073import org.junit.Test; 074import org.junit.experimental.categories.Category; 075import org.junit.rules.TestName; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 081 082@Category({LargeTests.class, ClientTests.class}) 083public class TestFromClientSide3 { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestFromClientSide3.class); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class); 090 private final static HBaseTestingUtility TEST_UTIL 091 = new HBaseTestingUtility(); 092 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 093 private static Random random = new Random(); 094 private static int SLAVES = 3; 095 private static final byte[] ROW = Bytes.toBytes("testRow"); 096 private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow"); 097 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 098 private static final byte[] VALUE = Bytes.toBytes("testValue"); 099 private static final byte[] COL_QUAL = Bytes.toBytes("f1"); 100 private static final byte[] VAL_BYTES = Bytes.toBytes("v1"); 101 private static final byte[] ROW_BYTES = Bytes.toBytes("r1"); 102 103 @Rule 104 public TestName name = new TestName(); 105 106 /** 107 * @throws java.lang.Exception 108 */ 109 @BeforeClass 110 public static void setUpBeforeClass() throws Exception { 111 TEST_UTIL.startMiniCluster(SLAVES); 112 } 113 114 /** 115 * @throws java.lang.Exception 116 */ 117 @AfterClass 118 public static void tearDownAfterClass() throws Exception { 119 TEST_UTIL.shutdownMiniCluster(); 120 } 121 122 /** 123 * @throws java.lang.Exception 124 */ 125 @Before 126 public void setUp() throws Exception { 127 // Nothing to do. 128 } 129 130 /** 131 * @throws java.lang.Exception 132 */ 133 @After 134 public void tearDown() throws Exception { 135 for (HTableDescriptor htd: TEST_UTIL.getAdmin().listTables()) { 136 LOG.info("Tear down, remove table=" + htd.getTableName()); 137 TEST_UTIL.deleteTable(htd.getTableName()); 138 } 139 } 140 141 private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts) 142 throws Exception { 143 Put put = new Put(row); 144 for (int i = 0; i < nPuts; i++) { 145 byte[] qualifier = Bytes.toBytes(random.nextInt()); 146 byte[] value = Bytes.toBytes(random.nextInt()); 147 put.addColumn(family, qualifier, value); 148 } 149 table.put(put); 150 } 151 152 private void performMultiplePutAndFlush(HBaseAdmin admin, Table table, 153 byte[] row, byte[] family, int nFlushes, int nPuts) 154 throws Exception { 155 156 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(table.getName())) { 157 // connection needed for poll-wait 158 HRegionLocation loc = locator.getRegionLocation(row, true); 159 AdminProtos.AdminService.BlockingInterface server = 160 ((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName()); 161 byte[] regName = loc.getRegionInfo().getRegionName(); 162 163 for (int i = 0; i < nFlushes; i++) { 164 randomCFPuts(table, row, family, nPuts); 165 List<String> sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY); 166 int sfCount = sf.size(); 167 168 admin.flush(table.getName()); 169 } 170 } 171 } 172 173 private static List<Cell> toList(ResultScanner scanner) { 174 try { 175 List<Cell> cells = new ArrayList<>(); 176 for (Result r : scanner) { 177 cells.addAll(r.listCells()); 178 } 179 return cells; 180 } finally { 181 scanner.close(); 182 } 183 } 184 185 @Test 186 public void testScanAfterDeletingSpecifiedRow() throws IOException { 187 TableName tableName = TableName.valueOf(name.getMethodName()); 188 TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) 189 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 190 .build(); 191 TEST_UTIL.getAdmin().createTable(desc); 192 byte[] row = Bytes.toBytes("SpecifiedRow"); 193 byte[] value0 = Bytes.toBytes("value_0"); 194 byte[] value1 = Bytes.toBytes("value_1"); 195 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 196 Put put = new Put(row); 197 put.addColumn(FAMILY, QUALIFIER, VALUE); 198 t.put(put); 199 Delete d = new Delete(row); 200 t.delete(d); 201 put = new Put(row); 202 put.addColumn(FAMILY, null, value0); 203 t.put(put); 204 put = new Put(row); 205 put.addColumn(FAMILY, null, value1); 206 t.put(put); 207 List<Cell> cells = toList(t.getScanner(new Scan())); 208 assertEquals(1, cells.size()); 209 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 210 211 cells = toList(t.getScanner(new Scan().addFamily(FAMILY))); 212 assertEquals(1, cells.size()); 213 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 214 215 cells = toList(t.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 216 assertEquals(0, cells.size()); 217 218 TEST_UTIL.getAdmin().flush(tableName); 219 cells = toList(t.getScanner(new Scan())); 220 assertEquals(1, cells.size()); 221 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 222 223 cells = toList(t.getScanner(new Scan().addFamily(FAMILY))); 224 assertEquals(1, cells.size()); 225 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 226 227 cells = toList(t.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 228 assertEquals(0, cells.size()); 229 } 230 } 231 232 @Test 233 public void testScanAfterDeletingSpecifiedRowV2() throws IOException { 234 TableName tableName = TableName.valueOf(name.getMethodName()); 235 TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) 236 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 237 .build(); 238 TEST_UTIL.getAdmin().createTable(desc); 239 byte[] row = Bytes.toBytes("SpecifiedRow"); 240 byte[] qual0 = Bytes.toBytes("qual0"); 241 byte[] qual1 = Bytes.toBytes("qual1"); 242 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 243 Delete d = new Delete(row); 244 t.delete(d); 245 246 Put put = new Put(row); 247 put.addColumn(FAMILY, null, VALUE); 248 t.put(put); 249 250 put = new Put(row); 251 put.addColumn(FAMILY, qual1, qual1); 252 t.put(put); 253 254 put = new Put(row); 255 put.addColumn(FAMILY, qual0, qual0); 256 t.put(put); 257 258 Result r = t.get(new Get(row)); 259 assertEquals(3, r.size()); 260 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 261 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 262 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 263 264 TEST_UTIL.getAdmin().flush(tableName); 265 r = t.get(new Get(row)); 266 assertEquals(3, r.size()); 267 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 268 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 269 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 270 } 271 } 272 273 // override the config settings at the CF level and ensure priority 274 @Test 275 public void testAdvancedConfigOverride() throws Exception { 276 /* 277 * Overall idea: (1) create 3 store files and issue a compaction. config's 278 * compaction.min == 3, so should work. (2) Increase the compaction.min 279 * toggle in the HTD to 5 and modify table. If we use the HTD value instead 280 * of the default config value, adding 3 files and issuing a compaction 281 * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2 282 * and modify table. The CF schema should override the Table schema and now 283 * cause a minor compaction. 284 */ 285 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3); 286 287 final TableName tableName = TableName.valueOf(name.getMethodName()); 288 Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10); 289 Admin admin = TEST_UTIL.getAdmin(); 290 ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection(); 291 292 // Create 3 store files. 293 byte[] row = Bytes.toBytes(random.nextInt()); 294 performMultiplePutAndFlush((HBaseAdmin) admin, hTable, row, FAMILY, 3, 100); 295 296 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 297 // Verify we have multiple store files. 298 HRegionLocation loc = locator.getRegionLocation(row, true); 299 byte[] regionName = loc.getRegionInfo().getRegionName(); 300 AdminProtos.AdminService.BlockingInterface server = connection.getAdmin(loc.getServerName()); 301 assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1); 302 303 // Issue a compaction request 304 admin.compact(tableName); 305 306 // poll wait for the compactions to happen 307 for (int i = 0; i < 10 * 1000 / 40; ++i) { 308 // The number of store files after compaction should be lesser. 309 loc = locator.getRegionLocation(row, true); 310 if (!loc.getRegionInfo().isOffline()) { 311 regionName = loc.getRegionInfo().getRegionName(); 312 server = connection.getAdmin(loc.getServerName()); 313 if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1) { 314 break; 315 } 316 } 317 Thread.sleep(40); 318 } 319 // verify the compactions took place and that we didn't just time out 320 assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1); 321 322 // change the compaction.min config option for this table to 5 323 LOG.info("hbase.hstore.compaction.min should now be 5"); 324 HTableDescriptor htd = new HTableDescriptor(hTable.getTableDescriptor()); 325 htd.setValue("hbase.hstore.compaction.min", String.valueOf(5)); 326 admin.modifyTable(tableName, htd); 327 Pair<Integer, Integer> st; 328 while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) { 329 LOG.debug(st.getFirst() + " regions left to update"); 330 Thread.sleep(40); 331 } 332 LOG.info("alter status finished"); 333 334 // Create 3 more store files. 335 performMultiplePutAndFlush((HBaseAdmin) admin, hTable, row, FAMILY, 3, 10); 336 337 // Issue a compaction request 338 admin.compact(tableName); 339 340 // This time, the compaction request should not happen 341 Thread.sleep(10 * 1000); 342 loc = locator.getRegionLocation(row, true); 343 regionName = loc.getRegionInfo().getRegionName(); 344 server = connection.getAdmin(loc.getServerName()); 345 int sfCount = ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size(); 346 assertTrue(sfCount > 1); 347 348 // change an individual CF's config option to 2 & online schema update 349 LOG.info("hbase.hstore.compaction.min should now be 2"); 350 HColumnDescriptor hcd = new HColumnDescriptor(htd.getFamily(FAMILY)); 351 hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2)); 352 htd.modifyFamily(hcd); 353 admin.modifyTable(tableName, htd); 354 while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) { 355 LOG.debug(st.getFirst() + " regions left to update"); 356 Thread.sleep(40); 357 } 358 LOG.info("alter status finished"); 359 360 // Issue a compaction request 361 admin.compact(tableName); 362 363 // poll wait for the compactions to happen 364 for (int i = 0; i < 10 * 1000 / 40; ++i) { 365 loc = locator.getRegionLocation(row, true); 366 regionName = loc.getRegionInfo().getRegionName(); 367 try { 368 server = connection.getAdmin(loc.getServerName()); 369 if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount) { 370 break; 371 } 372 } catch (Exception e) { 373 LOG.debug("Waiting for region to come online: " + Bytes.toString(regionName)); 374 } 375 Thread.sleep(40); 376 } 377 378 // verify the compaction took place and that we didn't just time out 379 assertTrue(ProtobufUtil.getStoreFiles( 380 server, regionName, FAMILY).size() < sfCount); 381 382 // Finally, ensure that we can remove a custom config value after we made it 383 LOG.info("Removing CF config value"); 384 LOG.info("hbase.hstore.compaction.min should now be 5"); 385 hcd = new HColumnDescriptor(htd.getFamily(FAMILY)); 386 hcd.setValue("hbase.hstore.compaction.min", null); 387 htd.modifyFamily(hcd); 388 admin.modifyTable(tableName, htd); 389 while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) { 390 LOG.debug(st.getFirst() + " regions left to update"); 391 Thread.sleep(40); 392 } 393 LOG.info("alter status finished"); 394 assertNull(hTable.getTableDescriptor().getFamily(FAMILY).getValue( 395 "hbase.hstore.compaction.min")); 396 } 397 } 398 399 @Test 400 public void testHTableBatchWithEmptyPut ()throws Exception { 401 Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), 402 new byte[][] { FAMILY }); 403 try { 404 List actions = (List) new ArrayList(); 405 Object[] results = new Object[2]; 406 // create an empty Put 407 Put put1 = new Put(ROW); 408 actions.add(put1); 409 410 Put put2 = new Put(ANOTHERROW); 411 put2.addColumn(FAMILY, QUALIFIER, VALUE); 412 actions.add(put2); 413 414 table.batch(actions, results); 415 fail("Empty Put should have failed the batch call"); 416 } catch (IllegalArgumentException iae) { 417 418 } finally { 419 table.close(); 420 } 421 } 422 423 // Test Table.batch with large amount of mutations against the same key. 424 // It used to trigger read lock's "Maximum lock count exceeded" Error. 425 @Test 426 public void testHTableWithLargeBatch() throws Exception { 427 Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), 428 new byte[][] { FAMILY }); 429 int sixtyFourK = 64 * 1024; 430 try { 431 List actions = new ArrayList(); 432 Object[] results = new Object[(sixtyFourK + 1) * 2]; 433 434 for (int i = 0; i < sixtyFourK + 1; i ++) { 435 Put put1 = new Put(ROW); 436 put1.addColumn(FAMILY, QUALIFIER, VALUE); 437 actions.add(put1); 438 439 Put put2 = new Put(ANOTHERROW); 440 put2.addColumn(FAMILY, QUALIFIER, VALUE); 441 actions.add(put2); 442 } 443 444 table.batch(actions, results); 445 } finally { 446 table.close(); 447 } 448 } 449 450 @Test 451 public void testBatchWithRowMutation() throws Exception { 452 LOG.info("Starting testBatchWithRowMutation"); 453 final TableName TABLENAME = TableName.valueOf("testBatchWithRowMutation"); 454 try (Table t = TEST_UTIL.createTable(TABLENAME, FAMILY)) { 455 byte [][] QUALIFIERS = new byte [][] { 456 Bytes.toBytes("a"), Bytes.toBytes("b") 457 }; 458 459 RowMutations arm = RowMutations.of(Collections.singletonList( 460 new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE))); 461 Object[] batchResult = new Object[1]; 462 t.batch(Arrays.asList(arm), batchResult); 463 464 Get g = new Get(ROW); 465 Result r = t.get(g); 466 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); 467 468 arm = RowMutations.of(Arrays.asList( 469 new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE), 470 new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0]))); 471 t.batch(Arrays.asList(arm), batchResult); 472 r = t.get(g); 473 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); 474 assertNull(r.getValue(FAMILY, QUALIFIERS[0])); 475 476 // Test that we get the correct remote exception for RowMutations from batch() 477 try { 478 arm = RowMutations.of(Collections.singletonList( 479 new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE))); 480 t.batch(Arrays.asList(arm), batchResult); 481 fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException"); 482 } catch(RetriesExhaustedWithDetailsException e) { 483 String msg = e.getMessage(); 484 assertTrue(msg.contains("NoSuchColumnFamilyException")); 485 } 486 } 487 } 488 489 @Test 490 public void testHTableExistsMethodSingleRegionSingleGet() throws Exception { 491 // Test with a single region table. 492 Table table = TEST_UTIL.createTable( 493 TableName.valueOf(name.getMethodName()), 494 new byte[][] { FAMILY }); 495 496 Put put = new Put(ROW); 497 put.addColumn(FAMILY, QUALIFIER, VALUE); 498 499 Get get = new Get(ROW); 500 501 boolean exist = table.exists(get); 502 assertFalse(exist); 503 504 table.put(put); 505 506 exist = table.exists(get); 507 assertTrue(exist); 508 } 509 510 @Test 511 public void testHTableExistsMethodSingleRegionMultipleGets() throws Exception { 512 Table table = TEST_UTIL.createTable(TableName.valueOf( 513 name.getMethodName()), new byte[][] { FAMILY }); 514 515 Put put = new Put(ROW); 516 put.addColumn(FAMILY, QUALIFIER, VALUE); 517 table.put(put); 518 519 List<Get> gets = new ArrayList<>(); 520 gets.add(new Get(ROW)); 521 gets.add(new Get(ANOTHERROW)); 522 523 boolean[] results = table.exists(gets); 524 assertTrue(results[0]); 525 assertFalse(results[1]); 526 } 527 528 @Test 529 public void testHTableExistsBeforeGet() throws Exception { 530 Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), 531 new byte[][] { FAMILY }); 532 try { 533 Put put = new Put(ROW); 534 put.addColumn(FAMILY, QUALIFIER, VALUE); 535 table.put(put); 536 537 Get get = new Get(ROW); 538 539 boolean exist = table.exists(get); 540 assertEquals(true, exist); 541 542 Result result = table.get(get); 543 assertEquals(false, result.isEmpty()); 544 assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER))); 545 } finally { 546 table.close(); 547 } 548 } 549 550 @Test 551 public void testHTableExistsAllBeforeGet() throws Exception { 552 final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2")); 553 Table table = TEST_UTIL.createTable( 554 TableName.valueOf(name.getMethodName()), new byte[][] { FAMILY }); 555 try { 556 Put put = new Put(ROW); 557 put.addColumn(FAMILY, QUALIFIER, VALUE); 558 table.put(put); 559 put = new Put(ROW2); 560 put.addColumn(FAMILY, QUALIFIER, VALUE); 561 table.put(put); 562 563 Get get = new Get(ROW); 564 Get get2 = new Get(ROW2); 565 ArrayList<Get> getList = new ArrayList(2); 566 getList.add(get); 567 getList.add(get2); 568 569 boolean[] exists = table.existsAll(getList); 570 assertEquals(true, exists[0]); 571 assertEquals(true, exists[1]); 572 573 Result[] result = table.get(getList); 574 assertEquals(false, result[0].isEmpty()); 575 assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER))); 576 assertEquals(false, result[1].isEmpty()); 577 assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER))); 578 } finally { 579 table.close(); 580 } 581 } 582 583 @Test 584 public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception { 585 Table table = TEST_UTIL.createTable( 586 TableName.valueOf(name.getMethodName()), new byte[][] { FAMILY }, 587 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255); 588 Put put = new Put(ROW); 589 put.addColumn(FAMILY, QUALIFIER, VALUE); 590 591 Get get = new Get(ROW); 592 593 boolean exist = table.exists(get); 594 assertFalse(exist); 595 596 table.put(put); 597 598 exist = table.exists(get); 599 assertTrue(exist); 600 } 601 602 @Test 603 public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception { 604 Table table = TEST_UTIL.createTable( 605 TableName.valueOf(name.getMethodName()), 606 new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255); 607 Put put = new Put(ROW); 608 put.addColumn(FAMILY, QUALIFIER, VALUE); 609 table.put (put); 610 611 List<Get> gets = new ArrayList<>(); 612 gets.add(new Get(ANOTHERROW)); 613 gets.add(new Get(Bytes.add(ROW, new byte[] { 0x00 }))); 614 gets.add(new Get(ROW)); 615 gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 }))); 616 617 LOG.info("Calling exists"); 618 boolean[] results = table.existsAll(gets); 619 assertFalse(results[0]); 620 assertFalse(results[1]); 621 assertTrue(results[2]); 622 assertFalse(results[3]); 623 624 // Test with the first region. 625 put = new Put(new byte[] { 0x00 }); 626 put.addColumn(FAMILY, QUALIFIER, VALUE); 627 table.put(put); 628 629 gets = new ArrayList<>(); 630 gets.add(new Get(new byte[] { 0x00 })); 631 gets.add(new Get(new byte[] { 0x00, 0x00 })); 632 results = table.existsAll(gets); 633 assertTrue(results[0]); 634 assertFalse(results[1]); 635 636 // Test with the last region 637 put = new Put(new byte[] { (byte) 0xff, (byte) 0xff }); 638 put.addColumn(FAMILY, QUALIFIER, VALUE); 639 table.put(put); 640 641 gets = new ArrayList<>(); 642 gets.add(new Get(new byte[] { (byte) 0xff })); 643 gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff })); 644 gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff })); 645 results = table.existsAll(gets); 646 assertFalse(results[0]); 647 assertTrue(results[1]); 648 assertFalse(results[2]); 649 } 650 651 @Test 652 public void testGetEmptyRow() throws Exception { 653 //Create a table and put in 1 row 654 Admin admin = TEST_UTIL.getAdmin(); 655 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes.toBytes(name.getMethodName()))); 656 desc.addFamily(new HColumnDescriptor(FAMILY)); 657 admin.createTable(desc); 658 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 659 660 Put put = new Put(ROW_BYTES); 661 put.addColumn(FAMILY, COL_QUAL, VAL_BYTES); 662 table.put(put); 663 664 //Try getting the row with an empty row key 665 Result res = null; 666 try { 667 res = table.get(new Get(new byte[0])); 668 fail(); 669 } catch (IllegalArgumentException e) { 670 // Expected. 671 } 672 assertTrue(res == null); 673 res = table.get(new Get(Bytes.toBytes("r1-not-exist"))); 674 assertTrue(res.isEmpty() == true); 675 res = table.get(new Get(ROW_BYTES)); 676 assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); 677 table.close(); 678 } 679 680 @Test 681 public void testConnectionDefaultUsesCodec() throws Exception { 682 ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection(); 683 assertTrue(con.hasCellBlockSupport()); 684 } 685 686 @Test 687 public void testPutWithPreBatchMutate() throws Exception { 688 final TableName tableName = TableName.valueOf(name.getMethodName()); 689 testPreBatchMutate(tableName, () -> { 690 try { 691 Table t = TEST_UTIL.getConnection().getTable(tableName); 692 Put put = new Put(ROW); 693 put.addColumn(FAMILY, QUALIFIER, VALUE); 694 t.put(put); 695 } catch (IOException ex) { 696 throw new RuntimeException(ex); 697 } 698 }); 699 } 700 701 @Test 702 public void testRowMutationsWithPreBatchMutate() throws Exception { 703 final TableName tableName = TableName.valueOf(name.getMethodName()); 704 testPreBatchMutate(tableName, () -> { 705 try { 706 RowMutations rm = new RowMutations(ROW, 1); 707 Table t = TEST_UTIL.getConnection().getTable(tableName); 708 Put put = new Put(ROW); 709 put.addColumn(FAMILY, QUALIFIER, VALUE); 710 rm.add(put); 711 t.mutateRow(rm); 712 } catch (IOException ex) { 713 throw new RuntimeException(ex); 714 } 715 }); 716 } 717 718 private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception { 719 HTableDescriptor desc = new HTableDescriptor(tableName); 720 desc.addCoprocessor(WaitingForScanObserver.class.getName()); 721 desc.addFamily(new HColumnDescriptor(FAMILY)); 722 TEST_UTIL.getAdmin().createTable(desc); 723 ExecutorService service = Executors.newFixedThreadPool(2); 724 service.execute(rn); 725 final List<Cell> cells = new ArrayList<>(); 726 service.execute(() -> { 727 try { 728 // waiting for update. 729 TimeUnit.SECONDS.sleep(3); 730 Table t = TEST_UTIL.getConnection().getTable(tableName); 731 Scan scan = new Scan(); 732 try (ResultScanner scanner = t.getScanner(scan)) { 733 for (Result r : scanner) { 734 cells.addAll(Arrays.asList(r.rawCells())); 735 } 736 } 737 } catch (IOException | InterruptedException ex) { 738 throw new RuntimeException(ex); 739 } 740 }); 741 service.shutdown(); 742 service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 743 assertEquals("The write is blocking by RegionObserver#postBatchMutate" 744 + ", so the data is invisible to reader", 0, cells.size()); 745 TEST_UTIL.deleteTable(tableName); 746 } 747 748 @Test 749 public void testLockLeakWithDelta() throws Exception, Throwable { 750 final TableName tableName = TableName.valueOf(name.getMethodName()); 751 HTableDescriptor desc = new HTableDescriptor(tableName); 752 desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); 753 desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); 754 desc.addFamily(new HColumnDescriptor(FAMILY)); 755 TEST_UTIL.getAdmin().createTable(desc); 756 // new a connection for lower retry number. 757 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 758 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 759 try (Connection con = ConnectionFactory.createConnection(copy)) { 760 HRegion region = (HRegion) find(tableName); 761 region.setTimeoutForWriteLock(10); 762 ExecutorService putService = Executors.newSingleThreadExecutor(); 763 putService.execute(() -> { 764 try (Table table = con.getTable(tableName)) { 765 Put put = new Put(ROW); 766 put.addColumn(FAMILY, QUALIFIER, VALUE); 767 // the put will be blocked by WaitingForMultiMutationsObserver. 768 table.put(put); 769 } catch (IOException ex) { 770 throw new RuntimeException(ex); 771 } 772 }); 773 ExecutorService appendService = Executors.newSingleThreadExecutor(); 774 appendService.execute(() -> { 775 Append append = new Append(ROW); 776 append.addColumn(FAMILY, QUALIFIER, VALUE); 777 try (Table table = con.getTable(tableName)) { 778 table.append(append); 779 fail("The APPEND should fail because the target lock is blocked by previous put"); 780 } catch (Exception ex) { 781 } 782 }); 783 appendService.shutdown(); 784 appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 785 WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class); 786 observer.latch.countDown(); 787 putService.shutdown(); 788 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 789 try (Table table = con.getTable(tableName)) { 790 Result r = table.get(new Get(ROW)); 791 assertFalse(r.isEmpty()); 792 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE)); 793 } 794 } 795 HRegion region = (HRegion) find(tableName); 796 int readLockCount = region.getReadLockCount(); 797 LOG.info("readLockCount:" + readLockCount); 798 assertEquals(0, readLockCount); 799 } 800 801 @Test 802 public void testMultiRowMutations() throws Exception, Throwable { 803 final TableName tableName = TableName.valueOf(name.getMethodName()); 804 HTableDescriptor desc = new HTableDescriptor(tableName); 805 desc.addCoprocessor(MultiRowMutationEndpoint.class.getName()); 806 desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); 807 desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); 808 desc.addFamily(new HColumnDescriptor(FAMILY)); 809 TEST_UTIL.getAdmin().createTable(desc); 810 // new a connection for lower retry number. 811 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 812 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 813 try (Connection con = ConnectionFactory.createConnection(copy)) { 814 byte[] row = Bytes.toBytes("ROW-0"); 815 byte[] rowLocked= Bytes.toBytes("ROW-1"); 816 byte[] value0 = Bytes.toBytes("VALUE-0"); 817 byte[] value1 = Bytes.toBytes("VALUE-1"); 818 byte[] value2 = Bytes.toBytes("VALUE-2"); 819 assertNoLocks(tableName); 820 ExecutorService putService = Executors.newSingleThreadExecutor(); 821 putService.execute(() -> { 822 try (Table table = con.getTable(tableName)) { 823 Put put0 = new Put(rowLocked); 824 put0.addColumn(FAMILY, QUALIFIER, value0); 825 // the put will be blocked by WaitingForMultiMutationsObserver. 826 table.put(put0); 827 } catch (IOException ex) { 828 throw new RuntimeException(ex); 829 } 830 }); 831 ExecutorService cpService = Executors.newSingleThreadExecutor(); 832 cpService.execute(() -> { 833 boolean threw; 834 Put put1 = new Put(row); 835 Put put2 = new Put(rowLocked); 836 put1.addColumn(FAMILY, QUALIFIER, value1); 837 put2.addColumn(FAMILY, QUALIFIER, value2); 838 try (Table table = con.getTable(tableName)) { 839 MultiRowMutationProtos.MutateRowsRequest request 840 = MultiRowMutationProtos.MutateRowsRequest.newBuilder() 841 .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 842 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put1)) 843 .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 844 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2)) 845 .build(); 846 table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, 847 ROW, ROW, 848 (MultiRowMutationProtos.MultiRowMutationService exe) -> { 849 ServerRpcController controller = new ServerRpcController(); 850 CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse> 851 rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); 852 exe.mutateRows(controller, request, rpcCallback); 853 return rpcCallback.get(); 854 }); 855 threw = false; 856 } catch (Throwable ex) { 857 threw = true; 858 } 859 if (!threw) { 860 // Can't call fail() earlier because the catch would eat it. 861 fail("This cp should fail because the target lock is blocked by previous put"); 862 } 863 }); 864 cpService.shutdown(); 865 cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 866 WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class); 867 observer.latch.countDown(); 868 putService.shutdown(); 869 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 870 try (Table table = con.getTable(tableName)) { 871 Get g0 = new Get(row); 872 Get g1 = new Get(rowLocked); 873 Result r0 = table.get(g0); 874 Result r1 = table.get(g1); 875 assertTrue(r0.isEmpty()); 876 assertFalse(r1.isEmpty()); 877 assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); 878 } 879 assertNoLocks(tableName); 880 } 881 } 882 883 /** 884 * A test case for issue HBASE-17482 885 * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped 886 * onto cells in the append thread, a countdown latch is used to ensure that happened 887 * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) 888 * make the seqid/mvcc acquirement in handler thread and stamping in append thread 889 * No countdown latch to assure cells in memstore are stamped with seqid/mvcc. 890 * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner 891 * with a smaller readpoint can see these data, which disobey the multi version 892 * concurrency control rules. 893 * This test case is to reproduce this scenario. 894 * @throws IOException 895 */ 896 @Test 897 public void testMVCCUsingMVCCPreAssign() throws IOException { 898 final TableName tableName = TableName.valueOf(name.getMethodName()); 899 HTableDescriptor htd = new HTableDescriptor(tableName); 900 HColumnDescriptor fam = new HColumnDescriptor(FAMILY); 901 htd.addFamily(fam); 902 Admin admin = TEST_UTIL.getAdmin(); 903 admin.createTable(htd); 904 Table table = admin.getConnection().getTable(TableName.valueOf(name.getMethodName())); 905 //put two row first to init the scanner 906 Put put = new Put(Bytes.toBytes("0")); 907 put.addColumn(FAMILY, Bytes.toBytes( ""), Bytes.toBytes("0")); 908 table.put(put); 909 put = new Put(Bytes.toBytes("00")); 910 put.addColumn(FAMILY, Bytes.toBytes( ""), Bytes.toBytes("0")); 911 table.put(put); 912 Scan scan = new Scan(); 913 scan.setTimeRange(0, Long.MAX_VALUE); 914 scan.setCaching(1); 915 ResultScanner scanner = table.getScanner(scan); 916 int rowNum = scanner.next() != null ? 1 : 0; 917 //the started scanner shouldn't see the rows put below 918 for(int i = 1; i < 1000; i++) { 919 put = new Put(Bytes.toBytes(String.valueOf(i))); 920 put.setDurability(Durability.ASYNC_WAL); 921 put.addColumn(FAMILY, Bytes.toBytes( ""), Bytes.toBytes(i)); 922 table.put(put); 923 } 924 for(Result result : scanner) { 925 rowNum++; 926 } 927 //scanner should only see two rows 928 assertEquals(2, rowNum); 929 scanner = table.getScanner(scan); 930 rowNum = 0; 931 for(Result result : scanner) { 932 rowNum++; 933 } 934 // the new scanner should see all rows 935 assertEquals(1001, rowNum); 936 937 938 } 939 940 @Test 941 public void testPutThenGetWithMultipleThreads() throws Exception { 942 final TableName tableName = TableName.valueOf(name.getMethodName()); 943 final int THREAD_NUM = 20; 944 final int ROUND_NUM = 10; 945 for (int round = 0; round < ROUND_NUM; round++) { 946 ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM); 947 final AtomicInteger successCnt = new AtomicInteger(0); 948 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 949 for (int i = 0; i < THREAD_NUM; i++) { 950 final int index = i; 951 Thread t = new Thread(new Runnable() { 952 953 @Override 954 public void run() { 955 final byte[] row = Bytes.toBytes("row-" + index); 956 final byte[] value = Bytes.toBytes("v" + index); 957 try { 958 Put put = new Put(row); 959 put.addColumn(FAMILY, QUALIFIER, value); 960 ht.put(put); 961 Get get = new Get(row); 962 Result result = ht.get(get); 963 byte[] returnedValue = result.getValue(FAMILY, QUALIFIER); 964 if (Bytes.equals(value, returnedValue)) { 965 successCnt.getAndIncrement(); 966 } else { 967 LOG.error("Should be equal but not, original value: " + Bytes.toString(value) 968 + ", returned value: " 969 + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); 970 } 971 } catch (Throwable e) { 972 // do nothing 973 } 974 } 975 }); 976 threads.add(t); 977 } 978 for (Thread t : threads) { 979 t.start(); 980 } 981 for (Thread t : threads) { 982 t.join(); 983 } 984 assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get()); 985 ht.close(); 986 TEST_UTIL.deleteTable(tableName); 987 } 988 } 989 990 private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException { 991 HRegion region = (HRegion) find(tableName); 992 assertEquals(0, region.getLockedRows().size()); 993 } 994 private static HRegion find(final TableName tableName) 995 throws IOException, InterruptedException { 996 HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); 997 List<HRegion> regions = rs.getRegions(tableName); 998 assertEquals(1, regions.size()); 999 return regions.get(0); 1000 } 1001 1002 private static <T extends RegionObserver> T find(final TableName tableName, 1003 Class<T> clz) throws IOException, InterruptedException { 1004 HRegion region = find(tableName); 1005 Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); 1006 assertTrue("The cp instance should be " + clz.getName() 1007 + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp)); 1008 return clz.cast(cp); 1009 } 1010 1011 public static class WaitingForMultiMutationsObserver 1012 implements RegionCoprocessor, RegionObserver { 1013 final CountDownLatch latch = new CountDownLatch(1); 1014 1015 @Override 1016 public Optional<RegionObserver> getRegionObserver() { 1017 return Optional.of(this); 1018 } 1019 1020 @Override 1021 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, 1022 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1023 try { 1024 latch.await(); 1025 } catch (InterruptedException ex) { 1026 throw new IOException(ex); 1027 } 1028 } 1029 } 1030 1031 public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver { 1032 private final CountDownLatch latch = new CountDownLatch(1); 1033 1034 @Override 1035 public Optional<RegionObserver> getRegionObserver() { 1036 return Optional.of(this); 1037 } 1038 1039 @Override 1040 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, 1041 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1042 try { 1043 // waiting for scanner 1044 latch.await(); 1045 } catch (InterruptedException ex) { 1046 throw new IOException(ex); 1047 } 1048 } 1049 1050 @Override 1051 public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 1052 final Scan scan, final RegionScanner s) throws IOException { 1053 latch.countDown(); 1054 return s; 1055 } 1056 } 1057 1058 static byte[] generateHugeValue(int size) { 1059 Random rand = ThreadLocalRandom.current(); 1060 byte[] value = new byte[size]; 1061 for (int i = 0; i < value.length; i++) { 1062 value[i] = (byte) rand.nextInt(256); 1063 } 1064 return value; 1065 } 1066 1067 @Test 1068 public void testScanWithBatchSizeReturnIncompleteCells() throws IOException { 1069 TableName tableName = TableName.valueOf(name.getMethodName()); 1070 TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName) 1071 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build()) 1072 .build(); 1073 1074 Table table = TEST_UTIL.createTable(hd, null); 1075 1076 Put put = new Put(ROW); 1077 put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024)); 1078 table.put(put); 1079 1080 put = new Put(ROW); 1081 put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024)); 1082 table.put(put); 1083 1084 for (int i = 2; i < 5; i++) { 1085 for (int version = 0; version < 2; version++) { 1086 put = new Put(ROW); 1087 put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024)); 1088 table.put(put); 1089 } 1090 } 1091 1092 Scan scan = new Scan(); 1093 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3) 1094 .setMaxResultSize(4 * 1024 * 1024); 1095 Result result; 1096 try (ResultScanner scanner = table.getScanner(scan)) { 1097 List<Result> list = new ArrayList<>(); 1098 /* 1099 * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second 1100 * scan rpc should return a result with 3 cells, because reach the batch limit = 3; The 1101 * mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the 1102 * moreResultsInRegion also would be false. Finally, the client should collect all the cells 1103 * into two result: 2+3 -> 3+2; 1104 */ 1105 while ((result = scanner.next()) != null) { 1106 list.add(result); 1107 } 1108 1109 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1110 Assert.assertEquals(2, list.size()); 1111 Assert.assertEquals(3, list.get(0).size()); 1112 Assert.assertEquals(2, list.get(1).size()); 1113 } 1114 1115 scan = new Scan(); 1116 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2) 1117 .setMaxResultSize(4 * 1024 * 1024); 1118 try (ResultScanner scanner = table.getScanner(scan)) { 1119 List<Result> list = new ArrayList<>(); 1120 while ((result = scanner.next()) != null) { 1121 list.add(result); 1122 } 1123 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1124 Assert.assertEquals(3, list.size()); 1125 Assert.assertEquals(2, list.get(0).size()); 1126 Assert.assertEquals(2, list.get(1).size()); 1127 Assert.assertEquals(1, list.get(2).size()); 1128 } 1129 } 1130}