001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.List; 031import java.util.Optional; 032import java.util.Random; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicInteger; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.Coprocessor; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HColumnDescriptor; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.HTableDescriptor; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 051import org.apache.hadoop.hbase.coprocessor.ObserverContext; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 054import org.apache.hadoop.hbase.coprocessor.RegionObserver; 055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 056import org.apache.hadoop.hbase.ipc.ServerRpcController; 057import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; 058import org.apache.hadoop.hbase.regionserver.HRegion; 059import org.apache.hadoop.hbase.regionserver.HRegionServer; 060import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 061import org.apache.hadoop.hbase.regionserver.RegionScanner; 062import org.apache.hadoop.hbase.testclassification.ClientTests; 063import org.apache.hadoop.hbase.testclassification.LargeTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.Pair; 066import org.junit.After; 067import org.junit.AfterClass; 068import org.junit.Assert; 069import org.junit.Before; 070import org.junit.BeforeClass; 071import org.junit.ClassRule; 072import org.junit.Rule; 073import org.junit.Test; 074import org.junit.experimental.categories.Category; 075import org.junit.rules.TestName; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 081 082@Category({LargeTests.class, ClientTests.class}) 083public class TestFromClientSide3 { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestFromClientSide3.class); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class); 090 private final static HBaseTestingUtility TEST_UTIL 091 = new HBaseTestingUtility(); 092 private static final int WAITTABLE_MILLIS = 10000; 093 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 094 private static Random random = new Random(); 095 private static int SLAVES = 3; 096 private static final byte[] ROW = Bytes.toBytes("testRow"); 097 private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow"); 098 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 099 private static final byte[] VALUE = Bytes.toBytes("testValue"); 100 private static final byte[] COL_QUAL = Bytes.toBytes("f1"); 101 private static final byte[] VAL_BYTES = Bytes.toBytes("v1"); 102 private static final byte[] ROW_BYTES = Bytes.toBytes("r1"); 103 104 @Rule 105 public TestName name = new TestName(); 106 private TableName tableName; 107 108 /** 109 * @throws java.lang.Exception 110 */ 111 @BeforeClass 112 public static void setUpBeforeClass() throws Exception { 113 TEST_UTIL.startMiniCluster(SLAVES); 114 } 115 116 /** 117 * @throws java.lang.Exception 118 */ 119 @AfterClass 120 public static void tearDownAfterClass() throws Exception { 121 TEST_UTIL.shutdownMiniCluster(); 122 } 123 124 /** 125 * @throws java.lang.Exception 126 */ 127 @Before 128 public void setUp() throws Exception { 129 tableName = TableName.valueOf(name.getMethodName()); 130 } 131 132 /** 133 * @throws java.lang.Exception 134 */ 135 @After 136 public void tearDown() throws Exception { 137 for (HTableDescriptor htd: TEST_UTIL.getAdmin().listTables()) { 138 LOG.info("Tear down, remove table=" + htd.getTableName()); 139 TEST_UTIL.deleteTable(htd.getTableName()); 140 } 141 } 142 143 private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts) 144 throws Exception { 145 Put put = new Put(row); 146 for (int i = 0; i < nPuts; i++) { 147 byte[] qualifier = Bytes.toBytes(random.nextInt()); 148 byte[] value = Bytes.toBytes(random.nextInt()); 149 put.addColumn(family, qualifier, value); 150 } 151 table.put(put); 152 } 153 154 private void performMultiplePutAndFlush(HBaseAdmin admin, Table table, 155 byte[] row, byte[] family, int nFlushes, int nPuts) 156 throws Exception { 157 158 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(table.getName())) { 159 // connection needed for poll-wait 160 HRegionLocation loc = locator.getRegionLocation(row, true); 161 AdminProtos.AdminService.BlockingInterface server = 162 ((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName()); 163 byte[] regName = loc.getRegionInfo().getRegionName(); 164 165 for (int i = 0; i < nFlushes; i++) { 166 randomCFPuts(table, row, family, nPuts); 167 List<String> sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY); 168 int sfCount = sf.size(); 169 170 admin.flush(table.getName()); 171 } 172 } 173 } 174 175 private static List<Cell> toList(ResultScanner scanner) { 176 try { 177 List<Cell> cells = new ArrayList<>(); 178 for (Result r : scanner) { 179 cells.addAll(r.listCells()); 180 } 181 return cells; 182 } finally { 183 scanner.close(); 184 } 185 } 186 187 @Test 188 public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException { 189 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 190 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 191 byte[] row = Bytes.toBytes("SpecifiedRow"); 192 byte[] value0 = Bytes.toBytes("value_0"); 193 byte[] value1 = Bytes.toBytes("value_1"); 194 Put put = new Put(row); 195 put.addColumn(FAMILY, QUALIFIER, VALUE); 196 table.put(put); 197 Delete d = new Delete(row); 198 table.delete(d); 199 put = new Put(row); 200 put.addColumn(FAMILY, null, value0); 201 table.put(put); 202 put = new Put(row); 203 put.addColumn(FAMILY, null, value1); 204 table.put(put); 205 List<Cell> cells = toList(table.getScanner(new Scan())); 206 assertEquals(1, cells.size()); 207 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 208 209 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 210 assertEquals(1, cells.size()); 211 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 212 213 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 214 assertEquals(0, cells.size()); 215 216 TEST_UTIL.getAdmin().flush(tableName); 217 cells = toList(table.getScanner(new Scan())); 218 assertEquals(1, cells.size()); 219 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 220 221 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 222 assertEquals(1, cells.size()); 223 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 224 225 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 226 assertEquals(0, cells.size()); 227 } 228 } 229 230 @Test 231 public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException { 232 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 233 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 234 byte[] row = Bytes.toBytes("SpecifiedRow"); 235 byte[] qual0 = Bytes.toBytes("qual0"); 236 byte[] qual1 = Bytes.toBytes("qual1"); 237 Delete d = new Delete(row); 238 table.delete(d); 239 240 Put put = new Put(row); 241 put.addColumn(FAMILY, null, VALUE); 242 table.put(put); 243 244 put = new Put(row); 245 put.addColumn(FAMILY, qual1, qual1); 246 table.put(put); 247 248 put = new Put(row); 249 put.addColumn(FAMILY, qual0, qual0); 250 table.put(put); 251 252 Result r = table.get(new Get(row)); 253 assertEquals(3, r.size()); 254 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 255 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 256 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 257 258 TEST_UTIL.getAdmin().flush(tableName); 259 r = table.get(new Get(row)); 260 assertEquals(3, r.size()); 261 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 262 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 263 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 264 } 265 } 266 267 // override the config settings at the CF level and ensure priority 268 @Test 269 public void testAdvancedConfigOverride() throws Exception { 270 /* 271 * Overall idea: (1) create 3 store files and issue a compaction. config's 272 * compaction.min == 3, so should work. (2) Increase the compaction.min 273 * toggle in the HTD to 5 and modify table. If we use the HTD value instead 274 * of the default config value, adding 3 files and issuing a compaction 275 * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2 276 * and modify table. The CF schema should override the Table schema and now 277 * cause a minor compaction. 278 */ 279 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3); 280 281 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 282 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 283 try (Admin admin = TEST_UTIL.getAdmin()) { 284 ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection(); 285 286 // Create 3 store files. 287 byte[] row = Bytes.toBytes(random.nextInt()); 288 performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 100); 289 290 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 291 // Verify we have multiple store files. 292 HRegionLocation loc = locator.getRegionLocation(row, true); 293 byte[] regionName = loc.getRegionInfo().getRegionName(); 294 AdminProtos.AdminService.BlockingInterface server = 295 connection.getAdmin(loc.getServerName()); 296 assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1); 297 298 // Issue a compaction request 299 admin.compact(tableName); 300 301 // poll wait for the compactions to happen 302 for (int i = 0; i < 10 * 1000 / 40; ++i) { 303 // The number of store files after compaction should be lesser. 304 loc = locator.getRegionLocation(row, true); 305 if (!loc.getRegionInfo().isOffline()) { 306 regionName = loc.getRegionInfo().getRegionName(); 307 server = connection.getAdmin(loc.getServerName()); 308 if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1) { 309 break; 310 } 311 } 312 Thread.sleep(40); 313 } 314 // verify the compactions took place and that we didn't just time out 315 assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1); 316 317 // change the compaction.min config option for this table to 5 318 LOG.info("hbase.hstore.compaction.min should now be 5"); 319 HTableDescriptor htd = new HTableDescriptor(table.getTableDescriptor()); 320 htd.setValue("hbase.hstore.compaction.min", String.valueOf(5)); 321 admin.modifyTable(tableName, htd); 322 Pair<Integer, Integer> st = admin.getAlterStatus(tableName); 323 while (null != st && st.getFirst() > 0) { 324 LOG.debug(st.getFirst() + " regions left to update"); 325 Thread.sleep(40); 326 st = admin.getAlterStatus(tableName); 327 } 328 LOG.info("alter status finished"); 329 330 // Create 3 more store files. 331 performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 10); 332 333 // Issue a compaction request 334 admin.compact(tableName); 335 336 // This time, the compaction request should not happen 337 Thread.sleep(10 * 1000); 338 loc = locator.getRegionLocation(row, true); 339 regionName = loc.getRegionInfo().getRegionName(); 340 server = connection.getAdmin(loc.getServerName()); 341 int sfCount = ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size(); 342 assertTrue(sfCount > 1); 343 344 // change an individual CF's config option to 2 & online schema update 345 LOG.info("hbase.hstore.compaction.min should now be 2"); 346 HColumnDescriptor hcd = new HColumnDescriptor(htd.getFamily(FAMILY)); 347 hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2)); 348 htd.modifyFamily(hcd); 349 admin.modifyTable(tableName, htd); 350 st = admin.getAlterStatus(tableName); 351 while (null != st && st.getFirst() > 0) { 352 LOG.debug(st.getFirst() + " regions left to update"); 353 Thread.sleep(40); 354 st = admin.getAlterStatus(tableName); 355 } 356 LOG.info("alter status finished"); 357 358 // Issue a compaction request 359 admin.compact(tableName); 360 361 // poll wait for the compactions to happen 362 for (int i = 0; i < 10 * 1000 / 40; ++i) { 363 loc = locator.getRegionLocation(row, true); 364 regionName = loc.getRegionInfo().getRegionName(); 365 try { 366 server = connection.getAdmin(loc.getServerName()); 367 if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount) { 368 break; 369 } 370 } catch (Exception e) { 371 LOG.debug("Waiting for region to come online: " + Bytes.toString(regionName)); 372 } 373 Thread.sleep(40); 374 } 375 376 // verify the compaction took place and that we didn't just time out 377 assertTrue(ProtobufUtil.getStoreFiles( 378 server, regionName, FAMILY).size() < sfCount); 379 380 // Finally, ensure that we can remove a custom config value after we made it 381 LOG.info("Removing CF config value"); 382 LOG.info("hbase.hstore.compaction.min should now be 5"); 383 hcd = new HColumnDescriptor(htd.getFamily(FAMILY)); 384 hcd.setValue("hbase.hstore.compaction.min", null); 385 htd.modifyFamily(hcd); 386 admin.modifyTable(tableName, htd); 387 st = admin.getAlterStatus(tableName); 388 while (null != st && st.getFirst() > 0) { 389 LOG.debug(st.getFirst() + " regions left to update"); 390 Thread.sleep(40); 391 st = admin.getAlterStatus(tableName); 392 } 393 LOG.info("alter status finished"); 394 assertNull(table.getTableDescriptor().getFamily(FAMILY).getValue( 395 "hbase.hstore.compaction.min")); 396 } 397 } 398 } 399 } 400 401 @Test 402 public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException { 403 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 404 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 405 List actions = (List) new ArrayList(); 406 Object[] results = new Object[2]; 407 // create an empty Put 408 Put put1 = new Put(ROW); 409 actions.add(put1); 410 411 Put put2 = new Put(ANOTHERROW); 412 put2.addColumn(FAMILY, QUALIFIER, VALUE); 413 actions.add(put2); 414 415 table.batch(actions, results); 416 fail("Empty Put should have failed the batch call"); 417 } catch (IllegalArgumentException iae) { 418 } 419 } 420 421 // Test Table.batch with large amount of mutations against the same key. 422 // It used to trigger read lock's "Maximum lock count exceeded" Error. 423 @Test 424 public void testHTableWithLargeBatch() throws IOException, InterruptedException { 425 int sixtyFourK = 64 * 1024; 426 List actions = new ArrayList(); 427 Object[] results = new Object[(sixtyFourK + 1) * 2]; 428 429 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 430 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 431 432 for (int i = 0; i < sixtyFourK + 1; i++) { 433 Put put1 = new Put(ROW); 434 put1.addColumn(FAMILY, QUALIFIER, VALUE); 435 actions.add(put1); 436 437 Put put2 = new Put(ANOTHERROW); 438 put2.addColumn(FAMILY, QUALIFIER, VALUE); 439 actions.add(put2); 440 } 441 442 table.batch(actions, results); 443 } 444 } 445 446 @Test 447 public void testBatchWithRowMutation() throws Exception { 448 LOG.info("Starting testBatchWithRowMutation"); 449 byte [][] QUALIFIERS = new byte [][] { 450 Bytes.toBytes("a"), Bytes.toBytes("b") 451 }; 452 453 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 454 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 455 456 RowMutations arm = RowMutations.of(Collections.singletonList( 457 new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE))); 458 Object[] batchResult = new Object[1]; 459 table.batch(Arrays.asList(arm), batchResult); 460 461 Get g = new Get(ROW); 462 Result r = table.get(g); 463 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); 464 465 arm = RowMutations.of(Arrays.asList( 466 new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE), 467 new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0]))); 468 table.batch(Arrays.asList(arm), batchResult); 469 r = table.get(g); 470 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); 471 assertNull(r.getValue(FAMILY, QUALIFIERS[0])); 472 473 // Test that we get the correct remote exception for RowMutations from batch() 474 try { 475 arm = RowMutations.of(Collections.singletonList( 476 new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE))); 477 table.batch(Arrays.asList(arm), batchResult); 478 fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException"); 479 } catch (RetriesExhaustedWithDetailsException e) { 480 String msg = e.getMessage(); 481 assertTrue(msg.contains("NoSuchColumnFamilyException")); 482 } 483 } 484 } 485 486 @Test 487 public void testHTableExistsMethodSingleRegionSingleGet() 488 throws IOException, InterruptedException { 489 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 490 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 491 492 // Test with a single region table. 493 Put put = new Put(ROW); 494 put.addColumn(FAMILY, QUALIFIER, VALUE); 495 496 Get get = new Get(ROW); 497 498 boolean exist = table.exists(get); 499 assertFalse(exist); 500 501 table.put(put); 502 503 exist = table.exists(get); 504 assertTrue(exist); 505 } 506 } 507 508 @Test 509 public void testHTableExistsMethodSingleRegionMultipleGets() 510 throws IOException, InterruptedException { 511 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 512 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 513 514 Put put = new Put(ROW); 515 put.addColumn(FAMILY, QUALIFIER, VALUE); 516 table.put(put); 517 518 List<Get> gets = new ArrayList<>(); 519 gets.add(new Get(ROW)); 520 gets.add(new Get(ANOTHERROW)); 521 522 boolean[] results = table.exists(gets); 523 assertTrue(results[0]); 524 assertFalse(results[1]); 525 } 526 } 527 528 @Test 529 public void testHTableExistsBeforeGet() throws IOException, InterruptedException { 530 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 531 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 532 533 Put put = new Put(ROW); 534 put.addColumn(FAMILY, QUALIFIER, VALUE); 535 table.put(put); 536 537 Get get = new Get(ROW); 538 539 boolean exist = table.exists(get); 540 assertEquals(true, exist); 541 542 Result result = table.get(get); 543 assertEquals(false, result.isEmpty()); 544 assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER))); 545 } 546 } 547 548 @Test 549 public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException { 550 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 551 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 552 553 final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2")); 554 Put put = new Put(ROW); 555 put.addColumn(FAMILY, QUALIFIER, VALUE); 556 table.put(put); 557 put = new Put(ROW2); 558 put.addColumn(FAMILY, QUALIFIER, VALUE); 559 table.put(put); 560 561 Get get = new Get(ROW); 562 Get get2 = new Get(ROW2); 563 ArrayList<Get> getList = new ArrayList(2); 564 getList.add(get); 565 getList.add(get2); 566 567 boolean[] exists = table.existsAll(getList); 568 assertEquals(true, exists[0]); 569 assertEquals(true, exists[1]); 570 571 Result[] result = table.get(getList); 572 assertEquals(false, result[0].isEmpty()); 573 assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER))); 574 assertEquals(false, result[1].isEmpty()); 575 assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER))); 576 } 577 } 578 579 @Test 580 public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception { 581 try (Table table = TEST_UTIL.createTable( 582 tableName, new byte[][] { FAMILY }, 583 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 584 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 585 586 Put put = new Put(ROW); 587 put.addColumn(FAMILY, QUALIFIER, VALUE); 588 589 Get get = new Get(ROW); 590 591 boolean exist = table.exists(get); 592 assertFalse(exist); 593 594 table.put(put); 595 596 exist = table.exists(get); 597 assertTrue(exist); 598 } 599 } 600 601 @Test 602 public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception { 603 try (Table table = TEST_UTIL.createTable( 604 tableName, 605 new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 606 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 607 608 Put put = new Put(ROW); 609 put.addColumn(FAMILY, QUALIFIER, VALUE); 610 table.put(put); 611 612 List<Get> gets = new ArrayList<>(); 613 gets.add(new Get(ANOTHERROW)); 614 gets.add(new Get(Bytes.add(ROW, new byte[]{0x00}))); 615 gets.add(new Get(ROW)); 616 gets.add(new Get(Bytes.add(ANOTHERROW, new byte[]{0x00}))); 617 618 LOG.info("Calling exists"); 619 boolean[] results = table.existsAll(gets); 620 assertFalse(results[0]); 621 assertFalse(results[1]); 622 assertTrue(results[2]); 623 assertFalse(results[3]); 624 625 // Test with the first region. 626 put = new Put(new byte[]{0x00}); 627 put.addColumn(FAMILY, QUALIFIER, VALUE); 628 table.put(put); 629 630 gets = new ArrayList<>(); 631 gets.add(new Get(new byte[]{0x00})); 632 gets.add(new Get(new byte[]{0x00, 0x00})); 633 results = table.existsAll(gets); 634 assertTrue(results[0]); 635 assertFalse(results[1]); 636 637 // Test with the last region 638 put = new Put(new byte[]{(byte) 0xff, (byte) 0xff}); 639 put.addColumn(FAMILY, QUALIFIER, VALUE); 640 table.put(put); 641 642 gets = new ArrayList<>(); 643 gets.add(new Get(new byte[]{(byte) 0xff})); 644 gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff})); 645 gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff})); 646 results = table.existsAll(gets); 647 assertFalse(results[0]); 648 assertTrue(results[1]); 649 assertFalse(results[2]); 650 } 651 } 652 653 @Test 654 public void testGetEmptyRow() throws Exception { 655 //Create a table and put in 1 row 656 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 657 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 658 659 Put put = new Put(ROW_BYTES); 660 put.addColumn(FAMILY, COL_QUAL, VAL_BYTES); 661 table.put(put); 662 663 //Try getting the row with an empty row key 664 Result res = null; 665 try { 666 res = table.get(new Get(new byte[0])); 667 fail(); 668 } catch (IllegalArgumentException e) { 669 // Expected. 670 } 671 assertTrue(res == null); 672 res = table.get(new Get(Bytes.toBytes("r1-not-exist"))); 673 assertTrue(res.isEmpty() == true); 674 res = table.get(new Get(ROW_BYTES)); 675 assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); 676 } 677 } 678 679 @Test 680 public void testConnectionDefaultUsesCodec() throws Exception { 681 ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection(); 682 assertTrue(con.hasCellBlockSupport()); 683 } 684 685 @Test 686 public void testPutWithPreBatchMutate() throws Exception { 687 testPreBatchMutate(tableName, () -> { 688 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 689 Put put = new Put(ROW); 690 put.addColumn(FAMILY, QUALIFIER, VALUE); 691 t.put(put); 692 } catch (IOException ex) { 693 throw new RuntimeException(ex); 694 } 695 }); 696 } 697 698 @Test 699 public void testRowMutationsWithPreBatchMutate() throws Exception { 700 testPreBatchMutate(tableName, () -> { 701 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 702 RowMutations rm = new RowMutations(ROW, 1); 703 Put put = new Put(ROW); 704 put.addColumn(FAMILY, QUALIFIER, VALUE); 705 rm.add(put); 706 t.mutateRow(rm); 707 } catch (IOException ex) { 708 throw new RuntimeException(ex); 709 } 710 }); 711 } 712 713 private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception { 714 HTableDescriptor desc = new HTableDescriptor(tableName); 715 desc.addCoprocessor(WaitingForScanObserver.class.getName()); 716 desc.addFamily(new HColumnDescriptor(FAMILY)); 717 TEST_UTIL.getAdmin().createTable(desc); 718 // Don't use waitTableAvailable(), because the scanner will mess up the co-processor 719 720 ExecutorService service = Executors.newFixedThreadPool(2); 721 service.execute(rn); 722 final List<Cell> cells = new ArrayList<>(); 723 service.execute(() -> { 724 try { 725 // waiting for update. 726 TimeUnit.SECONDS.sleep(3); 727 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 728 Scan scan = new Scan(); 729 try (ResultScanner scanner = t.getScanner(scan)) { 730 for (Result r : scanner) { 731 cells.addAll(Arrays.asList(r.rawCells())); 732 } 733 } 734 } 735 } catch (IOException | InterruptedException ex) { 736 throw new RuntimeException(ex); 737 } 738 }); 739 service.shutdown(); 740 service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 741 assertEquals("The write is blocking by RegionObserver#postBatchMutate" 742 + ", so the data is invisible to reader", 0, cells.size()); 743 TEST_UTIL.deleteTable(tableName); 744 } 745 746 @Test 747 public void testLockLeakWithDelta() throws Exception, Throwable { 748 HTableDescriptor desc = new HTableDescriptor(tableName); 749 desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); 750 desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); 751 desc.addFamily(new HColumnDescriptor(FAMILY)); 752 TEST_UTIL.getAdmin().createTable(desc); 753 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 754 755 // new a connection for lower retry number. 756 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 757 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 758 try (Connection con = ConnectionFactory.createConnection(copy)) { 759 HRegion region = (HRegion) find(tableName); 760 region.setTimeoutForWriteLock(10); 761 ExecutorService putService = Executors.newSingleThreadExecutor(); 762 putService.execute(() -> { 763 try (Table table = con.getTable(tableName)) { 764 Put put = new Put(ROW); 765 put.addColumn(FAMILY, QUALIFIER, VALUE); 766 // the put will be blocked by WaitingForMultiMutationsObserver. 767 table.put(put); 768 } catch (IOException ex) { 769 throw new RuntimeException(ex); 770 } 771 }); 772 ExecutorService appendService = Executors.newSingleThreadExecutor(); 773 appendService.execute(() -> { 774 Append append = new Append(ROW); 775 append.addColumn(FAMILY, QUALIFIER, VALUE); 776 try (Table table = con.getTable(tableName)) { 777 table.append(append); 778 fail("The APPEND should fail because the target lock is blocked by previous put"); 779 } catch (Exception ex) { 780 } 781 }); 782 appendService.shutdown(); 783 appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 784 WaitingForMultiMutationsObserver observer = 785 find(tableName, WaitingForMultiMutationsObserver.class); 786 observer.latch.countDown(); 787 putService.shutdown(); 788 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 789 try (Table table = con.getTable(tableName)) { 790 Result r = table.get(new Get(ROW)); 791 assertFalse(r.isEmpty()); 792 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE)); 793 } 794 } 795 HRegion region = (HRegion) find(tableName); 796 int readLockCount = region.getReadLockCount(); 797 LOG.info("readLockCount:" + readLockCount); 798 assertEquals(0, readLockCount); 799 } 800 801 @Test 802 public void testMultiRowMutations() throws Exception, Throwable { 803 HTableDescriptor desc = new HTableDescriptor(tableName); 804 desc.addCoprocessor(MultiRowMutationEndpoint.class.getName()); 805 desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); 806 desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); 807 desc.addFamily(new HColumnDescriptor(FAMILY)); 808 TEST_UTIL.getAdmin().createTable(desc); 809 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 810 811 // new a connection for lower retry number. 812 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 813 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 814 try (Connection con = ConnectionFactory.createConnection(copy)) { 815 byte[] row = Bytes.toBytes("ROW-0"); 816 byte[] rowLocked= Bytes.toBytes("ROW-1"); 817 byte[] value0 = Bytes.toBytes("VALUE-0"); 818 byte[] value1 = Bytes.toBytes("VALUE-1"); 819 byte[] value2 = Bytes.toBytes("VALUE-2"); 820 assertNoLocks(tableName); 821 ExecutorService putService = Executors.newSingleThreadExecutor(); 822 putService.execute(() -> { 823 try (Table table = con.getTable(tableName)) { 824 Put put0 = new Put(rowLocked); 825 put0.addColumn(FAMILY, QUALIFIER, value0); 826 // the put will be blocked by WaitingForMultiMutationsObserver. 827 table.put(put0); 828 } catch (IOException ex) { 829 throw new RuntimeException(ex); 830 } 831 }); 832 ExecutorService cpService = Executors.newSingleThreadExecutor(); 833 cpService.execute(() -> { 834 boolean threw; 835 Put put1 = new Put(row); 836 Put put2 = new Put(rowLocked); 837 put1.addColumn(FAMILY, QUALIFIER, value1); 838 put2.addColumn(FAMILY, QUALIFIER, value2); 839 try (Table table = con.getTable(tableName)) { 840 MultiRowMutationProtos.MutateRowsRequest request 841 = MultiRowMutationProtos.MutateRowsRequest.newBuilder() 842 .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 843 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto 844 .MutationType.PUT, put1)) 845 .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 846 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto 847 .MutationType.PUT, put2)) 848 .build(); 849 table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, 850 ROW, ROW, 851 (MultiRowMutationProtos.MultiRowMutationService exe) -> { 852 ServerRpcController controller = new ServerRpcController(); 853 CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse> 854 rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); 855 exe.mutateRows(controller, request, rpcCallback); 856 return rpcCallback.get(); 857 }); 858 threw = false; 859 } catch (Throwable ex) { 860 threw = true; 861 } 862 if (!threw) { 863 // Can't call fail() earlier because the catch would eat it. 864 fail("This cp should fail because the target lock is blocked by previous put"); 865 } 866 }); 867 cpService.shutdown(); 868 cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 869 WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class); 870 observer.latch.countDown(); 871 putService.shutdown(); 872 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 873 try (Table table = con.getTable(tableName)) { 874 Get g0 = new Get(row); 875 Get g1 = new Get(rowLocked); 876 Result r0 = table.get(g0); 877 Result r1 = table.get(g1); 878 assertTrue(r0.isEmpty()); 879 assertFalse(r1.isEmpty()); 880 assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); 881 } 882 assertNoLocks(tableName); 883 } 884 } 885 886 /** 887 * A test case for issue HBASE-17482 888 * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped 889 * onto cells in the append thread, a countdown latch is used to ensure that happened 890 * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) 891 * make the seqid/mvcc acquirement in handler thread and stamping in append thread 892 * No countdown latch to assure cells in memstore are stamped with seqid/mvcc. 893 * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner 894 * with a smaller readpoint can see these data, which disobey the multi version 895 * concurrency control rules. 896 * This test case is to reproduce this scenario. 897 * @throws IOException 898 */ 899 @Test 900 public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException { 901 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 902 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 903 //put two row first to init the scanner 904 Put put = new Put(Bytes.toBytes("0")); 905 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 906 table.put(put); 907 put = new Put(Bytes.toBytes("00")); 908 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 909 table.put(put); 910 Scan scan = new Scan(); 911 scan.setTimeRange(0, Long.MAX_VALUE); 912 scan.setCaching(1); 913 ResultScanner scanner = table.getScanner(scan); 914 int rowNum = scanner.next() != null ? 1 : 0; 915 //the started scanner shouldn't see the rows put below 916 for (int i = 1; i < 1000; i++) { 917 put = new Put(Bytes.toBytes(String.valueOf(i))); 918 put.setDurability(Durability.ASYNC_WAL); 919 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i)); 920 table.put(put); 921 } 922 for (Result result : scanner) { 923 rowNum++; 924 } 925 //scanner should only see two rows 926 assertEquals(2, rowNum); 927 scanner = table.getScanner(scan); 928 rowNum = 0; 929 for (Result result : scanner) { 930 rowNum++; 931 } 932 // the new scanner should see all rows 933 assertEquals(1001, rowNum); 934 } 935 } 936 937 @Test 938 public void testPutThenGetWithMultipleThreads() throws Exception { 939 final int THREAD_NUM = 20; 940 final int ROUND_NUM = 10; 941 for (int round = 0; round < ROUND_NUM; round++) { 942 ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM); 943 final AtomicInteger successCnt = new AtomicInteger(0); 944 try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { 945 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 946 947 for (int i = 0; i < THREAD_NUM; i++) { 948 final int index = i; 949 Thread t = new Thread(new Runnable() { 950 951 @Override 952 public void run() { 953 final byte[] row = Bytes.toBytes("row-" + index); 954 final byte[] value = Bytes.toBytes("v" + index); 955 try { 956 Put put = new Put(row); 957 put.addColumn(FAMILY, QUALIFIER, value); 958 ht.put(put); 959 Get get = new Get(row); 960 Result result = ht.get(get); 961 byte[] returnedValue = result.getValue(FAMILY, QUALIFIER); 962 if (Bytes.equals(value, returnedValue)) { 963 successCnt.getAndIncrement(); 964 } else { 965 LOG.error("Should be equal but not, original value: " + Bytes.toString(value) 966 + ", returned value: " 967 + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); 968 } 969 } catch (Throwable e) { 970 // do nothing 971 } 972 } 973 }); 974 threads.add(t); 975 } 976 for (Thread t : threads) { 977 t.start(); 978 } 979 for (Thread t : threads) { 980 t.join(); 981 } 982 assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get()); 983 } 984 TEST_UTIL.deleteTable(tableName); 985 } 986 } 987 988 private static void assertNoLocks(final TableName tableName) 989 throws IOException, InterruptedException { 990 HRegion region = (HRegion) find(tableName); 991 assertEquals(0, region.getLockedRows().size()); 992 } 993 private static HRegion find(final TableName tableName) 994 throws IOException, InterruptedException { 995 HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); 996 List<HRegion> regions = rs.getRegions(tableName); 997 assertEquals(1, regions.size()); 998 return regions.get(0); 999 } 1000 1001 private static <T extends RegionObserver> T find(final TableName tableName, 1002 Class<T> clz) throws IOException, InterruptedException { 1003 HRegion region = find(tableName); 1004 Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); 1005 assertTrue("The cp instance should be " + clz.getName() 1006 + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp)); 1007 return clz.cast(cp); 1008 } 1009 1010 public static class WaitingForMultiMutationsObserver 1011 implements RegionCoprocessor, RegionObserver { 1012 final CountDownLatch latch = new CountDownLatch(1); 1013 1014 @Override 1015 public Optional<RegionObserver> getRegionObserver() { 1016 return Optional.of(this); 1017 } 1018 1019 @Override 1020 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, 1021 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1022 try { 1023 latch.await(); 1024 } catch (InterruptedException ex) { 1025 throw new IOException(ex); 1026 } 1027 } 1028 } 1029 1030 public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver { 1031 private final CountDownLatch latch = new CountDownLatch(1); 1032 1033 @Override 1034 public Optional<RegionObserver> getRegionObserver() { 1035 return Optional.of(this); 1036 } 1037 1038 @Override 1039 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, 1040 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1041 try { 1042 // waiting for scanner 1043 latch.await(); 1044 } catch (InterruptedException ex) { 1045 throw new IOException(ex); 1046 } 1047 } 1048 1049 @Override 1050 public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 1051 final Scan scan, final RegionScanner s) throws IOException { 1052 latch.countDown(); 1053 return s; 1054 } 1055 } 1056 1057 static byte[] generateHugeValue(int size) { 1058 Random rand = ThreadLocalRandom.current(); 1059 byte[] value = new byte[size]; 1060 for (int i = 0; i < value.length; i++) { 1061 value[i] = (byte) rand.nextInt(256); 1062 } 1063 return value; 1064 } 1065 1066 @Test 1067 public void testScanWithBatchSizeReturnIncompleteCells() 1068 throws IOException, InterruptedException { 1069 TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName) 1070 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build()) 1071 .build(); 1072 try (Table table = TEST_UTIL.createTable(hd, null)) { 1073 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 1074 1075 Put put = new Put(ROW); 1076 put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024)); 1077 table.put(put); 1078 1079 put = new Put(ROW); 1080 put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024)); 1081 table.put(put); 1082 1083 for (int i = 2; i < 5; i++) { 1084 for (int version = 0; version < 2; version++) { 1085 put = new Put(ROW); 1086 put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024)); 1087 table.put(put); 1088 } 1089 } 1090 1091 Scan scan = new Scan(); 1092 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3) 1093 .setMaxResultSize(4 * 1024 * 1024); 1094 Result result; 1095 try (ResultScanner scanner = table.getScanner(scan)) { 1096 List<Result> list = new ArrayList<>(); 1097 /* 1098 * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; 1099 * The second scan rpc should return a result with 3 cells, because reach the batch limit 1100 * = 3; 1101 * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the 1102 * moreResultsInRegion also would be false. Finally, the client should collect all the cells 1103 * into two result: 2+3 -> 3+2; 1104 */ 1105 while ((result = scanner.next()) != null) { 1106 list.add(result); 1107 } 1108 1109 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1110 Assert.assertEquals(2, list.size()); 1111 Assert.assertEquals(3, list.get(0).size()); 1112 Assert.assertEquals(2, list.get(1).size()); 1113 } 1114 1115 scan = new Scan(); 1116 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2) 1117 .setMaxResultSize(4 * 1024 * 1024); 1118 try (ResultScanner scanner = table.getScanner(scan)) { 1119 List<Result> list = new ArrayList<>(); 1120 while ((result = scanner.next()) != null) { 1121 list.add(result); 1122 } 1123 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1124 Assert.assertEquals(3, list.size()); 1125 Assert.assertEquals(2, list.get(0).size()); 1126 Assert.assertEquals(2, list.get(1).size()); 1127 Assert.assertEquals(1, list.get(2).size()); 1128 } 1129 } 1130 } 1131}