001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
021import static org.junit.jupiter.api.Assertions.assertArrayEquals;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellScanner;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.ClientMetaTableAccessor;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.MetaTableAccessor;
038import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionLocator;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.regionserver.HRegion;
050import org.apache.hadoop.hbase.regionserver.HRegionServer;
051import org.apache.hadoop.hbase.regionserver.Region;
052import org.apache.hadoop.hbase.regionserver.RegionScanner;
053import org.apache.hadoop.hbase.testclassification.LargeTests;
054import org.apache.hadoop.hbase.testclassification.ReplicationTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
057import org.junit.jupiter.api.AfterEach;
058import org.junit.jupiter.api.BeforeEach;
059import org.junit.jupiter.api.Tag;
060import org.junit.jupiter.api.Test;
061import org.junit.jupiter.api.TestInfo;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * Tests region replication for hbase:meta by setting up region replicas and verifying async wal
067 * replication replays the edits to the secondary region in various scenarios.
068 * @see TestRegionReplicaReplication
069 */
070@Tag(ReplicationTests.TAG)
071@Tag(LargeTests.TAG)
072public class TestMetaRegionReplicaReplication {
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestMetaRegionReplicaReplication.class);
075  private static final int NB_SERVERS = 4;
076  private final HBaseTestingUtil HTU = new HBaseTestingUtil();
077  private int numOfMetaReplica = NB_SERVERS - 1;
078  private static byte[] VALUE = Bytes.toBytes("value");
079
080  private String testName;
081
082  @BeforeEach
083  public void before(TestInfo testInfo) throws Exception {
084    testName = testInfo.getTestMethod().get().getName();
085    Configuration conf = HTU.getConfiguration();
086    conf.setInt("zookeeper.recovery.retry", 1);
087    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
088    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
089    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
090    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
091    // Enable hbase:meta replication.
092    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
093    // Set hbase:meta replicas to be 3.
094    // conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica);
095    HTU.startMiniCluster(NB_SERVERS);
096    // Enable hbase:meta replication.
097    HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica);
098
099    HTU.waitFor(30000, () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
100        >= numOfMetaReplica);
101  }
102
103  @AfterEach
104  public void after() throws Exception {
105    HTU.shutdownMiniCluster();
106  }
107
108  /**
109   * Test meta region replica replication. Create some tables and see if replicas pick up the
110   * additions.
111   */
112  @Test
113  public void testHBaseMetaReplicates() throws Exception {
114    try (
115      Table table = HTU.createTable(TableName.valueOf(testName + "_0"), HConstants.CATALOG_FAMILY,
116        Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
117      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
118    }
119    try (
120      Table table = HTU.createTable(TableName.valueOf(testName + "_1"), HConstants.CATALOG_FAMILY,
121        Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
122      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
123      // Try delete.
124      HTU.deleteTableIfAny(table.getName());
125      verifyDeletedReplication(TableName.META_TABLE_NAME, numOfMetaReplica, table.getName());
126    }
127  }
128
129  @Test
130  public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception {
131    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
132      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
133      // load the data to the table
134      for (int i = 0; i < 5; i++) {
135        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
136        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
137        LOG.info("flushing table");
138        HTU.flush(TableName.META_TABLE_NAME);
139        LOG.info("compacting table");
140        if (i < 4) {
141          HTU.compact(TableName.META_TABLE_NAME, false);
142        }
143      }
144
145      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
146        HConstants.CATALOG_FAMILY);
147    }
148  }
149
150  @Test
151  public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
152    SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
153    HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
154
155    HRegionServer hrsNoMetaReplica = null;
156    HRegionServer server = null;
157    Region metaReplica = null;
158    boolean hostingMeta;
159
160    for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
161      server = cluster.getRegionServer(i);
162      hostingMeta = false;
163      if (server == hrs) {
164        continue;
165      }
166      for (Region region : server.getOnlineRegionsLocalContext()) {
167        if (region.getRegionInfo().isMetaRegion()) {
168          if (metaReplica == null) {
169            metaReplica = region;
170          }
171          hostingMeta = true;
172          break;
173        }
174      }
175      if (!hostingMeta) {
176        hrsNoMetaReplica = server;
177      }
178    }
179    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
180      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
181      // load the data to the table
182      for (int i = 0; i < 5; i++) {
183        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
184        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
185        if (i == 0) {
186          HTU.moveRegionAndWait(metaReplica.getRegionInfo(), hrsNoMetaReplica.getServerName());
187        }
188      }
189
190      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
191        HConstants.CATALOG_FAMILY);
192    }
193  }
194
195  protected void verifyReplication(TableName tableName, int regionReplication, final int startRow,
196    final int endRow, final byte[] family) throws Exception {
197    verifyReplication(tableName, regionReplication, startRow, endRow, family, true);
198  }
199
200  private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
201    final int endRow, final byte[] family, final boolean present) throws Exception {
202    // find the regions
203    final Region[] regions = new Region[regionReplication];
204
205    for (int i = 0; i < NB_SERVERS; i++) {
206      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
207      List<HRegion> onlineRegions = rs.getRegions(tableName);
208      for (HRegion region : onlineRegions) {
209        regions[region.getRegionInfo().getReplicaId()] = region;
210      }
211    }
212
213    for (Region region : regions) {
214      assertNotNull(region);
215    }
216
217    for (int i = 1; i < regionReplication; i++) {
218      final Region region = regions[i];
219      // wait until all the data is replicated to all secondary regions
220      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
221        @Override
222        public boolean evaluate() throws Exception {
223          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
224          try {
225            HTU.verifyNumericRows(region, family, startRow, endRow, present);
226          } catch (Throwable ex) {
227            LOG.warn("Verification from secondary region is not complete yet", ex);
228            // still wait
229            return false;
230          }
231          return true;
232        }
233      });
234    }
235  }
236
237  /**
238   * Scan hbase:meta for <code>tableName</code> content.
239   */
240  private List<Result> getMetaCells(TableName tableName) throws IOException {
241    final List<Result> results = new ArrayList<>();
242    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
243      @Override
244      public boolean visit(Result r) throws IOException {
245        results.add(r);
246        return true;
247      }
248    };
249    MetaTableAccessor.scanMetaForTableRegions(HTU.getConnection(), visitor, tableName);
250    return results;
251  }
252
253  /** Returns All Regions for tableName including Replicas. */
254  private Region[] getAllRegions(TableName tableName, int replication) {
255    final Region[] regions = new Region[replication];
256    for (int i = 0; i < NB_SERVERS; i++) {
257      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
258      List<HRegion> onlineRegions = rs.getRegions(tableName);
259      for (HRegion region : onlineRegions) {
260        regions[region.getRegionInfo().getReplicaId()] = region;
261      }
262    }
263    for (Region region : regions) {
264      assertNotNull(region);
265    }
266    return regions;
267  }
268
269  /**
270   * Verify when a Table is deleted from primary, then there are no references in replicas (because
271   * they get the delete of the table rows too).
272   */
273  private void verifyDeletedReplication(TableName tableName, int regionReplication,
274    final TableName deletedTableName) {
275    final Region[] regions = getAllRegions(tableName, regionReplication);
276
277    // Start count at '1' so we skip default, primary replica and only look at secondaries.
278    for (int i = 1; i < regionReplication; i++) {
279      final Region region = regions[i];
280      // wait until all the data is replicated to all secondary regions
281      Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
282        @Override
283        public boolean evaluate() throws Exception {
284          LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
285          try (RegionScanner rs = region.getScanner(new Scan())) {
286            List<Cell> cells = new ArrayList<>();
287            while (rs.next(cells)) {
288              continue;
289            }
290            return doesNotContain(cells, deletedTableName);
291          } catch (Throwable ex) {
292            LOG.warn("Verification from secondary region is not complete yet", ex);
293            // still wait
294            return false;
295          }
296        }
297      });
298    }
299  }
300
301  /**
302   * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed by
303   * HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
304   * <code>cells</code>.
305   */
306  private boolean doesNotContain(List<Cell> cells, TableName tableName) {
307    for (Cell cell : cells) {
308      String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
309      if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) {
310        return false;
311      }
312    }
313    return true;
314  }
315
316  /**
317   * Verify Replicas have results (exactly).
318   */
319  private void verifyReplication(TableName tableName, int regionReplication,
320    List<Result> contains) {
321    final Region[] regions = getAllRegions(tableName, regionReplication);
322
323    // Start count at '1' so we skip default, primary replica and only look at secondaries.
324    for (int i = 1; i < regionReplication; i++) {
325      final Region region = regions[i];
326      // wait until all the data is replicated to all secondary regions
327      Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
328        @Override
329        public boolean evaluate() throws Exception {
330          LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
331          try (RegionScanner rs = region.getScanner(new Scan())) {
332            List<Cell> cells = new ArrayList<>();
333            while (rs.next(cells)) {
334              continue;
335            }
336            return contains(contains, cells);
337          } catch (Throwable ex) {
338            LOG.warn("Verification from secondary region is not complete yet", ex);
339            // still wait
340            return false;
341          }
342        }
343      });
344    }
345  }
346
347  /**
348   * Presumes sorted Cells. Verify that <code>cells</code> has <code>contains</code> at least.
349   */
350  static boolean contains(List<Result> contains, List<Cell> cells) throws IOException {
351    CellScanner containsScanner = CellUtil.createCellScanner(contains);
352    CellScanner cellsScanner = CellUtil.createCellScanner(cells);
353    int matches = 0;
354    int count = 0;
355    while (containsScanner.advance()) {
356      while (cellsScanner.advance()) {
357        count++;
358        LOG.info("{} {}", containsScanner.current(), cellsScanner.current());
359        if (containsScanner.current().equals(cellsScanner.current())) {
360          matches++;
361          break;
362        }
363      }
364    }
365    return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size();
366  }
367
368  private void doNGets(final Table table, final byte[][] keys) throws Exception {
369    for (byte[] key : keys) {
370      Result r = table.get(new Get(key));
371      assertArrayEquals(VALUE, r.getValue(HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY));
372    }
373  }
374
375  private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
376    // There are read requests increase for primary meta replica.
377    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);
378
379    // No change for replica regions
380    for (int i = 1; i < after.length; i++) {
381      assertEquals(before[i], after[i]);
382    }
383  }
384
385  private void primaryIncreaseReplicaIncrease(final long[] before, final long[] after) {
386    // There are read requests increase for all meta replica regions,
387    for (int i = 0; i < after.length; i++) {
388      assertTrue(after[i] > before[i]);
389    }
390  }
391
392  private void getMetaReplicaReadRequests(final Region[] metaRegions, final long[] counters) {
393    int i = 0;
394    for (Region r : metaRegions) {
395      LOG.info("read request for region {} is {}", r, r.getReadRequestsCount());
396      counters[i] = r.getReadRequestsCount();
397      i++;
398    }
399  }
400
401  @Test
402  public void testHBaseMetaReplicaGets() throws Exception {
403    TableName tn = TableName.valueOf(testName);
404    final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
405    long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
406    long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
407    long[] readReqsForMetaReplicasAfterGetAllLocations = new long[numOfMetaReplica];
408    long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
409    long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
410    long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
411    Region userRegion = null;
412    HRegionServer srcRs = null;
413    HRegionServer destRs = null;
414
415    try (Table table = HTU.createTable(tn, HConstants.CATALOG_FAMILY,
416      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
417      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
418      // load different values
419      HTU.loadTable(table, new byte[][] { HConstants.CATALOG_FAMILY }, VALUE);
420      for (int i = 0; i < NB_SERVERS; i++) {
421        HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
422        List<HRegion> onlineRegions = rs.getRegions(tn);
423        if (onlineRegions.size() > 0) {
424          userRegion = onlineRegions.get(0);
425          srcRs = rs;
426          if (i > 0) {
427            destRs = HTU.getMiniHBaseCluster().getRegionServer(0);
428          } else {
429            destRs = HTU.getMiniHBaseCluster().getRegionServer(1);
430          }
431        }
432      }
433
434      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicas);
435
436      Configuration c = new Configuration(HTU.getConfiguration());
437      c.setBoolean(HConstants.USE_META_REPLICAS, true);
438      c.set(LOCATOR_META_REPLICAS_MODE, "LoadBalance");
439      Connection connection = ConnectionFactory.createConnection(c);
440      Table tableForGet = connection.getTable(tn);
441      byte[][] getRows = new byte[HBaseTestingUtil.KEYS.length][];
442
443      int i = 0;
444      for (byte[] key : HBaseTestingUtil.KEYS) {
445        getRows[i] = key;
446        i++;
447      }
448      getRows[0] = Bytes.toBytes("aaa");
449      doNGets(tableForGet, getRows);
450
451      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGet);
452
453      // There are more reads against all meta replica regions, including the primary region.
454      primaryIncreaseReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);
455
456      RegionLocator locator = tableForGet.getRegionLocator();
457
458      for (int j = 0; j < numOfMetaReplica * 3; j++) {
459        locator.getAllRegionLocations();
460      }
461
462      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGetAllLocations);
463      primaryIncreaseReplicaIncrease(readReqsForMetaReplicasAfterGet,
464        readReqsForMetaReplicasAfterGetAllLocations);
465
466      // move one of regions so it meta cache may be invalid.
467      HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());
468
469      doNGets(tableForGet, getRows);
470
471      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterMove);
472
473      // There are read requests increase for primary meta replica.
474      // For rest of meta replicas, there is no change as regionMove will tell the new location
475      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGetAllLocations,
476        readReqsForMetaReplicasAfterMove);
477      // Move region again.
478      HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
479
480      // Wait until moveRegion cache timeout.
481      while (destRs.getMovedRegion(userRegion.getRegionInfo().getEncodedName()) != null) {
482        Thread.sleep(1000);
483      }
484
485      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterSecondMove);
486
487      // There are read requests increase for primary meta replica.
488      // For rest of meta replicas, there is no change.
489      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterMove,
490        readReqsForMetaReplicasAfterSecondMove);
491
492      doNGets(tableForGet, getRows);
493
494      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterThirdGet);
495
496      // Since it gets RegionNotServedException, it will go to primary for the next lookup.
497      // There are read requests increase for primary meta replica.
498      // For rest of meta replicas, there is no change.
499      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterSecondMove,
500        readReqsForMetaReplicasAfterThirdGet);
501    }
502  }
503}