001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.*;
021
022import java.io.IOException;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.atomic.AtomicInteger;
025import java.util.concurrent.atomic.AtomicReference;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.HTableDescriptor;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.Waiter.Predicate;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Consistency;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
043import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
044import org.junit.After;
045import org.junit.Before;
046import org.junit.ClassRule;
047import org.junit.Rule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.rules.TestName;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Tests failover of secondary region replicas.
056 */
057@Category(LargeTests.class)
058public class TestRegionReplicaFailover {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestRegionReplicaFailover.class);
063
064  private static final Logger LOG =
065    LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
066
067  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
068
069  private static final int NB_SERVERS = 3;
070
071  protected final byte[][] families =
072    new byte[][] { HBaseTestingUtility.fam1, HBaseTestingUtility.fam2, HBaseTestingUtility.fam3 };
073  protected final byte[] fam = HBaseTestingUtility.fam1;
074  protected final byte[] qual1 = Bytes.toBytes("qual1");
075  protected final byte[] value1 = Bytes.toBytes("value1");
076  protected final byte[] row = Bytes.toBytes("rowA");
077  protected final byte[] row2 = Bytes.toBytes("rowB");
078
079  @Rule
080  public TestName name = new TestName();
081
082  private HTableDescriptor htd;
083
084  @Before
085  public void before() throws Exception {
086    Configuration conf = HTU.getConfiguration();
087    // Up the handlers; this test needs more than usual.
088    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
089    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
090    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
091    conf.setInt("replication.stats.thread.period.seconds", 5);
092    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
093
094    HTU.startMiniCluster(NB_SERVERS);
095    htd = HTU
096      .createTableDescriptor(name.getMethodName().substring(0, name.getMethodName().length() - 3));
097    htd.setRegionReplication(3);
098    HTU.getAdmin().createTable(htd);
099  }
100
101  @After
102  public void after() throws Exception {
103    HTU.deleteTableIfAny(htd.getTableName());
104    HTU.shutdownMiniCluster();
105  }
106
107  /**
108   * Tests the case where a newly created table with region replicas and no data, the secondary
109   * region replicas are available to read immediately.
110   */
111  @Test
112  public void testSecondaryRegionWithEmptyRegion() throws IOException {
113    // Create a new table with region replication, don't put any data. Test that the secondary
114    // region replica is available to read.
115    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
116      Table table = connection.getTable(htd.getTableName())) {
117
118      Get get = new Get(row);
119      get.setConsistency(Consistency.TIMELINE);
120      get.setReplicaId(1);
121      table.get(get); // this should not block
122    }
123  }
124
125  /**
126   * Tests the case where if there is some data in the primary region, reopening the region replicas
127   * (enable/disable table, etc) makes the region replicas readable. n
128   */
129  @Test
130  public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
131    // Create a new table with region replication and load some data
132    // than disable and enable the table again and verify the data from secondary
133    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
134      Table table = connection.getTable(htd.getTableName())) {
135
136      HTU.loadNumericRows(table, fam, 0, 1000);
137
138      HTU.getAdmin().disableTable(htd.getTableName());
139      HTU.getAdmin().enableTable(htd.getTableName());
140
141      HTU.verifyNumericRows(table, fam, 0, 1000, 1);
142    }
143  }
144
145  /**
146   * Tests the case where killing a primary region with unflushed data recovers
147   */
148  @Test
149  public void testPrimaryRegionKill() throws Exception {
150    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
151      Table table = connection.getTable(htd.getTableName())) {
152
153      HTU.loadNumericRows(table, fam, 0, 1000);
154
155      // wal replication is async, we have to wait until the replication catches up, or we timeout
156      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
157      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
158
159      // we should not have flushed files now, but data in memstores of primary and secondary
160      // kill the primary region replica now, and ensure that when it comes back up, we can still
161      // read from it the same data from primary and secondaries
162      boolean aborted = false;
163      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
164        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
165          if (r.getRegionInfo().getReplicaId() == 0) {
166            LOG.info("Aborting region server hosting primary region replica");
167            rs.getRegionServer().abort("for test");
168            aborted = true;
169            break;
170          }
171        }
172      }
173      assertTrue(aborted);
174
175      // wal replication is async, we have to wait until the replication catches up, or we timeout
176      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
177      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
178      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
179    }
180
181    // restart the region server
182    HTU.getMiniHBaseCluster().startRegionServer();
183  }
184
185  /**
186   * wal replication is async, we have to wait until the replication catches up, or we timeout
187   */
188  private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
189    final int endRow, final int replicaId, final long timeout) throws Exception {
190    try {
191      HTU.waitFor(timeout, new Predicate<Exception>() {
192        @Override
193        public boolean evaluate() throws Exception {
194          try {
195            HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
196            return true;
197          } catch (AssertionError ae) {
198            return false;
199          }
200        }
201      });
202    } catch (Throwable t) {
203      // ignore this, but redo the verify do get the actual exception
204      HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
205    }
206  }
207
208  /**
209   * Tests the case where killing a secondary region with unflushed data recovers, and the replica
210   * becomes available to read again shortly.
211   */
212  @Test
213  public void testSecondaryRegionKill() throws Exception {
214    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
215      Table table = connection.getTable(htd.getTableName())) {
216      HTU.loadNumericRows(table, fam, 0, 1000);
217
218      // wait for some time to ensure that async wal replication does it's magic
219      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
220      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
221
222      // we should not have flushed files now, but data in memstores of primary and secondary
223      // kill the secondary region replica now, and ensure that when it comes back up, we can still
224      // read from it the same data
225      boolean aborted = false;
226      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
227        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
228          if (r.getRegionInfo().getReplicaId() == 1) {
229            LOG.info("Aborting region server hosting secondary region replica");
230            rs.getRegionServer().abort("for test");
231            aborted = true;
232            break;
233          }
234        }
235      }
236      assertTrue(aborted);
237
238      // It takes extra time for replica region is ready for read as during
239      // region open process, it needs to ask primary region to do a flush and replica region
240      // can open newly flushed hfiles to avoid data out-of-sync.
241      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
242      HTU.verifyNumericRows(table, fam, 0, 1000, 2);
243    }
244
245    // restart the region server
246    HTU.getMiniHBaseCluster().startRegionServer();
247  }
248
249  /**
250   * Tests the case where there are 3 region replicas and the primary is continuously accepting new
251   * writes while one of the secondaries is killed. Verification is done for both of the secondary
252   * replicas.
253   */
254  @Test
255  public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
256    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
257      Table table = connection.getTable(htd.getTableName()); Admin admin = connection.getAdmin()) {
258      // start a thread to do the loading of primary
259      HTU.loadNumericRows(table, fam, 0, 1000); // start with some base
260      admin.flush(table.getName());
261      HTU.loadNumericRows(table, fam, 1000, 2000);
262
263      final AtomicReference<Throwable> ex = new AtomicReference<>(null);
264      final AtomicBoolean done = new AtomicBoolean(false);
265      final AtomicInteger key = new AtomicInteger(2000);
266
267      Thread loader = new Thread() {
268        @Override
269        public void run() {
270          while (!done.get()) {
271            try {
272              HTU.loadNumericRows(table, fam, key.get(), key.get() + 1000);
273              key.addAndGet(1000);
274            } catch (Throwable e) {
275              ex.compareAndSet(null, e);
276            }
277          }
278        }
279      };
280      loader.start();
281
282      Thread aborter = new Thread() {
283        @Override
284        public void run() {
285          try {
286            boolean aborted = false;
287            for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
288              for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
289                if (r.getRegionInfo().getReplicaId() == 1) {
290                  LOG.info("Aborting region server hosting secondary region replica");
291                  rs.getRegionServer().abort("for test");
292                  aborted = true;
293                }
294              }
295            }
296            assertTrue(aborted);
297          } catch (Throwable e) {
298            ex.compareAndSet(null, e);
299          }
300        };
301      };
302
303      aborter.start();
304      aborter.join();
305      done.set(true);
306      loader.join();
307
308      assertNull(ex.get());
309
310      assertTrue(key.get() > 1000); // assert that the test is working as designed
311      LOG.info("Loaded up to key :" + key.get());
312      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
313      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
314      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
315    }
316
317    // restart the region server
318    HTU.getMiniHBaseCluster().startRegionServer();
319  }
320
321  /**
322   * Tests the case where we are creating a table with a lot of regions and replicas. Opening region
323   * replicas should not block handlers on RS indefinitely.
324   */
325  @Test
326  public void testLotsOfRegionReplicas() throws IOException {
327    int numRegions = NB_SERVERS * 20;
328    int regionReplication = 10;
329    String tableName = htd.getTableName().getNameAsString() + "2";
330    htd = HTU.createTableDescriptor(tableName);
331    htd.setRegionReplication(regionReplication);
332
333    // dont care about splits themselves too much
334    byte[] startKey = Bytes.toBytes("aaa");
335    byte[] endKey = Bytes.toBytes("zzz");
336    byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
337    HTU.getAdmin().createTable(htd, startKey, endKey, numRegions);
338
339    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
340      Table table = connection.getTable(htd.getTableName())) {
341
342      for (int i = 1; i < splits.length; i++) {
343        for (int j = 0; j < regionReplication; j++) {
344          Get get = new Get(splits[i]);
345          get.setConsistency(Consistency.TIMELINE);
346          get.setReplicaId(j);
347          table.get(get); // this should not block. Regions should be coming online
348        }
349      }
350    }
351
352    HTU.deleteTableIfAny(TableName.valueOf(tableName));
353  }
354}