001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertNotEquals;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026import static org.junit.jupiter.api.Assertions.fail;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.Map;
032import java.util.concurrent.CompletableFuture;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.function.Supplier;
036import java.util.stream.Collectors;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.TableNotFoundException;
042import org.apache.hadoop.hbase.master.HMaster;
043import org.apache.hadoop.hbase.master.RegionState;
044import org.apache.hadoop.hbase.master.ServerManager;
045import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
046import org.apache.hadoop.hbase.master.assignment.RegionStates;
047import org.apache.hadoop.hbase.regionserver.HRegionServer;
048import org.apache.hadoop.hbase.regionserver.Region;
049import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
050import org.apache.hadoop.hbase.testclassification.ClientTests;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.JVMClusterUtil;
055import org.apache.hadoop.hbase.util.Threads;
056import org.junit.jupiter.api.AfterAll;
057import org.junit.jupiter.api.BeforeAll;
058import org.junit.jupiter.api.Tag;
059import org.junit.jupiter.api.TestTemplate;
060
061/**
062 * Class to test asynchronous region admin operations.
063 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our
064 *      ten minute timeout so they were split.
065 */
066@Tag(LargeTests.TAG)
067@Tag(ClientTests.TAG)
068@HBaseParameterizedTestTemplate(name = "{index}: policy = {0}")
069public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
070
071  public TestAsyncRegionAdminApi(Supplier<AsyncAdmin> admin) {
072    super(admin);
073  }
074
075  @BeforeAll
076  public static void setUpBeforeClass() throws Exception {
077    TestAsyncAdminBase.setUpBeforeClass();
078  }
079
080  @AfterAll
081  public static void tearDownAfterClass() throws Exception {
082    TestAsyncAdminBase.tearDownAfterClass();
083  }
084
085  @TestTemplate
086  public void testAssignRegionAndUnassignRegion() throws Exception {
087    createTableWithDefaultConf(tableName);
088
089    // assign region.
090    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
091    AssignmentManager am = master.getAssignmentManager();
092    RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
093
094    // assert region on server
095    RegionStates regionStates = am.getRegionStates();
096    ServerName serverName = regionStates.getRegionServerOfRegion(hri);
097    TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
098    assertTrue(regionStates.getRegionState(hri).isOpened());
099
100    // Region is assigned now. Let's assign it again.
101    // Master should not abort, and region should stay assigned.
102    try {
103      admin.assign(hri.getRegionName()).get();
104      fail("Should fail when assigning an already onlined region");
105    } catch (ExecutionException e) {
106      // Expected
107      assertThat(e.getCause(), instanceOf(DoNotRetryRegionException.class));
108    }
109    assertFalse(am.getRegionStates().getRegionStateNode(hri).isTransitionScheduled());
110    assertTrue(regionStates.getRegionState(hri).isOpened());
111
112    // unassign region
113    admin.unassign(hri.getRegionName(), true).get();
114    assertFalse(am.getRegionStates().getRegionStateNode(hri).isTransitionScheduled());
115    assertTrue(regionStates.getRegionState(hri).isClosed());
116  }
117
118  RegionInfo createTableAndGetOneRegion(final TableName tableName)
119    throws IOException, InterruptedException, ExecutionException {
120    TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
121      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
122    admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get();
123
124    // wait till the table is assigned
125    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
126    long timeoutTime = EnvironmentEdgeManager.currentTime() + 3000;
127    while (true) {
128      List<RegionInfo> regions =
129        master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
130      if (regions.size() > 3) {
131        return regions.get(2);
132      }
133      long now = EnvironmentEdgeManager.currentTime();
134      if (now > timeoutTime) {
135        fail("Could not find an online region");
136      }
137      Thread.sleep(10);
138    }
139  }
140
141  @TestTemplate
142  public void testGetRegionByStateOfTable() throws Exception {
143    RegionInfo hri = createTableAndGetOneRegion(tableName);
144
145    RegionStates regionStates =
146      TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
147    assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN).stream()
148      .anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
149    assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
150      .get(RegionState.State.OPEN).stream()
151      .anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
152  }
153
154  @TestTemplate
155  public void testMoveRegion() throws Exception {
156    admin.balancerSwitch(false).join();
157
158    RegionInfo hri = createTableAndGetOneRegion(tableName);
159    RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
160    ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
161
162    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
163    ServerManager serverManager = master.getServerManager();
164    ServerName destServerName = null;
165    List<JVMClusterUtil.RegionServerThread> regionServers =
166      TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
167    for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
168      HRegionServer destServer = regionServer.getRegionServer();
169      destServerName = destServer.getServerName();
170      if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
171        break;
172      }
173    }
174
175    assertTrue(destServerName != null && !destServerName.equals(serverName));
176    admin.move(hri.getRegionName(), destServerName).get();
177
178    long timeoutTime = EnvironmentEdgeManager.currentTime() + 30000;
179    while (true) {
180      ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
181      if (sn != null && sn.equals(destServerName)) {
182        break;
183      }
184      long now = EnvironmentEdgeManager.currentTime();
185      if (now > timeoutTime) {
186        fail("Failed to move the region in time: " + hri);
187      }
188      Thread.sleep(100);
189    }
190    admin.balancerSwitch(true).join();
191  }
192
193  @TestTemplate
194  public void testGetOnlineRegions() throws Exception {
195    createTableAndGetOneRegion(tableName);
196    AtomicInteger regionServerCount = new AtomicInteger(0);
197    TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
198      .map(rsThread -> rsThread.getRegionServer()).forEach(rs -> {
199        ServerName serverName = rs.getServerName();
200        try {
201          assertEquals(admin.getRegions(serverName).get().size(), rs.getRegions().size());
202        } catch (Exception e) {
203          fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
204        }
205        regionServerCount.incrementAndGet();
206      });
207    assertEquals(TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size(),
208      regionServerCount.get());
209  }
210
211  @TestTemplate
212  public void testFlushTableAndRegion() throws Exception {
213    RegionInfo hri = createTableAndGetOneRegion(tableName);
214    ServerName serverName = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
215      .getRegionStates().getRegionServerOfRegion(hri);
216    HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
217      .map(rsThread -> rsThread.getRegionServer())
218      .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
219
220    // write a put into the specific region
221    ASYNC_CONN.getTable(tableName)
222      .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1"))).join();
223    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
224    // flush region and wait flush operation finished.
225    LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
226    admin.flushRegion(hri.getRegionName()).get();
227    LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
228    Threads.sleepWithoutInterrupt(500);
229    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
230      Threads.sleep(50);
231    }
232    // check the memstore.
233    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
234
235    // write another put into the specific region
236    ASYNC_CONN.getTable(tableName)
237      .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2"))).join();
238    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
239    admin.flush(tableName).get();
240    Threads.sleepWithoutInterrupt(500);
241    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
242      Threads.sleep(50);
243    }
244    // check the memstore.
245    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
246  }
247
248  private void waitUntilMobCompactionFinished(TableName tableName)
249    throws ExecutionException, InterruptedException {
250    long finished = EnvironmentEdgeManager.currentTime() + 60000;
251    CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
252    while (EnvironmentEdgeManager.currentTime() < finished) {
253      if (state == CompactionState.NONE) {
254        break;
255      }
256      Thread.sleep(10);
257      state = admin.getCompactionState(tableName, CompactType.MOB).get();
258    }
259    assertEquals(CompactionState.NONE, state);
260  }
261
262  @TestTemplate
263  public void testCompactMob() throws Exception {
264    ColumnFamilyDescriptor columnDescriptor = ColumnFamilyDescriptorBuilder
265      .newBuilder(Bytes.toBytes("mob")).setMobEnabled(true).setMobThreshold(0).build();
266
267    TableDescriptor tableDescriptor =
268      TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(columnDescriptor).build();
269
270    admin.createTable(tableDescriptor).get();
271
272    byte[][] families = { Bytes.toBytes("mob") };
273    loadData(tableName, families, 3000, 8);
274
275    admin.majorCompact(tableName).get();
276
277    CompactionState state = admin.getCompactionState(tableName).get();
278    assertNotEquals(CompactionState.NONE, state);
279
280    waitUntilMobCompactionFinished(tableName);
281  }
282
283  @TestTemplate
284  public void testCompactRegionServer() throws Exception {
285    byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
286    createTableWithDefaultConf(tableName, null, families);
287    loadData(tableName, families, 3000, 8);
288
289    List<HRegionServer> rsList = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
290      .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
291    List<Region> regions = new ArrayList<>();
292    rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName)));
293    assertEquals(1, regions.size());
294    int countBefore = countStoreFilesInFamilies(regions, families);
295    assertTrue(countBefore > 0);
296
297    // Minor compaction for all region servers.
298    for (HRegionServer rs : rsList)
299      admin.compactRegionServer(rs.getServerName()).get();
300    Thread.sleep(5000);
301    int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
302    assertTrue(countAfterMinorCompaction < countBefore);
303
304    // Major compaction for all region servers.
305    for (HRegionServer rs : rsList)
306      admin.majorCompactRegionServer(rs.getServerName()).get();
307    Thread.sleep(5000);
308    int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
309    assertEquals(3, countAfterMajorCompaction);
310  }
311
312  @TestTemplate
313  public void testCompactionSwitchStates() throws Exception {
314    // Create a table with regions
315    byte[] family = Bytes.toBytes("family");
316    byte[][] families =
317      { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
318    createTableWithDefaultConf(tableName, null, families);
319    loadData(tableName, families, 3000, 8);
320    List<Region> regions = new ArrayList<>();
321    TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads()
322      .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
323    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture =
324      admin.compactionSwitch(true, new ArrayList<>());
325    Map<ServerName, Boolean> pairs = listCompletableFuture.get();
326    for (Map.Entry<ServerName, Boolean> p : pairs.entrySet()) {
327      assertEquals(true, p.getValue(),
328        "Default compaction state, expected=enabled actual=disabled");
329    }
330    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture1 =
331      admin.compactionSwitch(false, new ArrayList<>());
332    Map<ServerName, Boolean> pairs1 = listCompletableFuture1.get();
333    for (Map.Entry<ServerName, Boolean> p : pairs1.entrySet()) {
334      assertEquals(true, p.getValue(), "Last compaction state, expected=enabled actual=disabled");
335    }
336    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture2 =
337      admin.compactionSwitch(true, new ArrayList<>());
338    Map<ServerName, Boolean> pairs2 = listCompletableFuture2.get();
339    for (Map.Entry<ServerName, Boolean> p : pairs2.entrySet()) {
340      assertEquals(false, p.getValue(), "Last compaction state, expected=disabled actual=enabled");
341    }
342    ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
343    List<String> serverNameList = new ArrayList<String>();
344    serverNameList.add(serverName.getServerName());
345    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture3 =
346      admin.compactionSwitch(false, serverNameList);
347    Map<ServerName, Boolean> pairs3 = listCompletableFuture3.get();
348    assertEquals(pairs3.entrySet().size(), 1);
349    for (Map.Entry<ServerName, Boolean> p : pairs3.entrySet()) {
350      assertEquals(true, p.getValue(), "Last compaction state, expected=enabled actual=disabled");
351    }
352    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture4 =
353      admin.compactionSwitch(true, serverNameList);
354    Map<ServerName, Boolean> pairs4 = listCompletableFuture4.get();
355    assertEquals(pairs4.entrySet().size(), 1);
356    for (Map.Entry<ServerName, Boolean> p : pairs4.entrySet()) {
357      assertEquals(false, p.getValue(), "Last compaction state, expected=disabled actual=enabled");
358    }
359  }
360
361  @TestTemplate
362  public void testCompact() throws Exception {
363    compactionTest(TableName.valueOf("testCompact1"), 15, CompactionState.MINOR, false);
364    compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, true);
365
366    // For major compaction, set up a higher hbase.hstore.compaction.min to avoid
367    // minor compactions. It is a hack to avoid random delays introduced by Admins's
368    // updateConfiguration() method.
369    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
370      Configuration conf = thread.getRegionServer().getConfiguration();
371      conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 25);
372    });
373
374    compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, false);
375    compactionTest(TableName.valueOf("testCompact4"), 8, CompactionState.MAJOR, true);
376
377    // Restore to default
378    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
379      Configuration conf = thread.getRegionServer().getConfiguration();
380      conf.unset(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY);
381    });
382  }
383
384  private void compactionTest(final TableName tableName, final int flushes,
385    final CompactionState expectedState, boolean singleFamily) throws Exception {
386    // Create a table with regions
387    byte[] family = Bytes.toBytes("family");
388    byte[][] families =
389      { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
390    createTableWithDefaultConf(tableName, null, families);
391
392    byte[][] singleFamilyArray = { family };
393
394    // When singleFamily is true, only load data for the family being tested. This is to avoid
395    // the case that while major compaction is going on for the family, minor compaction could
396    // happen for other families at the same time (Two compaction threads long/short), thus
397    // pollute the compaction and store file numbers for the region.
398    if (singleFamily) {
399      loadData(tableName, singleFamilyArray, 3000, flushes);
400    } else {
401      loadData(tableName, families, 3000, flushes);
402    }
403
404    List<Region> regions = new ArrayList<>();
405    TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads()
406      .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
407    assertEquals(1, regions.size());
408
409    int countBefore = countStoreFilesInFamilies(regions, families);
410    int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
411    assertTrue(countBefore > 0); // there should be some data files
412    if (expectedState == CompactionState.MINOR) {
413      if (singleFamily) {
414        admin.compact(tableName, family).get();
415      } else {
416        admin.compact(tableName).get();
417      }
418    } else {
419      if (singleFamily) {
420        admin.majorCompact(tableName, family).get();
421      } else {
422        admin.majorCompact(tableName).get();
423      }
424    }
425
426    long curt = EnvironmentEdgeManager.currentTime();
427    long waitTime = 10000;
428    long endt = curt + waitTime;
429    CompactionState state = admin.getCompactionState(tableName).get();
430    while (state == CompactionState.NONE && curt < endt) {
431      Thread.sleep(1);
432      state = admin.getCompactionState(tableName).get();
433      curt = EnvironmentEdgeManager.currentTime();
434    }
435    // Now, should have the right compaction state,
436    // otherwise, the compaction should have already been done
437    if (expectedState != state) {
438      for (Region region : regions) {
439        state = CompactionState.valueOf(region.getCompactionState().toString());
440        assertEquals(CompactionState.NONE, state);
441      }
442    } else {
443      // Wait until the compaction is done
444      state = admin.getCompactionState(tableName).get();
445      while (state != CompactionState.NONE && curt < endt) {
446        Thread.sleep(10);
447        state = admin.getCompactionState(tableName).get();
448      }
449      // Now, compaction should be done.
450      assertEquals(CompactionState.NONE, state);
451    }
452
453    int countAfter = countStoreFilesInFamilies(regions, families);
454    int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
455    assertTrue(countAfter < countBefore);
456    if (!singleFamily) {
457      if (expectedState == CompactionState.MAJOR) {
458        assertEquals(families.length, countAfter);
459      } else {
460        assertTrue(families.length <= countAfter);
461      }
462    } else {
463      int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
464      // assert only change was to single column family
465      assertEquals(singleFamDiff, (countBefore - countAfter));
466      if (expectedState == CompactionState.MAJOR) {
467        assertEquals(1, countAfterSingleFamily);
468      } else {
469        assertTrue(1 <= countAfterSingleFamily, "" + countAfterSingleFamily);
470      }
471    }
472  }
473
474  @TestTemplate
475  public void testNonExistentTableCompaction() {
476    testNonExistentTableCompaction(CompactionState.MINOR);
477    testNonExistentTableCompaction(CompactionState.MAJOR);
478  }
479
480  private void testNonExistentTableCompaction(CompactionState compactionState) {
481    try {
482      if (compactionState == CompactionState.MINOR) {
483        admin.compact(TableName.valueOf("NonExistentTable")).get();
484      } else {
485        admin.majorCompact(TableName.valueOf("NonExistentTable")).get();
486      }
487      fail("Expected TableNotFoundException when table doesn't exist");
488    } catch (Exception e) {
489      // expected.
490      assertTrue(e.getCause() instanceof TableNotFoundException);
491    }
492  }
493
494  private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) {
495    return countStoreFilesInFamilies(regions, new byte[][] { family });
496  }
497
498  private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) {
499    int count = 0;
500    for (Region region : regions) {
501      count += region.getStoreFileList(families).size();
502    }
503    return count;
504  }
505
506  static void loadData(final TableName tableName, final byte[][] families, final int rows)
507    throws IOException {
508    loadData(tableName, families, rows, 1);
509  }
510
511  static void loadData(final TableName tableName, final byte[][] families, final int rows,
512    final int flushes) throws IOException {
513    AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
514    List<Put> puts = new ArrayList<>(rows);
515    byte[] qualifier = Bytes.toBytes("val");
516    for (int i = 0; i < flushes; i++) {
517      for (int k = 0; k < rows; k++) {
518        byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i));
519        Put p = new Put(row);
520        for (int j = 0; j < families.length; ++j) {
521          p.addColumn(families[j], qualifier, row);
522        }
523        puts.add(p);
524      }
525      table.putAll(puts).join();
526      TEST_UTIL.flush();
527      puts.clear();
528    }
529  }
530}