001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.Collection;
030import java.util.Deque;
031import java.util.List;
032import java.util.Map;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.stream.IntStream;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.MetaTableAccessor;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableExistsException;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.ClientServiceCallable;
049import org.apache.hadoop.hbase.client.ClusterConnection;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.ConnectionFactory;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.client.RegionInfoBuilder;
055import org.apache.hadoop.hbase.client.RegionLocator;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.ResultScanner;
058import org.apache.hadoop.hbase.client.Scan;
059import org.apache.hadoop.hbase.client.Table;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
062import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
063import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
064import org.apache.hadoop.hbase.log.HBaseMarkers;
065import org.apache.hadoop.hbase.regionserver.HRegionServer;
066import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
067import org.apache.hadoop.hbase.testclassification.LargeTests;
068import org.apache.hadoop.hbase.testclassification.MiscTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.CommonFSUtils;
071import org.apache.hadoop.hbase.util.Pair;
072import org.junit.AfterClass;
073import org.junit.BeforeClass;
074import org.junit.ClassRule;
075import org.junit.Rule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.junit.rules.TestName;
079import org.mockito.Mockito;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
084import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
085import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
086
087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
090
091/**
092 * Test cases for the atomic load error handling of the bulk load functionality.
093 */
094@Category({ MiscTests.class, LargeTests.class })
095public class TestLoadIncrementalHFilesSplitRecovery {
096
097  @ClassRule
098  public static final HBaseClassTestRule CLASS_RULE =
099    HBaseClassTestRule.forClass(TestLoadIncrementalHFilesSplitRecovery.class);
100
101  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
102
103  static HBaseTestingUtility util;
104  // used by secure subclass
105  static boolean useSecure = false;
106
107  final static int NUM_CFS = 10;
108  final static byte[] QUAL = Bytes.toBytes("qual");
109  final static int ROWCOUNT = 100;
110
111  private final static byte[][] families = new byte[NUM_CFS][];
112
113  @Rule
114  public TestName name = new TestName();
115
116  static {
117    for (int i = 0; i < NUM_CFS; i++) {
118      families[i] = Bytes.toBytes(family(i));
119    }
120  }
121
122  static byte[] rowkey(int i) {
123    return Bytes.toBytes(String.format("row_%08d", i));
124  }
125
126  static String family(int i) {
127    return String.format("family_%04d", i);
128  }
129
130  static byte[] value(int i) {
131    return Bytes.toBytes(String.format("%010d", i));
132  }
133
134  public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
135    byte[] val = value(value);
136    for (int i = 0; i < NUM_CFS; i++) {
137      Path testIn = new Path(dir, family(i));
138
139      TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
140        Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
141    }
142  }
143
144  private TableDescriptor createTableDesc(TableName name, int cfs) {
145    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
146    IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
147      .forEachOrdered(builder::setColumnFamily);
148    return builder.build();
149  }
150
151  /**
152   * Creates a table with given table name and specified number of column families if the table does
153   * not already exist.
154   */
155  private void setupTable(final Connection connection, TableName table, int cfs)
156    throws IOException {
157    try {
158      LOG.info("Creating table " + table);
159      try (Admin admin = connection.getAdmin()) {
160        admin.createTable(createTableDesc(table, cfs));
161      }
162    } catch (TableExistsException tee) {
163      LOG.info("Table " + table + " already exists");
164    }
165  }
166
167  /**
168   * Creates a table with given table name,specified number of column families<br>
169   * and splitkeys if the table does not already exist. nnn
170   */
171  private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
172    throws IOException {
173    try {
174      LOG.info("Creating table " + table);
175      util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
176    } catch (TableExistsException tee) {
177      LOG.info("Table " + table + " already exists");
178    }
179  }
180
181  private Path buildBulkFiles(TableName table, int value) throws Exception {
182    Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
183    Path bulk1 = new Path(dir, table.getNameAsString() + value);
184    FileSystem fs = util.getTestFileSystem();
185    buildHFiles(fs, bulk1, value);
186    return bulk1;
187  }
188
189  /**
190   * Populate table with known values.
191   */
192  private void populateTable(final Connection connection, TableName table, int value)
193    throws Exception {
194    // create HFiles for different column families
195    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
196    Path bulk1 = buildBulkFiles(table, value);
197    try (Table t = connection.getTable(table);
198      RegionLocator locator = connection.getRegionLocator(table);
199      Admin admin = connection.getAdmin()) {
200      lih.doBulkLoad(bulk1, admin, t, locator);
201    }
202  }
203
204  /**
205   * Split the known table in half. (this is hard coded for this test suite)
206   */
207  private void forceSplit(TableName table) {
208    try {
209      // need to call regions server to by synchronous but isn't visible.
210      HRegionServer hrs = util.getRSForFirstRegionInTable(table);
211
212      for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
213        if (hri.getTable().equals(table)) {
214          util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
215          // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
216        }
217      }
218
219      // verify that split completed.
220      int regions;
221      do {
222        regions = 0;
223        for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
224          if (hri.getTable().equals(table)) {
225            regions++;
226          }
227        }
228        if (regions != 2) {
229          LOG.info("Taking some time to complete split...");
230          Thread.sleep(250);
231        }
232      } while (regions != 2);
233    } catch (IOException e) {
234      e.printStackTrace();
235    } catch (InterruptedException e) {
236      e.printStackTrace();
237    }
238  }
239
240  @BeforeClass
241  public static void setupCluster() throws Exception {
242    util = new HBaseTestingUtility();
243    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
244    util.startMiniCluster(1);
245  }
246
247  @AfterClass
248  public static void teardownCluster() throws Exception {
249    util.shutdownMiniCluster();
250  }
251
252  /**
253   * Checks that all columns have the expected value and that there is the expected number of rows.
254   * n
255   */
256  void assertExpectedTable(TableName table, int count, int value) throws IOException {
257    TableDescriptor htd = util.getAdmin().getDescriptor(table);
258    assertNotNull(htd);
259    try (Table t = util.getConnection().getTable(table);
260      ResultScanner sr = t.getScanner(new Scan())) {
261      int i = 0;
262      for (Result r; (r = sr.next()) != null;) {
263        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
264          .forEach(v -> assertArrayEquals(value(value), v));
265        i++;
266      }
267      assertEquals(count, i);
268    } catch (IOException e) {
269      fail("Failed due to exception");
270    }
271  }
272
273  /**
274   * Test that shows that exception thrown from the RS side will result in an exception on the
275   * LIHFile client.
276   */
277  @Test(expected = IOException.class)
278  public void testBulkLoadPhaseFailure() throws Exception {
279    final TableName table = TableName.valueOf(name.getMethodName());
280    final AtomicInteger attmptedCalls = new AtomicInteger();
281    final AtomicInteger failedCalls = new AtomicInteger();
282    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
283    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
284      setupTable(connection, table, 10);
285      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
286        @Override
287        protected List<LoadQueueItem> tryAtomicRegionLoad(Connection connection,
288          TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile)
289          throws IOException {
290          int i = attmptedCalls.incrementAndGet();
291          if (i == 1) {
292            Connection errConn;
293            try {
294              errConn = getMockedConnection(util.getConfiguration());
295            } catch (Exception e) {
296              LOG.error(HBaseMarkers.FATAL, "mocking cruft, should never happen", e);
297              throw new RuntimeException("mocking cruft, should never happen");
298            }
299            failedCalls.incrementAndGet();
300            return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, true);
301          }
302
303          return super.tryAtomicRegionLoad(connection, tableName, first, lqis, true);
304        }
305      };
306      try {
307        // create HFiles for different column families
308        Path dir = buildBulkFiles(table, 1);
309        try (Table t = connection.getTable(table);
310          RegionLocator locator = connection.getRegionLocator(table);
311          Admin admin = connection.getAdmin()) {
312          lih.doBulkLoad(dir, admin, t, locator);
313        }
314      } finally {
315        util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
316          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
317      }
318      fail("doBulkLoad should have thrown an exception");
319    }
320  }
321
322  /**
323   * Test that shows that exception thrown from the RS side will result in the expected number of
324   * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
325   * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
326   */
327  @Test
328  public void testRetryOnIOException() throws Exception {
329    final TableName table = TableName.valueOf(name.getMethodName());
330    final AtomicInteger calls = new AtomicInteger(0);
331    final Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
332    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
333    util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
334    final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
335      @Override
336      protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn,
337        TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) {
338        if (
339          calls.get() < util.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
340            HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)
341        ) {
342          calls.getAndIncrement();
343          return new ClientServiceCallable<byte[]>(conn, tableName, first,
344            new RpcControllerFactory(util.getConfiguration()).newController(),
345            HConstants.PRIORITY_UNSET) {
346            @Override
347            public byte[] rpcCall() throws Exception {
348              throw new IOException("Error calling something on RegionServer");
349            }
350          };
351        } else {
352          return super.buildClientServiceCallable(conn, tableName, first, lqis, true);
353        }
354      }
355    };
356    setupTable(conn, table, 10);
357    Path dir = buildBulkFiles(table, 1);
358    lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table));
359    assertEquals(calls.get(), 2);
360    util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
361  }
362
363  private ClusterConnection getMockedConnection(final Configuration conf)
364    throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
365    ClusterConnection c = Mockito.mock(ClusterConnection.class);
366    Mockito.when(c.getConfiguration()).thenReturn(conf);
367    Mockito.doNothing().when(c).close();
368    // Make it so we return a particular location when asked.
369    final HRegionLocation loc = new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO,
370      ServerName.valueOf("example.org", 1234, 0));
371    Mockito.when(
372      c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean()))
373      .thenReturn(loc);
374    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc);
375    ClientProtos.ClientService.BlockingInterface hri =
376      Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
377    Mockito
378      .when(hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any()))
379      .thenThrow(new ServiceException(new IOException("injecting bulk load error")));
380    Mockito.when(c.getClient(Mockito.any())).thenReturn(hri);
381    return c;
382  }
383
384  /**
385   * This test exercises the path where there is a split after initial validation but before the
386   * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
387   * split just before the atomic region load.
388   */
389  @Test
390  public void testSplitWhileBulkLoadPhase() throws Exception {
391    final TableName table = TableName.valueOf(name.getMethodName());
392    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
393      setupTable(connection, table, 10);
394      populateTable(connection, table, 1);
395      assertExpectedTable(table, ROWCOUNT, 1);
396
397      // Now let's cause trouble. This will occur after checks and cause bulk
398      // files to fail when attempt to atomically import. This is recoverable.
399      final AtomicInteger attemptedCalls = new AtomicInteger();
400      LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
401        @Override
402        protected void bulkLoadPhase(final Table htable, final Connection conn,
403          ExecutorService pool, Deque<LoadQueueItem> queue,
404          final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
405          Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
406          int i = attemptedCalls.incrementAndGet();
407          if (i == 1) {
408            // On first attempt force a split.
409            forceSplit(table);
410          }
411          super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
412        }
413      };
414
415      // create HFiles for different column families
416      try (Table t = connection.getTable(table);
417        RegionLocator locator = connection.getRegionLocator(table);
418        Admin admin = connection.getAdmin()) {
419        Path bulk = buildBulkFiles(table, 2);
420        lih2.doBulkLoad(bulk, admin, t, locator);
421      }
422
423      // check that data was loaded
424      // The three expected attempts are 1) failure because need to split, 2)
425      // load of split top 3) load of split bottom
426      assertEquals(3, attemptedCalls.get());
427      assertExpectedTable(table, ROWCOUNT, 2);
428    }
429  }
430
431  /**
432   * This test splits a table and attempts to bulk load. The bulk import files should be split
433   * before atomically importing.
434   */
435  @Test
436  public void testGroupOrSplitPresplit() throws Exception {
437    final TableName table = TableName.valueOf(name.getMethodName());
438    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
439      setupTable(connection, table, 10);
440      populateTable(connection, table, 1);
441      assertExpectedTable(connection, table, ROWCOUNT, 1);
442      forceSplit(table);
443
444      final AtomicInteger countedLqis = new AtomicInteger();
445      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
446        @Override
447        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
448          Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
449          final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
450          Pair<List<LoadQueueItem>, String> lqis =
451            super.groupOrSplit(regionGroups, item, htable, startEndKeys);
452          if (lqis != null && lqis.getFirst() != null) {
453            countedLqis.addAndGet(lqis.getFirst().size());
454          }
455          return lqis;
456        }
457      };
458
459      // create HFiles for different column families
460      Path bulk = buildBulkFiles(table, 2);
461      try (Table t = connection.getTable(table);
462        RegionLocator locator = connection.getRegionLocator(table);
463        Admin admin = connection.getAdmin()) {
464        lih.doBulkLoad(bulk, admin, t, locator);
465      }
466      assertExpectedTable(connection, table, ROWCOUNT, 2);
467      assertEquals(20, countedLqis.get());
468    }
469  }
470
471  @Test
472  public void testCorrectSplitPoint() throws Exception {
473    final TableName table = TableName.valueOf(name.getMethodName());
474    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
475      Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
476      Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"), Bytes.toBytes("row_00000070") };
477    setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS);
478
479    final AtomicInteger bulkloadRpcTimes = new AtomicInteger();
480    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
481
482      @Override
483      protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool,
484        Deque<LoadIncrementalHFiles.LoadQueueItem> queue,
485        Multimap<ByteBuffer, LoadIncrementalHFiles.LoadQueueItem> regionGroups, boolean copyFile,
486        Map<LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
487        bulkloadRpcTimes.addAndGet(1);
488        super.bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
489      }
490    };
491
492    Path dir = buildBulkFiles(table, 1);
493    loader.bulkLoad(table, dir);
494    // before HBASE-25281 we need invoke bulkload rpc 8 times
495    assertEquals(4, bulkloadRpcTimes.get());
496  }
497
498  /**
499   * This test creates a table with many small regions. The bulk load files would be splitted
500   * multiple times before all of them can be loaded successfully.
501   */
502  @Test
503  public void testSplitTmpFileCleanUp() throws Exception {
504    final TableName table = TableName.valueOf(name.getMethodName());
505    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
506      Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
507      Bytes.toBytes("row_00000050") };
508    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
509      setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
510
511      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
512
513      // create HFiles
514      Path bulk = buildBulkFiles(table, 2);
515      try (Table t = connection.getTable(table);
516        RegionLocator locator = connection.getRegionLocator(table);
517        Admin admin = connection.getAdmin()) {
518        lih.doBulkLoad(bulk, admin, t, locator);
519      }
520      // family path
521      Path tmpPath = new Path(bulk, family(0));
522      // TMP_DIR under family path
523      tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
524      FileSystem fs = bulk.getFileSystem(util.getConfiguration());
525      // HFiles have been splitted, there is TMP_DIR
526      assertTrue(fs.exists(tmpPath));
527      // TMP_DIR should have been cleaned-up
528      assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
529        CommonFSUtils.listStatus(fs, tmpPath));
530      assertExpectedTable(connection, table, ROWCOUNT, 2);
531    }
532  }
533
534  /**
535   * This simulates an remote exception which should cause LIHF to exit with an exception.
536   */
537  @Test(expected = IOException.class)
538  public void testGroupOrSplitFailure() throws Exception {
539    final TableName tableName = TableName.valueOf(name.getMethodName());
540    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
541      setupTable(connection, tableName, 10);
542
543      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
544        int i = 0;
545
546        @Override
547        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
548          Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
549          final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
550          i++;
551
552          if (i == 5) {
553            throw new IOException("failure");
554          }
555          return super.groupOrSplit(regionGroups, item, table, startEndKeys);
556        }
557      };
558
559      // create HFiles for different column families
560      Path dir = buildBulkFiles(tableName, 1);
561      try (Table t = connection.getTable(tableName);
562        RegionLocator locator = connection.getRegionLocator(tableName);
563        Admin admin = connection.getAdmin()) {
564        lih.doBulkLoad(dir, admin, t, locator);
565      }
566    }
567
568    fail("doBulkLoad should have thrown an exception");
569  }
570
571  @Test
572  public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
573    final TableName tableName = TableName.valueOf(name.getMethodName());
574    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
575    // Share connection. We were failing to find the table with our new reverse scan because it
576    // looks for first region, not any region -- that is how it works now. The below removes first
577    // region in test. Was reliant on the Connection caching having first region.
578    Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
579    Table table = connection.getTable(tableName);
580
581    setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
582    Path dir = buildBulkFiles(tableName, 2);
583
584    final AtomicInteger countedLqis = new AtomicInteger();
585    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
586
587      @Override
588      protected Pair<List<LoadQueueItem>, String> groupOrSplit(
589        Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
590        final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
591        Pair<List<LoadQueueItem>, String> lqis =
592          super.groupOrSplit(regionGroups, item, htable, startEndKeys);
593        if (lqis != null && lqis.getFirst() != null) {
594          countedLqis.addAndGet(lqis.getFirst().size());
595        }
596        return lqis;
597      }
598    };
599
600    // do bulkload when there is no region hole in hbase:meta.
601    try (Table t = connection.getTable(tableName);
602      RegionLocator locator = connection.getRegionLocator(tableName);
603      Admin admin = connection.getAdmin()) {
604      loader.doBulkLoad(dir, admin, t, locator);
605    } catch (Exception e) {
606      LOG.error("exeception=", e);
607    }
608    // check if all the data are loaded into the table.
609    this.assertExpectedTable(tableName, ROWCOUNT, 2);
610
611    dir = buildBulkFiles(tableName, 3);
612
613    // Mess it up by leaving a hole in the hbase:meta
614    List<RegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
615    for (RegionInfo regionInfo : regionInfos) {
616      if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
617        MetaTableAccessor.deleteRegionInfo(connection, regionInfo);
618        break;
619      }
620    }
621
622    try (Table t = connection.getTable(tableName);
623      RegionLocator locator = connection.getRegionLocator(tableName);
624      Admin admin = connection.getAdmin()) {
625      loader.doBulkLoad(dir, admin, t, locator);
626    } catch (Exception e) {
627      LOG.error("exception=", e);
628      assertTrue("IOException expected", e instanceof IOException);
629    }
630
631    table.close();
632
633    // Make sure at least the one region that still exists can be found.
634    regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
635    assertTrue(regionInfos.size() >= 1);
636
637    this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
638    connection.close();
639  }
640
641  /**
642   * Checks that all columns have the expected value and that there is the expected number of rows.
643   * n
644   */
645  void assertExpectedTable(final Connection connection, TableName table, int count, int value)
646    throws IOException {
647    TableDescriptor htd = util.getAdmin().getDescriptor(table);
648    assertNotNull(htd);
649    try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
650      int i = 0;
651      for (Result r; (r = sr.next()) != null;) {
652        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
653          .forEach(v -> assertArrayEquals(value(value), v));
654        i++;
655      }
656      assertEquals(count, i);
657    } catch (IOException e) {
658      fail("Failed due to exception");
659    }
660  }
661}