001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.Collection;
030import java.util.Deque;
031import java.util.List;
032import java.util.Map;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.stream.IntStream;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.MetaTableAccessor;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableExistsException;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.ClientServiceCallable;
049import org.apache.hadoop.hbase.client.ClusterConnection;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.ConnectionFactory;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.client.RegionInfoBuilder;
055import org.apache.hadoop.hbase.client.RegionLocator;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.ResultScanner;
058import org.apache.hadoop.hbase.client.Scan;
059import org.apache.hadoop.hbase.client.Table;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
062import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
063import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
064import org.apache.hadoop.hbase.log.HBaseMarkers;
065import org.apache.hadoop.hbase.regionserver.HRegionServer;
066import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
067import org.apache.hadoop.hbase.testclassification.LargeTests;
068import org.apache.hadoop.hbase.testclassification.MiscTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.CommonFSUtils;
071import org.apache.hadoop.hbase.util.Pair;
072import org.junit.AfterClass;
073import org.junit.BeforeClass;
074import org.junit.ClassRule;
075import org.junit.Rule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.junit.rules.TestName;
079import org.mockito.Mockito;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
084import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
085import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
086
087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
090
091/**
092 * Test cases for the atomic load error handling of the bulk load functionality.
093 */
094@Category({ MiscTests.class, LargeTests.class })
095public class TestLoadIncrementalHFilesSplitRecovery {
096
097  @ClassRule
098  public static final HBaseClassTestRule CLASS_RULE =
099      HBaseClassTestRule.forClass(TestLoadIncrementalHFilesSplitRecovery.class);
100
101  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
102
103  static HBaseTestingUtility util;
104  // used by secure subclass
105  static boolean useSecure = false;
106
107  final static int NUM_CFS = 10;
108  final static byte[] QUAL = Bytes.toBytes("qual");
109  final static int ROWCOUNT = 100;
110
111  private final static byte[][] families = new byte[NUM_CFS][];
112
113  @Rule
114  public TestName name = new TestName();
115
116  static {
117    for (int i = 0; i < NUM_CFS; i++) {
118      families[i] = Bytes.toBytes(family(i));
119    }
120  }
121
122  static byte[] rowkey(int i) {
123    return Bytes.toBytes(String.format("row_%08d", i));
124  }
125
126  static String family(int i) {
127    return String.format("family_%04d", i);
128  }
129
130  static byte[] value(int i) {
131    return Bytes.toBytes(String.format("%010d", i));
132  }
133
134  public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
135    byte[] val = value(value);
136    for (int i = 0; i < NUM_CFS; i++) {
137      Path testIn = new Path(dir, family(i));
138
139      TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
140        Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
141    }
142  }
143
144  private TableDescriptor createTableDesc(TableName name, int cfs) {
145    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
146    IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
147        .forEachOrdered(builder::setColumnFamily);
148    return builder.build();
149  }
150
151  /**
152   * Creates a table with given table name and specified number of column families if the table does
153   * not already exist.
154   */
155  private void setupTable(final Connection connection, TableName table, int cfs)
156      throws IOException {
157    try {
158      LOG.info("Creating table " + table);
159      try (Admin admin = connection.getAdmin()) {
160        admin.createTable(createTableDesc(table, cfs));
161      }
162    } catch (TableExistsException tee) {
163      LOG.info("Table " + table + " already exists");
164    }
165  }
166
167  /**
168   * Creates a table with given table name,specified number of column families<br>
169   * and splitkeys if the table does not already exist.
170   * @param table
171   * @param cfs
172   * @param SPLIT_KEYS
173   */
174  private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
175      throws IOException {
176    try {
177      LOG.info("Creating table " + table);
178      util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
179    } catch (TableExistsException tee) {
180      LOG.info("Table " + table + " already exists");
181    }
182  }
183
184  private Path buildBulkFiles(TableName table, int value) throws Exception {
185    Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
186    Path bulk1 = new Path(dir, table.getNameAsString() + value);
187    FileSystem fs = util.getTestFileSystem();
188    buildHFiles(fs, bulk1, value);
189    return bulk1;
190  }
191
192  /**
193   * Populate table with known values.
194   */
195  private void populateTable(final Connection connection, TableName table, int value)
196      throws Exception {
197    // create HFiles for different column families
198    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
199    Path bulk1 = buildBulkFiles(table, value);
200    try (Table t = connection.getTable(table);
201        RegionLocator locator = connection.getRegionLocator(table);
202        Admin admin = connection.getAdmin()) {
203      lih.doBulkLoad(bulk1, admin, t, locator);
204    }
205  }
206
207  /**
208   * Split the known table in half. (this is hard coded for this test suite)
209   */
210  private void forceSplit(TableName table) {
211    try {
212      // need to call regions server to by synchronous but isn't visible.
213      HRegionServer hrs = util.getRSForFirstRegionInTable(table);
214
215      for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
216        if (hri.getTable().equals(table)) {
217          util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
218          // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
219        }
220      }
221
222      // verify that split completed.
223      int regions;
224      do {
225        regions = 0;
226        for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
227          if (hri.getTable().equals(table)) {
228            regions++;
229          }
230        }
231        if (regions != 2) {
232          LOG.info("Taking some time to complete split...");
233          Thread.sleep(250);
234        }
235      } while (regions != 2);
236    } catch (IOException e) {
237      e.printStackTrace();
238    } catch (InterruptedException e) {
239      e.printStackTrace();
240    }
241  }
242
243  @BeforeClass
244  public static void setupCluster() throws Exception {
245    util = new HBaseTestingUtility();
246    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
247    util.startMiniCluster(1);
248  }
249
250  @AfterClass
251  public static void teardownCluster() throws Exception {
252    util.shutdownMiniCluster();
253  }
254
255  /**
256   * Checks that all columns have the expected value and that there is the expected number of rows.
257   * @throws IOException
258   */
259  void assertExpectedTable(TableName table, int count, int value) throws IOException {
260    TableDescriptor htd = util.getAdmin().getDescriptor(table);
261    assertNotNull(htd);
262    try (Table t = util.getConnection().getTable(table);
263        ResultScanner sr = t.getScanner(new Scan())) {
264      int i = 0;
265      for (Result r; (r = sr.next()) != null;) {
266        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
267            .forEach(v -> assertArrayEquals(value(value), v));
268        i++;
269      }
270      assertEquals(count, i);
271    } catch (IOException e) {
272      fail("Failed due to exception");
273    }
274  }
275
276  /**
277   * Test that shows that exception thrown from the RS side will result in an exception on the
278   * LIHFile client.
279   */
280  @Test(expected = IOException.class)
281  public void testBulkLoadPhaseFailure() throws Exception {
282    final TableName table = TableName.valueOf(name.getMethodName());
283    final AtomicInteger attmptedCalls = new AtomicInteger();
284    final AtomicInteger failedCalls = new AtomicInteger();
285    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
286    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
287      setupTable(connection, table, 10);
288      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
289        @Override
290        protected List<LoadQueueItem> tryAtomicRegionLoad(Connection connection,
291            TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis,
292            boolean copyFile) throws IOException {
293          int i = attmptedCalls.incrementAndGet();
294          if (i == 1) {
295            Connection errConn;
296            try {
297              errConn = getMockedConnection(util.getConfiguration());
298            } catch (Exception e) {
299              LOG.error(HBaseMarkers.FATAL, "mocking cruft, should never happen", e);
300              throw new RuntimeException("mocking cruft, should never happen");
301            }
302            failedCalls.incrementAndGet();
303            return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, true);
304          }
305
306          return super.tryAtomicRegionLoad(connection, tableName, first, lqis, true);
307        }
308      };
309      try {
310        // create HFiles for different column families
311        Path dir = buildBulkFiles(table, 1);
312        try (Table t = connection.getTable(table);
313            RegionLocator locator = connection.getRegionLocator(table);
314            Admin admin = connection.getAdmin()) {
315          lih.doBulkLoad(dir, admin, t, locator);
316        }
317      } finally {
318        util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
319          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
320      }
321      fail("doBulkLoad should have thrown an exception");
322    }
323  }
324
325  /**
326   * Test that shows that exception thrown from the RS side will result in the expected number of
327   * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
328   * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
329   */
330  @Test
331  public void testRetryOnIOException() throws Exception {
332    final TableName table = TableName.valueOf(name.getMethodName());
333    final AtomicInteger calls = new AtomicInteger(0);
334    final Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
335    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
336    util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
337    final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
338      @Override
339      protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn,
340          TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) {
341        if (calls.get() < util.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
342          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
343          calls.getAndIncrement();
344          return new ClientServiceCallable<byte[]>(conn, tableName, first,
345              new RpcControllerFactory(util.getConfiguration()).newController(),
346              HConstants.PRIORITY_UNSET) {
347            @Override
348            public byte[] rpcCall() throws Exception {
349              throw new IOException("Error calling something on RegionServer");
350            }
351          };
352        } else {
353          return super.buildClientServiceCallable(conn, tableName, first, lqis, true);
354        }
355      }
356    };
357    setupTable(conn, table, 10);
358    Path dir = buildBulkFiles(table, 1);
359    lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table));
360    assertEquals(calls.get(), 2);
361    util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
362  }
363
364  private ClusterConnection getMockedConnection(final Configuration conf)
365      throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
366    ClusterConnection c = Mockito.mock(ClusterConnection.class);
367    Mockito.when(c.getConfiguration()).thenReturn(conf);
368    Mockito.doNothing().when(c).close();
369    // Make it so we return a particular location when asked.
370    final HRegionLocation loc = new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO,
371        ServerName.valueOf("example.org", 1234, 0));
372    Mockito.when(
373      c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean()))
374        .thenReturn(loc);
375    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc);
376    ClientProtos.ClientService.BlockingInterface hri =
377        Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
378    Mockito
379        .when(
380          hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any()))
381        .thenThrow(new ServiceException(new IOException("injecting bulk load error")));
382    Mockito.when(c.getClient(Mockito.any())).thenReturn(hri);
383    return c;
384  }
385
386  /**
387   * This test exercises the path where there is a split after initial validation but before the
388   * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
389   * split just before the atomic region load.
390   */
391  @Test
392  public void testSplitWhileBulkLoadPhase() throws Exception {
393    final TableName table = TableName.valueOf(name.getMethodName());
394    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
395      setupTable(connection, table, 10);
396      populateTable(connection, table, 1);
397      assertExpectedTable(table, ROWCOUNT, 1);
398
399      // Now let's cause trouble. This will occur after checks and cause bulk
400      // files to fail when attempt to atomically import. This is recoverable.
401      final AtomicInteger attemptedCalls = new AtomicInteger();
402      LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
403        @Override
404        protected void bulkLoadPhase(final Table htable, final Connection conn,
405            ExecutorService pool, Deque<LoadQueueItem> queue,
406            final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
407            Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
408          int i = attemptedCalls.incrementAndGet();
409          if (i == 1) {
410            // On first attempt force a split.
411            forceSplit(table);
412          }
413          super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
414        }
415      };
416
417      // create HFiles for different column families
418      try (Table t = connection.getTable(table);
419          RegionLocator locator = connection.getRegionLocator(table);
420          Admin admin = connection.getAdmin()) {
421        Path bulk = buildBulkFiles(table, 2);
422        lih2.doBulkLoad(bulk, admin, t, locator);
423      }
424
425      // check that data was loaded
426      // The three expected attempts are 1) failure because need to split, 2)
427      // load of split top 3) load of split bottom
428      assertEquals(3, attemptedCalls.get());
429      assertExpectedTable(table, ROWCOUNT, 2);
430    }
431  }
432
433  /**
434   * This test splits a table and attempts to bulk load. The bulk import files should be split
435   * before atomically importing.
436   */
437  @Test
438  public void testGroupOrSplitPresplit() throws Exception {
439    final TableName table = TableName.valueOf(name.getMethodName());
440    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
441      setupTable(connection, table, 10);
442      populateTable(connection, table, 1);
443      assertExpectedTable(connection, table, ROWCOUNT, 1);
444      forceSplit(table);
445
446      final AtomicInteger countedLqis = new AtomicInteger();
447      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
448        @Override
449        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
450            Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
451            final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
452          Pair<List<LoadQueueItem>, String> lqis =
453              super.groupOrSplit(regionGroups, item, htable, startEndKeys);
454          if (lqis != null && lqis.getFirst() != null) {
455            countedLqis.addAndGet(lqis.getFirst().size());
456          }
457          return lqis;
458        }
459      };
460
461      // create HFiles for different column families
462      Path bulk = buildBulkFiles(table, 2);
463      try (Table t = connection.getTable(table);
464          RegionLocator locator = connection.getRegionLocator(table);
465          Admin admin = connection.getAdmin()) {
466        lih.doBulkLoad(bulk, admin, t, locator);
467      }
468      assertExpectedTable(connection, table, ROWCOUNT, 2);
469      assertEquals(20, countedLqis.get());
470    }
471  }
472
473  @Test
474  public void testCorrectSplitPoint() throws Exception {
475    final TableName table = TableName.valueOf(name.getMethodName());
476    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
477        Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
478        Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"),
479        Bytes.toBytes("row_00000070") };
480    setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS);
481
482    final AtomicInteger bulkloadRpcTimes = new AtomicInteger();
483    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
484
485      @Override
486      protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool,
487          Deque<LoadIncrementalHFiles.LoadQueueItem> queue,
488          Multimap<ByteBuffer, LoadIncrementalHFiles.LoadQueueItem> regionGroups, boolean copyFile,
489          Map<LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
490        bulkloadRpcTimes.addAndGet(1);
491        super.bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
492      }
493    };
494
495    Path dir = buildBulkFiles(table, 1);
496    loader.bulkLoad(table, dir);
497    // before HBASE-25281 we need invoke bulkload rpc 8 times
498    assertEquals(4, bulkloadRpcTimes.get());
499  }
500
501  /**
502   * This test creates a table with many small regions. The bulk load files would be splitted
503   * multiple times before all of them can be loaded successfully.
504   */
505  @Test
506  public void testSplitTmpFileCleanUp() throws Exception {
507    final TableName table = TableName.valueOf(name.getMethodName());
508    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
509        Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
510        Bytes.toBytes("row_00000050") };
511    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
512      setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
513
514      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
515
516      // create HFiles
517      Path bulk = buildBulkFiles(table, 2);
518      try (Table t = connection.getTable(table);
519          RegionLocator locator = connection.getRegionLocator(table);
520          Admin admin = connection.getAdmin()) {
521        lih.doBulkLoad(bulk, admin, t, locator);
522      }
523      // family path
524      Path tmpPath = new Path(bulk, family(0));
525      // TMP_DIR under family path
526      tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
527      FileSystem fs = bulk.getFileSystem(util.getConfiguration());
528      // HFiles have been splitted, there is TMP_DIR
529      assertTrue(fs.exists(tmpPath));
530      // TMP_DIR should have been cleaned-up
531      assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
532        CommonFSUtils.listStatus(fs, tmpPath));
533      assertExpectedTable(connection, table, ROWCOUNT, 2);
534    }
535  }
536
537  /**
538   * This simulates an remote exception which should cause LIHF to exit with an exception.
539   */
540  @Test(expected = IOException.class)
541  public void testGroupOrSplitFailure() throws Exception {
542    final TableName tableName = TableName.valueOf(name.getMethodName());
543    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
544      setupTable(connection, tableName, 10);
545
546      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
547        int i = 0;
548
549        @Override
550        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
551            Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
552            final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
553          i++;
554
555          if (i == 5) {
556            throw new IOException("failure");
557          }
558          return super.groupOrSplit(regionGroups, item, table, startEndKeys);
559        }
560      };
561
562      // create HFiles for different column families
563      Path dir = buildBulkFiles(tableName, 1);
564      try (Table t = connection.getTable(tableName);
565          RegionLocator locator = connection.getRegionLocator(tableName);
566          Admin admin = connection.getAdmin()) {
567        lih.doBulkLoad(dir, admin, t, locator);
568      }
569    }
570
571    fail("doBulkLoad should have thrown an exception");
572  }
573
574  @Test
575  public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
576    final TableName tableName = TableName.valueOf(name.getMethodName());
577    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
578    // Share connection. We were failing to find the table with our new reverse scan because it
579    // looks for first region, not any region -- that is how it works now. The below removes first
580    // region in test. Was reliant on the Connection caching having first region.
581    Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
582    Table table = connection.getTable(tableName);
583
584    setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
585    Path dir = buildBulkFiles(tableName, 2);
586
587    final AtomicInteger countedLqis = new AtomicInteger();
588    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
589
590      @Override
591      protected Pair<List<LoadQueueItem>, String> groupOrSplit(
592          Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
593          final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
594        Pair<List<LoadQueueItem>, String> lqis =
595            super.groupOrSplit(regionGroups, item, htable, startEndKeys);
596        if (lqis != null && lqis.getFirst() != null) {
597          countedLqis.addAndGet(lqis.getFirst().size());
598        }
599        return lqis;
600      }
601    };
602
603    // do bulkload when there is no region hole in hbase:meta.
604    try (Table t = connection.getTable(tableName);
605        RegionLocator locator = connection.getRegionLocator(tableName);
606        Admin admin = connection.getAdmin()) {
607      loader.doBulkLoad(dir, admin, t, locator);
608    } catch (Exception e) {
609      LOG.error("exeception=", e);
610    }
611    // check if all the data are loaded into the table.
612    this.assertExpectedTable(tableName, ROWCOUNT, 2);
613
614    dir = buildBulkFiles(tableName, 3);
615
616    // Mess it up by leaving a hole in the hbase:meta
617    List<RegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
618    for (RegionInfo regionInfo : regionInfos) {
619      if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
620        MetaTableAccessor.deleteRegionInfo(connection, regionInfo);
621        break;
622      }
623    }
624
625    try (Table t = connection.getTable(tableName);
626        RegionLocator locator = connection.getRegionLocator(tableName);
627        Admin admin = connection.getAdmin()) {
628      loader.doBulkLoad(dir, admin, t, locator);
629    } catch (Exception e) {
630      LOG.error("exception=", e);
631      assertTrue("IOException expected", e instanceof IOException);
632    }
633
634    table.close();
635
636    // Make sure at least the one region that still exists can be found.
637    regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
638    assertTrue(regionInfos.size() >= 1);
639
640    this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
641    connection.close();
642  }
643
644  /**
645   * Checks that all columns have the expected value and that there is the expected number of rows.
646   * @throws IOException
647   */
648  void assertExpectedTable(final Connection connection, TableName table, int count, int value)
649      throws IOException {
650    TableDescriptor htd = util.getAdmin().getDescriptor(table);
651    assertNotNull(htd);
652    try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
653      int i = 0;
654      for (Result r; (r = sr.next()) != null;) {
655        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
656            .forEach(v -> assertArrayEquals(value(value), v));
657        i++;
658      }
659      assertEquals(count, i);
660    } catch (IOException e) {
661      fail("Failed due to exception");
662    }
663  }
664}