001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Map;
028import java.util.Random;
029import java.util.Set;
030import java.util.TreeSet;
031import java.util.stream.Collectors;
032import org.apache.commons.io.IOUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.ChoreService;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.MetaTableAccessor;
040import org.apache.hadoop.hbase.NotServingRegionException;
041import org.apache.hadoop.hbase.ScheduledChore;
042import org.apache.hadoop.hbase.Stoppable;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.Waiter;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.Get;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.RegionLocator;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
057import org.apache.hadoop.hbase.testclassification.LargeTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.hbase.util.PairOfSameType;
061import org.apache.hadoop.hbase.util.StoppableImplementation;
062import org.apache.hadoop.hbase.util.Threads;
063import org.junit.AfterClass;
064import org.junit.BeforeClass;
065import org.junit.ClassRule;
066import org.junit.Rule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.rules.TestName;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
073import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
074import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
075
076@Category(LargeTests.class)
077public class TestEndToEndSplitTransaction {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestEndToEndSplitTransaction.class);
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndSplitTransaction.class);
084  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
085  private static final Configuration CONF = TEST_UTIL.getConfiguration();
086
087  @Rule
088  public TestName name = new TestName();
089
090  @BeforeClass
091  public static void beforeAllTests() throws Exception {
092    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
093    TEST_UTIL.startMiniCluster(1);
094  }
095
096  @AfterClass
097  public static void afterAllTests() throws Exception {
098    TEST_UTIL.shutdownMiniCluster();
099  }
100
101  /**
102   * This is the test for : HBASE-20940 This test will split the region and try to open an reference
103   * over store file. Once store file has any reference, it makes sure that region can't be split
104   */
105  @Test
106  public void testCanSplitJustAfterASplit() throws Exception {
107    LOG.info("Starting testCanSplitJustAfterASplit");
108    byte[] fam = Bytes.toBytes("cf_split");
109
110    CompactSplit compactSplit =
111      TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getCompactSplitThread();
112    TableName tableName = TableName.valueOf("CanSplitTable");
113    Table source = TEST_UTIL.getConnection().getTable(tableName);
114    Admin admin = TEST_UTIL.getAdmin();
115    // set a large min compaction file count to avoid compaction just after splitting.
116    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
117      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
118    Map<String, StoreFileReader> scanner = Maps.newHashMap();
119    try {
120      admin.createTable(htd);
121      TEST_UTIL.loadTable(source, fam);
122      compactSplit.setCompactionsEnabled(false);
123      TEST_UTIL.getHBaseCluster().getRegions(tableName).get(0).forceSplit(null);
124      admin.split(tableName);
125      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getHBaseCluster().getRegions(tableName).size() == 2);
126
127      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
128      regions.stream()
129        .forEach(r -> r.getStores().get(0).getStorefiles().stream()
130          .filter(s -> s.isReference() && !scanner.containsKey(r.getRegionInfo().getEncodedName()))
131          .forEach(sf -> {
132            StoreFileReader reader = ((HStoreFile) sf).getReader();
133            reader.getStoreFileScanner(true, false, false, 0, 0, false);
134            scanner.put(r.getRegionInfo().getEncodedName(), reader);
135            LOG.info("Got reference to file = " + sf.getPath() + ",for region = " +
136              r.getRegionInfo().getEncodedName());
137          }));
138      assertTrue("Regions did not split properly", regions.size() > 1);
139      assertTrue("Could not get reference any of the store file", scanner.size() > 1);
140      compactSplit.setCompactionsEnabled(true);
141      for (HRegion region : regions) {
142        region.compact(true);
143      }
144
145      regions.stream()
146        .filter(region -> scanner.containsKey(region.getRegionInfo().getEncodedName()))
147        .forEach(r -> assertFalse("Contains an open file reference which can be split",
148          r.getStores().get(0).canSplit()));
149    } finally {
150      scanner.values().forEach(s -> {
151        try {
152          s.close(true);
153        } catch (IOException ioe) {
154          LOG.error("Failed while closing store file", ioe);
155        }
156      });
157      scanner.clear();
158      Closeables.close(source, true);
159      if (!compactSplit.isCompactionsEnabled()) {
160        compactSplit.setCompactionsEnabled(true);
161      }
162      TEST_UTIL.deleteTableIfAny(tableName);
163    }
164  }
165
166  /**
167   * Tests that the client sees meta table changes as atomic during splits
168   */
169  @Test
170  public void testFromClientSideWhileSplitting() throws Throwable {
171    LOG.info("Starting testFromClientSideWhileSplitting");
172    final TableName tableName = TableName.valueOf(name.getMethodName());
173    final byte[] FAMILY = Bytes.toBytes("family");
174
175    // SplitTransaction will update the meta table by offlining the parent region, and adding info
176    // for daughters.
177    Table table = TEST_UTIL.createTable(tableName, FAMILY);
178
179    Stoppable stopper = new StoppableImplementation();
180    RegionSplitter regionSplitter = new RegionSplitter(table);
181    RegionChecker regionChecker = new RegionChecker(CONF, stopper, tableName);
182    final ChoreService choreService = new ChoreService("TEST_SERVER");
183
184    choreService.scheduleChore(regionChecker);
185    regionSplitter.start();
186
187    // wait until the splitter is finished
188    regionSplitter.join();
189    stopper.stop(null);
190
191    if (regionChecker.ex != null) {
192      throw new AssertionError("regionChecker", regionChecker.ex);
193    }
194
195    if (regionSplitter.ex != null) {
196      throw new AssertionError("regionSplitter", regionSplitter.ex);
197    }
198
199    // one final check
200    regionChecker.verify();
201  }
202
203  static class RegionSplitter extends Thread {
204    final Connection connection;
205    Throwable ex;
206    Table table;
207    TableName tableName;
208    byte[] family;
209    Admin admin;
210    HRegionServer rs;
211
212    RegionSplitter(Table table) throws IOException {
213      this.table = table;
214      this.tableName = table.getName();
215      this.family = table.getDescriptor().getColumnFamilies()[0].getName();
216      admin = TEST_UTIL.getAdmin();
217      rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
218      connection = TEST_UTIL.getConnection();
219    }
220
221    @Override
222    public void run() {
223      try {
224        Random random = new Random();
225        for (int i = 0; i < 5; i++) {
226          List<RegionInfo> regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
227          if (regions.isEmpty()) {
228            continue;
229          }
230          int regionIndex = random.nextInt(regions.size());
231
232          // pick a random region and split it into two
233          RegionInfo region = Iterators.get(regions.iterator(), regionIndex);
234
235          // pick the mid split point
236          int start = 0, end = Integer.MAX_VALUE;
237          if (region.getStartKey().length > 0) {
238            start = Bytes.toInt(region.getStartKey());
239          }
240          if (region.getEndKey().length > 0) {
241            end = Bytes.toInt(region.getEndKey());
242          }
243          int mid = start + ((end - start) / 2);
244          byte[] splitPoint = Bytes.toBytes(mid);
245
246          // put some rows to the regions
247          addData(start);
248          addData(mid);
249
250          flushAndBlockUntilDone(admin, rs, region.getRegionName());
251          compactAndBlockUntilDone(admin, rs, region.getRegionName());
252
253          log("Initiating region split for:" + region.getRegionNameAsString());
254          try {
255            admin.splitRegion(region.getRegionName(), splitPoint);
256            // wait until the split is complete
257            blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
258
259          } catch (NotServingRegionException ex) {
260            // ignore
261          }
262        }
263      } catch (Throwable ex) {
264        this.ex = ex;
265      }
266    }
267
268    void addData(int start) throws IOException {
269      List<Put> puts = new ArrayList<>();
270      for (int i = start; i < start + 100; i++) {
271        Put put = new Put(Bytes.toBytes(i));
272        put.addColumn(family, family, Bytes.toBytes(i));
273        puts.add(put);
274      }
275      table.put(puts);
276    }
277  }
278
279  /**
280   * Checks regions using MetaTableAccessor and HTable methods
281   */
282  static class RegionChecker extends ScheduledChore {
283    Connection connection;
284    Configuration conf;
285    TableName tableName;
286    Throwable ex;
287
288    RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
289      super("RegionChecker", stopper, 100);
290      this.conf = conf;
291      this.tableName = tableName;
292
293      this.connection = ConnectionFactory.createConnection(conf);
294    }
295
296    /** verify region boundaries obtained from MetaScanner */
297    void verifyRegionsUsingMetaTableAccessor() throws Exception {
298      List<RegionInfo> regionList = MetaTableAccessor.getTableRegions(connection, tableName, true);
299      verifyTableRegions(regionList.stream()
300        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
301      regionList = MetaTableAccessor.getAllRegions(connection, true);
302      verifyTableRegions(regionList.stream()
303        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
304    }
305
306    /** verify region boundaries obtained from HTable.getStartEndKeys() */
307    void verifyRegionsUsingHTable() throws IOException {
308      Table table = null;
309      try {
310        // HTable.getStartEndKeys()
311        table = connection.getTable(tableName);
312
313        try (RegionLocator rl = connection.getRegionLocator(tableName)) {
314          Pair<byte[][], byte[][]> keys = rl.getStartEndKeys();
315          verifyStartEndKeys(keys);
316
317          Set<RegionInfo> regions = new TreeSet<>(RegionInfo.COMPARATOR);
318          for (HRegionLocation loc : rl.getAllRegionLocations()) {
319            regions.add(loc.getRegion());
320          }
321          verifyTableRegions(regions);
322        }
323
324      } finally {
325        IOUtils.closeQuietly(table);
326      }
327    }
328
329    void verify() throws Exception {
330      verifyRegionsUsingMetaTableAccessor();
331      verifyRegionsUsingHTable();
332    }
333
334    void verifyTableRegions(Set<RegionInfo> regions) {
335      log("Verifying " + regions.size() + " regions: " + regions);
336
337      byte[][] startKeys = new byte[regions.size()][];
338      byte[][] endKeys = new byte[regions.size()][];
339
340      int i = 0;
341      for (RegionInfo region : regions) {
342        startKeys[i] = region.getStartKey();
343        endKeys[i] = region.getEndKey();
344        i++;
345      }
346
347      Pair<byte[][], byte[][]> keys = new Pair<>(startKeys, endKeys);
348      verifyStartEndKeys(keys);
349    }
350
351    void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
352      byte[][] startKeys = keys.getFirst();
353      byte[][] endKeys = keys.getSecond();
354      assertEquals(startKeys.length, endKeys.length);
355      assertTrue("Found 0 regions for the table", startKeys.length > 0);
356
357      assertArrayEquals("Start key for the first region is not byte[0]", HConstants.EMPTY_START_ROW,
358        startKeys[0]);
359      byte[] prevEndKey = HConstants.EMPTY_START_ROW;
360
361      // ensure that we do not have any gaps
362      for (int i = 0; i < startKeys.length; i++) {
363        assertArrayEquals(
364          "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey) +
365            " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]),
366          prevEndKey, startKeys[i]);
367        prevEndKey = endKeys[i];
368      }
369      assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
370        endKeys[endKeys.length - 1]);
371    }
372
373    @Override
374    protected void chore() {
375      try {
376        verify();
377      } catch (Throwable ex) {
378        this.ex = ex;
379        getStopper().stop("caught exception");
380      }
381    }
382  }
383
384  public static void log(String msg) {
385    LOG.info(msg);
386  }
387
388  /* some utility methods for split tests */
389
390  public static void flushAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
391      throws IOException, InterruptedException {
392    log("flushing region: " + Bytes.toStringBinary(regionName));
393    admin.flushRegion(regionName);
394    log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
395    Threads.sleepWithoutInterrupt(500);
396    while (rs.getOnlineRegion(regionName).getMemStoreDataSize() > 0) {
397      Threads.sleep(50);
398    }
399  }
400
401  public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
402      throws IOException, InterruptedException {
403    log("Compacting region: " + Bytes.toStringBinary(regionName));
404    // Wait till its online before we do compact else it comes back with NoServerForRegionException
405    try {
406      TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
407        @Override public boolean evaluate() throws Exception {
408          return rs.getServerName().equals(MetaTableAccessor.
409            getRegionLocation(admin.getConnection(), regionName).getServerName());
410        }
411      });
412    } catch (Exception e) {
413      throw new IOException(e);
414    }
415    admin.majorCompactRegion(regionName);
416    log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
417    Threads.sleepWithoutInterrupt(500);
418    outer: for (;;) {
419      for (Store store : rs.getOnlineRegion(regionName).getStores()) {
420        if (store.getStorefilesCount() > 1) {
421          Threads.sleep(50);
422          continue outer;
423        }
424      }
425      break;
426    }
427  }
428
429  /**
430   * Blocks until the region split is complete in hbase:meta and region server opens the daughters
431   */
432  public static void blockUntilRegionSplit(Configuration conf, long timeout,
433      final byte[] regionName, boolean waitForDaughters) throws IOException, InterruptedException {
434    long start = System.currentTimeMillis();
435    log("blocking until region is split:" + Bytes.toStringBinary(regionName));
436    RegionInfo daughterA = null, daughterB = null;
437    try (Connection conn = ConnectionFactory.createConnection(conf);
438        Table metaTable = conn.getTable(TableName.META_TABLE_NAME)) {
439      Result result = null;
440      RegionInfo region = null;
441      while ((System.currentTimeMillis() - start) < timeout) {
442        result = metaTable.get(new Get(regionName));
443        if (result == null) {
444          break;
445        }
446
447        region = MetaTableAccessor.getRegionInfo(result);
448        if (region.isSplitParent()) {
449          log("found parent region: " + region.toString());
450          PairOfSameType<RegionInfo> pair = MetaTableAccessor.getDaughterRegions(result);
451          daughterA = pair.getFirst();
452          daughterB = pair.getSecond();
453          break;
454        }
455        Threads.sleep(100);
456      }
457      if (daughterA == null || daughterB == null) {
458        throw new IOException("Failed to get daughters, daughterA=" + daughterA + ", daughterB=" +
459          daughterB + ", timeout=" + timeout + ", result=" + result + ", regionName=" +
460          Bytes.toString(regionName) + ", region=" + region);
461      }
462
463      // if we are here, this means the region split is complete or timed out
464      if (waitForDaughters) {
465        long rem = timeout - (System.currentTimeMillis() - start);
466        blockUntilRegionIsInMeta(conn, rem, daughterA);
467
468        rem = timeout - (System.currentTimeMillis() - start);
469        blockUntilRegionIsInMeta(conn, rem, daughterB);
470
471        rem = timeout - (System.currentTimeMillis() - start);
472        blockUntilRegionIsOpened(conf, rem, daughterA);
473
474        rem = timeout - (System.currentTimeMillis() - start);
475        blockUntilRegionIsOpened(conf, rem, daughterB);
476
477        // Compacting the new region to make sure references can be cleaned up
478        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
479          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterA.getRegionName());
480        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
481          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterB.getRegionName());
482
483        removeCompactedFiles(conn, timeout, daughterA);
484        removeCompactedFiles(conn, timeout, daughterB);
485      }
486    }
487  }
488
489  public static void removeCompactedFiles(Connection conn, long timeout, RegionInfo hri)
490      throws IOException, InterruptedException {
491    log("remove compacted files for : " + hri.getRegionNameAsString());
492    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(hri.getTable());
493    regions.stream().forEach(r -> {
494      try {
495        r.getStores().get(0).closeAndArchiveCompactedFiles();
496      } catch (IOException ioe) {
497        LOG.error("failed in removing compacted file", ioe);
498      }
499    });
500  }
501
502  public static void blockUntilRegionIsInMeta(Connection conn, long timeout, RegionInfo hri)
503      throws IOException, InterruptedException {
504    log("blocking until region is in META: " + hri.getRegionNameAsString());
505    long start = System.currentTimeMillis();
506    while (System.currentTimeMillis() - start < timeout) {
507      HRegionLocation loc = MetaTableAccessor.getRegionLocation(conn, hri);
508      if (loc != null && !loc.getRegion().isOffline()) {
509        log("found region in META: " + hri.getRegionNameAsString());
510        break;
511      }
512      Threads.sleep(100);
513    }
514  }
515
516  public static void blockUntilRegionIsOpened(Configuration conf, long timeout, RegionInfo hri)
517      throws IOException, InterruptedException {
518    log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
519    long start = System.currentTimeMillis();
520    try (Connection conn = ConnectionFactory.createConnection(conf);
521        Table table = conn.getTable(hri.getTable())) {
522      byte[] row = hri.getStartKey();
523      // Check for null/empty row. If we find one, use a key that is likely to be in first region.
524      if (row == null || row.length <= 0) {
525        row = new byte[] { '0' };
526      }
527      Get get = new Get(row);
528      while (System.currentTimeMillis() - start < timeout) {
529        try {
530          table.get(get);
531          break;
532        } catch (IOException ex) {
533          // wait some more
534        }
535        Threads.sleep(100);
536      }
537    }
538  }
539}