001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertFalse; 024import static org.junit.jupiter.api.Assertions.assertNotEquals; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026import static org.junit.jupiter.api.Assertions.fail; 027 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.Map; 032import java.util.concurrent.CompletableFuture; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.function.Supplier; 036import java.util.stream.Collectors; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.TableNotFoundException; 042import org.apache.hadoop.hbase.master.HMaster; 043import org.apache.hadoop.hbase.master.RegionState; 044import org.apache.hadoop.hbase.master.ServerManager; 045import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 046import org.apache.hadoop.hbase.master.assignment.RegionStates; 047import org.apache.hadoop.hbase.regionserver.HRegionServer; 048import org.apache.hadoop.hbase.regionserver.Region; 049import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 050import org.apache.hadoop.hbase.testclassification.ClientTests; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.JVMClusterUtil; 055import org.apache.hadoop.hbase.util.Threads; 056import org.junit.jupiter.api.AfterAll; 057import org.junit.jupiter.api.BeforeAll; 058import org.junit.jupiter.api.Tag; 059import org.junit.jupiter.api.TestTemplate; 060 061/** 062 * Class to test asynchronous region admin operations. 063 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our 064 * ten minute timeout so they were split. 065 */ 066@Tag(LargeTests.TAG) 067@Tag(ClientTests.TAG) 068@HBaseParameterizedTestTemplate(name = "{index}: policy = {0}") 069public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { 070 071 public TestAsyncRegionAdminApi(Supplier<AsyncAdmin> admin) { 072 super(admin); 073 } 074 075 @BeforeAll 076 public static void setUpBeforeClass() throws Exception { 077 TestAsyncAdminBase.setUpBeforeClass(); 078 } 079 080 @AfterAll 081 public static void tearDownAfterClass() throws Exception { 082 TestAsyncAdminBase.tearDownAfterClass(); 083 } 084 085 @TestTemplate 086 public void testAssignRegionAndUnassignRegion() throws Exception { 087 createTableWithDefaultConf(tableName); 088 089 // assign region. 090 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 091 AssignmentManager am = master.getAssignmentManager(); 092 RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0); 093 094 // assert region on server 095 RegionStates regionStates = am.getRegionStates(); 096 ServerName serverName = regionStates.getRegionServerOfRegion(hri); 097 TEST_UTIL.assertRegionOnServer(hri, serverName, 200); 098 assertTrue(regionStates.getRegionState(hri).isOpened()); 099 100 // Region is assigned now. Let's assign it again. 101 // Master should not abort, and region should stay assigned. 102 try { 103 admin.assign(hri.getRegionName()).get(); 104 fail("Should fail when assigning an already onlined region"); 105 } catch (ExecutionException e) { 106 // Expected 107 assertThat(e.getCause(), instanceOf(DoNotRetryRegionException.class)); 108 } 109 assertFalse(am.getRegionStates().getRegionStateNode(hri).isTransitionScheduled()); 110 assertTrue(regionStates.getRegionState(hri).isOpened()); 111 112 // unassign region 113 admin.unassign(hri.getRegionName(), true).get(); 114 assertFalse(am.getRegionStates().getRegionStateNode(hri).isTransitionScheduled()); 115 assertTrue(regionStates.getRegionState(hri).isClosed()); 116 } 117 118 RegionInfo createTableAndGetOneRegion(final TableName tableName) 119 throws IOException, InterruptedException, ExecutionException { 120 TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) 121 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 122 admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get(); 123 124 // wait till the table is assigned 125 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 126 long timeoutTime = EnvironmentEdgeManager.currentTime() + 3000; 127 while (true) { 128 List<RegionInfo> regions = 129 master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName); 130 if (regions.size() > 3) { 131 return regions.get(2); 132 } 133 long now = EnvironmentEdgeManager.currentTime(); 134 if (now > timeoutTime) { 135 fail("Could not find an online region"); 136 } 137 Thread.sleep(10); 138 } 139 } 140 141 @TestTemplate 142 public void testGetRegionByStateOfTable() throws Exception { 143 RegionInfo hri = createTableAndGetOneRegion(tableName); 144 145 RegionStates regionStates = 146 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 147 assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN).stream() 148 .anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 149 assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom")) 150 .get(RegionState.State.OPEN).stream() 151 .anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 152 } 153 154 @TestTemplate 155 public void testMoveRegion() throws Exception { 156 admin.balancerSwitch(false).join(); 157 158 RegionInfo hri = createTableAndGetOneRegion(tableName); 159 RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin(); 160 ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 161 162 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 163 ServerManager serverManager = master.getServerManager(); 164 ServerName destServerName = null; 165 List<JVMClusterUtil.RegionServerThread> regionServers = 166 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads(); 167 for (JVMClusterUtil.RegionServerThread regionServer : regionServers) { 168 HRegionServer destServer = regionServer.getRegionServer(); 169 destServerName = destServer.getServerName(); 170 if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) { 171 break; 172 } 173 } 174 175 assertTrue(destServerName != null && !destServerName.equals(serverName)); 176 admin.move(hri.getRegionName(), destServerName).get(); 177 178 long timeoutTime = EnvironmentEdgeManager.currentTime() + 30000; 179 while (true) { 180 ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 181 if (sn != null && sn.equals(destServerName)) { 182 break; 183 } 184 long now = EnvironmentEdgeManager.currentTime(); 185 if (now > timeoutTime) { 186 fail("Failed to move the region in time: " + hri); 187 } 188 Thread.sleep(100); 189 } 190 admin.balancerSwitch(true).join(); 191 } 192 193 @TestTemplate 194 public void testGetOnlineRegions() throws Exception { 195 createTableAndGetOneRegion(tableName); 196 AtomicInteger regionServerCount = new AtomicInteger(0); 197 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 198 .map(rsThread -> rsThread.getRegionServer()).forEach(rs -> { 199 ServerName serverName = rs.getServerName(); 200 try { 201 assertEquals(admin.getRegions(serverName).get().size(), rs.getRegions().size()); 202 } catch (Exception e) { 203 fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage()); 204 } 205 regionServerCount.incrementAndGet(); 206 }); 207 assertEquals(TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size(), 208 regionServerCount.get()); 209 } 210 211 @TestTemplate 212 public void testFlushTableAndRegion() throws Exception { 213 RegionInfo hri = createTableAndGetOneRegion(tableName); 214 ServerName serverName = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() 215 .getRegionStates().getRegionServerOfRegion(hri); 216 HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 217 .map(rsThread -> rsThread.getRegionServer()) 218 .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get(); 219 220 // write a put into the specific region 221 ASYNC_CONN.getTable(tableName) 222 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1"))).join(); 223 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 224 // flush region and wait flush operation finished. 225 LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName())); 226 admin.flushRegion(hri.getRegionName()).get(); 227 LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName())); 228 Threads.sleepWithoutInterrupt(500); 229 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 230 Threads.sleep(50); 231 } 232 // check the memstore. 233 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 234 235 // write another put into the specific region 236 ASYNC_CONN.getTable(tableName) 237 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2"))).join(); 238 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 239 admin.flush(tableName).get(); 240 Threads.sleepWithoutInterrupt(500); 241 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 242 Threads.sleep(50); 243 } 244 // check the memstore. 245 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 246 } 247 248 private void waitUntilMobCompactionFinished(TableName tableName) 249 throws ExecutionException, InterruptedException { 250 long finished = EnvironmentEdgeManager.currentTime() + 60000; 251 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); 252 while (EnvironmentEdgeManager.currentTime() < finished) { 253 if (state == CompactionState.NONE) { 254 break; 255 } 256 Thread.sleep(10); 257 state = admin.getCompactionState(tableName, CompactType.MOB).get(); 258 } 259 assertEquals(CompactionState.NONE, state); 260 } 261 262 @TestTemplate 263 public void testCompactMob() throws Exception { 264 ColumnFamilyDescriptor columnDescriptor = ColumnFamilyDescriptorBuilder 265 .newBuilder(Bytes.toBytes("mob")).setMobEnabled(true).setMobThreshold(0).build(); 266 267 TableDescriptor tableDescriptor = 268 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(columnDescriptor).build(); 269 270 admin.createTable(tableDescriptor).get(); 271 272 byte[][] families = { Bytes.toBytes("mob") }; 273 loadData(tableName, families, 3000, 8); 274 275 admin.majorCompact(tableName).get(); 276 277 CompactionState state = admin.getCompactionState(tableName).get(); 278 assertNotEquals(CompactionState.NONE, state); 279 280 waitUntilMobCompactionFinished(tableName); 281 } 282 283 @TestTemplate 284 public void testCompactRegionServer() throws Exception { 285 byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") }; 286 createTableWithDefaultConf(tableName, null, families); 287 loadData(tableName, families, 3000, 8); 288 289 List<HRegionServer> rsList = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 290 .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList()); 291 List<Region> regions = new ArrayList<>(); 292 rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName))); 293 assertEquals(1, regions.size()); 294 int countBefore = countStoreFilesInFamilies(regions, families); 295 assertTrue(countBefore > 0); 296 297 // Minor compaction for all region servers. 298 for (HRegionServer rs : rsList) 299 admin.compactRegionServer(rs.getServerName()).get(); 300 Thread.sleep(5000); 301 int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families); 302 assertTrue(countAfterMinorCompaction < countBefore); 303 304 // Major compaction for all region servers. 305 for (HRegionServer rs : rsList) 306 admin.majorCompactRegionServer(rs.getServerName()).get(); 307 Thread.sleep(5000); 308 int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families); 309 assertEquals(3, countAfterMajorCompaction); 310 } 311 312 @TestTemplate 313 public void testCompactionSwitchStates() throws Exception { 314 // Create a table with regions 315 byte[] family = Bytes.toBytes("family"); 316 byte[][] families = 317 { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) }; 318 createTableWithDefaultConf(tableName, null, families); 319 loadData(tableName, families, 3000, 8); 320 List<Region> regions = new ArrayList<>(); 321 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads() 322 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); 323 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture = 324 admin.compactionSwitch(true, new ArrayList<>()); 325 Map<ServerName, Boolean> pairs = listCompletableFuture.get(); 326 for (Map.Entry<ServerName, Boolean> p : pairs.entrySet()) { 327 assertEquals(true, p.getValue(), 328 "Default compaction state, expected=enabled actual=disabled"); 329 } 330 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture1 = 331 admin.compactionSwitch(false, new ArrayList<>()); 332 Map<ServerName, Boolean> pairs1 = listCompletableFuture1.get(); 333 for (Map.Entry<ServerName, Boolean> p : pairs1.entrySet()) { 334 assertEquals(true, p.getValue(), "Last compaction state, expected=enabled actual=disabled"); 335 } 336 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture2 = 337 admin.compactionSwitch(true, new ArrayList<>()); 338 Map<ServerName, Boolean> pairs2 = listCompletableFuture2.get(); 339 for (Map.Entry<ServerName, Boolean> p : pairs2.entrySet()) { 340 assertEquals(false, p.getValue(), "Last compaction state, expected=disabled actual=enabled"); 341 } 342 ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName(); 343 List<String> serverNameList = new ArrayList<String>(); 344 serverNameList.add(serverName.getServerName()); 345 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture3 = 346 admin.compactionSwitch(false, serverNameList); 347 Map<ServerName, Boolean> pairs3 = listCompletableFuture3.get(); 348 assertEquals(pairs3.entrySet().size(), 1); 349 for (Map.Entry<ServerName, Boolean> p : pairs3.entrySet()) { 350 assertEquals(true, p.getValue(), "Last compaction state, expected=enabled actual=disabled"); 351 } 352 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture4 = 353 admin.compactionSwitch(true, serverNameList); 354 Map<ServerName, Boolean> pairs4 = listCompletableFuture4.get(); 355 assertEquals(pairs4.entrySet().size(), 1); 356 for (Map.Entry<ServerName, Boolean> p : pairs4.entrySet()) { 357 assertEquals(false, p.getValue(), "Last compaction state, expected=disabled actual=enabled"); 358 } 359 } 360 361 @TestTemplate 362 public void testCompact() throws Exception { 363 compactionTest(TableName.valueOf("testCompact1"), 15, CompactionState.MINOR, false); 364 compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, true); 365 366 // For major compaction, set up a higher hbase.hstore.compaction.min to avoid 367 // minor compactions. It is a hack to avoid random delays introduced by Admins's 368 // updateConfiguration() method. 369 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 370 Configuration conf = thread.getRegionServer().getConfiguration(); 371 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 25); 372 }); 373 374 compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, false); 375 compactionTest(TableName.valueOf("testCompact4"), 8, CompactionState.MAJOR, true); 376 377 // Restore to default 378 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 379 Configuration conf = thread.getRegionServer().getConfiguration(); 380 conf.unset(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY); 381 }); 382 } 383 384 private void compactionTest(final TableName tableName, final int flushes, 385 final CompactionState expectedState, boolean singleFamily) throws Exception { 386 // Create a table with regions 387 byte[] family = Bytes.toBytes("family"); 388 byte[][] families = 389 { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) }; 390 createTableWithDefaultConf(tableName, null, families); 391 392 byte[][] singleFamilyArray = { family }; 393 394 // When singleFamily is true, only load data for the family being tested. This is to avoid 395 // the case that while major compaction is going on for the family, minor compaction could 396 // happen for other families at the same time (Two compaction threads long/short), thus 397 // pollute the compaction and store file numbers for the region. 398 if (singleFamily) { 399 loadData(tableName, singleFamilyArray, 3000, flushes); 400 } else { 401 loadData(tableName, families, 3000, flushes); 402 } 403 404 List<Region> regions = new ArrayList<>(); 405 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads() 406 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); 407 assertEquals(1, regions.size()); 408 409 int countBefore = countStoreFilesInFamilies(regions, families); 410 int countBeforeSingleFamily = countStoreFilesInFamily(regions, family); 411 assertTrue(countBefore > 0); // there should be some data files 412 if (expectedState == CompactionState.MINOR) { 413 if (singleFamily) { 414 admin.compact(tableName, family).get(); 415 } else { 416 admin.compact(tableName).get(); 417 } 418 } else { 419 if (singleFamily) { 420 admin.majorCompact(tableName, family).get(); 421 } else { 422 admin.majorCompact(tableName).get(); 423 } 424 } 425 426 long curt = EnvironmentEdgeManager.currentTime(); 427 long waitTime = 10000; 428 long endt = curt + waitTime; 429 CompactionState state = admin.getCompactionState(tableName).get(); 430 while (state == CompactionState.NONE && curt < endt) { 431 Thread.sleep(1); 432 state = admin.getCompactionState(tableName).get(); 433 curt = EnvironmentEdgeManager.currentTime(); 434 } 435 // Now, should have the right compaction state, 436 // otherwise, the compaction should have already been done 437 if (expectedState != state) { 438 for (Region region : regions) { 439 state = CompactionState.valueOf(region.getCompactionState().toString()); 440 assertEquals(CompactionState.NONE, state); 441 } 442 } else { 443 // Wait until the compaction is done 444 state = admin.getCompactionState(tableName).get(); 445 while (state != CompactionState.NONE && curt < endt) { 446 Thread.sleep(10); 447 state = admin.getCompactionState(tableName).get(); 448 } 449 // Now, compaction should be done. 450 assertEquals(CompactionState.NONE, state); 451 } 452 453 int countAfter = countStoreFilesInFamilies(regions, families); 454 int countAfterSingleFamily = countStoreFilesInFamily(regions, family); 455 assertTrue(countAfter < countBefore); 456 if (!singleFamily) { 457 if (expectedState == CompactionState.MAJOR) { 458 assertEquals(families.length, countAfter); 459 } else { 460 assertTrue(families.length <= countAfter); 461 } 462 } else { 463 int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily; 464 // assert only change was to single column family 465 assertEquals(singleFamDiff, (countBefore - countAfter)); 466 if (expectedState == CompactionState.MAJOR) { 467 assertEquals(1, countAfterSingleFamily); 468 } else { 469 assertTrue(1 <= countAfterSingleFamily, "" + countAfterSingleFamily); 470 } 471 } 472 } 473 474 @TestTemplate 475 public void testNonExistentTableCompaction() { 476 testNonExistentTableCompaction(CompactionState.MINOR); 477 testNonExistentTableCompaction(CompactionState.MAJOR); 478 } 479 480 private void testNonExistentTableCompaction(CompactionState compactionState) { 481 try { 482 if (compactionState == CompactionState.MINOR) { 483 admin.compact(TableName.valueOf("NonExistentTable")).get(); 484 } else { 485 admin.majorCompact(TableName.valueOf("NonExistentTable")).get(); 486 } 487 fail("Expected TableNotFoundException when table doesn't exist"); 488 } catch (Exception e) { 489 // expected. 490 assertTrue(e.getCause() instanceof TableNotFoundException); 491 } 492 } 493 494 private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) { 495 return countStoreFilesInFamilies(regions, new byte[][] { family }); 496 } 497 498 private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) { 499 int count = 0; 500 for (Region region : regions) { 501 count += region.getStoreFileList(families).size(); 502 } 503 return count; 504 } 505 506 static void loadData(final TableName tableName, final byte[][] families, final int rows) 507 throws IOException { 508 loadData(tableName, families, rows, 1); 509 } 510 511 static void loadData(final TableName tableName, final byte[][] families, final int rows, 512 final int flushes) throws IOException { 513 AsyncTable<?> table = ASYNC_CONN.getTable(tableName); 514 List<Put> puts = new ArrayList<>(rows); 515 byte[] qualifier = Bytes.toBytes("val"); 516 for (int i = 0; i < flushes; i++) { 517 for (int k = 0; k < rows; k++) { 518 byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i)); 519 Put p = new Put(row); 520 for (int j = 0; j < families.length; ++j) { 521 p.addColumn(families[j], qualifier, row); 522 } 523 puts.add(p); 524 } 525 table.putAll(puts).join(); 526 TEST_UTIL.flush(); 527 puts.clear(); 528 } 529 } 530}