001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
021import java.io.IOException;
022import java.util.List;
023import java.util.Random;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.Executors;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.atomic.AtomicReference;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionInfo;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.TestMetaTableAccessor;
036import org.apache.hadoop.hbase.client.Consistency;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.io.hfile.HFileScanner;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.testclassification.RegionServerTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
047import org.apache.hadoop.hbase.util.Threads;
048import org.apache.hadoop.hdfs.DFSConfigKeys;
049import org.apache.hadoop.util.StringUtils;
050import org.junit.AfterClass;
051import org.junit.Assert;
052import org.junit.BeforeClass;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
059import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
060import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
062
063/**
064 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
065 * cluster. See {@link TestRegionServerNoMaster}.
066 */
067@Category({RegionServerTests.class, LargeTests.class})
068public class TestRegionReplicas {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestRegionReplicas.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicas.class);
075
076  private static final int NB_SERVERS = 1;
077  private static Table table;
078  private static final byte[] row = Bytes.toBytes("TestRegionReplicas");
079
080  private static HRegionInfo hriPrimary;
081  private static HRegionInfo hriSecondary;
082
083  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
084  private static final byte[] f = HConstants.CATALOG_FAMILY;
085
086  @BeforeClass
087  public static void before() throws Exception {
088    // Reduce the hdfs block size and prefetch to trigger the file-link reopen
089    // when the file is moved to archive (e.g. compaction)
090    HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
091    HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
092    HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
093
094    HTU.startMiniCluster(NB_SERVERS);
095    final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName());
096
097    // Create table then get the single region for our new table.
098    table = HTU.createTable(tableName, f);
099
100    try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
101      hriPrimary = locator.getRegionLocation(row, false).getRegionInfo();
102    }
103
104    // mock a secondary region info to open
105    hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
106        hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
107
108    // No master
109    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
110  }
111
112  @AfterClass
113  public static void afterClass() throws Exception {
114    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
115    table.close();
116    HTU.shutdownMiniCluster();
117  }
118
119  private HRegionServer getRS() {
120    return HTU.getMiniHBaseCluster().getRegionServer(0);
121  }
122
123  @Test
124  public void testOpenRegionReplica() throws Exception {
125    openRegion(HTU, getRS(), hriSecondary);
126    try {
127      //load some data to primary
128      HTU.loadNumericRows(table, f, 0, 1000);
129
130      // assert that we can read back from primary
131      Assert.assertEquals(1000, HTU.countRows(table));
132    } finally {
133      HTU.deleteNumericRows(table, f, 0, 1000);
134      closeRegion(HTU, getRS(), hriSecondary);
135    }
136  }
137
138  /** Tests that the meta location is saved for secondary regions */
139  @Test
140  public void testRegionReplicaUpdatesMetaLocation() throws Exception {
141    openRegion(HTU, getRS(), hriSecondary);
142    Table meta = null;
143    try {
144      meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME);
145      TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName()
146        , getRS().getServerName(), -1, 1, false);
147    } finally {
148      if (meta != null) {
149        meta.close();
150      }
151      closeRegion(HTU, getRS(), hriSecondary);
152    }
153  }
154
155  @Test
156  public void testRegionReplicaGets() throws Exception {
157    try {
158      //load some data to primary
159      HTU.loadNumericRows(table, f, 0, 1000);
160      // assert that we can read back from primary
161      Assert.assertEquals(1000, HTU.countRows(table));
162      // flush so that region replica can read
163      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
164      region.flush(true);
165
166      openRegion(HTU, getRS(), hriSecondary);
167
168      // first try directly against region
169      region = getRS().getRegion(hriSecondary.getEncodedName());
170      assertGet(region, 42, true);
171
172      assertGetRpc(hriSecondary, 42, true);
173    } finally {
174      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
175      closeRegion(HTU, getRS(), hriSecondary);
176    }
177  }
178
179  @Test
180  public void testGetOnTargetRegionReplica() throws Exception {
181    try {
182      //load some data to primary
183      HTU.loadNumericRows(table, f, 0, 1000);
184      // assert that we can read back from primary
185      Assert.assertEquals(1000, HTU.countRows(table));
186      // flush so that region replica can read
187      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
188      region.flush(true);
189
190      openRegion(HTU, getRS(), hriSecondary);
191
192      // try directly Get against region replica
193      byte[] row = Bytes.toBytes(String.valueOf(42));
194      Get get = new Get(row);
195      get.setConsistency(Consistency.TIMELINE);
196      get.setReplicaId(1);
197      Result result = table.get(get);
198      Assert.assertArrayEquals(row, result.getValue(f, null));
199    } finally {
200      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
201      closeRegion(HTU, getRS(), hriSecondary);
202    }
203  }
204
205  private void assertGet(Region region, int value, boolean expect) throws IOException {
206    byte[] row = Bytes.toBytes(String.valueOf(value));
207    Get get = new Get(row);
208    Result result = region.get(get);
209    if (expect) {
210      Assert.assertArrayEquals(row, result.getValue(f, null));
211    } else {
212      result.isEmpty();
213    }
214  }
215
216  // build a mock rpc
217  private void assertGetRpc(HRegionInfo info, int value, boolean expect)
218      throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
219    byte[] row = Bytes.toBytes(String.valueOf(value));
220    Get get = new Get(row);
221    ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
222    ClientProtos.GetResponse getResp =  getRS().getRSRpcServices().get(null, getReq);
223    Result result = ProtobufUtil.toResult(getResp.getResult());
224    if (expect) {
225      Assert.assertArrayEquals(row, result.getValue(f, null));
226    } else {
227      result.isEmpty();
228    }
229  }
230
231  private void restartRegionServer() throws Exception {
232    afterClass();
233    before();
234  }
235
236  @Test
237  public void testRefresStoreFiles() throws Exception {
238    // enable store file refreshing
239    final int refreshPeriod = 2000; // 2 sec
240    HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
241    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
242      refreshPeriod);
243    // restart the region server so that it starts the refresher chore
244    restartRegionServer();
245
246    try {
247      LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
248      openRegion(HTU, getRS(), hriSecondary);
249
250      //load some data to primary
251      LOG.info("Loading data to primary region");
252      HTU.loadNumericRows(table, f, 0, 1000);
253      // assert that we can read back from primary
254      Assert.assertEquals(1000, HTU.countRows(table));
255      // flush so that region replica can read
256      LOG.info("Flushing primary region");
257      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
258      region.flush(true);
259      HRegion primaryRegion = region;
260
261      // ensure that chore is run
262      LOG.info("Sleeping for " + (4 * refreshPeriod));
263      Threads.sleep(4 * refreshPeriod);
264
265      LOG.info("Checking results from secondary region replica");
266      Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
267      Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
268
269      assertGet(secondaryRegion, 42, true);
270      assertGetRpc(hriSecondary, 42, true);
271      assertGetRpc(hriSecondary, 1042, false);
272
273      //load some data to primary
274      HTU.loadNumericRows(table, f, 1000, 1100);
275      region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
276      region.flush(true);
277
278      HTU.loadNumericRows(table, f, 2000, 2100);
279      region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
280      region.flush(true);
281
282      // ensure that chore is run
283      Threads.sleep(4 * refreshPeriod);
284
285      assertGetRpc(hriSecondary, 42, true);
286      assertGetRpc(hriSecondary, 1042, true);
287      assertGetRpc(hriSecondary, 2042, true);
288
289      // ensure that we see the 3 store files
290      Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
291
292      // force compaction
293      HTU.compact(table.getName(), true);
294
295      long wakeUpTime = System.currentTimeMillis() + 4 * refreshPeriod;
296      while (System.currentTimeMillis() < wakeUpTime) {
297        assertGetRpc(hriSecondary, 42, true);
298        assertGetRpc(hriSecondary, 1042, true);
299        assertGetRpc(hriSecondary, 2042, true);
300        Threads.sleep(10);
301      }
302
303      // ensure that we see the compacted file only
304      // This will be 4 until the cleaner chore runs
305      Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount());
306
307    } finally {
308      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
309      closeRegion(HTU, getRS(), hriSecondary);
310    }
311  }
312
313  @Test
314  public void testFlushAndCompactionsInPrimary() throws Exception {
315
316    long runtime = 30 * 1000;
317    // enable store file refreshing
318    final int refreshPeriod = 100; // 100ms refresh is a lot
319    HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3);
320    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
321      refreshPeriod);
322    // restart the region server so that it starts the refresher chore
323    restartRegionServer();
324    final int startKey = 0, endKey = 1000;
325
326    try {
327      openRegion(HTU, getRS(), hriSecondary);
328
329      //load some data to primary so that reader won't fail
330      HTU.loadNumericRows(table, f, startKey, endKey);
331      TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
332      // ensure that chore is run
333      Threads.sleep(2 * refreshPeriod);
334
335      final AtomicBoolean running = new AtomicBoolean(true);
336      @SuppressWarnings("unchecked")
337      final AtomicReference<Exception>[] exceptions = new AtomicReference[3];
338      for (int i=0; i < exceptions.length; i++) {
339        exceptions[i] = new AtomicReference<>();
340      }
341
342      Runnable writer = new Runnable() {
343        int key = startKey;
344        @Override
345        public void run() {
346          try {
347            while (running.get()) {
348              byte[] data = Bytes.toBytes(String.valueOf(key));
349              Put put = new Put(data);
350              put.addColumn(f, null, data);
351              table.put(put);
352              key++;
353              if (key == endKey) {
354                key = startKey;
355              }
356            }
357          } catch (Exception ex) {
358            LOG.warn(ex.toString(), ex);
359            exceptions[0].compareAndSet(null, ex);
360          }
361        }
362      };
363
364      Runnable flusherCompactor = new Runnable() {
365        Random random = new Random();
366        @Override
367        public void run() {
368          try {
369            while (running.get()) {
370              // flush or compact
371              if (random.nextBoolean()) {
372                TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
373              } else {
374                HTU.compact(table.getName(), random.nextBoolean());
375              }
376            }
377          } catch (Exception ex) {
378            LOG.warn(ex.toString(), ex);
379            exceptions[1].compareAndSet(null, ex);
380          }
381        }
382      };
383
384      Runnable reader = new Runnable() {
385        Random random = new Random();
386        @Override
387        public void run() {
388          try {
389            while (running.get()) {
390              // whether to do a close and open
391              if (random.nextInt(10) == 0) {
392                try {
393                  closeRegion(HTU, getRS(), hriSecondary);
394                } catch (Exception ex) {
395                  LOG.warn("Failed closing the region " + hriSecondary + " "  +
396                    StringUtils.stringifyException(ex));
397                  exceptions[2].compareAndSet(null, ex);
398                }
399                try {
400                  openRegion(HTU, getRS(), hriSecondary);
401                } catch (Exception ex) {
402                  LOG.warn("Failed opening the region " + hriSecondary + " "  +
403                    StringUtils.stringifyException(ex));
404                  exceptions[2].compareAndSet(null, ex);
405                }
406              }
407
408              int key = random.nextInt(endKey - startKey) + startKey;
409              assertGetRpc(hriSecondary, key, true);
410            }
411          } catch (Exception ex) {
412            LOG.warn("Failed getting the value in the region " + hriSecondary + " "  +
413              StringUtils.stringifyException(ex));
414            exceptions[2].compareAndSet(null, ex);
415          }
416        }
417      };
418
419      LOG.info("Starting writer and reader, secondary={}", hriSecondary.getEncodedName());
420      ExecutorService executor = Executors.newFixedThreadPool(3);
421      executor.submit(writer);
422      executor.submit(flusherCompactor);
423      executor.submit(reader);
424
425      // wait for threads
426      Threads.sleep(runtime);
427      running.set(false);
428      executor.shutdown();
429      executor.awaitTermination(30, TimeUnit.SECONDS);
430
431      for (AtomicReference<Exception> exRef : exceptions) {
432        Assert.assertNull(exRef.get());
433      }
434    } finally {
435      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
436      try {
437        closeRegion(HTU, getRS(), hriSecondary);
438      } catch (ServiceException e) {
439        LOG.info("Closing wrong region {}", hriSecondary, e);
440      }
441    }
442  }
443
444  @Test
445  public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception {
446    // disable the store file refresh chore (we do this by hand)
447    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
448    restartRegionServer();
449
450    try {
451      LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
452      openRegion(HTU, getRS(), hriSecondary);
453
454      // load some data to primary
455      LOG.info("Loading data to primary region");
456      for (int i = 0; i < 3; ++i) {
457        HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000);
458        HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
459        region.flush(true);
460      }
461
462      HRegion primaryRegion = getRS().getRegion(hriPrimary.getEncodedName());
463      Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount());
464
465      // Refresh store files on the secondary
466      Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
467      secondaryRegion.getStore(f).refreshStoreFiles();
468      Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
469
470      // force compaction
471      LOG.info("Force Major compaction on primary region " + hriPrimary);
472      primaryRegion.compact(true);
473      Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
474      List<RegionServerThread> regionServerThreads = HTU.getMiniHBaseCluster()
475          .getRegionServerThreads();
476      HRegionServer hrs = null;
477      for (RegionServerThread rs : regionServerThreads) {
478        if (rs.getRegionServer()
479            .getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) {
480          hrs = rs.getRegionServer();
481          break;
482        }
483      }
484      CompactedHFilesDischarger cleaner =
485          new CompactedHFilesDischarger(100, null, hrs, false);
486      cleaner.chore();
487      // scan all the hfiles on the secondary.
488      // since there are no read on the secondary when we ask locations to
489      // the NN a FileNotFound exception will be returned and the FileLink
490      // should be able to deal with it giving us all the result we expect.
491      int keys = 0;
492      int sum = 0;
493      for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) {
494        // Our file does not exist anymore. was moved by the compaction above.
495        LOG.debug(Boolean.toString(getRS().getFileSystem().exists(sf.getPath())));
496        Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath()));
497
498        HFileScanner scanner = sf.getReader().getScanner(false, false);
499        scanner.seekTo();
500        do {
501          keys++;
502
503          Cell cell = scanner.getCell();
504          sum += Integer.parseInt(Bytes.toString(cell.getRowArray(),
505            cell.getRowOffset(), cell.getRowLength()));
506        } while (scanner.next());
507      }
508      Assert.assertEquals(3000, keys);
509      Assert.assertEquals(4498500, sum);
510    } finally {
511      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
512      closeRegion(HTU, getRS(), hriSecondary);
513    }
514  }
515}