001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Map;
028import java.util.Random;
029import java.util.Set;
030import java.util.TreeSet;
031import java.util.concurrent.TimeUnit;
032import java.util.stream.Collectors;
033
034import org.apache.commons.io.IOUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.ChoreService;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.MetaTableAccessor;
042import org.apache.hadoop.hbase.NotServingRegionException;
043import org.apache.hadoop.hbase.ScheduledChore;
044import org.apache.hadoop.hbase.Stoppable;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.CompactionState;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.client.RegionLocator;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.PairOfSameType;
063import org.apache.hadoop.hbase.util.RetryCounter;
064import org.apache.hadoop.hbase.util.StoppableImplementation;
065import org.apache.hadoop.hbase.util.Threads;
066import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
067import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
068import org.junit.AfterClass;
069import org.junit.Assert;
070import org.junit.BeforeClass;
071import org.junit.ClassRule;
072import org.junit.Rule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.junit.rules.TestName;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079@Category(LargeTests.class)
080public class TestEndToEndSplitTransaction {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084      HBaseClassTestRule.forClass(TestEndToEndSplitTransaction.class);
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndSplitTransaction.class);
087  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
088  private static final Configuration CONF = TEST_UTIL.getConfiguration();
089
090  @Rule
091  public TestName name = new TestName();
092
093  @BeforeClass
094  public static void beforeAllTests() throws Exception {
095    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
096    TEST_UTIL.startMiniCluster();
097  }
098
099  @AfterClass
100  public static void afterAllTests() throws Exception {
101    TEST_UTIL.shutdownMiniCluster();
102  }
103
104
105  /*
106   * This is the test for : HBASE-20940 This test will split the region and try to open an reference
107   * over store file. Once store file has any reference, it makes sure that region can't be split
108   * @throws Exception
109   */
110  @Test
111  public void testCanSplitJustAfterASplit() throws Exception {
112    LOG.info("Starting testCanSplitJustAfterASplit");
113    byte[] fam = Bytes.toBytes("cf_split");
114
115    TableName tableName = TableName.valueOf("CanSplitTable");
116    Table source = TEST_UTIL.getConnection().getTable(tableName);
117    Admin admin = TEST_UTIL.getAdmin();
118    Map<String, StoreFileReader> scanner = Maps.newHashMap();
119
120    try {
121      TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
122          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
123
124      admin.createTable(htd);
125      TEST_UTIL.loadTable(source, fam);
126      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
127      regions.get(0).forceSplit(null);
128      admin.split(tableName);
129
130      while (regions.size() <= 1) {
131        regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
132        regions.stream()
133            .forEach(r -> r.getStores().get(0).getStorefiles().stream()
134                .filter(
135                  s -> s.isReference() && !scanner.containsKey(r.getRegionInfo().getEncodedName()))
136                .forEach(sf -> {
137                  StoreFileReader reader = ((HStoreFile) sf).getReader();
138                  reader.getStoreFileScanner(true, false, false, 0, 0, false);
139                  scanner.put(r.getRegionInfo().getEncodedName(), reader);
140                  LOG.info("Got reference to file = " + sf.getPath() + ",for region = "
141                      + r.getRegionInfo().getEncodedName());
142                }));
143      }
144
145      Assert.assertTrue("Regions did not split properly", regions.size() > 1);
146      Assert.assertTrue("Could not get reference any of the store file", scanner.size() > 1);
147
148      RetryCounter retrier = new RetryCounter(30, 1, TimeUnit.SECONDS);
149      while (CompactionState.NONE != admin.getCompactionState(tableName) && retrier.shouldRetry()) {
150        retrier.sleepUntilNextRetry();
151      }
152
153      Assert.assertEquals("Compaction did not complete in 30 secs", CompactionState.NONE,
154        admin.getCompactionState(tableName));
155
156      regions.stream()
157          .filter(region -> scanner.containsKey(region.getRegionInfo().getEncodedName()))
158          .forEach(r -> Assert.assertTrue("Contains an open file reference which can be split",
159            !r.getStores().get(0).canSplit()));
160    } finally {
161      scanner.values().stream().forEach(s -> {
162        try {
163          s.close(true);
164        } catch (IOException ioe) {
165          LOG.error("Failed while closing store file", ioe);
166        }
167      });
168      scanner.clear();
169      if (source != null) {
170        source.close();
171      }
172      TEST_UTIL.deleteTableIfAny(tableName);
173    }
174  }
175
176  /**
177   * Tests that the client sees meta table changes as atomic during splits
178   */
179  @Test
180  public void testFromClientSideWhileSplitting() throws Throwable {
181    LOG.info("Starting testFromClientSideWhileSplitting");
182    final TableName tableName = TableName.valueOf(name.getMethodName());
183    final byte[] FAMILY = Bytes.toBytes("family");
184
185    //SplitTransaction will update the meta table by offlining the parent region, and adding info
186    //for daughters.
187    Table table = TEST_UTIL.createTable(tableName, FAMILY);
188
189    Stoppable stopper = new StoppableImplementation();
190    RegionSplitter regionSplitter = new RegionSplitter(table);
191    RegionChecker regionChecker = new RegionChecker(CONF, stopper, tableName);
192    final ChoreService choreService = new ChoreService("TEST_SERVER");
193
194    choreService.scheduleChore(regionChecker);
195    regionSplitter.start();
196
197    //wait until the splitter is finished
198    regionSplitter.join();
199    stopper.stop(null);
200
201    if (regionChecker.ex != null) {
202      throw new AssertionError("regionChecker", regionChecker.ex);
203    }
204
205    if (regionSplitter.ex != null) {
206      throw new AssertionError("regionSplitter", regionSplitter.ex);
207    }
208
209    //one final check
210    regionChecker.verify();
211  }
212
213  static class RegionSplitter extends Thread {
214    final Connection connection;
215    Throwable ex;
216    Table table;
217    TableName tableName;
218    byte[] family;
219    Admin admin;
220    HRegionServer rs;
221
222    RegionSplitter(Table table) throws IOException {
223      this.table = table;
224      this.tableName = table.getName();
225      this.family = table.getTableDescriptor().getFamiliesKeys().iterator().next();
226      admin = TEST_UTIL.getAdmin();
227      rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
228      connection = TEST_UTIL.getConnection();
229    }
230
231    @Override
232    public void run() {
233      try {
234        Random random = new Random();
235        for (int i = 0; i < 5; i++) {
236          List<RegionInfo> regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
237          if (regions.isEmpty()) {
238            continue;
239          }
240          int regionIndex = random.nextInt(regions.size());
241
242          // pick a random region and split it into two
243          RegionInfo region = Iterators.get(regions.iterator(), regionIndex);
244
245          // pick the mid split point
246          int start = 0, end = Integer.MAX_VALUE;
247          if (region.getStartKey().length > 0) {
248            start = Bytes.toInt(region.getStartKey());
249          }
250          if (region.getEndKey().length > 0) {
251            end = Bytes.toInt(region.getEndKey());
252          }
253          int mid = start + ((end - start) / 2);
254          byte[] splitPoint = Bytes.toBytes(mid);
255
256          // put some rows to the regions
257          addData(start);
258          addData(mid);
259
260          flushAndBlockUntilDone(admin, rs, region.getRegionName());
261          compactAndBlockUntilDone(admin, rs, region.getRegionName());
262
263          log("Initiating region split for:" + region.getRegionNameAsString());
264          try {
265            admin.splitRegion(region.getRegionName(), splitPoint);
266            // wait until the split is complete
267            blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
268
269          } catch (NotServingRegionException ex) {
270            // ignore
271          }
272        }
273      } catch (Throwable ex) {
274        this.ex = ex;
275      }
276    }
277
278    void addData(int start) throws IOException {
279      List<Put> puts = new ArrayList<>();
280      for (int i=start; i< start + 100; i++) {
281        Put put = new Put(Bytes.toBytes(i));
282        put.addColumn(family, family, Bytes.toBytes(i));
283        puts.add(put);
284      }
285      table.put(puts);
286    }
287  }
288
289  /**
290   * Checks regions using MetaTableAccessor and HTable methods
291   */
292  static class RegionChecker extends ScheduledChore {
293    Connection connection;
294    Configuration conf;
295    TableName tableName;
296    Throwable ex;
297
298    RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
299      super("RegionChecker", stopper, 100);
300      this.conf = conf;
301      this.tableName = tableName;
302
303      this.connection = ConnectionFactory.createConnection(conf);
304    }
305
306    /** verify region boundaries obtained from MetaScanner */
307    void verifyRegionsUsingMetaTableAccessor() throws Exception {
308      List<RegionInfo> regionList = MetaTableAccessor.getTableRegions(connection, tableName, true);
309      verifyTableRegions(regionList.stream()
310          .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
311      regionList = MetaTableAccessor.getAllRegions(connection, true);
312      verifyTableRegions(regionList.stream()
313          .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
314    }
315
316    /** verify region boundaries obtained from HTable.getStartEndKeys() */
317    void verifyRegionsUsingHTable() throws IOException {
318      Table table = null;
319      try {
320        //HTable.getStartEndKeys()
321        table = connection.getTable(tableName);
322
323        try(RegionLocator rl = connection.getRegionLocator(tableName)) {
324          Pair<byte[][], byte[][]> keys = rl.getStartEndKeys();
325          verifyStartEndKeys(keys);
326
327          Set<RegionInfo> regions = new TreeSet<>(RegionInfo.COMPARATOR);
328          for (HRegionLocation loc : rl.getAllRegionLocations()) {
329            regions.add(loc.getRegionInfo());
330          }
331          verifyTableRegions(regions);
332        }
333
334      } finally {
335        IOUtils.closeQuietly(table);
336      }
337    }
338
339    void verify() throws Exception {
340      verifyRegionsUsingMetaTableAccessor();
341      verifyRegionsUsingHTable();
342    }
343
344    void verifyTableRegions(Set<RegionInfo> regions) {
345      log("Verifying " + regions.size() + " regions: " + regions);
346
347      byte[][] startKeys = new byte[regions.size()][];
348      byte[][] endKeys = new byte[regions.size()][];
349
350      int i=0;
351      for (RegionInfo region : regions) {
352        startKeys[i] = region.getStartKey();
353        endKeys[i] = region.getEndKey();
354        i++;
355      }
356
357      Pair<byte[][], byte[][]> keys = new Pair<>(startKeys, endKeys);
358      verifyStartEndKeys(keys);
359    }
360
361    void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
362      byte[][] startKeys = keys.getFirst();
363      byte[][] endKeys = keys.getSecond();
364      assertEquals(startKeys.length, endKeys.length);
365      assertTrue("Found 0 regions for the table", startKeys.length > 0);
366
367      assertArrayEquals("Start key for the first region is not byte[0]",
368          HConstants.EMPTY_START_ROW, startKeys[0]);
369      byte[] prevEndKey = HConstants.EMPTY_START_ROW;
370
371      // ensure that we do not have any gaps
372      for (int i=0; i<startKeys.length; i++) {
373        assertArrayEquals(
374            "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey)
375                + " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]), prevEndKey,
376            startKeys[i]);
377        prevEndKey = endKeys[i];
378      }
379      assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
380          endKeys[endKeys.length - 1]);
381    }
382
383    @Override
384    protected void chore() {
385      try {
386        verify();
387      } catch (Throwable ex) {
388        this.ex = ex;
389        getStopper().stop("caught exception");
390      }
391    }
392  }
393
394  public static void log(String msg) {
395    LOG.info(msg);
396  }
397
398  /* some utility methods for split tests */
399
400  public static void flushAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
401      throws IOException, InterruptedException {
402    log("flushing region: " + Bytes.toStringBinary(regionName));
403    admin.flushRegion(regionName);
404    log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
405    Threads.sleepWithoutInterrupt(500);
406    while (rs.getOnlineRegion(regionName).getMemStoreDataSize() > 0) {
407      Threads.sleep(50);
408    }
409  }
410
411  public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
412      throws IOException, InterruptedException {
413    log("Compacting region: " + Bytes.toStringBinary(regionName));
414    admin.majorCompactRegion(regionName);
415    log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
416    Threads.sleepWithoutInterrupt(500);
417    outer: for (;;) {
418      for (Store store : rs.getOnlineRegion(regionName).getStores()) {
419        if (store.getStorefilesCount() > 1) {
420          Threads.sleep(50);
421          continue outer;
422        }
423      }
424      break;
425    }
426  }
427
428  /**
429   * Blocks until the region split is complete in hbase:meta and region server opens the daughters
430   */
431  public static void blockUntilRegionSplit(Configuration conf, long timeout,
432      final byte[] regionName, boolean waitForDaughters)
433      throws IOException, InterruptedException {
434    long start = System.currentTimeMillis();
435    log("blocking until region is split:" +  Bytes.toStringBinary(regionName));
436    RegionInfo daughterA = null, daughterB = null;
437    try (Connection conn = ConnectionFactory.createConnection(conf);
438        Table metaTable = conn.getTable(TableName.META_TABLE_NAME)) {
439      Result result = null;
440      RegionInfo region = null;
441      while ((System.currentTimeMillis() - start) < timeout) {
442        result = metaTable.get(new Get(regionName));
443        if (result == null) {
444          break;
445        }
446
447        region = MetaTableAccessor.getRegionInfo(result);
448        if (region.isSplitParent()) {
449          log("found parent region: " + region.toString());
450          PairOfSameType<RegionInfo> pair = MetaTableAccessor.getDaughterRegions(result);
451          daughterA = pair.getFirst();
452          daughterB = pair.getSecond();
453          break;
454        }
455        Threads.sleep(100);
456      }
457      if (daughterA == null || daughterB == null) {
458        throw new IOException("Failed to get daughters, daughterA=" + daughterA + ", daughterB=" +
459          daughterB + ", timeout=" + timeout + ", result=" + result + ", regionName=" +
460          Bytes.toString(regionName) + ", region=" + region);
461      }
462
463      //if we are here, this means the region split is complete or timed out
464      if (waitForDaughters) {
465        long rem = timeout - (System.currentTimeMillis() - start);
466        blockUntilRegionIsInMeta(conn, rem, daughterA);
467
468        rem = timeout - (System.currentTimeMillis() - start);
469        blockUntilRegionIsInMeta(conn, rem, daughterB);
470
471        rem = timeout - (System.currentTimeMillis() - start);
472        blockUntilRegionIsOpened(conf, rem, daughterA);
473
474        rem = timeout - (System.currentTimeMillis() - start);
475        blockUntilRegionIsOpened(conf, rem, daughterB);
476
477        // Compacting the new region to make sure references can be cleaned up
478        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
479          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterA.getRegionName());
480        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
481          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterB.getRegionName());
482
483        removeCompactedFiles(conn, timeout, daughterA);
484        removeCompactedFiles(conn, timeout, daughterB);
485      }
486    }
487  }
488
489  public static void removeCompactedFiles(Connection conn, long timeout, RegionInfo hri)
490      throws IOException, InterruptedException {
491    log("remove compacted files for : " + hri.getRegionNameAsString());
492    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(hri.getTable());
493    regions.stream().forEach(r -> {
494      try {
495        r.getStores().get(0).closeAndArchiveCompactedFiles();
496      } catch (IOException ioe) {
497        LOG.error("failed in removing compacted file", ioe);
498      }
499    });
500  }
501
502  public static void blockUntilRegionIsInMeta(Connection conn, long timeout, RegionInfo hri)
503      throws IOException, InterruptedException {
504    log("blocking until region is in META: " + hri.getRegionNameAsString());
505    long start = System.currentTimeMillis();
506    while (System.currentTimeMillis() - start < timeout) {
507      HRegionLocation loc = MetaTableAccessor.getRegionLocation(conn, hri);
508      if (loc != null && !loc.getRegionInfo().isOffline()) {
509        log("found region in META: " + hri.getRegionNameAsString());
510        break;
511      }
512      Threads.sleep(100);
513    }
514  }
515
516  public static void blockUntilRegionIsOpened(Configuration conf, long timeout, RegionInfo hri)
517      throws IOException, InterruptedException {
518    log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
519    long start = System.currentTimeMillis();
520    try (Connection conn = ConnectionFactory.createConnection(conf);
521        Table table = conn.getTable(hri.getTable())) {
522      byte[] row = hri.getStartKey();
523      // Check for null/empty row. If we find one, use a key that is likely to be in first region.
524      if (row == null || row.length <= 0) {
525        row = new byte[] { '0' };
526      }
527      Get get = new Get(row);
528      while (System.currentTimeMillis() - start < timeout) {
529        try {
530          table.get(get);
531          break;
532        } catch (IOException ex) {
533          // wait some more
534        }
535        Threads.sleep(100);
536      }
537    }
538  }
539}