001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.Random;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.atomic.AtomicReference;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HRegionInfo;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.TestMetaTableAccessor;
038import org.apache.hadoop.hbase.client.Consistency;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.RegionLocator;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.io.hfile.HFileScanner;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.RegionServerTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.hdfs.DFSConfigKeys;
051import org.apache.hadoop.util.StringUtils;
052import org.junit.AfterClass;
053import org.junit.Assert;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
064
065/**
066 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
067 * cluster. See {@link TestRegionServerNoMaster}.
068 */
069@Category({RegionServerTests.class, MediumTests.class})
070public class TestRegionReplicas {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestRegionReplicas.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicas.class);
077
078  private static final int NB_SERVERS = 1;
079  private static Table table;
080  private static final byte[] row = Bytes.toBytes("TestRegionReplicas");
081
082  private static HRegionInfo hriPrimary;
083  private static HRegionInfo hriSecondary;
084
085  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
086  private static final byte[] f = HConstants.CATALOG_FAMILY;
087
088  @BeforeClass
089  public static void before() throws Exception {
090    // Reduce the hdfs block size and prefetch to trigger the file-link reopen
091    // when the file is moved to archive (e.g. compaction)
092    HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
093    HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
094    HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
095
096    HTU.startMiniCluster(NB_SERVERS);
097    final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName());
098
099    // Create table then get the single region for our new table.
100    table = HTU.createTable(tableName, f);
101
102    try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
103      hriPrimary = locator.getRegionLocation(row, false).getRegionInfo();
104    }
105
106    // mock a secondary region info to open
107    hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
108        hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
109
110    // No master
111    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
112  }
113
114  @AfterClass
115  public static void afterClass() throws Exception {
116    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
117    table.close();
118    HTU.shutdownMiniCluster();
119  }
120
121  private HRegionServer getRS() {
122    return HTU.getMiniHBaseCluster().getRegionServer(0);
123  }
124
125  @Test
126  public void testOpenRegionReplica() throws Exception {
127    openRegion(HTU, getRS(), hriSecondary);
128    try {
129      //load some data to primary
130      HTU.loadNumericRows(table, f, 0, 1000);
131
132      // assert that we can read back from primary
133      Assert.assertEquals(1000, HTU.countRows(table));
134    } finally {
135      HTU.deleteNumericRows(table, f, 0, 1000);
136      closeRegion(HTU, getRS(), hriSecondary);
137    }
138  }
139
140  /** Tests that the meta location is saved for secondary regions */
141  @Test
142  public void testRegionReplicaUpdatesMetaLocation() throws Exception {
143    openRegion(HTU, getRS(), hriSecondary);
144    Table meta = null;
145    try {
146      meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME);
147      TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName()
148        , getRS().getServerName(), -1, 1, false);
149    } finally {
150      if (meta != null ) meta.close();
151      closeRegion(HTU, getRS(), hriSecondary);
152    }
153  }
154
155  @Test
156  public void testRegionReplicaGets() throws Exception {
157    try {
158      //load some data to primary
159      HTU.loadNumericRows(table, f, 0, 1000);
160      // assert that we can read back from primary
161      Assert.assertEquals(1000, HTU.countRows(table));
162      // flush so that region replica can read
163      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
164      region.flush(true);
165
166      openRegion(HTU, getRS(), hriSecondary);
167
168      // first try directly against region
169      region = getRS().getRegion(hriSecondary.getEncodedName());
170      assertGet(region, 42, true);
171
172      assertGetRpc(hriSecondary, 42, true);
173    } finally {
174      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
175      closeRegion(HTU, getRS(), hriSecondary);
176    }
177  }
178
179  @Test
180  public void testGetOnTargetRegionReplica() throws Exception {
181    try {
182      //load some data to primary
183      HTU.loadNumericRows(table, f, 0, 1000);
184      // assert that we can read back from primary
185      Assert.assertEquals(1000, HTU.countRows(table));
186      // flush so that region replica can read
187      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
188      region.flush(true);
189
190      openRegion(HTU, getRS(), hriSecondary);
191
192      // try directly Get against region replica
193      byte[] row = Bytes.toBytes(String.valueOf(42));
194      Get get = new Get(row);
195      get.setConsistency(Consistency.TIMELINE);
196      get.setReplicaId(1);
197      Result result = table.get(get);
198      Assert.assertArrayEquals(row, result.getValue(f, null));
199    } finally {
200      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
201      closeRegion(HTU, getRS(), hriSecondary);
202    }
203  }
204
205  private void assertGet(Region region, int value, boolean expect) throws IOException {
206    byte[] row = Bytes.toBytes(String.valueOf(value));
207    Get get = new Get(row);
208    Result result = region.get(get);
209    if (expect) {
210      Assert.assertArrayEquals(row, result.getValue(f, null));
211    } else {
212      result.isEmpty();
213    }
214  }
215
216  // build a mock rpc
217  private void assertGetRpc(HRegionInfo info, int value, boolean expect)
218      throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
219    byte[] row = Bytes.toBytes(String.valueOf(value));
220    Get get = new Get(row);
221    ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
222    ClientProtos.GetResponse getResp =  getRS().getRSRpcServices().get(null, getReq);
223    Result result = ProtobufUtil.toResult(getResp.getResult());
224    if (expect) {
225      Assert.assertArrayEquals(row, result.getValue(f, null));
226    } else {
227      result.isEmpty();
228    }
229  }
230
231  private void restartRegionServer() throws Exception {
232    afterClass();
233    before();
234  }
235
236  @Test
237  public void testRefresStoreFiles() throws Exception {
238    // enable store file refreshing
239    final int refreshPeriod = 2000; // 2 sec
240    HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
241    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
242      refreshPeriod);
243    // restart the region server so that it starts the refresher chore
244    restartRegionServer();
245
246    try {
247      LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
248      openRegion(HTU, getRS(), hriSecondary);
249
250      //load some data to primary
251      LOG.info("Loading data to primary region");
252      HTU.loadNumericRows(table, f, 0, 1000);
253      // assert that we can read back from primary
254      Assert.assertEquals(1000, HTU.countRows(table));
255      // flush so that region replica can read
256      LOG.info("Flushing primary region");
257      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
258      region.flush(true);
259      HRegion primaryRegion = region;
260
261      // ensure that chore is run
262      LOG.info("Sleeping for " + (4 * refreshPeriod));
263      Threads.sleep(4 * refreshPeriod);
264
265      LOG.info("Checking results from secondary region replica");
266      Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
267      Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
268
269      assertGet(secondaryRegion, 42, true);
270      assertGetRpc(hriSecondary, 42, true);
271      assertGetRpc(hriSecondary, 1042, false);
272
273      //load some data to primary
274      HTU.loadNumericRows(table, f, 1000, 1100);
275      region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
276      region.flush(true);
277
278      HTU.loadNumericRows(table, f, 2000, 2100);
279      region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
280      region.flush(true);
281
282      // ensure that chore is run
283      Threads.sleep(4 * refreshPeriod);
284
285      assertGetRpc(hriSecondary, 42, true);
286      assertGetRpc(hriSecondary, 1042, true);
287      assertGetRpc(hriSecondary, 2042, true);
288
289      // ensure that we see the 3 store files
290      Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
291
292      // force compaction
293      HTU.compact(table.getName(), true);
294
295      long wakeUpTime = System.currentTimeMillis() + 4 * refreshPeriod;
296      while (System.currentTimeMillis() < wakeUpTime) {
297        assertGetRpc(hriSecondary, 42, true);
298        assertGetRpc(hriSecondary, 1042, true);
299        assertGetRpc(hriSecondary, 2042, true);
300        Threads.sleep(10);
301      }
302
303      // ensure that we see the compacted file only
304      // This will be 4 until the cleaner chore runs
305      Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount());
306
307    } finally {
308      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
309      closeRegion(HTU, getRS(), hriSecondary);
310    }
311  }
312
313  @Test
314  public void testFlushAndCompactionsInPrimary() throws Exception {
315
316    long runtime = 30 * 1000;
317    // enable store file refreshing
318    final int refreshPeriod = 100; // 100ms refresh is a lot
319    HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3);
320    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod);
321    // restart the region server so that it starts the refresher chore
322    restartRegionServer();
323    final int startKey = 0, endKey = 1000;
324
325    try {
326      openRegion(HTU, getRS(), hriSecondary);
327
328      //load some data to primary so that reader won't fail
329      HTU.loadNumericRows(table, f, startKey, endKey);
330      TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
331      // ensure that chore is run
332      Threads.sleep(2 * refreshPeriod);
333
334      final AtomicBoolean running = new AtomicBoolean(true);
335      @SuppressWarnings("unchecked")
336      final AtomicReference<Exception>[] exceptions = new AtomicReference[3];
337      for (int i=0; i < exceptions.length; i++) {
338        exceptions[i] = new AtomicReference<>();
339      }
340
341      Runnable writer = new Runnable() {
342        int key = startKey;
343        @Override
344        public void run() {
345          try {
346            while (running.get()) {
347              byte[] data = Bytes.toBytes(String.valueOf(key));
348              Put put = new Put(data);
349              put.addColumn(f, null, data);
350              table.put(put);
351              key++;
352              if (key == endKey) key = startKey;
353            }
354          } catch (Exception ex) {
355            LOG.warn(ex.toString(), ex);
356            exceptions[0].compareAndSet(null, ex);
357          }
358        }
359      };
360
361      Runnable flusherCompactor = new Runnable() {
362        Random random = new Random();
363        @Override
364        public void run() {
365          try {
366            while (running.get()) {
367              // flush or compact
368              if (random.nextBoolean()) {
369                TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
370              } else {
371                HTU.compact(table.getName(), random.nextBoolean());
372              }
373            }
374          } catch (Exception ex) {
375            LOG.warn(ex.toString(), ex);
376            exceptions[1].compareAndSet(null, ex);
377          }
378        }
379      };
380
381      Runnable reader = new Runnable() {
382        Random random = new Random();
383        @Override
384        public void run() {
385          try {
386            while (running.get()) {
387              // whether to do a close and open
388              if (random.nextInt(10) == 0) {
389                try {
390                  closeRegion(HTU, getRS(), hriSecondary);
391                } catch (Exception ex) {
392                  LOG.warn("Failed closing the region " + hriSecondary + " "  + StringUtils.stringifyException(ex));
393                  exceptions[2].compareAndSet(null, ex);
394                }
395                try {
396                  openRegion(HTU, getRS(), hriSecondary);
397                } catch (Exception ex) {
398                  LOG.warn("Failed opening the region " + hriSecondary + " "  + StringUtils.stringifyException(ex));
399                  exceptions[2].compareAndSet(null, ex);
400                }
401              }
402
403              int key = random.nextInt(endKey - startKey) + startKey;
404              assertGetRpc(hriSecondary, key, true);
405            }
406          } catch (Exception ex) {
407            LOG.warn("Failed getting the value in the region " + hriSecondary + " "  + StringUtils.stringifyException(ex));
408            exceptions[2].compareAndSet(null, ex);
409          }
410        }
411      };
412
413      LOG.info("Starting writer and reader");
414      ExecutorService executor = Executors.newFixedThreadPool(3);
415      executor.submit(writer);
416      executor.submit(flusherCompactor);
417      executor.submit(reader);
418
419      // wait for threads
420      Threads.sleep(runtime);
421      running.set(false);
422      executor.shutdown();
423      executor.awaitTermination(30, TimeUnit.SECONDS);
424
425      for (AtomicReference<Exception> exRef : exceptions) {
426        Assert.assertNull(exRef.get());
427      }
428    } finally {
429      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
430      closeRegion(HTU, getRS(), hriSecondary);
431    }
432  }
433
434  @Test
435  public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception {
436    // disable the store file refresh chore (we do this by hand)
437    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
438    restartRegionServer();
439
440    try {
441      LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
442      openRegion(HTU, getRS(), hriSecondary);
443
444      // load some data to primary
445      LOG.info("Loading data to primary region");
446      for (int i = 0; i < 3; ++i) {
447        HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000);
448        HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
449        region.flush(true);
450      }
451
452      HRegion primaryRegion = getRS().getRegion(hriPrimary.getEncodedName());
453      Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount());
454
455      // Refresh store files on the secondary
456      Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
457      secondaryRegion.getStore(f).refreshStoreFiles();
458      Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
459
460      // force compaction
461      LOG.info("Force Major compaction on primary region " + hriPrimary);
462      primaryRegion.compact(true);
463      Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
464      List<RegionServerThread> regionServerThreads = HTU.getMiniHBaseCluster()
465          .getRegionServerThreads();
466      HRegionServer hrs = null;
467      for (RegionServerThread rs : regionServerThreads) {
468        if (rs.getRegionServer()
469            .getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) {
470          hrs = rs.getRegionServer();
471          break;
472        }
473      }
474      CompactedHFilesDischarger cleaner =
475          new CompactedHFilesDischarger(100, null, hrs, false);
476      cleaner.chore();
477      // scan all the hfiles on the secondary.
478      // since there are no read on the secondary when we ask locations to
479      // the NN a FileNotFound exception will be returned and the FileLink
480      // should be able to deal with it giving us all the result we expect.
481      int keys = 0;
482      int sum = 0;
483      for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) {
484        // Our file does not exist anymore. was moved by the compaction above.
485        LOG.debug(Boolean.toString(getRS().getFileSystem().exists(sf.getPath())));
486        Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath()));
487
488        HFileScanner scanner = sf.getReader().getScanner(false, false);
489        scanner.seekTo();
490        do {
491          keys++;
492
493          Cell cell = scanner.getCell();
494          sum += Integer.parseInt(Bytes.toString(cell.getRowArray(),
495            cell.getRowOffset(), cell.getRowLength()));
496        } while (scanner.next());
497      }
498      Assert.assertEquals(3000, keys);
499      Assert.assertEquals(4498500, sum);
500    } finally {
501      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
502      closeRegion(HTU, getRS(), hriSecondary);
503    }
504  }
505}