001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
022import static org.junit.jupiter.api.Assertions.assertArrayEquals;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertFalse;
025import static org.junit.jupiter.api.Assertions.assertNull;
026
027import java.io.IOException;
028import java.util.List;
029import java.util.Random;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicReference;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.TestMetaTableAccessor;
042import org.apache.hadoop.hbase.client.Consistency;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.RegionLocator;
047import org.apache.hadoop.hbase.client.RegionReplicaUtil;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.RegionServerTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
055import org.apache.hadoop.hbase.util.Threads;
056import org.apache.hadoop.hdfs.DFSConfigKeys;
057import org.apache.hadoop.util.StringUtils;
058import org.junit.jupiter.api.AfterAll;
059import org.junit.jupiter.api.BeforeAll;
060import org.junit.jupiter.api.Tag;
061import org.junit.jupiter.api.Test;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
066
067import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
068import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
070
071/**
072 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster.
073 * See {@link TestRegionServerNoMaster}.
074 */
075@Tag(RegionServerTests.TAG)
076@Tag(LargeTests.TAG)
077public class TestRegionReplicas {
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicas.class);
080
081  private static final int NB_SERVERS = 1;
082  private static Table table;
083  private static final byte[] row = Bytes.toBytes("TestRegionReplicas");
084
085  private static RegionInfo hriPrimary;
086  private static RegionInfo hriSecondary;
087
088  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
089  private static final byte[] f = HConstants.CATALOG_FAMILY;
090
091  @BeforeAll
092  public static void before() throws Exception {
093    // Reduce the hdfs block size and prefetch to trigger the file-link reopen
094    // when the file is moved to archive (e.g. compaction)
095    HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
096    HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
097    HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
098
099    HTU.startMiniCluster(NB_SERVERS);
100    final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName());
101
102    // Create table then get the single region for our new table.
103    table = HTU.createTable(tableName, f);
104
105    try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
106      hriPrimary = locator.getRegionLocation(row, false).getRegion();
107    }
108
109    // mock a secondary region info to open
110    hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
111
112    // No master
113    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
114  }
115
116  @AfterAll
117  public static void afterClass() throws Exception {
118    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
119    table.close();
120    HTU.shutdownMiniCluster();
121  }
122
123  private HRegionServer getRS() {
124    return HTU.getMiniHBaseCluster().getRegionServer(0);
125  }
126
127  @Test
128  public void testOpenRegionReplica() throws Exception {
129    openRegion(HTU, getRS(), hriSecondary);
130    try {
131      // load some data to primary
132      HTU.loadNumericRows(table, f, 0, 1000);
133
134      // assert that we can read back from primary
135      assertEquals(1000, HBaseTestingUtil.countRows(table));
136    } finally {
137      HTU.deleteNumericRows(table, f, 0, 1000);
138      closeRegion(HTU, getRS(), hriSecondary);
139    }
140  }
141
142  /** Tests that the meta location is saved for secondary regions */
143  @Test
144  public void testRegionReplicaUpdatesMetaLocation() throws Exception {
145    openRegion(HTU, getRS(), hriSecondary);
146    Table meta = null;
147    try {
148      meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME);
149      TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName(),
150        getRS().getServerName(), -1, 1, false);
151    } finally {
152      if (meta != null) {
153        meta.close();
154      }
155      closeRegion(HTU, getRS(), hriSecondary);
156    }
157  }
158
159  @Test
160  public void testRegionReplicaGets() throws Exception {
161    try {
162      // load some data to primary
163      HTU.loadNumericRows(table, f, 0, 1000);
164      // assert that we can read back from primary
165      assertEquals(1000, HBaseTestingUtil.countRows(table));
166      // flush so that region replica can read
167      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
168      region.flush(true);
169
170      openRegion(HTU, getRS(), hriSecondary);
171
172      // first try directly against region
173      region = getRS().getRegion(hriSecondary.getEncodedName());
174      assertGet(region, 42, true);
175
176      assertGetRpc(hriSecondary, 42, true);
177    } finally {
178      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
179      closeRegion(HTU, getRS(), hriSecondary);
180    }
181  }
182
183  @Test
184  public void testGetOnTargetRegionReplica() throws Exception {
185    try {
186      // load some data to primary
187      HTU.loadNumericRows(table, f, 0, 1000);
188      // assert that we can read back from primary
189      assertEquals(1000, HBaseTestingUtil.countRows(table));
190      // flush so that region replica can read
191      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
192      region.flush(true);
193
194      openRegion(HTU, getRS(), hriSecondary);
195
196      // try directly Get against region replica
197      byte[] row = Bytes.toBytes(String.valueOf(42));
198      Get get = new Get(row);
199      get.setConsistency(Consistency.TIMELINE);
200      get.setReplicaId(1);
201      Result result = table.get(get);
202      assertArrayEquals(row, result.getValue(f, null));
203    } finally {
204      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
205      closeRegion(HTU, getRS(), hriSecondary);
206    }
207  }
208
209  private void assertGet(Region region, int value, boolean expect) throws IOException {
210    byte[] row = Bytes.toBytes(String.valueOf(value));
211    Get get = new Get(row);
212    Result result = region.get(get);
213    if (expect) {
214      assertArrayEquals(row, result.getValue(f, null));
215    } else {
216      result.isEmpty();
217    }
218  }
219
220  // build a mock rpc
221  private void assertGetRpc(RegionInfo info, int value, boolean expect)
222    throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
223    byte[] row = Bytes.toBytes(String.valueOf(value));
224    Get get = new Get(row);
225    ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
226    ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq);
227    Result result = ProtobufUtil.toResult(getResp.getResult());
228    if (expect) {
229      assertArrayEquals(row, result.getValue(f, null));
230    } else {
231      result.isEmpty();
232    }
233  }
234
235  private void restartRegionServer() throws Exception {
236    afterClass();
237    before();
238  }
239
240  @Test
241  public void testRefresStoreFiles() throws Exception {
242    // enable store file refreshing
243    final int refreshPeriod = 2000; // 2 sec
244    HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
245    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
246      refreshPeriod);
247    // restart the region server so that it starts the refresher chore
248    restartRegionServer();
249
250    try {
251      LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
252      openRegion(HTU, getRS(), hriSecondary);
253
254      // load some data to primary
255      LOG.info("Loading data to primary region");
256      HTU.loadNumericRows(table, f, 0, 1000);
257      // assert that we can read back from primary
258      assertEquals(1000, HBaseTestingUtil.countRows(table));
259      // flush so that region replica can read
260      LOG.info("Flushing primary region");
261      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
262      region.flush(true);
263
264      // ensure that chore is run
265      LOG.info("Sleeping for " + (4 * refreshPeriod));
266      Threads.sleep(4 * refreshPeriod);
267
268      LOG.info("Checking results from secondary region replica");
269      Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
270      assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
271
272      assertGet(secondaryRegion, 42, true);
273      assertGetRpc(hriSecondary, 42, true);
274      assertGetRpc(hriSecondary, 1042, false);
275
276      // load some data to primary
277      HTU.loadNumericRows(table, f, 1000, 1100);
278      region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
279      region.flush(true);
280
281      HTU.loadNumericRows(table, f, 2000, 2100);
282      region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
283      region.flush(true);
284
285      // ensure that chore is run
286      Threads.sleep(4 * refreshPeriod);
287
288      assertGetRpc(hriSecondary, 42, true);
289      assertGetRpc(hriSecondary, 1042, true);
290      assertGetRpc(hriSecondary, 2042, true);
291
292      // ensure that we see the 3 store files
293      assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
294
295      // force compaction
296      HTU.compact(table.getName(), true);
297
298      long wakeUpTime = EnvironmentEdgeManager.currentTime() + 4 * refreshPeriod;
299      while (EnvironmentEdgeManager.currentTime() < wakeUpTime) {
300        assertGetRpc(hriSecondary, 42, true);
301        assertGetRpc(hriSecondary, 1042, true);
302        assertGetRpc(hriSecondary, 2042, true);
303        Threads.sleep(10);
304      }
305
306      // ensure that we see the compacted file only
307      // This will be 4 until the cleaner chore runs
308      assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount());
309
310    } finally {
311      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
312      closeRegion(HTU, getRS(), hriSecondary);
313    }
314  }
315
316  @Test
317  public void testFlushAndCompactionsInPrimary() throws Exception {
318
319    long runtime = 30 * 1000;
320    // enable store file refreshing
321    final int refreshPeriod = 100; // 100ms refresh is a lot
322    HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3);
323    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
324      refreshPeriod);
325    // restart the region server so that it starts the refresher chore
326    restartRegionServer();
327    final int startKey = 0, endKey = 1000;
328
329    try {
330      openRegion(HTU, getRS(), hriSecondary);
331
332      // load some data to primary so that reader won't fail
333      HTU.loadNumericRows(table, f, startKey, endKey);
334      TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
335      // ensure that chore is run
336      Threads.sleep(2 * refreshPeriod);
337
338      final AtomicBoolean running = new AtomicBoolean(true);
339      @SuppressWarnings("unchecked")
340      final AtomicReference<Exception>[] exceptions = new AtomicReference[3];
341      for (int i = 0; i < exceptions.length; i++) {
342        exceptions[i] = new AtomicReference<>();
343      }
344
345      Runnable writer = new Runnable() {
346        int key = startKey;
347
348        @Override
349        public void run() {
350          try {
351            while (running.get()) {
352              byte[] data = Bytes.toBytes(String.valueOf(key));
353              Put put = new Put(data);
354              put.addColumn(f, null, data);
355              table.put(put);
356              key++;
357              if (key == endKey) {
358                key = startKey;
359              }
360            }
361          } catch (Exception ex) {
362            LOG.warn(ex.toString(), ex);
363            exceptions[0].compareAndSet(null, ex);
364          }
365        }
366      };
367
368      Runnable flusherCompactor = new Runnable() {
369        Random random = ThreadLocalRandom.current();
370
371        public void run() {
372          try {
373            while (running.get()) {
374              // flush or compact
375              if (random.nextBoolean()) {
376                TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
377              } else {
378                HTU.compact(table.getName(), random.nextBoolean());
379              }
380            }
381          } catch (Exception ex) {
382            LOG.warn(ex.toString(), ex);
383            exceptions[1].compareAndSet(null, ex);
384          }
385        }
386      };
387
388      Runnable reader = new Runnable() {
389        @Override
390        public void run() {
391          try {
392            Random random = ThreadLocalRandom.current();
393            while (running.get()) {
394              // whether to do a close and open
395              if (random.nextInt(10) == 0) {
396                try {
397                  closeRegion(HTU, getRS(), hriSecondary);
398                } catch (Exception ex) {
399                  LOG.warn("Failed closing the region " + hriSecondary + " "
400                    + StringUtils.stringifyException(ex));
401                  exceptions[2].compareAndSet(null, ex);
402                }
403                try {
404                  openRegion(HTU, getRS(), hriSecondary);
405                } catch (Exception ex) {
406                  LOG.warn("Failed opening the region " + hriSecondary + " "
407                    + StringUtils.stringifyException(ex));
408                  exceptions[2].compareAndSet(null, ex);
409                }
410              }
411
412              int key = random.nextInt(endKey - startKey) + startKey;
413              assertGetRpc(hriSecondary, key, true);
414            }
415          } catch (Exception ex) {
416            LOG.warn("Failed getting the value in the region " + hriSecondary + " "
417              + StringUtils.stringifyException(ex));
418            exceptions[2].compareAndSet(null, ex);
419          }
420        }
421      };
422
423      LOG.info("Starting writer and reader, secondary={}", hriSecondary.getEncodedName());
424      ExecutorService executor = Executors.newFixedThreadPool(3);
425      executor.submit(writer);
426      executor.submit(flusherCompactor);
427      executor.submit(reader);
428
429      // wait for threads
430      Threads.sleep(runtime);
431      running.set(false);
432      executor.shutdown();
433      executor.awaitTermination(30, TimeUnit.SECONDS);
434
435      for (AtomicReference<Exception> exRef : exceptions) {
436        assertNull(exRef.get());
437      }
438    } finally {
439      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
440      try {
441        closeRegion(HTU, getRS(), hriSecondary);
442      } catch (ServiceException e) {
443        LOG.info("Closing wrong region {}", hriSecondary, e);
444      }
445    }
446  }
447
448  @Test
449  public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception {
450    // disable the store file refresh chore (we do this by hand)
451    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
452    restartRegionServer();
453
454    try {
455      LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
456      openRegion(HTU, getRS(), hriSecondary);
457
458      // load some data to primary
459      LOG.info("Loading data to primary region");
460      for (int i = 0; i < 3; ++i) {
461        HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000);
462        HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
463        region.flush(true);
464      }
465
466      HRegion primaryRegion = getRS().getRegion(hriPrimary.getEncodedName());
467      assertEquals(3, primaryRegion.getStore(f).getStorefilesCount());
468
469      // Refresh store files on the secondary
470      Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
471      secondaryRegion.getStore(f).refreshStoreFiles();
472      assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
473
474      // force compaction
475      LOG.info("Force Major compaction on primary region " + hriPrimary);
476      primaryRegion.compact(true);
477      assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
478      List<RegionServerThread> regionServerThreads =
479        HTU.getMiniHBaseCluster().getRegionServerThreads();
480      HRegionServer hrs = null;
481      for (RegionServerThread rs : regionServerThreads) {
482        if (
483          rs.getRegionServer().getOnlineRegion(primaryRegion.getRegionInfo().getRegionName())
484              != null
485        ) {
486          hrs = rs.getRegionServer();
487          break;
488        }
489      }
490      CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false);
491      cleaner.chore();
492      // scan all the hfiles on the secondary.
493      // since there are no read on the secondary when we ask locations to
494      // the NN a FileNotFound exception will be returned and the FileLink
495      // should be able to deal with it giving us all the result we expect.
496      int keys = 0;
497      int sum = 0;
498      for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) {
499        // Our file does not exist anymore. was moved by the compaction above.
500        LOG.debug(Boolean.toString(getRS().getFileSystem().exists(sf.getPath())));
501        assertFalse(getRS().getFileSystem().exists(sf.getPath()));
502        sf.initReader();
503        try (StoreFileScanner scanner = sf.getPreadScanner(false, Long.MAX_VALUE, 0, false)) {
504          scanner.seek(KeyValue.LOWESTKEY);
505          for (Cell cell;;) {
506            cell = scanner.next();
507            if (cell == null) {
508              break;
509            }
510            keys++;
511            sum += Integer.parseInt(
512              Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
513          }
514        }
515      }
516      assertEquals(3000, keys);
517      assertEquals(4498500, sum);
518    } finally {
519      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
520      closeRegion(HTU, getRS(), hriSecondary);
521    }
522  }
523}