001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellScanner;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.ClientMetaTableAccessor;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.MetaTableAccessor;
039import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.RegionLocator;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.regionserver.Region;
053import org.apache.hadoop.hbase.regionserver.RegionScanner;
054import org.apache.hadoop.hbase.testclassification.LargeTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
057import org.junit.After;
058import org.junit.Before;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.rules.TestName;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * Tests region replication for hbase:meta by setting up region replicas and verifying async wal
069 * replication replays the edits to the secondary region in various scenarios.
070 * @see TestRegionReplicaReplication
071 */
072@Category({ LargeTests.class })
073public class TestMetaRegionReplicaReplication {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestMetaRegionReplicaReplication.class);
078  private static final Logger LOG = LoggerFactory.getLogger(TestMetaRegionReplicaReplication.class);
079  private static final int NB_SERVERS = 4;
080  private final HBaseTestingUtil HTU = new HBaseTestingUtil();
081  private int numOfMetaReplica = NB_SERVERS - 1;
082  private static byte[] VALUE = Bytes.toBytes("value");
083
084  @Rule
085  public TestName name = new TestName();
086
087  @Before
088  public void before() throws Exception {
089    Configuration conf = HTU.getConfiguration();
090    conf.setInt("zookeeper.recovery.retry", 1);
091    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
092    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
093    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
094    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
095    // Enable hbase:meta replication.
096    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
097    // Set hbase:meta replicas to be 3.
098    // conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica);
099    HTU.startMiniCluster(NB_SERVERS);
100    // Enable hbase:meta replication.
101    HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica);
102
103    HTU.waitFor(30000, () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
104        >= numOfMetaReplica);
105  }
106
107  @After
108  public void after() throws Exception {
109    HTU.shutdownMiniCluster();
110  }
111
112  /**
113   * Test meta region replica replication. Create some tables and see if replicas pick up the
114   * additions.
115   */
116  @Test
117  public void testHBaseMetaReplicates() throws Exception {
118    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
119      HConstants.CATALOG_FAMILY,
120      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
121      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
122    }
123    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
124      HConstants.CATALOG_FAMILY,
125      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
126      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
127      // Try delete.
128      HTU.deleteTableIfAny(table.getName());
129      verifyDeletedReplication(TableName.META_TABLE_NAME, numOfMetaReplica, table.getName());
130    }
131  }
132
133  @Test
134  public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception {
135    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
136      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
137      // load the data to the table
138      for (int i = 0; i < 5; i++) {
139        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
140        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
141        LOG.info("flushing table");
142        HTU.flush(TableName.META_TABLE_NAME);
143        LOG.info("compacting table");
144        if (i < 4) {
145          HTU.compact(TableName.META_TABLE_NAME, false);
146        }
147      }
148
149      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
150        HConstants.CATALOG_FAMILY);
151    }
152  }
153
154  @Test
155  public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
156    SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
157    HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
158
159    HRegionServer hrsNoMetaReplica = null;
160    HRegionServer server = null;
161    Region metaReplica = null;
162    boolean hostingMeta;
163
164    for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
165      server = cluster.getRegionServer(i);
166      hostingMeta = false;
167      if (server == hrs) {
168        continue;
169      }
170      for (Region region : server.getOnlineRegionsLocalContext()) {
171        if (region.getRegionInfo().isMetaRegion()) {
172          if (metaReplica == null) {
173            metaReplica = region;
174          }
175          hostingMeta = true;
176          break;
177        }
178      }
179      if (!hostingMeta) {
180        hrsNoMetaReplica = server;
181      }
182    }
183    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
184      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
185      // load the data to the table
186      for (int i = 0; i < 5; i++) {
187        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
188        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
189        if (i == 0) {
190          HTU.moveRegionAndWait(metaReplica.getRegionInfo(), hrsNoMetaReplica.getServerName());
191        }
192      }
193
194      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
195        HConstants.CATALOG_FAMILY);
196    }
197  }
198
199  protected void verifyReplication(TableName tableName, int regionReplication, final int startRow,
200    final int endRow, final byte[] family) throws Exception {
201    verifyReplication(tableName, regionReplication, startRow, endRow, family, true);
202  }
203
204  private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
205    final int endRow, final byte[] family, final boolean present) throws Exception {
206    // find the regions
207    final Region[] regions = new Region[regionReplication];
208
209    for (int i = 0; i < NB_SERVERS; i++) {
210      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
211      List<HRegion> onlineRegions = rs.getRegions(tableName);
212      for (HRegion region : onlineRegions) {
213        regions[region.getRegionInfo().getReplicaId()] = region;
214      }
215    }
216
217    for (Region region : regions) {
218      assertNotNull(region);
219    }
220
221    for (int i = 1; i < regionReplication; i++) {
222      final Region region = regions[i];
223      // wait until all the data is replicated to all secondary regions
224      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
225        @Override
226        public boolean evaluate() throws Exception {
227          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
228          try {
229            HTU.verifyNumericRows(region, family, startRow, endRow, present);
230          } catch (Throwable ex) {
231            LOG.warn("Verification from secondary region is not complete yet", ex);
232            // still wait
233            return false;
234          }
235          return true;
236        }
237      });
238    }
239  }
240
241  /**
242   * Scan hbase:meta for <code>tableName</code> content.
243   */
244  private List<Result> getMetaCells(TableName tableName) throws IOException {
245    final List<Result> results = new ArrayList<>();
246    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
247      @Override
248      public boolean visit(Result r) throws IOException {
249        results.add(r);
250        return true;
251      }
252    };
253    MetaTableAccessor.scanMetaForTableRegions(HTU.getConnection(), visitor, tableName);
254    return results;
255  }
256
257  /**
258   * @return All Regions for tableName including Replicas.
259   */
260  private Region[] getAllRegions(TableName tableName, int replication) {
261    final Region[] regions = new Region[replication];
262    for (int i = 0; i < NB_SERVERS; i++) {
263      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
264      List<HRegion> onlineRegions = rs.getRegions(tableName);
265      for (HRegion region : onlineRegions) {
266        regions[region.getRegionInfo().getReplicaId()] = region;
267      }
268    }
269    for (Region region : regions) {
270      assertNotNull(region);
271    }
272    return regions;
273  }
274
275  /**
276   * Verify when a Table is deleted from primary, then there are no references in replicas (because
277   * they get the delete of the table rows too).
278   */
279  private void verifyDeletedReplication(TableName tableName, int regionReplication,
280    final TableName deletedTableName) {
281    final Region[] regions = getAllRegions(tableName, regionReplication);
282
283    // Start count at '1' so we skip default, primary replica and only look at secondaries.
284    for (int i = 1; i < regionReplication; i++) {
285      final Region region = regions[i];
286      // wait until all the data is replicated to all secondary regions
287      Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
288        @Override
289        public boolean evaluate() throws Exception {
290          LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
291          try (RegionScanner rs = region.getScanner(new Scan())) {
292            List<Cell> cells = new ArrayList<>();
293            while (rs.next(cells)) {
294              continue;
295            }
296            return doesNotContain(cells, deletedTableName);
297          } catch (Throwable ex) {
298            LOG.warn("Verification from secondary region is not complete yet", ex);
299            // still wait
300            return false;
301          }
302        }
303      });
304    }
305  }
306
307  /**
308   * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed by
309   * HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
310   * <code>cells</code>.
311   */
312  private boolean doesNotContain(List<Cell> cells, TableName tableName) {
313    for (Cell cell : cells) {
314      String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
315      if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) {
316        return false;
317      }
318    }
319    return true;
320  }
321
322  /**
323   * Verify Replicas have results (exactly).
324   */
325  private void verifyReplication(TableName tableName, int regionReplication,
326    List<Result> contains) {
327    final Region[] regions = getAllRegions(tableName, regionReplication);
328
329    // Start count at '1' so we skip default, primary replica and only look at secondaries.
330    for (int i = 1; i < regionReplication; i++) {
331      final Region region = regions[i];
332      // wait until all the data is replicated to all secondary regions
333      Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
334        @Override
335        public boolean evaluate() throws Exception {
336          LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
337          try (RegionScanner rs = region.getScanner(new Scan())) {
338            List<Cell> cells = new ArrayList<>();
339            while (rs.next(cells)) {
340              continue;
341            }
342            return contains(contains, cells);
343          } catch (Throwable ex) {
344            LOG.warn("Verification from secondary region is not complete yet", ex);
345            // still wait
346            return false;
347          }
348        }
349      });
350    }
351  }
352
353  /**
354   * Presumes sorted Cells. Verify that <code>cells</code> has <code>contains</code> at least.
355   */
356  static boolean contains(List<Result> contains, List<Cell> cells) throws IOException {
357    CellScanner containsScanner = CellUtil.createCellScanner(contains);
358    CellScanner cellsScanner = CellUtil.createCellScanner(cells);
359    int matches = 0;
360    int count = 0;
361    while (containsScanner.advance()) {
362      while (cellsScanner.advance()) {
363        count++;
364        LOG.info("{} {}", containsScanner.current(), cellsScanner.current());
365        if (containsScanner.current().equals(cellsScanner.current())) {
366          matches++;
367          break;
368        }
369      }
370    }
371    return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size();
372  }
373
374  private void doNGets(final Table table, final byte[][] keys) throws Exception {
375    for (byte[] key : keys) {
376      Result r = table.get(new Get(key));
377      assertArrayEquals(VALUE, r.getValue(HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY));
378    }
379  }
380
381  private void primaryNoChangeReplicaIncrease(final long[] before, final long[] after) {
382    assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID], after[RegionInfo.DEFAULT_REPLICA_ID]);
383
384    for (int i = 1; i < after.length; i++) {
385      assertTrue(after[i] > before[i]);
386    }
387  }
388
389  private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
390    // There are read requests increase for primary meta replica.
391    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);
392
393    // No change for replica regions
394    for (int i = 1; i < after.length; i++) {
395      assertEquals(before[i], after[i]);
396    }
397  }
398
399  private void primaryIncreaseReplicaIncrease(final long[] before, final long[] after) {
400    // There are read requests increase for all meta replica regions,
401    for (int i = 0; i < after.length; i++) {
402      assertTrue(after[i] > before[i]);
403    }
404  }
405
406  private void getMetaReplicaReadRequests(final Region[] metaRegions, final long[] counters) {
407    int i = 0;
408    for (Region r : metaRegions) {
409      LOG.info("read request for region {} is {}", r, r.getReadRequestsCount());
410      counters[i] = r.getReadRequestsCount();
411      i++;
412    }
413  }
414
415  @Test
416  public void testHBaseMetaReplicaGets() throws Exception {
417    TableName tn = TableName.valueOf(this.name.getMethodName());
418    final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
419    long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
420    long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
421    long[] readReqsForMetaReplicasAfterGetAllLocations = new long[numOfMetaReplica];
422    long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
423    long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
424    long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
425    Region userRegion = null;
426    HRegionServer srcRs = null;
427    HRegionServer destRs = null;
428
429    try (Table table = HTU.createTable(tn, HConstants.CATALOG_FAMILY,
430      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
431      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
432      // load different values
433      HTU.loadTable(table, new byte[][] { HConstants.CATALOG_FAMILY }, VALUE);
434      for (int i = 0; i < NB_SERVERS; i++) {
435        HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
436        List<HRegion> onlineRegions = rs.getRegions(tn);
437        if (onlineRegions.size() > 0) {
438          userRegion = onlineRegions.get(0);
439          srcRs = rs;
440          if (i > 0) {
441            destRs = HTU.getMiniHBaseCluster().getRegionServer(0);
442          } else {
443            destRs = HTU.getMiniHBaseCluster().getRegionServer(1);
444          }
445        }
446      }
447
448      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicas);
449
450      Configuration c = new Configuration(HTU.getConfiguration());
451      c.setBoolean(HConstants.USE_META_REPLICAS, true);
452      c.set(LOCATOR_META_REPLICAS_MODE, "LoadBalance");
453      Connection connection = ConnectionFactory.createConnection(c);
454      Table tableForGet = connection.getTable(tn);
455      byte[][] getRows = new byte[HBaseTestingUtil.KEYS.length][];
456
457      int i = 0;
458      for (byte[] key : HBaseTestingUtil.KEYS) {
459        getRows[i] = key;
460        i++;
461      }
462      getRows[0] = Bytes.toBytes("aaa");
463      doNGets(tableForGet, getRows);
464
465      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGet);
466
467      // There are more reads against all meta replica regions, including the primary region.
468      primaryIncreaseReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);
469
470      RegionLocator locator = tableForGet.getRegionLocator();
471
472      for (int j = 0; j < numOfMetaReplica * 3; j++) {
473        locator.getAllRegionLocations();
474      }
475
476      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGetAllLocations);
477      primaryIncreaseReplicaIncrease(readReqsForMetaReplicasAfterGet,
478        readReqsForMetaReplicasAfterGetAllLocations);
479
480      // move one of regions so it meta cache may be invalid.
481      HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());
482
483      doNGets(tableForGet, getRows);
484
485      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterMove);
486
487      // There are read requests increase for primary meta replica.
488      // For rest of meta replicas, there is no change as regionMove will tell the new location
489      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGetAllLocations,
490        readReqsForMetaReplicasAfterMove);
491      // Move region again.
492      HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
493
494      // Wait until moveRegion cache timeout.
495      while (destRs.getMovedRegion(userRegion.getRegionInfo().getEncodedName()) != null) {
496        Thread.sleep(1000);
497      }
498
499      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterSecondMove);
500
501      // There are read requests increase for primary meta replica.
502      // For rest of meta replicas, there is no change.
503      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterMove,
504        readReqsForMetaReplicasAfterSecondMove);
505
506      doNGets(tableForGet, getRows);
507
508      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterThirdGet);
509
510      // Since it gets RegionNotServedException, it will go to primary for the next lookup.
511      // There are read requests increase for primary meta replica.
512      // For rest of meta replicas, there is no change.
513      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterSecondMove,
514        readReqsForMetaReplicasAfterThirdGet);
515    }
516  }
517}