001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.Collection;
030import java.util.Deque;
031import java.util.List;
032import java.util.Map;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.stream.IntStream;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.MetaTableAccessor;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableExistsException;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.ClientServiceCallable;
049import org.apache.hadoop.hbase.client.ClusterConnection;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.ConnectionFactory;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.client.RegionInfoBuilder;
055import org.apache.hadoop.hbase.client.RegionLocator;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.ResultScanner;
058import org.apache.hadoop.hbase.client.Scan;
059import org.apache.hadoop.hbase.client.Table;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
062import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
063import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
064import org.apache.hadoop.hbase.log.HBaseMarkers;
065import org.apache.hadoop.hbase.regionserver.HRegionServer;
066import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
067import org.apache.hadoop.hbase.testclassification.LargeTests;
068import org.apache.hadoop.hbase.testclassification.MiscTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.FSUtils;
071import org.apache.hadoop.hbase.util.Pair;
072import org.junit.AfterClass;
073import org.junit.BeforeClass;
074import org.junit.ClassRule;
075import org.junit.Rule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.junit.rules.TestName;
079import org.mockito.Mockito;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
084import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
085import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
086
087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
090
091/**
092 * Test cases for the atomic load error handling of the bulk load functionality.
093 */
094@Category({ MiscTests.class, LargeTests.class })
095public class TestLoadIncrementalHFilesSplitRecovery {
096
097  @ClassRule
098  public static final HBaseClassTestRule CLASS_RULE =
099      HBaseClassTestRule.forClass(TestLoadIncrementalHFilesSplitRecovery.class);
100
101  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
102
103  static HBaseTestingUtility util;
104  // used by secure subclass
105  static boolean useSecure = false;
106
107  final static int NUM_CFS = 10;
108  final static byte[] QUAL = Bytes.toBytes("qual");
109  final static int ROWCOUNT = 100;
110
111  private final static byte[][] families = new byte[NUM_CFS][];
112
113  @Rule
114  public TestName name = new TestName();
115
116  static {
117    for (int i = 0; i < NUM_CFS; i++) {
118      families[i] = Bytes.toBytes(family(i));
119    }
120  }
121
122  static byte[] rowkey(int i) {
123    return Bytes.toBytes(String.format("row_%08d", i));
124  }
125
126  static String family(int i) {
127    return String.format("family_%04d", i);
128  }
129
130  static byte[] value(int i) {
131    return Bytes.toBytes(String.format("%010d", i));
132  }
133
134  public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
135    byte[] val = value(value);
136    for (int i = 0; i < NUM_CFS; i++) {
137      Path testIn = new Path(dir, family(i));
138
139      TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
140        Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
141    }
142  }
143
144  private TableDescriptor createTableDesc(TableName name, int cfs) {
145    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
146    IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
147        .forEachOrdered(builder::setColumnFamily);
148    return builder.build();
149  }
150
151  /**
152   * Creates a table with given table name and specified number of column families if the table does
153   * not already exist.
154   */
155  private void setupTable(final Connection connection, TableName table, int cfs)
156      throws IOException {
157    try {
158      LOG.info("Creating table " + table);
159      try (Admin admin = connection.getAdmin()) {
160        admin.createTable(createTableDesc(table, cfs));
161      }
162    } catch (TableExistsException tee) {
163      LOG.info("Table " + table + " already exists");
164    }
165  }
166
167  /**
168   * Creates a table with given table name,specified number of column families<br>
169   * and splitkeys if the table does not already exist.
170   * @param table
171   * @param cfs
172   * @param SPLIT_KEYS
173   */
174  private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
175      throws IOException {
176    try {
177      LOG.info("Creating table " + table);
178      util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
179    } catch (TableExistsException tee) {
180      LOG.info("Table " + table + " already exists");
181    }
182  }
183
184  private Path buildBulkFiles(TableName table, int value) throws Exception {
185    Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
186    Path bulk1 = new Path(dir, table.getNameAsString() + value);
187    FileSystem fs = util.getTestFileSystem();
188    buildHFiles(fs, bulk1, value);
189    return bulk1;
190  }
191
192  /**
193   * Populate table with known values.
194   */
195  private void populateTable(final Connection connection, TableName table, int value)
196      throws Exception {
197    // create HFiles for different column families
198    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
199    Path bulk1 = buildBulkFiles(table, value);
200    try (Table t = connection.getTable(table);
201        RegionLocator locator = connection.getRegionLocator(table);
202        Admin admin = connection.getAdmin()) {
203      lih.doBulkLoad(bulk1, admin, t, locator);
204    }
205  }
206
207  /**
208   * Split the known table in half. (this is hard coded for this test suite)
209   */
210  private void forceSplit(TableName table) {
211    try {
212      // need to call regions server to by synchronous but isn't visible.
213      HRegionServer hrs = util.getRSForFirstRegionInTable(table);
214
215      for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
216        if (hri.getTable().equals(table)) {
217          util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
218          // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
219        }
220      }
221
222      // verify that split completed.
223      int regions;
224      do {
225        regions = 0;
226        for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
227          if (hri.getTable().equals(table)) {
228            regions++;
229          }
230        }
231        if (regions != 2) {
232          LOG.info("Taking some time to complete split...");
233          Thread.sleep(250);
234        }
235      } while (regions != 2);
236    } catch (IOException e) {
237      e.printStackTrace();
238    } catch (InterruptedException e) {
239      e.printStackTrace();
240    }
241  }
242
243  @BeforeClass
244  public static void setupCluster() throws Exception {
245    util = new HBaseTestingUtility();
246    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
247    util.startMiniCluster(1);
248  }
249
250  @AfterClass
251  public static void teardownCluster() throws Exception {
252    util.shutdownMiniCluster();
253  }
254
255  /**
256   * Checks that all columns have the expected value and that there is the expected number of rows.
257   * @throws IOException
258   */
259  void assertExpectedTable(TableName table, int count, int value) throws IOException {
260    TableDescriptor htd = util.getAdmin().getDescriptor(table);
261    assertNotNull(htd);
262    try (Table t = util.getConnection().getTable(table);
263        ResultScanner sr = t.getScanner(new Scan())) {
264      int i = 0;
265      for (Result r; (r = sr.next()) != null;) {
266        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
267            .forEach(v -> assertArrayEquals(value(value), v));
268        i++;
269      }
270      assertEquals(count, i);
271    } catch (IOException e) {
272      fail("Failed due to exception");
273    }
274  }
275
276  /**
277   * Test that shows that exception thrown from the RS side will result in an exception on the
278   * LIHFile client.
279   */
280  @Test(expected = IOException.class)
281  public void testBulkLoadPhaseFailure() throws Exception {
282    final TableName table = TableName.valueOf(name.getMethodName());
283    final AtomicInteger attmptedCalls = new AtomicInteger();
284    final AtomicInteger failedCalls = new AtomicInteger();
285    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
286    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
287      setupTable(connection, table, 10);
288      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
289        @Override
290        protected List<LoadQueueItem> tryAtomicRegionLoad(
291            ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first,
292            Collection<LoadQueueItem> lqis) throws IOException {
293          int i = attmptedCalls.incrementAndGet();
294          if (i == 1) {
295            Connection errConn;
296            try {
297              errConn = getMockedConnection(util.getConfiguration());
298              serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true);
299            } catch (Exception e) {
300              LOG.error(HBaseMarkers.FATAL, "mocking cruft, should never happen", e);
301              throw new RuntimeException("mocking cruft, should never happen");
302            }
303            failedCalls.incrementAndGet();
304            return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
305          }
306
307          return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
308        }
309      };
310      try {
311        // create HFiles for different column families
312        Path dir = buildBulkFiles(table, 1);
313        try (Table t = connection.getTable(table);
314            RegionLocator locator = connection.getRegionLocator(table);
315            Admin admin = connection.getAdmin()) {
316          lih.doBulkLoad(dir, admin, t, locator);
317        }
318      } finally {
319        util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
320          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
321      }
322      fail("doBulkLoad should have thrown an exception");
323    }
324  }
325
326  /**
327   * Test that shows that exception thrown from the RS side will result in the expected number of
328   * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
329   * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
330   */
331  @Test
332  public void testRetryOnIOException() throws Exception {
333    final TableName table = TableName.valueOf(name.getMethodName());
334    final AtomicInteger calls = new AtomicInteger(0);
335    final Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
336    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
337    util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
338    final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
339      @Override
340      protected List<LoadQueueItem> tryAtomicRegionLoad(
341          ClientServiceCallable<byte[]> serverCallable, TableName tableName, final byte[] first,
342          Collection<LoadQueueItem> lqis) throws IOException {
343        if (calls.get() < util.getConfiguration().getInt(
344          HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
345          ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(conn,
346              tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(),
347              HConstants.PRIORITY_UNSET) {
348            @Override
349            public byte[] rpcCall() throws Exception {
350              throw new IOException("Error calling something on RegionServer");
351            }
352          };
353          calls.getAndIncrement();
354          return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
355        } else {
356          return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
357        }
358      }
359    };
360    setupTable(conn, table, 10);
361    Path dir = buildBulkFiles(table, 1);
362    lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table));
363    assertEquals(calls.get(), 2);
364    util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
365  }
366
367  private ClusterConnection getMockedConnection(final Configuration conf)
368      throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
369    ClusterConnection c = Mockito.mock(ClusterConnection.class);
370    Mockito.when(c.getConfiguration()).thenReturn(conf);
371    Mockito.doNothing().when(c).close();
372    // Make it so we return a particular location when asked.
373    final HRegionLocation loc = new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO,
374        ServerName.valueOf("example.org", 1234, 0));
375    Mockito.when(
376      c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean()))
377        .thenReturn(loc);
378    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc);
379    ClientProtos.ClientService.BlockingInterface hri =
380        Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
381    Mockito
382        .when(
383          hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any()))
384        .thenThrow(new ServiceException(new IOException("injecting bulk load error")));
385    Mockito.when(c.getClient(Mockito.any())).thenReturn(hri);
386    return c;
387  }
388
389  /**
390   * This test exercises the path where there is a split after initial validation but before the
391   * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
392   * split just before the atomic region load.
393   */
394  @Test
395  public void testSplitWhileBulkLoadPhase() throws Exception {
396    final TableName table = TableName.valueOf(name.getMethodName());
397    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
398      setupTable(connection, table, 10);
399      populateTable(connection, table, 1);
400      assertExpectedTable(table, ROWCOUNT, 1);
401
402      // Now let's cause trouble. This will occur after checks and cause bulk
403      // files to fail when attempt to atomically import. This is recoverable.
404      final AtomicInteger attemptedCalls = new AtomicInteger();
405      LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
406        @Override
407        protected void bulkLoadPhase(final Table htable, final Connection conn,
408            ExecutorService pool, Deque<LoadQueueItem> queue,
409            final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
410            Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
411          int i = attemptedCalls.incrementAndGet();
412          if (i == 1) {
413            // On first attempt force a split.
414            forceSplit(table);
415          }
416          super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
417        }
418      };
419
420      // create HFiles for different column families
421      try (Table t = connection.getTable(table);
422          RegionLocator locator = connection.getRegionLocator(table);
423          Admin admin = connection.getAdmin()) {
424        Path bulk = buildBulkFiles(table, 2);
425        lih2.doBulkLoad(bulk, admin, t, locator);
426      }
427
428      // check that data was loaded
429      // The three expected attempts are 1) failure because need to split, 2)
430      // load of split top 3) load of split bottom
431      assertEquals(3, attemptedCalls.get());
432      assertExpectedTable(table, ROWCOUNT, 2);
433    }
434  }
435
436  /**
437   * This test splits a table and attempts to bulk load. The bulk import files should be split
438   * before atomically importing.
439   */
440  @Test
441  public void testGroupOrSplitPresplit() throws Exception {
442    final TableName table = TableName.valueOf(name.getMethodName());
443    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
444      setupTable(connection, table, 10);
445      populateTable(connection, table, 1);
446      assertExpectedTable(connection, table, ROWCOUNT, 1);
447      forceSplit(table);
448
449      final AtomicInteger countedLqis = new AtomicInteger();
450      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
451        @Override
452        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
453            Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
454            final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
455          Pair<List<LoadQueueItem>, String> lqis =
456              super.groupOrSplit(regionGroups, item, htable, startEndKeys);
457          if (lqis != null && lqis.getFirst() != null) {
458            countedLqis.addAndGet(lqis.getFirst().size());
459          }
460          return lqis;
461        }
462      };
463
464      // create HFiles for different column families
465      Path bulk = buildBulkFiles(table, 2);
466      try (Table t = connection.getTable(table);
467          RegionLocator locator = connection.getRegionLocator(table);
468          Admin admin = connection.getAdmin()) {
469        lih.doBulkLoad(bulk, admin, t, locator);
470      }
471      assertExpectedTable(connection, table, ROWCOUNT, 2);
472      assertEquals(20, countedLqis.get());
473    }
474  }
475
476  /**
477   * This test creates a table with many small regions. The bulk load files would be splitted
478   * multiple times before all of them can be loaded successfully.
479   */
480  @Test
481  public void testSplitTmpFileCleanUp() throws Exception {
482    final TableName table = TableName.valueOf(name.getMethodName());
483    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
484        Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
485        Bytes.toBytes("row_00000050") };
486    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
487      setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
488
489      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
490
491      // create HFiles
492      Path bulk = buildBulkFiles(table, 2);
493      try (Table t = connection.getTable(table);
494          RegionLocator locator = connection.getRegionLocator(table);
495          Admin admin = connection.getAdmin()) {
496        lih.doBulkLoad(bulk, admin, t, locator);
497      }
498      // family path
499      Path tmpPath = new Path(bulk, family(0));
500      // TMP_DIR under family path
501      tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
502      FileSystem fs = bulk.getFileSystem(util.getConfiguration());
503      // HFiles have been splitted, there is TMP_DIR
504      assertTrue(fs.exists(tmpPath));
505      // TMP_DIR should have been cleaned-up
506      assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
507        FSUtils.listStatus(fs, tmpPath));
508      assertExpectedTable(connection, table, ROWCOUNT, 2);
509    }
510  }
511
512  /**
513   * This simulates an remote exception which should cause LIHF to exit with an exception.
514   */
515  @Test(expected = IOException.class)
516  public void testGroupOrSplitFailure() throws Exception {
517    final TableName tableName = TableName.valueOf(name.getMethodName());
518    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
519      setupTable(connection, tableName, 10);
520
521      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
522        int i = 0;
523
524        @Override
525        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
526            Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
527            final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
528          i++;
529
530          if (i == 5) {
531            throw new IOException("failure");
532          }
533          return super.groupOrSplit(regionGroups, item, table, startEndKeys);
534        }
535      };
536
537      // create HFiles for different column families
538      Path dir = buildBulkFiles(tableName, 1);
539      try (Table t = connection.getTable(tableName);
540          RegionLocator locator = connection.getRegionLocator(tableName);
541          Admin admin = connection.getAdmin()) {
542        lih.doBulkLoad(dir, admin, t, locator);
543      }
544    }
545
546    fail("doBulkLoad should have thrown an exception");
547  }
548
549  @Test
550  public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
551    final TableName tableName = TableName.valueOf(name.getMethodName());
552    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
553    // Share connection. We were failing to find the table with our new reverse scan because it
554    // looks for first region, not any region -- that is how it works now. The below removes first
555    // region in test. Was reliant on the Connection caching having first region.
556    Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
557    Table table = connection.getTable(tableName);
558
559    setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
560    Path dir = buildBulkFiles(tableName, 2);
561
562    final AtomicInteger countedLqis = new AtomicInteger();
563    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
564
565      @Override
566      protected Pair<List<LoadQueueItem>, String> groupOrSplit(
567          Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
568          final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
569        Pair<List<LoadQueueItem>, String> lqis =
570            super.groupOrSplit(regionGroups, item, htable, startEndKeys);
571        if (lqis != null && lqis.getFirst() != null) {
572          countedLqis.addAndGet(lqis.getFirst().size());
573        }
574        return lqis;
575      }
576    };
577
578    // do bulkload when there is no region hole in hbase:meta.
579    try (Table t = connection.getTable(tableName);
580        RegionLocator locator = connection.getRegionLocator(tableName);
581        Admin admin = connection.getAdmin()) {
582      loader.doBulkLoad(dir, admin, t, locator);
583    } catch (Exception e) {
584      LOG.error("exeception=", e);
585    }
586    // check if all the data are loaded into the table.
587    this.assertExpectedTable(tableName, ROWCOUNT, 2);
588
589    dir = buildBulkFiles(tableName, 3);
590
591    // Mess it up by leaving a hole in the hbase:meta
592    List<RegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
593    for (RegionInfo regionInfo : regionInfos) {
594      if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
595        MetaTableAccessor.deleteRegionInfo(connection, regionInfo);
596        break;
597      }
598    }
599
600    try (Table t = connection.getTable(tableName);
601        RegionLocator locator = connection.getRegionLocator(tableName);
602        Admin admin = connection.getAdmin()) {
603      loader.doBulkLoad(dir, admin, t, locator);
604    } catch (Exception e) {
605      LOG.error("exception=", e);
606      assertTrue("IOException expected", e instanceof IOException);
607    }
608
609    table.close();
610
611    // Make sure at least the one region that still exists can be found.
612    regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
613    assertTrue(regionInfos.size() >= 1);
614
615    this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
616    connection.close();
617  }
618
619  /**
620   * Checks that all columns have the expected value and that there is the expected number of rows.
621   * @throws IOException
622   */
623  void assertExpectedTable(final Connection connection, TableName table, int count, int value)
624      throws IOException {
625    TableDescriptor htd = util.getAdmin().getDescriptor(table);
626    assertNotNull(htd);
627    try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
628      int i = 0;
629      for (Result r; (r = sr.next()) != null;) {
630        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
631            .forEach(v -> assertArrayEquals(value(value), v));
632        i++;
633      }
634      assertEquals(count, i);
635    } catch (IOException e) {
636      fail("Failed due to exception");
637    }
638  }
639}