001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.Random;
030import java.util.Set;
031import java.util.TreeSet;
032import java.util.stream.Collectors;
033import org.apache.commons.io.IOUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ChoreService;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.MetaTableAccessor;
041import org.apache.hadoop.hbase.NotServingRegionException;
042import org.apache.hadoop.hbase.ScheduledChore;
043import org.apache.hadoop.hbase.Stoppable;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.Get;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.RegionLocator;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
057import org.apache.hadoop.hbase.testclassification.LargeTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.hbase.util.PairOfSameType;
061import org.apache.hadoop.hbase.util.StoppableImplementation;
062import org.apache.hadoop.hbase.util.Threads;
063import org.junit.AfterClass;
064import org.junit.BeforeClass;
065import org.junit.ClassRule;
066import org.junit.Rule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.rules.TestName;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
074import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
075import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
076
077@Category(LargeTests.class)
078public class TestEndToEndSplitTransaction {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082    HBaseClassTestRule.forClass(TestEndToEndSplitTransaction.class);
083
084  private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndSplitTransaction.class);
085  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
086  private static final Configuration CONF = TEST_UTIL.getConfiguration();
087
088  @Rule
089  public TestName name = new TestName();
090
091  @BeforeClass
092  public static void beforeAllTests() throws Exception {
093    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
094    TEST_UTIL.startMiniCluster(1);
095  }
096
097  @AfterClass
098  public static void afterAllTests() throws Exception {
099    TEST_UTIL.shutdownMiniCluster();
100  }
101
102  /**
103   * This is the test for : HBASE-20940 This test will split the region and try to open an reference
104   * over store file. Once store file has any reference, it makes sure that region can't be split
105   */
106  @Test
107  public void testCanSplitJustAfterASplit() throws Exception {
108    LOG.info("Starting testCanSplitJustAfterASplit");
109    byte[] fam = Bytes.toBytes("cf_split");
110
111    CompactSplit compactSplit =
112      TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getCompactSplitThread();
113    TableName tableName = TableName.valueOf("CanSplitTable");
114    Table source = TEST_UTIL.getConnection().getTable(tableName);
115    Admin admin = TEST_UTIL.getAdmin();
116    // set a large min compaction file count to avoid compaction just after splitting.
117    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
118      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
119    Map<String, StoreFileReader> scanner = Maps.newHashMap();
120    try {
121      admin.createTable(htd);
122      TEST_UTIL.loadTable(source, fam);
123      compactSplit.setCompactionsEnabled(false);
124      TEST_UTIL.getHBaseCluster().getRegions(tableName).get(0).forceSplit(null);
125      admin.split(tableName);
126      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getHBaseCluster().getRegions(tableName).size() == 2);
127
128      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
129      regions.stream()
130        .forEach(r -> r.getStores().get(0).getStorefiles().stream()
131          .filter(s -> s.isReference() && !scanner.containsKey(r.getRegionInfo().getEncodedName()))
132          .forEach(sf -> {
133            StoreFileReader reader = ((HStoreFile) sf).getReader();
134            reader.getStoreFileScanner(true, false, false, 0, 0, false);
135            scanner.put(r.getRegionInfo().getEncodedName(), reader);
136            LOG.info("Got reference to file = " + sf.getPath() + ",for region = " +
137              r.getRegionInfo().getEncodedName());
138          }));
139      assertTrue("Regions did not split properly", regions.size() > 1);
140      assertTrue("Could not get reference any of the store file", scanner.size() > 1);
141      compactSplit.setCompactionsEnabled(true);
142      for (HRegion region : regions) {
143        region.compact(true);
144      }
145
146      regions.stream()
147        .filter(region -> scanner.containsKey(region.getRegionInfo().getEncodedName()))
148        .forEach(r -> assertFalse("Contains an open file reference which can be split",
149          r.getStores().get(0).canSplit()));
150    } finally {
151      scanner.values().forEach(s -> {
152        try {
153          s.close(true);
154        } catch (IOException ioe) {
155          LOG.error("Failed while closing store file", ioe);
156        }
157      });
158      scanner.clear();
159      Closeables.close(source, true);
160      if (!compactSplit.isCompactionsEnabled()) {
161        compactSplit.setCompactionsEnabled(true);
162      }
163      TEST_UTIL.deleteTableIfAny(tableName);
164    }
165  }
166
167  /**
168   * Tests that the client sees meta table changes as atomic during splits
169   */
170  @Test
171  public void testFromClientSideWhileSplitting() throws Throwable {
172    LOG.info("Starting testFromClientSideWhileSplitting");
173    final TableName tableName = TableName.valueOf(name.getMethodName());
174    final byte[] FAMILY = Bytes.toBytes("family");
175
176    // SplitTransaction will update the meta table by offlining the parent region, and adding info
177    // for daughters.
178    Table table = TEST_UTIL.createTable(tableName, FAMILY);
179
180    Stoppable stopper = new StoppableImplementation();
181    RegionSplitter regionSplitter = new RegionSplitter(table);
182    RegionChecker regionChecker = new RegionChecker(CONF, stopper, tableName);
183    final ChoreService choreService = new ChoreService("TEST_SERVER");
184
185    choreService.scheduleChore(regionChecker);
186    regionSplitter.start();
187
188    // wait until the splitter is finished
189    regionSplitter.join();
190    stopper.stop(null);
191
192    if (regionChecker.ex != null) {
193      throw new AssertionError("regionChecker", regionChecker.ex);
194    }
195
196    if (regionSplitter.ex != null) {
197      throw new AssertionError("regionSplitter", regionSplitter.ex);
198    }
199
200    // one final check
201    regionChecker.verify();
202  }
203
204  static class RegionSplitter extends Thread {
205    final Connection connection;
206    Throwable ex;
207    Table table;
208    TableName tableName;
209    byte[] family;
210    Admin admin;
211    HRegionServer rs;
212
213    RegionSplitter(Table table) throws IOException {
214      this.table = table;
215      this.tableName = table.getName();
216      this.family = table.getDescriptor().getColumnFamilies()[0].getName();
217      admin = TEST_UTIL.getAdmin();
218      rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
219      connection = TEST_UTIL.getConnection();
220    }
221
222    @Override
223    public void run() {
224      try {
225        Random random = new Random();
226        for (int i = 0; i < 5; i++) {
227          List<RegionInfo> regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
228          if (regions.isEmpty()) {
229            continue;
230          }
231          int regionIndex = random.nextInt(regions.size());
232
233          // pick a random region and split it into two
234          RegionInfo region = Iterators.get(regions.iterator(), regionIndex);
235
236          // pick the mid split point
237          int start = 0, end = Integer.MAX_VALUE;
238          if (region.getStartKey().length > 0) {
239            start = Bytes.toInt(region.getStartKey());
240          }
241          if (region.getEndKey().length > 0) {
242            end = Bytes.toInt(region.getEndKey());
243          }
244          int mid = start + ((end - start) / 2);
245          byte[] splitPoint = Bytes.toBytes(mid);
246
247          // put some rows to the regions
248          addData(start);
249          addData(mid);
250
251          flushAndBlockUntilDone(admin, rs, region.getRegionName());
252          compactAndBlockUntilDone(admin, rs, region.getRegionName());
253
254          log("Initiating region split for:" + region.getRegionNameAsString());
255          try {
256            admin.splitRegion(region.getRegionName(), splitPoint);
257            // wait until the split is complete
258            blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
259
260          } catch (NotServingRegionException ex) {
261            // ignore
262          }
263        }
264      } catch (Throwable ex) {
265        this.ex = ex;
266      }
267    }
268
269    void addData(int start) throws IOException {
270      List<Put> puts = new ArrayList<>();
271      for (int i = start; i < start + 100; i++) {
272        Put put = new Put(Bytes.toBytes(i));
273        put.addColumn(family, family, Bytes.toBytes(i));
274        puts.add(put);
275      }
276      table.put(puts);
277    }
278  }
279
280  /**
281   * Checks regions using MetaTableAccessor and HTable methods
282   */
283  static class RegionChecker extends ScheduledChore {
284    Connection connection;
285    Configuration conf;
286    TableName tableName;
287    Throwable ex;
288
289    RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
290      super("RegionChecker", stopper, 100);
291      this.conf = conf;
292      this.tableName = tableName;
293
294      this.connection = ConnectionFactory.createConnection(conf);
295    }
296
297    /** verify region boundaries obtained from MetaScanner */
298    void verifyRegionsUsingMetaTableAccessor() throws Exception {
299      List<RegionInfo> regionList = MetaTableAccessor.getTableRegions(connection, tableName, true);
300      verifyTableRegions(regionList.stream()
301        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
302      regionList = MetaTableAccessor.getAllRegions(connection, true);
303      verifyTableRegions(regionList.stream()
304        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
305    }
306
307    /** verify region boundaries obtained from HTable.getStartEndKeys() */
308    void verifyRegionsUsingHTable() throws IOException {
309      Table table = null;
310      try {
311        // HTable.getStartEndKeys()
312        table = connection.getTable(tableName);
313
314        try (RegionLocator rl = connection.getRegionLocator(tableName)) {
315          Pair<byte[][], byte[][]> keys = rl.getStartEndKeys();
316          verifyStartEndKeys(keys);
317
318          Set<RegionInfo> regions = new TreeSet<>(RegionInfo.COMPARATOR);
319          for (HRegionLocation loc : rl.getAllRegionLocations()) {
320            regions.add(loc.getRegion());
321          }
322          verifyTableRegions(regions);
323        }
324
325      } finally {
326        IOUtils.closeQuietly(table);
327      }
328    }
329
330    void verify() throws Exception {
331      verifyRegionsUsingMetaTableAccessor();
332      verifyRegionsUsingHTable();
333    }
334
335    void verifyTableRegions(Set<RegionInfo> regions) {
336      log("Verifying " + regions.size() + " regions: " + regions);
337
338      byte[][] startKeys = new byte[regions.size()][];
339      byte[][] endKeys = new byte[regions.size()][];
340
341      int i = 0;
342      for (RegionInfo region : regions) {
343        startKeys[i] = region.getStartKey();
344        endKeys[i] = region.getEndKey();
345        i++;
346      }
347
348      Pair<byte[][], byte[][]> keys = new Pair<>(startKeys, endKeys);
349      verifyStartEndKeys(keys);
350    }
351
352    void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
353      byte[][] startKeys = keys.getFirst();
354      byte[][] endKeys = keys.getSecond();
355      assertEquals(startKeys.length, endKeys.length);
356      assertTrue("Found 0 regions for the table", startKeys.length > 0);
357
358      assertArrayEquals("Start key for the first region is not byte[0]", HConstants.EMPTY_START_ROW,
359        startKeys[0]);
360      byte[] prevEndKey = HConstants.EMPTY_START_ROW;
361
362      // ensure that we do not have any gaps
363      for (int i = 0; i < startKeys.length; i++) {
364        assertArrayEquals(
365          "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey) +
366            " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]),
367          prevEndKey, startKeys[i]);
368        prevEndKey = endKeys[i];
369      }
370      assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
371        endKeys[endKeys.length - 1]);
372    }
373
374    @Override
375    protected void chore() {
376      try {
377        verify();
378      } catch (Throwable ex) {
379        this.ex = ex;
380        getStopper().stop("caught exception");
381      }
382    }
383  }
384
385  public static void log(String msg) {
386    LOG.info(msg);
387  }
388
389  /* some utility methods for split tests */
390
391  public static void flushAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
392      throws IOException, InterruptedException {
393    log("flushing region: " + Bytes.toStringBinary(regionName));
394    admin.flushRegion(regionName);
395    log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
396    Threads.sleepWithoutInterrupt(500);
397    while (rs.getOnlineRegion(regionName).getMemStoreDataSize() > 0) {
398      Threads.sleep(50);
399    }
400  }
401
402  public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
403      throws IOException, InterruptedException {
404    log("Compacting region: " + Bytes.toStringBinary(regionName));
405    admin.majorCompactRegion(regionName);
406    log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
407    Threads.sleepWithoutInterrupt(500);
408    outer: for (;;) {
409      for (Store store : rs.getOnlineRegion(regionName).getStores()) {
410        if (store.getStorefilesCount() > 1) {
411          Threads.sleep(50);
412          continue outer;
413        }
414      }
415      break;
416    }
417  }
418
419  /**
420   * Blocks until the region split is complete in hbase:meta and region server opens the daughters
421   */
422  public static void blockUntilRegionSplit(Configuration conf, long timeout,
423      final byte[] regionName, boolean waitForDaughters) throws IOException, InterruptedException {
424    long start = System.currentTimeMillis();
425    log("blocking until region is split:" + Bytes.toStringBinary(regionName));
426    RegionInfo daughterA = null, daughterB = null;
427    try (Connection conn = ConnectionFactory.createConnection(conf);
428        Table metaTable = conn.getTable(TableName.META_TABLE_NAME)) {
429      Result result = null;
430      RegionInfo region = null;
431      while ((System.currentTimeMillis() - start) < timeout) {
432        result = metaTable.get(new Get(regionName));
433        if (result == null) {
434          break;
435        }
436
437        region = MetaTableAccessor.getRegionInfo(result);
438        if (region.isSplitParent()) {
439          log("found parent region: " + region.toString());
440          PairOfSameType<RegionInfo> pair = MetaTableAccessor.getDaughterRegions(result);
441          daughterA = pair.getFirst();
442          daughterB = pair.getSecond();
443          break;
444        }
445        Threads.sleep(100);
446      }
447      if (daughterA == null || daughterB == null) {
448        throw new IOException("Failed to get daughters, daughterA=" + daughterA + ", daughterB=" +
449          daughterB + ", timeout=" + timeout + ", result=" + result + ", regionName=" +
450          Bytes.toString(regionName) + ", region=" + region);
451      }
452
453      // if we are here, this means the region split is complete or timed out
454      if (waitForDaughters) {
455        long rem = timeout - (System.currentTimeMillis() - start);
456        blockUntilRegionIsInMeta(conn, rem, daughterA);
457
458        rem = timeout - (System.currentTimeMillis() - start);
459        blockUntilRegionIsInMeta(conn, rem, daughterB);
460
461        rem = timeout - (System.currentTimeMillis() - start);
462        blockUntilRegionIsOpened(conf, rem, daughterA);
463
464        rem = timeout - (System.currentTimeMillis() - start);
465        blockUntilRegionIsOpened(conf, rem, daughterB);
466
467        // Compacting the new region to make sure references can be cleaned up
468        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
469          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterA.getRegionName());
470        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
471          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterB.getRegionName());
472
473        removeCompactedFiles(conn, timeout, daughterA);
474        removeCompactedFiles(conn, timeout, daughterB);
475      }
476    }
477  }
478
479  public static void removeCompactedFiles(Connection conn, long timeout, RegionInfo hri)
480      throws IOException, InterruptedException {
481    log("remove compacted files for : " + hri.getRegionNameAsString());
482    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(hri.getTable());
483    regions.stream().forEach(r -> {
484      try {
485        r.getStores().get(0).closeAndArchiveCompactedFiles();
486      } catch (IOException ioe) {
487        LOG.error("failed in removing compacted file", ioe);
488      }
489    });
490  }
491
492  public static void blockUntilRegionIsInMeta(Connection conn, long timeout, RegionInfo hri)
493      throws IOException, InterruptedException {
494    log("blocking until region is in META: " + hri.getRegionNameAsString());
495    long start = System.currentTimeMillis();
496    while (System.currentTimeMillis() - start < timeout) {
497      HRegionLocation loc = MetaTableAccessor.getRegionLocation(conn, hri);
498      if (loc != null && !loc.getRegion().isOffline()) {
499        log("found region in META: " + hri.getRegionNameAsString());
500        break;
501      }
502      Threads.sleep(100);
503    }
504  }
505
506  public static void blockUntilRegionIsOpened(Configuration conf, long timeout, RegionInfo hri)
507      throws IOException, InterruptedException {
508    log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
509    long start = System.currentTimeMillis();
510    try (Connection conn = ConnectionFactory.createConnection(conf);
511        Table table = conn.getTable(hri.getTable())) {
512      byte[] row = hri.getStartKey();
513      // Check for null/empty row. If we find one, use a key that is likely to be in first region.
514      if (row == null || row.length <= 0) {
515        row = new byte[] { '0' };
516      }
517      Get get = new Get(row);
518      while (System.currentTimeMillis() - start < timeout) {
519        try {
520          table.get(get);
521          break;
522        } catch (IOException ex) {
523          // wait some more
524        }
525        Threads.sleep(100);
526      }
527    }
528  }
529}