001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.Random;
030import java.util.Set;
031import java.util.TreeSet;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.stream.Collectors;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ChoreService;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.MetaTableAccessor;
041import org.apache.hadoop.hbase.NotServingRegionException;
042import org.apache.hadoop.hbase.ScheduledChore;
043import org.apache.hadoop.hbase.Stoppable;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.Waiter;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionLocator;
054import org.apache.hadoop.hbase.client.Result;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.PairOfSameType;
063import org.apache.hadoop.hbase.util.StoppableImplementation;
064import org.apache.hadoop.hbase.util.Threads;
065import org.junit.AfterClass;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.TestName;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
076import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
077import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
078
079@Category(LargeTests.class)
080public class TestEndToEndSplitTransaction {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084    HBaseClassTestRule.forClass(TestEndToEndSplitTransaction.class);
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndSplitTransaction.class);
087  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
088  private static final Configuration CONF = TEST_UTIL.getConfiguration();
089
090  @Rule
091  public TestName name = new TestName();
092
093  @BeforeClass
094  public static void beforeAllTests() throws Exception {
095    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
096    TEST_UTIL.startMiniCluster(1);
097  }
098
099  @AfterClass
100  public static void afterAllTests() throws Exception {
101    TEST_UTIL.shutdownMiniCluster();
102  }
103
104  /**
105   * This is the test for : HBASE-20940 This test will split the region and try to open an reference
106   * over store file. Once store file has any reference, it makes sure that region can't be split
107   */
108  @Test
109  public void testCanSplitJustAfterASplit() throws Exception {
110    LOG.info("Starting testCanSplitJustAfterASplit");
111    byte[] fam = Bytes.toBytes("cf_split");
112
113    CompactSplit compactSplit =
114      TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getCompactSplitThread();
115    TableName tableName = TableName.valueOf("CanSplitTable");
116    Table source = TEST_UTIL.getConnection().getTable(tableName);
117    Admin admin = TEST_UTIL.getAdmin();
118    // set a large min compaction file count to avoid compaction just after splitting.
119    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
120      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
121    Map<String, StoreFileReader> scanner = Maps.newHashMap();
122    try {
123      admin.createTable(htd);
124      TEST_UTIL.loadTable(source, fam);
125      compactSplit.setCompactionsEnabled(false);
126      admin.split(tableName);
127      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getHBaseCluster().getRegions(tableName).size() == 2);
128
129      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
130      regions.stream()
131        .forEach(r -> r.getStores().get(0).getStorefiles().stream()
132          .filter(s -> s.isReference() && !scanner.containsKey(r.getRegionInfo().getEncodedName()))
133          .forEach(sf -> {
134            StoreFileReader reader = ((HStoreFile) sf).getReader();
135            reader.getStoreFileScanner(true, false, false, 0, 0, false);
136            scanner.put(r.getRegionInfo().getEncodedName(), reader);
137            LOG.info("Got reference to file = " + sf.getPath() + ",for region = "
138              + r.getRegionInfo().getEncodedName());
139          }));
140      assertTrue("Regions did not split properly", regions.size() > 1);
141      assertTrue("Could not get reference any of the store file", scanner.size() > 1);
142      compactSplit.setCompactionsEnabled(true);
143      for (HRegion region : regions) {
144        region.compact(true);
145      }
146
147      regions.stream()
148        .filter(region -> scanner.containsKey(region.getRegionInfo().getEncodedName()))
149        .forEach(r -> assertFalse("Contains an open file reference which can be split",
150          r.getStores().get(0).canSplit()));
151    } finally {
152      scanner.values().forEach(s -> {
153        try {
154          s.close(true);
155        } catch (IOException ioe) {
156          LOG.error("Failed while closing store file", ioe);
157        }
158      });
159      scanner.clear();
160      Closeables.close(source, true);
161      if (!compactSplit.isCompactionsEnabled()) {
162        compactSplit.setCompactionsEnabled(true);
163      }
164      TEST_UTIL.deleteTableIfAny(tableName);
165    }
166  }
167
168  /**
169   * Tests that the client sees meta table changes as atomic during splits
170   */
171  @Test
172  public void testFromClientSideWhileSplitting() throws Throwable {
173    LOG.info("Starting testFromClientSideWhileSplitting");
174    final TableName tableName = TableName.valueOf(name.getMethodName());
175    final byte[] FAMILY = Bytes.toBytes("family");
176
177    // SplitTransaction will update the meta table by offlining the parent region, and adding info
178    // for daughters.
179    Table table = TEST_UTIL.createTable(tableName, FAMILY);
180
181    Stoppable stopper = new StoppableImplementation();
182    RegionSplitter regionSplitter = new RegionSplitter(table);
183    RegionChecker regionChecker = new RegionChecker(CONF, stopper, tableName);
184    final ChoreService choreService = new ChoreService("TEST_SERVER");
185
186    choreService.scheduleChore(regionChecker);
187    regionSplitter.start();
188
189    // wait until the splitter is finished
190    regionSplitter.join();
191    stopper.stop(null);
192
193    if (regionChecker.ex != null) {
194      throw new AssertionError("regionChecker", regionChecker.ex);
195    }
196
197    if (regionSplitter.ex != null) {
198      throw new AssertionError("regionSplitter", regionSplitter.ex);
199    }
200
201    // one final check
202    regionChecker.verify();
203  }
204
205  static class RegionSplitter extends Thread {
206    final Connection connection;
207    Throwable ex;
208    Table table;
209    TableName tableName;
210    byte[] family;
211    Admin admin;
212    HRegionServer rs;
213
214    RegionSplitter(Table table) throws IOException {
215      this.table = table;
216      this.tableName = table.getName();
217      this.family = table.getDescriptor().getColumnFamilies()[0].getName();
218      admin = TEST_UTIL.getAdmin();
219      rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
220      connection = TEST_UTIL.getConnection();
221    }
222
223    @Override
224    public void run() {
225      try {
226        Random random = ThreadLocalRandom.current();
227        for (int i = 0; i < 5; i++) {
228          List<RegionInfo> regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
229          if (regions.isEmpty()) {
230            continue;
231          }
232          int regionIndex = random.nextInt(regions.size());
233
234          // pick a random region and split it into two
235          RegionInfo region = Iterators.get(regions.iterator(), regionIndex);
236
237          // pick the mid split point
238          int start = 0, end = Integer.MAX_VALUE;
239          if (region.getStartKey().length > 0) {
240            start = Bytes.toInt(region.getStartKey());
241          }
242          if (region.getEndKey().length > 0) {
243            end = Bytes.toInt(region.getEndKey());
244          }
245          int mid = start + ((end - start) / 2);
246          byte[] splitPoint = Bytes.toBytes(mid);
247
248          // put some rows to the regions
249          addData(start);
250          addData(mid);
251
252          flushAndBlockUntilDone(admin, rs, region.getRegionName());
253          compactAndBlockUntilDone(admin, rs, region.getRegionName());
254
255          log("Initiating region split for:" + region.getRegionNameAsString());
256          try {
257            admin.splitRegion(region.getRegionName(), splitPoint);
258            // wait until the split is complete
259            blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
260
261          } catch (NotServingRegionException ex) {
262            // ignore
263          }
264        }
265      } catch (Throwable ex) {
266        this.ex = ex;
267      }
268    }
269
270    void addData(int start) throws IOException {
271      List<Put> puts = new ArrayList<>();
272      for (int i = start; i < start + 100; i++) {
273        Put put = new Put(Bytes.toBytes(i));
274        put.addColumn(family, family, Bytes.toBytes(i));
275        puts.add(put);
276      }
277      table.put(puts);
278    }
279  }
280
281  /**
282   * Checks regions using MetaTableAccessor and HTable methods
283   */
284  static class RegionChecker extends ScheduledChore {
285    Connection connection;
286    Configuration conf;
287    TableName tableName;
288    Throwable ex;
289
290    RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
291      super("RegionChecker", stopper, 100);
292      this.conf = conf;
293      this.tableName = tableName;
294
295      this.connection = ConnectionFactory.createConnection(conf);
296    }
297
298    /** verify region boundaries obtained from MetaScanner */
299    void verifyRegionsUsingMetaTableAccessor() throws Exception {
300      List<RegionInfo> regionList = MetaTableAccessor.getTableRegions(connection, tableName, true);
301      verifyTableRegions(regionList.stream()
302        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
303      regionList = MetaTableAccessor.getAllRegions(connection, true);
304      verifyTableRegions(regionList.stream()
305        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
306    }
307
308    /** verify region boundaries obtained from HTable.getStartEndKeys() */
309    void verifyRegionsUsingHTable() throws IOException {
310      try (RegionLocator rl = connection.getRegionLocator(tableName)) {
311        Pair<byte[][], byte[][]> keys = rl.getStartEndKeys();
312        verifyStartEndKeys(keys);
313
314        Set<RegionInfo> regions = new TreeSet<>(RegionInfo.COMPARATOR);
315        for (HRegionLocation loc : rl.getAllRegionLocations()) {
316          regions.add(loc.getRegion());
317        }
318        verifyTableRegions(regions);
319      }
320    }
321
322    void verify() throws Exception {
323      verifyRegionsUsingMetaTableAccessor();
324      verifyRegionsUsingHTable();
325    }
326
327    void verifyTableRegions(Set<RegionInfo> regions) {
328      log("Verifying " + regions.size() + " regions: " + regions);
329
330      byte[][] startKeys = new byte[regions.size()][];
331      byte[][] endKeys = new byte[regions.size()][];
332
333      int i = 0;
334      for (RegionInfo region : regions) {
335        startKeys[i] = region.getStartKey();
336        endKeys[i] = region.getEndKey();
337        i++;
338      }
339
340      Pair<byte[][], byte[][]> keys = new Pair<>(startKeys, endKeys);
341      verifyStartEndKeys(keys);
342    }
343
344    void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
345      byte[][] startKeys = keys.getFirst();
346      byte[][] endKeys = keys.getSecond();
347      assertEquals(startKeys.length, endKeys.length);
348      assertTrue("Found 0 regions for the table", startKeys.length > 0);
349
350      assertArrayEquals("Start key for the first region is not byte[0]", HConstants.EMPTY_START_ROW,
351        startKeys[0]);
352      byte[] prevEndKey = HConstants.EMPTY_START_ROW;
353
354      // ensure that we do not have any gaps
355      for (int i = 0; i < startKeys.length; i++) {
356        assertArrayEquals(
357          "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey)
358            + " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]),
359          prevEndKey, startKeys[i]);
360        prevEndKey = endKeys[i];
361      }
362      assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
363        endKeys[endKeys.length - 1]);
364    }
365
366    @Override
367    protected void chore() {
368      try {
369        verify();
370      } catch (Throwable ex) {
371        this.ex = ex;
372        getStopper().stop("caught exception");
373      }
374    }
375  }
376
377  public static void log(String msg) {
378    LOG.info(msg);
379  }
380
381  /* some utility methods for split tests */
382
383  public static void flushAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
384    throws IOException, InterruptedException {
385    log("flushing region: " + Bytes.toStringBinary(regionName));
386    admin.flushRegion(regionName);
387    log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
388    Threads.sleepWithoutInterrupt(500);
389    while (rs.getOnlineRegion(regionName).getMemStoreDataSize() > 0) {
390      Threads.sleep(50);
391    }
392  }
393
394  public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
395    throws IOException, InterruptedException {
396    log("Compacting region: " + Bytes.toStringBinary(regionName));
397    // Wait till its online before we do compact else it comes back with NoServerForRegionException
398    try {
399      TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
400        @Override
401        public boolean evaluate() throws Exception {
402          return rs.getServerName().equals(
403            MetaTableAccessor.getRegionLocation(admin.getConnection(), regionName).getServerName());
404        }
405      });
406    } catch (Exception e) {
407      throw new IOException(e);
408    }
409    admin.majorCompactRegion(regionName);
410    log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
411    Threads.sleepWithoutInterrupt(500);
412    outer: for (;;) {
413      for (Store store : rs.getOnlineRegion(regionName).getStores()) {
414        if (store.getStorefilesCount() > 1) {
415          Threads.sleep(50);
416          continue outer;
417        }
418      }
419      break;
420    }
421  }
422
423  /**
424   * Blocks until the region split is complete in hbase:meta and region server opens the daughters
425   */
426  public static void blockUntilRegionSplit(Configuration conf, long timeout,
427    final byte[] regionName, boolean waitForDaughters) throws IOException, InterruptedException {
428    long start = EnvironmentEdgeManager.currentTime();
429    log("blocking until region is split:" + Bytes.toStringBinary(regionName));
430    RegionInfo daughterA = null, daughterB = null;
431    try (Connection conn = ConnectionFactory.createConnection(conf);
432      Table metaTable = conn.getTable(TableName.META_TABLE_NAME)) {
433      Result result = null;
434      RegionInfo region = null;
435      while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
436        result = metaTable.get(new Get(regionName));
437        if (result == null) {
438          break;
439        }
440
441        region = MetaTableAccessor.getRegionInfo(result);
442        if (region.isSplitParent()) {
443          log("found parent region: " + region.toString());
444          PairOfSameType<RegionInfo> pair = MetaTableAccessor.getDaughterRegions(result);
445          daughterA = pair.getFirst();
446          daughterB = pair.getSecond();
447          break;
448        }
449        Threads.sleep(100);
450      }
451      if (daughterA == null || daughterB == null) {
452        throw new IOException("Failed to get daughters, daughterA=" + daughterA + ", daughterB="
453          + daughterB + ", timeout=" + timeout + ", result=" + result + ", regionName="
454          + Bytes.toString(regionName) + ", region=" + region);
455      }
456
457      // if we are here, this means the region split is complete or timed out
458      if (waitForDaughters) {
459        long rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
460        blockUntilRegionIsInMeta(conn, rem, daughterA);
461
462        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
463        blockUntilRegionIsInMeta(conn, rem, daughterB);
464
465        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
466        blockUntilRegionIsOpened(conf, rem, daughterA);
467
468        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
469        blockUntilRegionIsOpened(conf, rem, daughterB);
470
471        // Compacting the new region to make sure references can be cleaned up
472        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
473          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterA.getRegionName());
474        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
475          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterB.getRegionName());
476
477        removeCompactedFiles(conn, timeout, daughterA);
478        removeCompactedFiles(conn, timeout, daughterB);
479      }
480    }
481  }
482
483  public static void removeCompactedFiles(Connection conn, long timeout, RegionInfo hri)
484    throws IOException, InterruptedException {
485    log("remove compacted files for : " + hri.getRegionNameAsString());
486    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(hri.getTable());
487    regions.stream().forEach(r -> {
488      try {
489        r.getStores().get(0).closeAndArchiveCompactedFiles();
490      } catch (IOException ioe) {
491        LOG.error("failed in removing compacted file", ioe);
492      }
493    });
494  }
495
496  public static void blockUntilRegionIsInMeta(Connection conn, long timeout, RegionInfo hri)
497    throws IOException, InterruptedException {
498    log("blocking until region is in META: " + hri.getRegionNameAsString());
499    long start = EnvironmentEdgeManager.currentTime();
500    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
501      HRegionLocation loc = MetaTableAccessor.getRegionLocation(conn, hri);
502      if (loc != null && !loc.getRegion().isOffline()) {
503        log("found region in META: " + hri.getRegionNameAsString());
504        break;
505      }
506      Threads.sleep(100);
507    }
508  }
509
510  public static void blockUntilRegionIsOpened(Configuration conf, long timeout, RegionInfo hri)
511    throws IOException, InterruptedException {
512    log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
513    long start = EnvironmentEdgeManager.currentTime();
514    try (Connection conn = ConnectionFactory.createConnection(conf);
515      Table table = conn.getTable(hri.getTable())) {
516      byte[] row = hri.getStartKey();
517      // Check for null/empty row. If we find one, use a key that is likely to be in first region.
518      if (row == null || row.length <= 0) {
519        row = new byte[] { '0' };
520      }
521      Get get = new Get(row);
522      while (EnvironmentEdgeManager.currentTime() - start < timeout) {
523        try {
524          table.get(get);
525          break;
526        } catch (IOException ex) {
527          // wait some more
528        }
529        Threads.sleep(100);
530      }
531    }
532  }
533}