001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.Random;
030import java.util.Set;
031import java.util.TreeSet;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.stream.Collectors;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.CatalogFamilyFormat;
036import org.apache.hadoop.hbase.ChoreService;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.MetaTableAccessor;
042import org.apache.hadoop.hbase.NotServingRegionException;
043import org.apache.hadoop.hbase.ScheduledChore;
044import org.apache.hadoop.hbase.Stoppable;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Waiter;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.client.RegionLocator;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.Pair;
063import org.apache.hadoop.hbase.util.PairOfSameType;
064import org.apache.hadoop.hbase.util.StoppableImplementation;
065import org.apache.hadoop.hbase.util.Threads;
066import org.junit.AfterClass;
067import org.junit.BeforeClass;
068import org.junit.ClassRule;
069import org.junit.Rule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.junit.rules.TestName;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
077import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
078import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
079
080@Category(LargeTests.class)
081public class TestEndToEndSplitTransaction {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085    HBaseClassTestRule.forClass(TestEndToEndSplitTransaction.class);
086
087  private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndSplitTransaction.class);
088  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
089  private static final Configuration CONF = TEST_UTIL.getConfiguration();
090
091  @Rule
092  public TestName name = new TestName();
093
094  @BeforeClass
095  public static void beforeAllTests() throws Exception {
096    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
097    TEST_UTIL.startMiniCluster(1);
098  }
099
100  @AfterClass
101  public static void afterAllTests() throws Exception {
102    TEST_UTIL.shutdownMiniCluster();
103  }
104
105  /**
106   * This is the test for : HBASE-20940 This test will split the region and try to open an reference
107   * over store file. Once store file has any reference, it makes sure that region can't be split
108   */
109  @Test
110  public void testCanSplitJustAfterASplit() throws Exception {
111    LOG.info("Starting testCanSplitJustAfterASplit");
112    byte[] fam = Bytes.toBytes("cf_split");
113
114    CompactSplit compactSplit =
115      TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getCompactSplitThread();
116    TableName tableName = TableName.valueOf("CanSplitTable");
117    Table source = TEST_UTIL.getConnection().getTable(tableName);
118    Admin admin = TEST_UTIL.getAdmin();
119    // set a large min compaction file count to avoid compaction just after splitting.
120    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
121      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
122    Map<String, StoreFileReader> scanner = Maps.newHashMap();
123    try {
124      admin.createTable(htd);
125      TEST_UTIL.loadTable(source, fam);
126      compactSplit.setCompactionsEnabled(false);
127      admin.split(tableName);
128      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getHBaseCluster().getRegions(tableName).size() == 2);
129
130      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
131      regions.stream()
132        .forEach(r -> r.getStores().get(0).getStorefiles().stream()
133          .filter(s -> s.isReference() && !scanner.containsKey(r.getRegionInfo().getEncodedName()))
134          .forEach(sf -> {
135            StoreFileReader reader = ((HStoreFile) sf).getReader();
136            reader.getStoreFileScanner(true, false, false, 0, 0, false);
137            scanner.put(r.getRegionInfo().getEncodedName(), reader);
138            LOG.info("Got reference to file = " + sf.getPath() + ",for region = "
139              + r.getRegionInfo().getEncodedName());
140          }));
141      assertTrue("Regions did not split properly", regions.size() > 1);
142      assertTrue("Could not get reference any of the store file", scanner.size() > 1);
143      compactSplit.setCompactionsEnabled(true);
144      for (HRegion region : regions) {
145        region.compact(true);
146      }
147
148      regions.stream()
149        .filter(region -> scanner.containsKey(region.getRegionInfo().getEncodedName()))
150        .forEach(r -> assertFalse("Contains an open file reference which can be split",
151          r.getStores().get(0).canSplit()));
152    } finally {
153      scanner.values().forEach(s -> {
154        try {
155          s.close(true);
156        } catch (IOException ioe) {
157          LOG.error("Failed while closing store file", ioe);
158        }
159      });
160      scanner.clear();
161      Closeables.close(source, true);
162      if (!compactSplit.isCompactionsEnabled()) {
163        compactSplit.setCompactionsEnabled(true);
164      }
165      TEST_UTIL.deleteTableIfAny(tableName);
166    }
167  }
168
169  /**
170   * Tests that the client sees meta table changes as atomic during splits
171   */
172  @Test
173  public void testFromClientSideWhileSplitting() throws Throwable {
174    LOG.info("Starting testFromClientSideWhileSplitting");
175    final TableName tableName = TableName.valueOf(name.getMethodName());
176    final byte[] FAMILY = Bytes.toBytes("family");
177
178    // SplitTransaction will update the meta table by offlining the parent region, and adding info
179    // for daughters.
180    Table table = TEST_UTIL.createTable(tableName, FAMILY);
181
182    Stoppable stopper = new StoppableImplementation();
183    RegionSplitter regionSplitter = new RegionSplitter(table);
184    RegionChecker regionChecker = new RegionChecker(CONF, stopper, tableName);
185    final ChoreService choreService = new ChoreService("TEST_SERVER");
186
187    choreService.scheduleChore(regionChecker);
188    regionSplitter.start();
189
190    // wait until the splitter is finished
191    regionSplitter.join();
192    stopper.stop(null);
193
194    if (regionChecker.ex != null) {
195      throw new AssertionError("regionChecker", regionChecker.ex);
196    }
197
198    if (regionSplitter.ex != null) {
199      throw new AssertionError("regionSplitter", regionSplitter.ex);
200    }
201
202    // one final check
203    regionChecker.verify();
204  }
205
206  static class RegionSplitter extends Thread {
207    final Connection connection;
208    Throwable ex;
209    Table table;
210    TableName tableName;
211    byte[] family;
212    Admin admin;
213    HRegionServer rs;
214
215    RegionSplitter(Table table) throws IOException {
216      this.table = table;
217      this.tableName = table.getName();
218      this.family = table.getDescriptor().getColumnFamilies()[0].getName();
219      admin = TEST_UTIL.getAdmin();
220      rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
221      connection = TEST_UTIL.getConnection();
222    }
223
224    @Override
225    public void run() {
226      try {
227        Random random = ThreadLocalRandom.current();
228        for (int i = 0; i < 5; i++) {
229          List<RegionInfo> regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
230          if (regions.isEmpty()) {
231            continue;
232          }
233          int regionIndex = random.nextInt(regions.size());
234
235          // pick a random region and split it into two
236          RegionInfo region = Iterators.get(regions.iterator(), regionIndex);
237
238          // pick the mid split point
239          int start = 0, end = Integer.MAX_VALUE;
240          if (region.getStartKey().length > 0) {
241            start = Bytes.toInt(region.getStartKey());
242          }
243          if (region.getEndKey().length > 0) {
244            end = Bytes.toInt(region.getEndKey());
245          }
246          int mid = start + ((end - start) / 2);
247          byte[] splitPoint = Bytes.toBytes(mid);
248
249          // put some rows to the regions
250          addData(start);
251          addData(mid);
252
253          flushAndBlockUntilDone(admin, rs, region.getRegionName());
254          compactAndBlockUntilDone(admin, rs, region.getRegionName());
255
256          log("Initiating region split for:" + region.getRegionNameAsString());
257          try {
258            admin.splitRegionAsync(region.getRegionName(), splitPoint).get();
259            // wait until the split is complete
260            blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
261          } catch (NotServingRegionException ex) {
262            // ignore
263          }
264        }
265      } catch (Throwable ex) {
266        this.ex = ex;
267      }
268    }
269
270    void addData(int start) throws IOException {
271      List<Put> puts = new ArrayList<>();
272      for (int i = start; i < start + 100; i++) {
273        Put put = new Put(Bytes.toBytes(i));
274        put.addColumn(family, family, Bytes.toBytes(i));
275        puts.add(put);
276      }
277      table.put(puts);
278    }
279  }
280
281  /**
282   * Checks regions using MetaTableAccessor and HTable methods
283   */
284  static class RegionChecker extends ScheduledChore {
285    Connection connection;
286    Configuration conf;
287    TableName tableName;
288    Throwable ex;
289
290    RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
291      super("RegionChecker", stopper, 100);
292      this.conf = conf;
293      this.tableName = tableName;
294
295      this.connection = ConnectionFactory.createConnection(conf);
296    }
297
298    /** verify region boundaries obtained from MetaScanner */
299    void verifyRegionsUsingMetaTableAccessor() throws Exception {
300      List<RegionInfo> regionList = MetaTableAccessor.getTableRegions(connection, tableName, true);
301      verifyTableRegions(regionList.stream()
302        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
303      regionList = MetaTableAccessor.getAllRegions(connection, true);
304      verifyTableRegions(regionList.stream()
305        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
306    }
307
308    /** verify region boundaries obtained from HTable.getStartEndKeys() */
309    void verifyRegionsUsingHTable() throws IOException {
310      try (RegionLocator rl = connection.getRegionLocator(tableName)) {
311        Pair<byte[][], byte[][]> keys = rl.getStartEndKeys();
312        verifyStartEndKeys(keys);
313
314        Set<RegionInfo> regions = new TreeSet<>(RegionInfo.COMPARATOR);
315        for (HRegionLocation loc : rl.getAllRegionLocations()) {
316          regions.add(loc.getRegion());
317        }
318        verifyTableRegions(regions);
319      }
320    }
321
322    void verify() throws Exception {
323      verifyRegionsUsingMetaTableAccessor();
324      verifyRegionsUsingHTable();
325    }
326
327    void verifyTableRegions(Set<RegionInfo> regions) {
328      log("Verifying " + regions.size() + " regions: " + regions);
329
330      byte[][] startKeys = new byte[regions.size()][];
331      byte[][] endKeys = new byte[regions.size()][];
332
333      int i = 0;
334      for (RegionInfo region : regions) {
335        startKeys[i] = region.getStartKey();
336        endKeys[i] = region.getEndKey();
337        i++;
338      }
339
340      Pair<byte[][], byte[][]> keys = new Pair<>(startKeys, endKeys);
341      verifyStartEndKeys(keys);
342    }
343
344    void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
345      byte[][] startKeys = keys.getFirst();
346      byte[][] endKeys = keys.getSecond();
347      assertEquals(startKeys.length, endKeys.length);
348      assertTrue("Found 0 regions for the table", startKeys.length > 0);
349
350      assertArrayEquals("Start key for the first region is not byte[0]", HConstants.EMPTY_START_ROW,
351        startKeys[0]);
352      byte[] prevEndKey = HConstants.EMPTY_START_ROW;
353
354      // ensure that we do not have any gaps
355      for (int i = 0; i < startKeys.length; i++) {
356        assertArrayEquals(
357          "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey)
358            + " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]),
359          prevEndKey, startKeys[i]);
360        prevEndKey = endKeys[i];
361      }
362      assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
363        endKeys[endKeys.length - 1]);
364    }
365
366    @Override
367    protected void chore() {
368      try {
369        verify();
370      } catch (Throwable ex) {
371        this.ex = ex;
372        getStopper().stop("caught exception");
373      }
374    }
375  }
376
377  public static void log(String msg) {
378    LOG.info(msg);
379  }
380
381  /* some utility methods for split tests */
382
383  public static void flushAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
384    throws IOException, InterruptedException {
385    log("flushing region: " + Bytes.toStringBinary(regionName));
386    admin.flushRegion(regionName);
387    log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
388    Threads.sleepWithoutInterrupt(500);
389    while (rs.getOnlineRegion(regionName).getMemStoreDataSize() > 0) {
390      Threads.sleep(50);
391    }
392  }
393
394  public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
395    throws IOException, InterruptedException {
396    log("Compacting region: " + Bytes.toStringBinary(regionName));
397    // Wait till its online before we do compact else it comes back with NoServerForRegionException
398    try {
399      TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
400        @Override
401        public boolean evaluate() throws Exception {
402          return rs.getServerName().equals(
403            MetaTableAccessor.getRegionLocation(admin.getConnection(), regionName).getServerName());
404        }
405      });
406    } catch (Exception e) {
407      throw new IOException(e);
408    }
409    admin.majorCompactRegion(regionName);
410    log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
411    Threads.sleepWithoutInterrupt(500);
412    outer: for (;;) {
413      for (Store store : rs.getOnlineRegion(regionName).getStores()) {
414        if (store.getStorefilesCount() > 1) {
415          Threads.sleep(50);
416          continue outer;
417        }
418      }
419      break;
420    }
421  }
422
423  /**
424   * Blocks until the region split is complete in hbase:meta and region server opens the daughters
425   */
426  public static void blockUntilRegionSplit(Configuration conf, long timeout,
427    final byte[] regionName, boolean waitForDaughters) throws IOException, InterruptedException {
428    long start = EnvironmentEdgeManager.currentTime();
429    log("blocking until region is split:" + Bytes.toStringBinary(regionName));
430    RegionInfo daughterA = null, daughterB = null;
431    try (Connection conn = ConnectionFactory.createConnection(conf);
432      Table metaTable = conn.getTable(TableName.META_TABLE_NAME)) {
433      Result result = null;
434      RegionInfo region = null;
435      while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
436        result = metaTable.get(new Get(regionName));
437        if (result == null) {
438          break;
439        }
440
441        region = CatalogFamilyFormat.getRegionInfo(result);
442        if (region.isSplitParent()) {
443          log("found parent region: " + region.toString());
444          PairOfSameType<RegionInfo> pair = MetaTableAccessor.getDaughterRegions(result);
445          daughterA = pair.getFirst();
446          daughterB = pair.getSecond();
447          break;
448        }
449        Threads.sleep(100);
450      }
451      if (daughterA == null || daughterB == null) {
452        throw new IOException("Failed to get daughters, daughterA=" + daughterA + ", daughterB="
453          + daughterB + ", timeout=" + timeout + ", result=" + result + ", regionName="
454          + Bytes.toString(regionName) + ", region=" + region);
455      }
456
457      // if we are here, this means the region split is complete or timed out
458      if (waitForDaughters) {
459        long rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
460        blockUntilRegionIsInMeta(conn, rem, daughterA);
461
462        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
463        blockUntilRegionIsInMeta(conn, rem, daughterB);
464
465        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
466        blockUntilRegionIsOpened(conf, rem, daughterA);
467
468        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
469        blockUntilRegionIsOpened(conf, rem, daughterB);
470
471        // Compacting the new region to make sure references can be cleaned up
472        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
473          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterA.getRegionName());
474        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
475          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterB.getRegionName());
476
477        removeCompactedFiles(conn, timeout, daughterA);
478        removeCompactedFiles(conn, timeout, daughterB);
479      }
480    }
481  }
482
483  public static void removeCompactedFiles(Connection conn, long timeout, RegionInfo hri)
484    throws IOException, InterruptedException {
485    log("remove compacted files for : " + hri.getRegionNameAsString());
486    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(hri.getTable());
487    regions.stream().forEach(r -> {
488      try {
489        r.getStores().get(0).closeAndArchiveCompactedFiles();
490      } catch (IOException ioe) {
491        LOG.error("failed in removing compacted file", ioe);
492      }
493    });
494  }
495
496  public static void blockUntilRegionIsInMeta(Connection conn, long timeout, RegionInfo hri)
497    throws IOException, InterruptedException {
498    log("blocking until region is in META: " + hri.getRegionNameAsString());
499    long start = EnvironmentEdgeManager.currentTime();
500    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
501      HRegionLocation loc = MetaTableAccessor.getRegionLocation(conn, hri);
502      if (loc != null && !loc.getRegion().isOffline()) {
503        log("found region in META: " + hri.getRegionNameAsString());
504        break;
505      }
506      Threads.sleep(100);
507    }
508  }
509
510  public static void blockUntilRegionIsOpened(Configuration conf, long timeout, RegionInfo hri)
511    throws IOException, InterruptedException {
512    log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
513    long start = EnvironmentEdgeManager.currentTime();
514    try (Connection conn = ConnectionFactory.createConnection(conf);
515      Table table = conn.getTable(hri.getTable())) {
516      byte[] row = hri.getStartKey();
517      // Check for null/empty row. If we find one, use a key that is likely to be in first region.
518      if (row == null || row.length <= 0) {
519        row = new byte[] { '0' };
520      }
521      Get get = new Get(row);
522      while (EnvironmentEdgeManager.currentTime() - start < timeout) {
523        try {
524          table.get(get);
525          break;
526        } catch (IOException ex) {
527          // wait some more
528        }
529        Threads.sleep(100);
530      }
531    }
532  }
533}