001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.Random;
030import java.util.Set;
031import java.util.TreeSet;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.stream.Collectors;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.CatalogFamilyFormat;
036import org.apache.hadoop.hbase.ChoreService;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.MetaTableAccessor;
041import org.apache.hadoop.hbase.NotServingRegionException;
042import org.apache.hadoop.hbase.ScheduledChore;
043import org.apache.hadoop.hbase.Stoppable;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.Waiter;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionLocator;
054import org.apache.hadoop.hbase.client.Result;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.PairOfSameType;
063import org.apache.hadoop.hbase.util.StoppableImplementation;
064import org.apache.hadoop.hbase.util.Threads;
065import org.junit.jupiter.api.AfterAll;
066import org.junit.jupiter.api.BeforeAll;
067import org.junit.jupiter.api.BeforeEach;
068import org.junit.jupiter.api.Tag;
069import org.junit.jupiter.api.Test;
070import org.junit.jupiter.api.TestInfo;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
075import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
076import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
077
078@Tag(LargeTests.TAG)
079public class TestEndToEndSplitTransaction {
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndSplitTransaction.class);
082  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
083  private static final Configuration CONF = TEST_UTIL.getConfiguration();
084  private String name;
085
086  @BeforeEach
087  public void setTestName(TestInfo testInfo) {
088    this.name = testInfo.getTestMethod().get().getName();
089  }
090
091  @BeforeAll
092  public static void beforeAllTests() throws Exception {
093    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
094    TEST_UTIL.startMiniCluster(1);
095  }
096
097  @AfterAll
098  public static void afterAllTests() throws Exception {
099    TEST_UTIL.shutdownMiniCluster();
100  }
101
102  /**
103   * This is the test for : HBASE-20940 This test will split the region and try to open an reference
104   * over store file. Once store file has any reference, it makes sure that region can't be split
105   */
106  @Test
107  public void testCanSplitJustAfterASplit() throws Exception {
108    LOG.info("Starting testCanSplitJustAfterASplit");
109    byte[] fam = Bytes.toBytes("cf_split");
110
111    CompactSplit compactSplit =
112      TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getCompactSplitThread();
113    TableName tableName = TableName.valueOf("CanSplitTable");
114    Table source = TEST_UTIL.getConnection().getTable(tableName);
115    Admin admin = TEST_UTIL.getAdmin();
116    // set a large min compaction file count to avoid compaction just after splitting.
117    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
118      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
119    Map<String, StoreFileReader> scanner = Maps.newHashMap();
120    try {
121      admin.createTable(htd);
122      TEST_UTIL.loadTable(source, fam);
123      compactSplit.setCompactionsEnabled(false);
124      admin.split(tableName);
125      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getHBaseCluster().getRegions(tableName).size() == 2);
126
127      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
128      regions.stream()
129        .forEach(r -> r.getStores().get(0).getStorefiles().stream()
130          .filter(s -> s.isReference() && !scanner.containsKey(r.getRegionInfo().getEncodedName()))
131          .forEach(sf -> {
132            StoreFileReader reader = ((HStoreFile) sf).getReader();
133            reader.getStoreFileScanner(true, false, false, 0, 0, false);
134            scanner.put(r.getRegionInfo().getEncodedName(), reader);
135            LOG.info("Got reference to file = " + sf.getPath() + ",for region = "
136              + r.getRegionInfo().getEncodedName());
137          }));
138      assertTrue(regions.size() > 1, "Regions did not split properly");
139      assertTrue(scanner.size() > 1, "Could not get reference any of the store file");
140      compactSplit.setCompactionsEnabled(true);
141      for (HRegion region : regions) {
142        region.compact(true);
143      }
144
145      regions.stream()
146        .filter(region -> scanner.containsKey(region.getRegionInfo().getEncodedName()))
147        .forEach(r -> assertFalse(r.getStores().get(0).canSplit(),
148          "Contains an open file reference which can be split"));
149    } finally {
150      scanner.values().forEach(s -> {
151        try {
152          s.close(true);
153        } catch (IOException ioe) {
154          LOG.error("Failed while closing store file", ioe);
155        }
156      });
157      scanner.clear();
158      Closeables.close(source, true);
159      if (!compactSplit.isCompactionsEnabled()) {
160        compactSplit.setCompactionsEnabled(true);
161      }
162      TEST_UTIL.deleteTableIfAny(tableName);
163    }
164  }
165
166  /**
167   * Tests that the client sees meta table changes as atomic during splits
168   */
169  @Test
170  public void testFromClientSideWhileSplitting() throws Throwable {
171    LOG.info("Starting testFromClientSideWhileSplitting");
172    final TableName tableName = TableName.valueOf(name);
173    final byte[] FAMILY = Bytes.toBytes("family");
174
175    // SplitTransaction will update the meta table by offlining the parent region, and adding info
176    // for daughters.
177    Table table = TEST_UTIL.createTable(tableName, FAMILY);
178
179    Stoppable stopper = new StoppableImplementation();
180    RegionSplitter regionSplitter = new RegionSplitter(table);
181    RegionChecker regionChecker = new RegionChecker(CONF, stopper, tableName);
182    final ChoreService choreService = new ChoreService("TEST_SERVER");
183
184    choreService.scheduleChore(regionChecker);
185    regionSplitter.start();
186
187    // wait until the splitter is finished
188    regionSplitter.join();
189    stopper.stop(null);
190
191    if (regionChecker.ex != null) {
192      throw new AssertionError("regionChecker", regionChecker.ex);
193    }
194
195    if (regionSplitter.ex != null) {
196      throw new AssertionError("regionSplitter", regionSplitter.ex);
197    }
198
199    // one final check
200    regionChecker.verify();
201  }
202
203  static class RegionSplitter extends Thread {
204    final Connection connection;
205    Throwable ex;
206    Table table;
207    TableName tableName;
208    byte[] family;
209    Admin admin;
210    HRegionServer rs;
211
212    RegionSplitter(Table table) throws IOException {
213      this.table = table;
214      this.tableName = table.getName();
215      this.family = table.getDescriptor().getColumnFamilies()[0].getName();
216      admin = TEST_UTIL.getAdmin();
217      rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
218      connection = TEST_UTIL.getConnection();
219    }
220
221    @Override
222    public void run() {
223      try {
224        Random random = ThreadLocalRandom.current();
225        for (int i = 0; i < 5; i++) {
226          List<RegionInfo> regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
227          if (regions.isEmpty()) {
228            continue;
229          }
230          int regionIndex = random.nextInt(regions.size());
231
232          // pick a random region and split it into two
233          RegionInfo region = Iterators.get(regions.iterator(), regionIndex);
234
235          // pick the mid split point
236          int start = 0, end = Integer.MAX_VALUE;
237          if (region.getStartKey().length > 0) {
238            start = Bytes.toInt(region.getStartKey());
239          }
240          if (region.getEndKey().length > 0) {
241            end = Bytes.toInt(region.getEndKey());
242          }
243          int mid = start + ((end - start) / 2);
244          byte[] splitPoint = Bytes.toBytes(mid);
245
246          // put some rows to the regions
247          addData(start);
248          addData(mid);
249
250          flushAndBlockUntilDone(admin, rs, region.getRegionName());
251          compactAndBlockUntilDone(admin, rs, region.getRegionName());
252
253          log("Initiating region split for:" + region.getRegionNameAsString());
254          try {
255            admin.splitRegionAsync(region.getRegionName(), splitPoint).get();
256            // wait until the split is complete
257            blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
258          } catch (NotServingRegionException ex) {
259            // ignore
260          }
261        }
262      } catch (Throwable ex) {
263        this.ex = ex;
264      }
265    }
266
267    void addData(int start) throws IOException {
268      List<Put> puts = new ArrayList<>();
269      for (int i = start; i < start + 100; i++) {
270        Put put = new Put(Bytes.toBytes(i));
271        put.addColumn(family, family, Bytes.toBytes(i));
272        puts.add(put);
273      }
274      table.put(puts);
275    }
276  }
277
278  /**
279   * Checks regions using MetaTableAccessor and HTable methods
280   */
281  static class RegionChecker extends ScheduledChore {
282    Connection connection;
283    Configuration conf;
284    TableName tableName;
285    Throwable ex;
286
287    RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
288      super("RegionChecker", stopper, 100);
289      this.conf = conf;
290      this.tableName = tableName;
291
292      this.connection = ConnectionFactory.createConnection(conf);
293    }
294
295    /** verify region boundaries obtained from MetaScanner */
296    void verifyRegionsUsingMetaTableAccessor() throws Exception {
297      List<RegionInfo> regionList = MetaTableAccessor.getTableRegions(connection, tableName, true);
298      verifyTableRegions(regionList.stream()
299        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
300      regionList = MetaTableAccessor.getAllRegions(connection, true);
301      verifyTableRegions(regionList.stream()
302        .collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
303    }
304
305    /** verify region boundaries obtained from HTable.getStartEndKeys() */
306    void verifyRegionsUsingHTable() throws IOException {
307      try (RegionLocator rl = connection.getRegionLocator(tableName)) {
308        Pair<byte[][], byte[][]> keys = rl.getStartEndKeys();
309        verifyStartEndKeys(keys);
310
311        Set<RegionInfo> regions = new TreeSet<>(RegionInfo.COMPARATOR);
312        for (HRegionLocation loc : rl.getAllRegionLocations()) {
313          regions.add(loc.getRegion());
314        }
315        verifyTableRegions(regions);
316      }
317    }
318
319    void verify() throws Exception {
320      verifyRegionsUsingMetaTableAccessor();
321      verifyRegionsUsingHTable();
322    }
323
324    void verifyTableRegions(Set<RegionInfo> regions) {
325      log("Verifying " + regions.size() + " regions: " + regions);
326
327      byte[][] startKeys = new byte[regions.size()][];
328      byte[][] endKeys = new byte[regions.size()][];
329
330      int i = 0;
331      for (RegionInfo region : regions) {
332        startKeys[i] = region.getStartKey();
333        endKeys[i] = region.getEndKey();
334        i++;
335      }
336
337      Pair<byte[][], byte[][]> keys = new Pair<>(startKeys, endKeys);
338      verifyStartEndKeys(keys);
339    }
340
341    void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
342      byte[][] startKeys = keys.getFirst();
343      byte[][] endKeys = keys.getSecond();
344      assertEquals(startKeys.length, endKeys.length);
345      assertTrue(startKeys.length > 0, "Found 0 regions for the table");
346
347      assertArrayEquals(HConstants.EMPTY_START_ROW, startKeys[0],
348        "Start key for the first region is not byte[0]");
349      byte[] prevEndKey = HConstants.EMPTY_START_ROW;
350
351      // ensure that we do not have any gaps
352      for (int i = 0; i < startKeys.length; i++) {
353        assertArrayEquals(prevEndKey, startKeys[i],
354          "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey)
355            + " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]));
356        prevEndKey = endKeys[i];
357      }
358      assertArrayEquals(HConstants.EMPTY_END_ROW, endKeys[endKeys.length - 1],
359        "End key for the last region is not byte[0]");
360    }
361
362    @Override
363    protected void chore() {
364      try {
365        verify();
366      } catch (Throwable ex) {
367        this.ex = ex;
368        getStopper().stop("caught exception");
369      }
370    }
371  }
372
373  public static void log(String msg) {
374    LOG.info(msg);
375  }
376
377  /* some utility methods for split tests */
378
379  public static void flushAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
380    throws IOException, InterruptedException {
381    log("flushing region: " + Bytes.toStringBinary(regionName));
382    admin.flushRegion(regionName);
383    log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
384    Threads.sleepWithoutInterrupt(500);
385    while (rs.getOnlineRegion(regionName).getMemStoreDataSize() > 0) {
386      Threads.sleep(50);
387    }
388  }
389
390  public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
391    throws IOException, InterruptedException {
392    log("Compacting region: " + Bytes.toStringBinary(regionName));
393    // Wait till its online before we do compact else it comes back with NoServerForRegionException
394    try {
395      TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
396        @Override
397        public boolean evaluate() throws Exception {
398          return rs.getServerName().equals(
399            MetaTableAccessor.getRegionLocation(admin.getConnection(), regionName).getServerName());
400        }
401      });
402    } catch (Exception e) {
403      throw new IOException(e);
404    }
405    admin.majorCompactRegion(regionName);
406    log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
407    Threads.sleepWithoutInterrupt(500);
408    outer: for (;;) {
409      for (Store store : rs.getOnlineRegion(regionName).getStores()) {
410        if (store.getStorefilesCount() > 1) {
411          Threads.sleep(50);
412          continue outer;
413        }
414      }
415      break;
416    }
417  }
418
419  /**
420   * Blocks until the region split is complete in hbase:meta and region server opens the daughters
421   */
422  public static void blockUntilRegionSplit(Configuration conf, long timeout,
423    final byte[] regionName, boolean waitForDaughters) throws IOException, InterruptedException {
424    long start = EnvironmentEdgeManager.currentTime();
425    log("blocking until region is split:" + Bytes.toStringBinary(regionName));
426    RegionInfo daughterA = null, daughterB = null;
427    try (Connection conn = ConnectionFactory.createConnection(conf);
428      Table metaTable = conn.getTable(TableName.META_TABLE_NAME)) {
429      Result result = null;
430      RegionInfo region = null;
431      while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
432        result = metaTable.get(new Get(regionName));
433        if (result == null) {
434          break;
435        }
436
437        region = CatalogFamilyFormat.getRegionInfo(result);
438        if (region.isSplitParent()) {
439          log("found parent region: " + region.toString());
440          PairOfSameType<RegionInfo> pair = MetaTableAccessor.getDaughterRegions(result);
441          daughterA = pair.getFirst();
442          daughterB = pair.getSecond();
443          break;
444        }
445        Threads.sleep(100);
446      }
447      if (daughterA == null || daughterB == null) {
448        throw new IOException("Failed to get daughters, daughterA=" + daughterA + ", daughterB="
449          + daughterB + ", timeout=" + timeout + ", result=" + result + ", regionName="
450          + Bytes.toString(regionName) + ", region=" + region);
451      }
452
453      // if we are here, this means the region split is complete or timed out
454      if (waitForDaughters) {
455        long rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
456        blockUntilRegionIsInMeta(conn, rem, daughterA);
457
458        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
459        blockUntilRegionIsInMeta(conn, rem, daughterB);
460
461        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
462        blockUntilRegionIsOpened(conf, rem, daughterA);
463
464        rem = timeout - (EnvironmentEdgeManager.currentTime() - start);
465        blockUntilRegionIsOpened(conf, rem, daughterB);
466
467        // Compacting the new region to make sure references can be cleaned up
468        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
469          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterA.getRegionName());
470        compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
471          TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterB.getRegionName());
472
473        removeCompactedFiles(conn, timeout, daughterA);
474        removeCompactedFiles(conn, timeout, daughterB);
475      }
476    }
477  }
478
479  public static void removeCompactedFiles(Connection conn, long timeout, RegionInfo hri)
480    throws IOException, InterruptedException {
481    log("remove compacted files for : " + hri.getRegionNameAsString());
482    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(hri.getTable());
483    regions.stream().forEach(r -> {
484      try {
485        r.getStores().get(0).closeAndArchiveCompactedFiles();
486      } catch (IOException ioe) {
487        LOG.error("failed in removing compacted file", ioe);
488      }
489    });
490  }
491
492  public static void blockUntilRegionIsInMeta(Connection conn, long timeout, RegionInfo hri)
493    throws IOException, InterruptedException {
494    log("blocking until region is in META: " + hri.getRegionNameAsString());
495    long start = EnvironmentEdgeManager.currentTime();
496    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
497      HRegionLocation loc = MetaTableAccessor.getRegionLocation(conn, hri);
498      if (loc != null && !loc.getRegion().isOffline()) {
499        log("found region in META: " + hri.getRegionNameAsString());
500        break;
501      }
502      Threads.sleep(100);
503    }
504  }
505
506  public static void blockUntilRegionIsOpened(Configuration conf, long timeout, RegionInfo hri)
507    throws IOException, InterruptedException {
508    log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
509    long start = EnvironmentEdgeManager.currentTime();
510    try (Connection conn = ConnectionFactory.createConnection(conf);
511      Table table = conn.getTable(hri.getTable())) {
512      byte[] row = hri.getStartKey();
513      // Check for null/empty row. If we find one, use a key that is likely to be in first region.
514      if (row == null || row.length <= 0) {
515        row = new byte[] { '0' };
516      }
517      Get get = new Get(row);
518      while (EnvironmentEdgeManager.currentTime() - start < timeout) {
519        try {
520          table.get(get);
521          break;
522        } catch (IOException ex) {
523          // wait some more
524        }
525        Threads.sleep(100);
526      }
527    }
528  }
529}