001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.Random;
030import java.util.Set;
031import java.util.TreeSet;
032import java.util.stream.Collectors;
033import org.apache.commons.io.IOUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ChoreService;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.MetaTableAccessor;
041import org.apache.hadoop.hbase.NotServingRegionException;
042import org.apache.hadoop.hbase.ScheduledChore;
043import org.apache.hadoop.hbase.Stoppable;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.Waiter;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionLocator;
054import org.apache.hadoop.hbase.client.Result;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.hbase.util.PairOfSameType;
062import org.apache.hadoop.hbase.util.StoppableImplementation;
063import org.apache.hadoop.hbase.util.Threads;
064import org.junit.AfterClass;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.TestName;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
075import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
076import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
077
078@Category(LargeTests.class)
079public class TestEndToEndSplitTransaction {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083    HBaseClassTestRule.forClass(TestEndToEndSplitTransaction.class);
084
085  private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndSplitTransaction.class);
086  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
087  private static final Configuration CONF = TEST_UTIL.getConfiguration();
088
089  @Rule
090  public TestName name = new TestName();
091
092  @BeforeClass
093  public static void beforeAllTests() throws Exception {
094    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
095    TEST_UTIL.startMiniCluster(1);
096  }
097
098  @AfterClass
099  public static void afterAllTests() throws Exception {
100    TEST_UTIL.shutdownMiniCluster();
101  }
102
103  /**
104   * This is the test for : HBASE-20940 This test will split the region and try to open an reference
105   * over store file. Once store file has any reference, it makes sure that region can't be split
106   */
107  @Test
108  public void testCanSplitJustAfterASplit() throws Exception {
109    LOG.info("Starting testCanSplitJustAfterASplit");
110    byte[] fam = Bytes.toBytes("cf_split");
111
112    CompactSplit compactSplit =
113      TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getCompactSplitThread();
114    TableName tableName = TableName.valueOf("CanSplitTable");
115    Table source = TEST_UTIL.getConnection().getTable(tableName);
116    Admin admin = TEST_UTIL.getAdmin();
117    // set a large min compaction file count to avoid compaction just after splitting.
118    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
119      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
120    Map<String, StoreFileReader> scanner = Maps.newHashMap();
121    try {
122      admin.createTable(htd);
123      TEST_UTIL.loadTable(source, fam);
124      compactSplit.setCompactionsEnabled(false);
125      admin.split(tableName);
126      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getHBaseCluster().getRegions(tableName).size() == 2);
127
128      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
129      regions.stream()
130        .forEach(r -> r.getStores().get(0).getStorefiles().stream()
131          .filter(s -> s.isReference() && !scanner.containsKey(r.getRegionInfo().getEncodedName()))
132          .forEach(sf -> {
133            StoreFileReader reader = ((HStoreFile) sf).getReader();
134            reader.getStoreFileScanner(true, false, false, 0, 0, false);
135            scanner.put(r.getRegionInfo().getEncodedName(), reader);
136            LOG.info("Got reference to file = " + sf.getPath() + ",for region = " +
137              r.getRegionInfo().getEncodedName());
138          }));
139      assertTrue("Regions did not split properly", regions.size() > 1);
140      assertTrue("Could not get reference any of the store file", scanner.size() > 1);
141      compactSplit.setCompactionsEnabled(true);
142      for (HRegion region : regions) {
143        region.compact(true);
144      }
145
146      regions.stream()
147        .filter(region -> scanner.containsKey(region.getRegionInfo().getEncodedName()))
148        .forEach(r -> assertFalse("Contains an open file reference which can be split",
149          r.getStores().get(0).canSplit()));
150    } finally {
151      scanner.values().forEach(s -> {
152        try {
153          s.close(true);
154        } catch (IOException ioe) {
155          LOG.error("Failed while closing store file", ioe);
156        }
157      });
158      scanner.clear();
159      Closeables.close(source, true);
160      if (!compactSplit.isCompactionsEnabled()) {
161        compactSplit.setCompactionsEnabled(true);
162      }
163      TEST_UTIL.deleteTableIfAny(tableName);
164    }
165  }
166
167  /**
168   * Tests that the client sees meta table changes as atomic during splits
169   */
170  @Test
171  public void testFromClientSideWhileSplitting() throws Throwable {
172    LOG.info("Starting testFromClientSideWhileSplitting");
173    final TableName tableName = TableName.valueOf(name.getMethodName());
174    final byte[] FAMILY = Bytes.toBytes("family");
175
176    // SplitTransaction will update the meta table by offlining the parent region, and adding info
177    // for daughters.
178    Table table = TEST_UTIL.createTable(tableName, FAMILY);
179
180    Stoppable stopper = new StoppableImplementation();
181    RegionSplitter regionSplitter = new RegionSplitter(table);
182    RegionChecker regionChecker = new RegionChecker(CONF, stopper, tableName);
183    final ChoreService choreService = new ChoreService("TEST_SERVER");
184
185    choreService.scheduleChore(regionChecker);
186    regionSplitter.start();
187
188    // wait until the splitter is finished
189    regionSplitter.join();
190    stopper.stop(null);
191
192    if (regionChecker.ex != null) {
193      throw new AssertionError("regionChecker", regionChecker.ex);
194    }
195
196    if (regionSplitter.ex != null) {
197      throw new AssertionError("regionSplitter", regionSplitter.ex);
198    }
199
200    // one final check
201    regionChecker.verify();
202  }
203
204  static class RegionSplitter extends Thread {
205    final Connection connection;
206    Throwable ex;
207    Table table;
208    TableName tableName;
209    byte[] family;
210    Admin admin;
211    HRegionServer rs;
212
213    RegionSplitter(Table table) throws IOException {
214      this.table = table;
215      this.tableName = table.getName();
216      this.family = table.getDescriptor().getColumnFamilies()[0].getName();
217      admin = TEST_UTIL.getAdmin();
218      rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
219      connection = TEST_UTIL.getConnection();
220    }
221
222    @Override
223    public void run() {
224      try {
225        Random random = new Random();
226        for (int i = 0; i < 5; i++) {
227          List<RegionInfo> regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
228          if (regions.isEmpty()) {
229            continue;
230          }
231          int regionIndex = random.nextInt(regions.size());
232
233          // pick a random region and split it into two
234          RegionInfo region = Iterators.get(regions.iterator(), regionIndex);
235
236          // pick the mid split point
237          int start = 0, end = Integer.MAX_VALUE;
238          if (region.getStartKey().length > 0) {
239            start = Bytes.toInt(region.getStartKey());
240          }
241          if (region.getEndKey().length > 0) {
242            end = Bytes.toInt(region.getEndKey());
243          }
244          int mid = start + ((end - start) / 2);
245          byte[] splitPoint = Bytes.toBytes(mid);
246
247          // put some rows to the regions
248          addData(start);
249          addData(mid);
250
251          flushAndBlockUntilDone(admin, rs, region.getRegionName());
252          compactAndBlockUntilDone(admin, rs, region.getRegionName());
253
254          log("Initiating region split for:" + region.getRegionNameAsString());
255          try {
256            admin.splitRegion(region.getRegionName(), splitPoint);
257            // wait until the split is complete
258            blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
259
260          } catch (NotServingRegionException ex) {
261            // ignore
262          }
263        }
264      } catch (Throwable ex) {
265        this.ex = ex;
266      }
267    }
268
269    void addData(int start) throws IOException {
270      List<Put> puts = new ArrayList<>();
271      for (int i = start; i < start + 100; i++) {
272        Put put = new Put(Bytes.toBytes(i));
273        put.addColumn(family, family, Bytes.toBytes(i));
274        puts.add(put);
275      }
276      table.put(puts);
277    }
278  }
279
280  /**
281   * Checks regions using MetaTableAccessor and HTable methods
282   */
283  static class RegionChecker extends ScheduledChore {
284    Connection connection;
285    Configuration conf;
286    TableName tableName;
287    Throwable ex;
288
289    RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
290      super("RegionChecker", stopper, 100);
291      this.conf = conf;
292      this.tableName = tableName;
293
294      this.connection = ConnectionFactory.createConnection(conf);
295    }
296
297    /** verify region boundaries obtained from MetaScanner */
298    void verifyRegionsUsingMetaTableAccessor() throws Exception {
299      List<RegionInfo> regionList = MetaTableAccessor.getTableRegions(connection, tableName, true);
300      verifyTableRegions(regionList.stream()
301        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
302      regionList = MetaTableAccessor.getAllRegions(connection, true);
303      verifyTableRegions(regionList.stream()
304        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
305    }
306
307    /** verify region boundaries obtained from HTable.getStartEndKeys() */
308    void verifyRegionsUsingHTable() throws IOException {
309      Table table = null;
310      try {
311        // HTable.getStartEndKeys()
312        table = connection.getTable(tableName);
313
314        try (RegionLocator rl = connection.getRegionLocator(tableName)) {
315          Pair<byte[][], byte[][]> keys = rl.getStartEndKeys();
316          verifyStartEndKeys(keys);
317
318          Set<RegionInfo> regions = new TreeSet<>(RegionInfo.COMPARATOR);
319          for (HRegionLocation loc : rl.getAllRegionLocations()) {
320            regions.add(loc.getRegion());
321          }
322          verifyTableRegions(regions);
323        }
324
325      } finally {
326        IOUtils.closeQuietly(table);
327      }
328    }
329
330    void verify() throws Exception {
331      verifyRegionsUsingMetaTableAccessor();
332      verifyRegionsUsingHTable();
333    }
334
335    void verifyTableRegions(Set<RegionInfo> regions) {
336      log("Verifying " + regions.size() + " regions: " + regions);
337
338      byte[][] startKeys = new byte[regions.size()][];
339      byte[][] endKeys = new byte[regions.size()][];
340
341      int i = 0;
342      for (RegionInfo region : regions) {
343        startKeys[i] = region.getStartKey();
344        endKeys[i] = region.getEndKey();
345        i++;
346      }
347
348      Pair<byte[][], byte[][]> keys = new Pair<>(startKeys, endKeys);
349      verifyStartEndKeys(keys);
350    }
351
352    void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
353      byte[][] startKeys = keys.getFirst();
354      byte[][] endKeys = keys.getSecond();
355      assertEquals(startKeys.length, endKeys.length);
356      assertTrue("Found 0 regions for the table", startKeys.length > 0);
357
358      assertArrayEquals("Start key for the first region is not byte[0]", HConstants.EMPTY_START_ROW,
359        startKeys[0]);
360      byte[] prevEndKey = HConstants.EMPTY_START_ROW;
361
362      // ensure that we do not have any gaps
363      for (int i = 0; i < startKeys.length; i++) {
364        assertArrayEquals(
365          "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey) +
366            " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]),
367          prevEndKey, startKeys[i]);
368        prevEndKey = endKeys[i];
369      }
370      assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
371        endKeys[endKeys.length - 1]);
372    }
373
374    @Override
375    protected void chore() {
376      try {
377        verify();
378      } catch (Throwable ex) {
379        this.ex = ex;
380        getStopper().stop("caught exception");
381      }
382    }
383  }
384
385  public static void log(String msg) {
386    LOG.info(msg);
387  }
388
389  /* some utility methods for split tests */
390
391  public static void flushAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
392      throws IOException, InterruptedException {
393    log("flushing region: " + Bytes.toStringBinary(regionName));
394    admin.flushRegion(regionName);
395    log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
396    Threads.sleepWithoutInterrupt(500);
397    while (rs.getOnlineRegion(regionName).getMemStoreDataSize() > 0) {
398      Threads.sleep(50);
399    }
400  }
401
402  public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
403      throws IOException, InterruptedException {
404    log("Compacting region: " + Bytes.toStringBinary(regionName));
405    // Wait till its online before we do compact else it comes back with NoServerForRegionException
406    try {
407      TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
408        @Override public boolean evaluate() throws Exception {
409          return rs.getServerName().equals(MetaTableAccessor.
410            getRegionLocation(admin.getConnection(), regionName).getServerName());
411        }
412      });
413    } catch (Exception e) {
414      throw new IOException(e);
415    }
416    admin.majorCompactRegion(regionName);
417    log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
418    Threads.sleepWithoutInterrupt(500);
419    outer: for (;;) {
420      for (Store store : rs.getOnlineRegion(regionName).getStores()) {
421        if (store.getStorefilesCount() > 1) {
422          Threads.sleep(50);
423          continue outer;
424        }
425      }
426      break;
427    }
428  }
429
430  /**
431   * Blocks until the region split is complete in hbase:meta and region server opens the daughters
432   */
433  public static void blockUntilRegionSplit(Configuration conf, long timeout,
434      final byte[] regionName, boolean waitForDaughters) throws IOException, InterruptedException {
435    long start = System.currentTimeMillis();
436    log("blocking until region is split:" + Bytes.toStringBinary(regionName));
437    RegionInfo daughterA = null, daughterB = null;
438    try (Connection conn = ConnectionFactory.createConnection(conf);
439        Table metaTable = conn.getTable(TableName.META_TABLE_NAME)) {
440      Result result = null;
441      RegionInfo region = null;
442      while ((System.currentTimeMillis() - start) < timeout) {
443        result = metaTable.get(new Get(regionName));
444        if (result == null) {
445          break;
446        }
447
448        region = MetaTableAccessor.getRegionInfo(result);
449        if (region.isSplitParent()) {
450          log("found parent region: " + region.toString());
451          PairOfSameType<RegionInfo> pair = MetaTableAccessor.getDaughterRegions(result);
452          daughterA = pair.getFirst();
453          daughterB = pair.getSecond();
454          break;
455        }
456        Threads.sleep(100);
457      }
458      if (daughterA == null || daughterB == null) {
459        throw new IOException("Failed to get daughters, daughterA=" + daughterA + ", daughterB=" +
460          daughterB + ", timeout=" + timeout + ", result=" + result + ", regionName=" +
461          Bytes.toString(regionName) + ", region=" + region);
462      }
463
464      // if we are here, this means the region split is complete or timed out
465      if (waitForDaughters) {
466        long rem = timeout - (System.currentTimeMillis() - start);
467        blockUntilRegionIsInMeta(conn, rem, daughterA);
468
469        rem = timeout - (System.currentTimeMillis() - start);
470        blockUntilRegionIsInMeta(conn, rem, daughterB);
471
472        rem = timeout - (System.currentTimeMillis() - start);
473        blockUntilRegionIsOpened(conf, rem, daughterA);
474
475        rem = timeout - (System.currentTimeMillis() - start);
476        blockUntilRegionIsOpened(conf, rem, daughterB);
477
478        // Compacting the new region to make sure references can be cleaned up
479        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
480          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterA.getRegionName());
481        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
482          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterB.getRegionName());
483
484        removeCompactedFiles(conn, timeout, daughterA);
485        removeCompactedFiles(conn, timeout, daughterB);
486      }
487    }
488  }
489
490  public static void removeCompactedFiles(Connection conn, long timeout, RegionInfo hri)
491      throws IOException, InterruptedException {
492    log("remove compacted files for : " + hri.getRegionNameAsString());
493    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(hri.getTable());
494    regions.stream().forEach(r -> {
495      try {
496        r.getStores().get(0).closeAndArchiveCompactedFiles();
497      } catch (IOException ioe) {
498        LOG.error("failed in removing compacted file", ioe);
499      }
500    });
501  }
502
503  public static void blockUntilRegionIsInMeta(Connection conn, long timeout, RegionInfo hri)
504      throws IOException, InterruptedException {
505    log("blocking until region is in META: " + hri.getRegionNameAsString());
506    long start = System.currentTimeMillis();
507    while (System.currentTimeMillis() - start < timeout) {
508      HRegionLocation loc = MetaTableAccessor.getRegionLocation(conn, hri);
509      if (loc != null && !loc.getRegion().isOffline()) {
510        log("found region in META: " + hri.getRegionNameAsString());
511        break;
512      }
513      Threads.sleep(100);
514    }
515  }
516
517  public static void blockUntilRegionIsOpened(Configuration conf, long timeout, RegionInfo hri)
518      throws IOException, InterruptedException {
519    log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
520    long start = System.currentTimeMillis();
521    try (Connection conn = ConnectionFactory.createConnection(conf);
522        Table table = conn.getTable(hri.getTable())) {
523      byte[] row = hri.getStartKey();
524      // Check for null/empty row. If we find one, use a key that is likely to be in first region.
525      if (row == null || row.length <= 0) {
526        row = new byte[] { '0' };
527      }
528      Get get = new Get(row);
529      while (System.currentTimeMillis() - start < timeout) {
530        try {
531          table.get(get);
532          break;
533        } catch (IOException ex) {
534          // wait some more
535        }
536        Threads.sleep(100);
537      }
538    }
539  }
540}