001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.Random;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ThreadLocalRandom;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicReference;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.TestMetaTableAccessor;
038import org.apache.hadoop.hbase.client.Consistency;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.client.RegionLocator;
043import org.apache.hadoop.hbase.client.RegionReplicaUtil;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.io.hfile.HFileScanner;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
052import org.apache.hadoop.hbase.util.Threads;
053import org.apache.hadoop.hdfs.DFSConfigKeys;
054import org.apache.hadoop.util.StringUtils;
055import org.junit.AfterClass;
056import org.junit.Assert;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
065
066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
067import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
069
070/**
071 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster.
072 * See {@link TestRegionServerNoMaster}.
073 */
074@Category({ RegionServerTests.class, LargeTests.class })
075public class TestRegionReplicas {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079    HBaseClassTestRule.forClass(TestRegionReplicas.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicas.class);
082
083  private static final int NB_SERVERS = 1;
084  private static Table table;
085  private static final byte[] row = Bytes.toBytes("TestRegionReplicas");
086
087  private static RegionInfo hriPrimary;
088  private static RegionInfo hriSecondary;
089
090  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
091  private static final byte[] f = HConstants.CATALOG_FAMILY;
092
093  @BeforeClass
094  public static void before() throws Exception {
095    // Reduce the hdfs block size and prefetch to trigger the file-link reopen
096    // when the file is moved to archive (e.g. compaction)
097    HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
098    HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
099    HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
100
101    HTU.startMiniCluster(NB_SERVERS);
102    final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName());
103
104    // Create table then get the single region for our new table.
105    table = HTU.createTable(tableName, f);
106
107    try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
108      hriPrimary = locator.getRegionLocation(row, false).getRegion();
109    }
110
111    // mock a secondary region info to open
112    hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
113
114    // No master
115    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
116  }
117
118  @AfterClass
119  public static void afterClass() throws Exception {
120    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
121    table.close();
122    HTU.shutdownMiniCluster();
123  }
124
125  private HRegionServer getRS() {
126    return HTU.getMiniHBaseCluster().getRegionServer(0);
127  }
128
129  @Test
130  public void testOpenRegionReplica() throws Exception {
131    openRegion(HTU, getRS(), hriSecondary);
132    try {
133      // load some data to primary
134      HTU.loadNumericRows(table, f, 0, 1000);
135
136      // assert that we can read back from primary
137      Assert.assertEquals(1000, HBaseTestingUtil.countRows(table));
138    } finally {
139      HTU.deleteNumericRows(table, f, 0, 1000);
140      closeRegion(HTU, getRS(), hriSecondary);
141    }
142  }
143
144  /** Tests that the meta location is saved for secondary regions */
145  @Test
146  public void testRegionReplicaUpdatesMetaLocation() throws Exception {
147    openRegion(HTU, getRS(), hriSecondary);
148    Table meta = null;
149    try {
150      meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME);
151      TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName(),
152        getRS().getServerName(), -1, 1, false);
153    } finally {
154      if (meta != null) {
155        meta.close();
156      }
157      closeRegion(HTU, getRS(), hriSecondary);
158    }
159  }
160
161  @Test
162  public void testRegionReplicaGets() throws Exception {
163    try {
164      // load some data to primary
165      HTU.loadNumericRows(table, f, 0, 1000);
166      // assert that we can read back from primary
167      Assert.assertEquals(1000, HBaseTestingUtil.countRows(table));
168      // flush so that region replica can read
169      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
170      region.flush(true);
171
172      openRegion(HTU, getRS(), hriSecondary);
173
174      // first try directly against region
175      region = getRS().getRegion(hriSecondary.getEncodedName());
176      assertGet(region, 42, true);
177
178      assertGetRpc(hriSecondary, 42, true);
179    } finally {
180      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
181      closeRegion(HTU, getRS(), hriSecondary);
182    }
183  }
184
185  @Test
186  public void testGetOnTargetRegionReplica() throws Exception {
187    try {
188      // load some data to primary
189      HTU.loadNumericRows(table, f, 0, 1000);
190      // assert that we can read back from primary
191      Assert.assertEquals(1000, HBaseTestingUtil.countRows(table));
192      // flush so that region replica can read
193      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
194      region.flush(true);
195
196      openRegion(HTU, getRS(), hriSecondary);
197
198      // try directly Get against region replica
199      byte[] row = Bytes.toBytes(String.valueOf(42));
200      Get get = new Get(row);
201      get.setConsistency(Consistency.TIMELINE);
202      get.setReplicaId(1);
203      Result result = table.get(get);
204      Assert.assertArrayEquals(row, result.getValue(f, null));
205    } finally {
206      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
207      closeRegion(HTU, getRS(), hriSecondary);
208    }
209  }
210
211  private void assertGet(Region region, int value, boolean expect) throws IOException {
212    byte[] row = Bytes.toBytes(String.valueOf(value));
213    Get get = new Get(row);
214    Result result = region.get(get);
215    if (expect) {
216      Assert.assertArrayEquals(row, result.getValue(f, null));
217    } else {
218      result.isEmpty();
219    }
220  }
221
222  // build a mock rpc
223  private void assertGetRpc(RegionInfo info, int value, boolean expect)
224    throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
225    byte[] row = Bytes.toBytes(String.valueOf(value));
226    Get get = new Get(row);
227    ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
228    ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq);
229    Result result = ProtobufUtil.toResult(getResp.getResult());
230    if (expect) {
231      Assert.assertArrayEquals(row, result.getValue(f, null));
232    } else {
233      result.isEmpty();
234    }
235  }
236
237  private void restartRegionServer() throws Exception {
238    afterClass();
239    before();
240  }
241
242  @Test
243  public void testRefresStoreFiles() throws Exception {
244    // enable store file refreshing
245    final int refreshPeriod = 2000; // 2 sec
246    HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
247    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
248      refreshPeriod);
249    // restart the region server so that it starts the refresher chore
250    restartRegionServer();
251
252    try {
253      LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
254      openRegion(HTU, getRS(), hriSecondary);
255
256      // load some data to primary
257      LOG.info("Loading data to primary region");
258      HTU.loadNumericRows(table, f, 0, 1000);
259      // assert that we can read back from primary
260      Assert.assertEquals(1000, HBaseTestingUtil.countRows(table));
261      // flush so that region replica can read
262      LOG.info("Flushing primary region");
263      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
264      region.flush(true);
265
266      // ensure that chore is run
267      LOG.info("Sleeping for " + (4 * refreshPeriod));
268      Threads.sleep(4 * refreshPeriod);
269
270      LOG.info("Checking results from secondary region replica");
271      Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
272      Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
273
274      assertGet(secondaryRegion, 42, true);
275      assertGetRpc(hriSecondary, 42, true);
276      assertGetRpc(hriSecondary, 1042, false);
277
278      // load some data to primary
279      HTU.loadNumericRows(table, f, 1000, 1100);
280      region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
281      region.flush(true);
282
283      HTU.loadNumericRows(table, f, 2000, 2100);
284      region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
285      region.flush(true);
286
287      // ensure that chore is run
288      Threads.sleep(4 * refreshPeriod);
289
290      assertGetRpc(hriSecondary, 42, true);
291      assertGetRpc(hriSecondary, 1042, true);
292      assertGetRpc(hriSecondary, 2042, true);
293
294      // ensure that we see the 3 store files
295      Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
296
297      // force compaction
298      HTU.compact(table.getName(), true);
299
300      long wakeUpTime = EnvironmentEdgeManager.currentTime() + 4 * refreshPeriod;
301      while (EnvironmentEdgeManager.currentTime() < wakeUpTime) {
302        assertGetRpc(hriSecondary, 42, true);
303        assertGetRpc(hriSecondary, 1042, true);
304        assertGetRpc(hriSecondary, 2042, true);
305        Threads.sleep(10);
306      }
307
308      // ensure that we see the compacted file only
309      // This will be 4 until the cleaner chore runs
310      Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount());
311
312    } finally {
313      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
314      closeRegion(HTU, getRS(), hriSecondary);
315    }
316  }
317
318  @Test
319  public void testFlushAndCompactionsInPrimary() throws Exception {
320
321    long runtime = 30 * 1000;
322    // enable store file refreshing
323    final int refreshPeriod = 100; // 100ms refresh is a lot
324    HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3);
325    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
326      refreshPeriod);
327    // restart the region server so that it starts the refresher chore
328    restartRegionServer();
329    final int startKey = 0, endKey = 1000;
330
331    try {
332      openRegion(HTU, getRS(), hriSecondary);
333
334      // load some data to primary so that reader won't fail
335      HTU.loadNumericRows(table, f, startKey, endKey);
336      TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
337      // ensure that chore is run
338      Threads.sleep(2 * refreshPeriod);
339
340      final AtomicBoolean running = new AtomicBoolean(true);
341      @SuppressWarnings("unchecked")
342      final AtomicReference<Exception>[] exceptions = new AtomicReference[3];
343      for (int i = 0; i < exceptions.length; i++) {
344        exceptions[i] = new AtomicReference<>();
345      }
346
347      Runnable writer = new Runnable() {
348        int key = startKey;
349
350        @Override
351        public void run() {
352          try {
353            while (running.get()) {
354              byte[] data = Bytes.toBytes(String.valueOf(key));
355              Put put = new Put(data);
356              put.addColumn(f, null, data);
357              table.put(put);
358              key++;
359              if (key == endKey) {
360                key = startKey;
361              }
362            }
363          } catch (Exception ex) {
364            LOG.warn(ex.toString(), ex);
365            exceptions[0].compareAndSet(null, ex);
366          }
367        }
368      };
369
370      Runnable flusherCompactor = new Runnable() {
371        Random random = ThreadLocalRandom.current();
372
373        public void run() {
374          try {
375            while (running.get()) {
376              // flush or compact
377              if (random.nextBoolean()) {
378                TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
379              } else {
380                HTU.compact(table.getName(), random.nextBoolean());
381              }
382            }
383          } catch (Exception ex) {
384            LOG.warn(ex.toString(), ex);
385            exceptions[1].compareAndSet(null, ex);
386          }
387        }
388      };
389
390      Runnable reader = new Runnable() {
391        @Override
392        public void run() {
393          try {
394            Random random = ThreadLocalRandom.current();
395            while (running.get()) {
396              // whether to do a close and open
397              if (random.nextInt(10) == 0) {
398                try {
399                  closeRegion(HTU, getRS(), hriSecondary);
400                } catch (Exception ex) {
401                  LOG.warn("Failed closing the region " + hriSecondary + " "
402                    + StringUtils.stringifyException(ex));
403                  exceptions[2].compareAndSet(null, ex);
404                }
405                try {
406                  openRegion(HTU, getRS(), hriSecondary);
407                } catch (Exception ex) {
408                  LOG.warn("Failed opening the region " + hriSecondary + " "
409                    + StringUtils.stringifyException(ex));
410                  exceptions[2].compareAndSet(null, ex);
411                }
412              }
413
414              int key = random.nextInt(endKey - startKey) + startKey;
415              assertGetRpc(hriSecondary, key, true);
416            }
417          } catch (Exception ex) {
418            LOG.warn("Failed getting the value in the region " + hriSecondary + " "
419              + StringUtils.stringifyException(ex));
420            exceptions[2].compareAndSet(null, ex);
421          }
422        }
423      };
424
425      LOG.info("Starting writer and reader, secondary={}", hriSecondary.getEncodedName());
426      ExecutorService executor = Executors.newFixedThreadPool(3);
427      executor.submit(writer);
428      executor.submit(flusherCompactor);
429      executor.submit(reader);
430
431      // wait for threads
432      Threads.sleep(runtime);
433      running.set(false);
434      executor.shutdown();
435      executor.awaitTermination(30, TimeUnit.SECONDS);
436
437      for (AtomicReference<Exception> exRef : exceptions) {
438        Assert.assertNull(exRef.get());
439      }
440    } finally {
441      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
442      try {
443        closeRegion(HTU, getRS(), hriSecondary);
444      } catch (ServiceException e) {
445        LOG.info("Closing wrong region {}", hriSecondary, e);
446      }
447    }
448  }
449
450  @Test
451  public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception {
452    // disable the store file refresh chore (we do this by hand)
453    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
454    restartRegionServer();
455
456    try {
457      LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
458      openRegion(HTU, getRS(), hriSecondary);
459
460      // load some data to primary
461      LOG.info("Loading data to primary region");
462      for (int i = 0; i < 3; ++i) {
463        HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000);
464        HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
465        region.flush(true);
466      }
467
468      HRegion primaryRegion = getRS().getRegion(hriPrimary.getEncodedName());
469      Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount());
470
471      // Refresh store files on the secondary
472      Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
473      secondaryRegion.getStore(f).refreshStoreFiles();
474      Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
475
476      // force compaction
477      LOG.info("Force Major compaction on primary region " + hriPrimary);
478      primaryRegion.compact(true);
479      Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
480      List<RegionServerThread> regionServerThreads =
481        HTU.getMiniHBaseCluster().getRegionServerThreads();
482      HRegionServer hrs = null;
483      for (RegionServerThread rs : regionServerThreads) {
484        if (
485          rs.getRegionServer().getOnlineRegion(primaryRegion.getRegionInfo().getRegionName())
486              != null
487        ) {
488          hrs = rs.getRegionServer();
489          break;
490        }
491      }
492      CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false);
493      cleaner.chore();
494      // scan all the hfiles on the secondary.
495      // since there are no read on the secondary when we ask locations to
496      // the NN a FileNotFound exception will be returned and the FileLink
497      // should be able to deal with it giving us all the result we expect.
498      int keys = 0;
499      int sum = 0;
500      for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) {
501        // Our file does not exist anymore. was moved by the compaction above.
502        LOG.debug(Boolean.toString(getRS().getFileSystem().exists(sf.getPath())));
503        Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath()));
504
505        HFileScanner scanner = sf.getReader().getScanner(false, false);
506        scanner.seekTo();
507        do {
508          keys++;
509
510          Cell cell = scanner.getCell();
511          sum += Integer
512            .parseInt(Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
513        } while (scanner.next());
514      }
515      Assert.assertEquals(3000, keys);
516      Assert.assertEquals(4498500, sum);
517    } finally {
518      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
519      closeRegion(HTU, getRS(), hriSecondary);
520    }
521  }
522}