001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.Random;
030import java.util.Set;
031import java.util.TreeSet;
032import java.util.stream.Collectors;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.CatalogFamilyFormat;
035import org.apache.hadoop.hbase.ChoreService;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.MetaTableAccessor;
041import org.apache.hadoop.hbase.NotServingRegionException;
042import org.apache.hadoop.hbase.ScheduledChore;
043import org.apache.hadoop.hbase.Stoppable;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.Waiter;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionLocator;
054import org.apache.hadoop.hbase.client.Result;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.PairOfSameType;
063import org.apache.hadoop.hbase.util.StoppableImplementation;
064import org.apache.hadoop.hbase.util.Threads;
065import org.junit.AfterClass;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.TestName;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
076import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
077import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
078
079@Category(LargeTests.class)
080public class TestEndToEndSplitTransaction {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084    HBaseClassTestRule.forClass(TestEndToEndSplitTransaction.class);
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndSplitTransaction.class);
087  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
088  private static final Configuration CONF = TEST_UTIL.getConfiguration();
089
090  @Rule
091  public TestName name = new TestName();
092
093  @BeforeClass
094  public static void beforeAllTests() throws Exception {
095    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
096    TEST_UTIL.startMiniCluster(1);
097  }
098
099  @AfterClass
100  public static void afterAllTests() throws Exception {
101    TEST_UTIL.shutdownMiniCluster();
102  }
103
104  /**
105   * This is the test for : HBASE-20940 This test will split the region and try to open an reference
106   * over store file. Once store file has any reference, it makes sure that region can't be split
107   */
108  @Test
109  public void testCanSplitJustAfterASplit() throws Exception {
110    LOG.info("Starting testCanSplitJustAfterASplit");
111    byte[] fam = Bytes.toBytes("cf_split");
112
113    CompactSplit compactSplit =
114      TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getCompactSplitThread();
115    TableName tableName = TableName.valueOf("CanSplitTable");
116    Table source = TEST_UTIL.getConnection().getTable(tableName);
117    Admin admin = TEST_UTIL.getAdmin();
118    // set a large min compaction file count to avoid compaction just after splitting.
119    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
120      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
121    Map<String, StoreFileReader> scanner = Maps.newHashMap();
122    try {
123      admin.createTable(htd);
124      TEST_UTIL.loadTable(source, fam);
125      compactSplit.setCompactionsEnabled(false);
126      admin.split(tableName);
127      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getHBaseCluster().getRegions(tableName).size() == 2);
128
129      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
130      regions.stream()
131        .forEach(r -> r.getStores().get(0).getStorefiles().stream()
132          .filter(s -> s.isReference() && !scanner.containsKey(r.getRegionInfo().getEncodedName()))
133          .forEach(sf -> {
134            StoreFileReader reader = ((HStoreFile) sf).getReader();
135            reader.getStoreFileScanner(true, false, false, 0, 0, false);
136            scanner.put(r.getRegionInfo().getEncodedName(), reader);
137            LOG.info("Got reference to file = " + sf.getPath() + ",for region = " +
138              r.getRegionInfo().getEncodedName());
139          }));
140      assertTrue("Regions did not split properly", regions.size() > 1);
141      assertTrue("Could not get reference any of the store file", scanner.size() > 1);
142      compactSplit.setCompactionsEnabled(true);
143      for (HRegion region : regions) {
144        region.compact(true);
145      }
146
147      regions.stream()
148        .filter(region -> scanner.containsKey(region.getRegionInfo().getEncodedName()))
149        .forEach(r -> assertFalse("Contains an open file reference which can be split",
150          r.getStores().get(0).canSplit()));
151    } finally {
152      scanner.values().forEach(s -> {
153        try {
154          s.close(true);
155        } catch (IOException ioe) {
156          LOG.error("Failed while closing store file", ioe);
157        }
158      });
159      scanner.clear();
160      Closeables.close(source, true);
161      if (!compactSplit.isCompactionsEnabled()) {
162        compactSplit.setCompactionsEnabled(true);
163      }
164      TEST_UTIL.deleteTableIfAny(tableName);
165    }
166  }
167
168  /**
169   * Tests that the client sees meta table changes as atomic during splits
170   */
171  @Test
172  public void testFromClientSideWhileSplitting() throws Throwable {
173    LOG.info("Starting testFromClientSideWhileSplitting");
174    final TableName tableName = TableName.valueOf(name.getMethodName());
175    final byte[] FAMILY = Bytes.toBytes("family");
176
177    // SplitTransaction will update the meta table by offlining the parent region, and adding info
178    // for daughters.
179    Table table = TEST_UTIL.createTable(tableName, FAMILY);
180
181    Stoppable stopper = new StoppableImplementation();
182    RegionSplitter regionSplitter = new RegionSplitter(table);
183    RegionChecker regionChecker = new RegionChecker(CONF, stopper, tableName);
184    final ChoreService choreService = new ChoreService("TEST_SERVER");
185
186    choreService.scheduleChore(regionChecker);
187    regionSplitter.start();
188
189    // wait until the splitter is finished
190    regionSplitter.join();
191    stopper.stop(null);
192
193    if (regionChecker.ex != null) {
194      throw new AssertionError("regionChecker", regionChecker.ex);
195    }
196
197    if (regionSplitter.ex != null) {
198      throw new AssertionError("regionSplitter", regionSplitter.ex);
199    }
200
201    // one final check
202    regionChecker.verify();
203  }
204
205  static class RegionSplitter extends Thread {
206    final Connection connection;
207    Throwable ex;
208    Table table;
209    TableName tableName;
210    byte[] family;
211    Admin admin;
212    HRegionServer rs;
213
214    RegionSplitter(Table table) throws IOException {
215      this.table = table;
216      this.tableName = table.getName();
217      this.family = table.getDescriptor().getColumnFamilies()[0].getName();
218      admin = TEST_UTIL.getAdmin();
219      rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
220      connection = TEST_UTIL.getConnection();
221    }
222
223    @Override
224    public void run() {
225      try {
226        Random random = new Random();
227        for (int i = 0; i < 5; i++) {
228          List<RegionInfo> regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
229          if (regions.isEmpty()) {
230            continue;
231          }
232          int regionIndex = random.nextInt(regions.size());
233
234          // pick a random region and split it into two
235          RegionInfo region = Iterators.get(regions.iterator(), regionIndex);
236
237          // pick the mid split point
238          int start = 0, end = Integer.MAX_VALUE;
239          if (region.getStartKey().length > 0) {
240            start = Bytes.toInt(region.getStartKey());
241          }
242          if (region.getEndKey().length > 0) {
243            end = Bytes.toInt(region.getEndKey());
244          }
245          int mid = start + ((end - start) / 2);
246          byte[] splitPoint = Bytes.toBytes(mid);
247
248          // put some rows to the regions
249          addData(start);
250          addData(mid);
251
252          flushAndBlockUntilDone(admin, rs, region.getRegionName());
253          compactAndBlockUntilDone(admin, rs, region.getRegionName());
254
255          log("Initiating region split for:" + region.getRegionNameAsString());
256          try {
257            admin.splitRegionAsync(region.getRegionName(), splitPoint).get();
258            // wait until the split is complete
259            blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
260          } catch (NotServingRegionException ex) {
261            // ignore
262          }
263        }
264      } catch (Throwable ex) {
265        this.ex = ex;
266      }
267    }
268
269    void addData(int start) throws IOException {
270      List<Put> puts = new ArrayList<>();
271      for (int i = start; i < start + 100; i++) {
272        Put put = new Put(Bytes.toBytes(i));
273        put.addColumn(family, family, Bytes.toBytes(i));
274        puts.add(put);
275      }
276      table.put(puts);
277    }
278  }
279
280  /**
281   * Checks regions using MetaTableAccessor and HTable methods
282   */
283  static class RegionChecker extends ScheduledChore {
284    Connection connection;
285    Configuration conf;
286    TableName tableName;
287    Throwable ex;
288
289    RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
290      super("RegionChecker", stopper, 100);
291      this.conf = conf;
292      this.tableName = tableName;
293
294      this.connection = ConnectionFactory.createConnection(conf);
295    }
296
297    /** verify region boundaries obtained from MetaScanner */
298    void verifyRegionsUsingMetaTableAccessor() throws Exception {
299      List<RegionInfo> regionList = MetaTableAccessor.getTableRegions(connection, tableName, true);
300      verifyTableRegions(regionList.stream()
301        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
302      regionList = MetaTableAccessor.getAllRegions(connection, true);
303      verifyTableRegions(regionList.stream()
304        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
305    }
306
307    /** verify region boundaries obtained from HTable.getStartEndKeys() */
308    void verifyRegionsUsingHTable() throws IOException {
309      try (RegionLocator rl = connection.getRegionLocator(tableName)) {
310        Pair<byte[][], byte[][]> keys = rl.getStartEndKeys();
311        verifyStartEndKeys(keys);
312
313        Set<RegionInfo> regions = new TreeSet<>(RegionInfo.COMPARATOR);
314        for (HRegionLocation loc : rl.getAllRegionLocations()) {
315          regions.add(loc.getRegion());
316        }
317        verifyTableRegions(regions);
318      }
319    }
320
321    void verify() throws Exception {
322      verifyRegionsUsingMetaTableAccessor();
323      verifyRegionsUsingHTable();
324    }
325
326    void verifyTableRegions(Set<RegionInfo> regions) {
327      log("Verifying " + regions.size() + " regions: " + regions);
328
329      byte[][] startKeys = new byte[regions.size()][];
330      byte[][] endKeys = new byte[regions.size()][];
331
332      int i = 0;
333      for (RegionInfo region : regions) {
334        startKeys[i] = region.getStartKey();
335        endKeys[i] = region.getEndKey();
336        i++;
337      }
338
339      Pair<byte[][], byte[][]> keys = new Pair<>(startKeys, endKeys);
340      verifyStartEndKeys(keys);
341    }
342
343    void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
344      byte[][] startKeys = keys.getFirst();
345      byte[][] endKeys = keys.getSecond();
346      assertEquals(startKeys.length, endKeys.length);
347      assertTrue("Found 0 regions for the table", startKeys.length > 0);
348
349      assertArrayEquals("Start key for the first region is not byte[0]", HConstants.EMPTY_START_ROW,
350        startKeys[0]);
351      byte[] prevEndKey = HConstants.EMPTY_START_ROW;
352
353      // ensure that we do not have any gaps
354      for (int i = 0; i < startKeys.length; i++) {
355        assertArrayEquals(
356          "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey) +
357            " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]),
358          prevEndKey, startKeys[i]);
359        prevEndKey = endKeys[i];
360      }
361      assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
362        endKeys[endKeys.length - 1]);
363    }
364
365    @Override
366    protected void chore() {
367      try {
368        verify();
369      } catch (Throwable ex) {
370        this.ex = ex;
371        getStopper().stop("caught exception");
372      }
373    }
374  }
375
376  public static void log(String msg) {
377    LOG.info(msg);
378  }
379
380  /* some utility methods for split tests */
381
382  public static void flushAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
383      throws IOException, InterruptedException {
384    log("flushing region: " + Bytes.toStringBinary(regionName));
385    admin.flushRegion(regionName);
386    log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
387    Threads.sleepWithoutInterrupt(500);
388    while (rs.getOnlineRegion(regionName).getMemStoreDataSize() > 0) {
389      Threads.sleep(50);
390    }
391  }
392
393  public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
394      throws IOException, InterruptedException {
395    log("Compacting region: " + Bytes.toStringBinary(regionName));
396    // Wait till its online before we do compact else it comes back with NoServerForRegionException
397    try {
398      TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
399        @Override public boolean evaluate() throws Exception {
400          return rs.getServerName().equals(MetaTableAccessor.
401            getRegionLocation(admin.getConnection(), regionName).getServerName());
402        }
403      });
404    } catch (Exception e) {
405      throw new IOException(e);
406    }
407    admin.majorCompactRegion(regionName);
408    log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
409    Threads.sleepWithoutInterrupt(500);
410    outer: for (;;) {
411      for (Store store : rs.getOnlineRegion(regionName).getStores()) {
412        if (store.getStorefilesCount() > 1) {
413          Threads.sleep(50);
414          continue outer;
415        }
416      }
417      break;
418    }
419  }
420
421  /**
422   * Blocks until the region split is complete in hbase:meta and region server opens the daughters
423   */
424  public static void blockUntilRegionSplit(Configuration conf, long timeout,
425      final byte[] regionName, boolean waitForDaughters) throws IOException, InterruptedException {
426    long start = EnvironmentEdgeManager.currentTime();
427    log("blocking until region is split:" + Bytes.toStringBinary(regionName));
428    RegionInfo daughterA = null, daughterB = null;
429    try (Connection conn = ConnectionFactory.createConnection(conf);
430        Table metaTable = conn.getTable(TableName.META_TABLE_NAME)) {
431      Result result = null;
432      RegionInfo region = null;
433      while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
434        result = metaTable.get(new Get(regionName));
435        if (result == null) {
436          break;
437        }
438
439        region = CatalogFamilyFormat.getRegionInfo(result);
440        if (region.isSplitParent()) {
441          log("found parent region: " + region.toString());
442          PairOfSameType<RegionInfo> pair = MetaTableAccessor.getDaughterRegions(result);
443          daughterA = pair.getFirst();
444          daughterB = pair.getSecond();
445          break;
446        }
447        Threads.sleep(100);
448      }
449      if (daughterA == null || daughterB == null) {
450        throw new IOException("Failed to get daughters, daughterA=" + daughterA + ", daughterB=" +
451          daughterB + ", timeout=" + timeout + ", result=" + result + ", regionName=" +
452          Bytes.toString(regionName) + ", region=" + region);
453      }
454
455      // if we are here, this means the region split is complete or timed out
456      if (waitForDaughters) {
457        long rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
458        blockUntilRegionIsInMeta(conn, rem, daughterA);
459
460        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
461        blockUntilRegionIsInMeta(conn, rem, daughterB);
462
463        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
464        blockUntilRegionIsOpened(conf, rem, daughterA);
465
466        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
467        blockUntilRegionIsOpened(conf, rem, daughterB);
468
469        // Compacting the new region to make sure references can be cleaned up
470        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
471          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterA.getRegionName());
472        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
473          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterB.getRegionName());
474
475        removeCompactedFiles(conn, timeout, daughterA);
476        removeCompactedFiles(conn, timeout, daughterB);
477      }
478    }
479  }
480
481  public static void removeCompactedFiles(Connection conn, long timeout, RegionInfo hri)
482      throws IOException, InterruptedException {
483    log("remove compacted files for : " + hri.getRegionNameAsString());
484    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(hri.getTable());
485    regions.stream().forEach(r -> {
486      try {
487        r.getStores().get(0).closeAndArchiveCompactedFiles();
488      } catch (IOException ioe) {
489        LOG.error("failed in removing compacted file", ioe);
490      }
491    });
492  }
493
494  public static void blockUntilRegionIsInMeta(Connection conn, long timeout, RegionInfo hri)
495      throws IOException, InterruptedException {
496    log("blocking until region is in META: " + hri.getRegionNameAsString());
497    long start = EnvironmentEdgeManager.currentTime();
498    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
499      HRegionLocation loc = MetaTableAccessor.getRegionLocation(conn, hri);
500      if (loc != null && !loc.getRegion().isOffline()) {
501        log("found region in META: " + hri.getRegionNameAsString());
502        break;
503      }
504      Threads.sleep(100);
505    }
506  }
507
508  public static void blockUntilRegionIsOpened(Configuration conf, long timeout, RegionInfo hri)
509      throws IOException, InterruptedException {
510    log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
511    long start = EnvironmentEdgeManager.currentTime();
512    try (Connection conn = ConnectionFactory.createConnection(conf);
513        Table table = conn.getTable(hri.getTable())) {
514      byte[] row = hri.getStartKey();
515      // Check for null/empty row. If we find one, use a key that is likely to be in first region.
516      if (row == null || row.length <= 0) {
517        row = new byte[] { '0' };
518      }
519      Get get = new Get(row);
520      while (EnvironmentEdgeManager.currentTime() - start < timeout) {
521        try {
522          table.get(get);
523          break;
524        } catch (IOException ex) {
525          // wait some more
526        }
527        Threads.sleep(100);
528      }
529    }
530  }
531}