001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.*;
021
022import java.io.IOException;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.atomic.AtomicInteger;
025import java.util.concurrent.atomic.AtomicReference;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.HTableDescriptor;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.Waiter.Predicate;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Consistency;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
043import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
044import org.junit.After;
045import org.junit.Before;
046import org.junit.ClassRule;
047import org.junit.Rule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.rules.TestName;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Tests failover of secondary region replicas.
056 */
057@Category(LargeTests.class)
058public class TestRegionReplicaFailover {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062      HBaseClassTestRule.forClass(TestRegionReplicaFailover.class);
063
064  private static final Logger LOG =
065      LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
066
067  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
068
069  private static final int NB_SERVERS = 3;
070
071  protected final byte[][] families = new byte[][] {HBaseTestingUtility.fam1,
072      HBaseTestingUtility.fam2, HBaseTestingUtility.fam3};
073  protected final byte[] fam = HBaseTestingUtility.fam1;
074  protected final byte[] qual1 = Bytes.toBytes("qual1");
075  protected final byte[] value1 = Bytes.toBytes("value1");
076  protected final byte[] row = Bytes.toBytes("rowA");
077  protected final byte[] row2 = Bytes.toBytes("rowB");
078
079  @Rule public TestName name = new TestName();
080
081  private HTableDescriptor htd;
082
083  @Before
084  public void before() throws Exception {
085    Configuration conf = HTU.getConfiguration();
086   // Up the handlers; this test needs more than usual.
087    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
088    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
089    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
090    conf.setInt("replication.stats.thread.period.seconds", 5);
091    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
092
093    HTU.startMiniCluster(NB_SERVERS);
094    htd = HTU.createTableDescriptor(
095      name.getMethodName().substring(0, name.getMethodName().length()-3));
096    htd.setRegionReplication(3);
097    HTU.getAdmin().createTable(htd);
098  }
099
100  @After
101  public void after() throws Exception {
102    HTU.deleteTableIfAny(htd.getTableName());
103    HTU.shutdownMiniCluster();
104  }
105
106  /**
107   * Tests the case where a newly created table with region replicas and no data, the secondary
108   * region replicas are available to read immediately.
109   */
110  @Test
111  public void testSecondaryRegionWithEmptyRegion() throws IOException {
112    // Create a new table with region replication, don't put any data. Test that the secondary
113    // region replica is available to read.
114    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
115        Table table = connection.getTable(htd.getTableName())) {
116
117      Get get = new Get(row);
118      get.setConsistency(Consistency.TIMELINE);
119      get.setReplicaId(1);
120      table.get(get); // this should not block
121    }
122  }
123
124  /**
125   * Tests the case where if there is some data in the primary region, reopening the region replicas
126   * (enable/disable table, etc) makes the region replicas readable.
127   * @throws IOException
128   */
129  @Test
130  public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
131    // Create a new table with region replication and load some data
132    // than disable and enable the table again and verify the data from secondary
133    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
134        Table table = connection.getTable(htd.getTableName())) {
135
136      HTU.loadNumericRows(table, fam, 0, 1000);
137
138      HTU.getAdmin().disableTable(htd.getTableName());
139      HTU.getAdmin().enableTable(htd.getTableName());
140
141      HTU.verifyNumericRows(table, fam, 0, 1000, 1);
142    }
143  }
144
145  /**
146   * Tests the case where killing a primary region with unflushed data recovers
147   */
148  @Test
149  public void testPrimaryRegionKill() throws Exception {
150    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
151        Table table = connection.getTable(htd.getTableName())) {
152
153      HTU.loadNumericRows(table, fam, 0, 1000);
154
155      // wal replication is async, we have to wait until the replication catches up, or we timeout
156      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
157      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
158
159      // we should not have flushed files now, but data in memstores of primary and secondary
160      // kill the primary region replica now, and ensure that when it comes back up, we can still
161      // read from it the same data from primary and secondaries
162      boolean aborted = false;
163      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
164        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
165          if (r.getRegionInfo().getReplicaId() == 0) {
166            LOG.info("Aborting region server hosting primary region replica");
167            rs.getRegionServer().abort("for test");
168            aborted = true;
169            break;
170          }
171        }
172      }
173      assertTrue(aborted);
174
175      // wal replication is async, we have to wait until the replication catches up, or we timeout
176      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
177      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
178      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
179    }
180
181    // restart the region server
182    HTU.getMiniHBaseCluster().startRegionServer();
183  }
184
185  /** wal replication is async, we have to wait until the replication catches up, or we timeout
186   */
187  private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
188      final int endRow, final int replicaId, final long timeout) throws Exception {
189    try {
190      HTU.waitFor(timeout, new Predicate<Exception>() {
191        @Override
192        public boolean evaluate() throws Exception {
193          try {
194            HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
195            return true;
196          } catch (AssertionError ae) {
197            return false;
198          }
199        }
200      });
201    } catch (Throwable t) {
202      // ignore this, but redo the verify do get the actual exception
203      HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
204    }
205  }
206
207  /**
208   * Tests the case where killing a secondary region with unflushed data recovers, and the replica
209   * becomes available to read again shortly.
210   */
211  @Test
212  public void testSecondaryRegionKill() throws Exception {
213    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
214        Table table = connection.getTable(htd.getTableName())) {
215      HTU.loadNumericRows(table, fam, 0, 1000);
216
217      // wait for some time to ensure that async wal replication does it's magic
218      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
219      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
220
221      // we should not have flushed files now, but data in memstores of primary and secondary
222      // kill the secondary region replica now, and ensure that when it comes back up, we can still
223      // read from it the same data
224      boolean aborted = false;
225      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
226        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
227          if (r.getRegionInfo().getReplicaId() == 1) {
228            LOG.info("Aborting region server hosting secondary region replica");
229            rs.getRegionServer().abort("for test");
230            aborted = true;
231            break;
232          }
233        }
234      }
235      assertTrue(aborted);
236
237      // It takes extra time for replica region is ready for read as during
238      // region open process, it needs to ask primary region to do a flush and replica region
239      // can open newly flushed hfiles to avoid data out-of-sync.
240      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
241      HTU.verifyNumericRows(table, fam, 0, 1000, 2);
242    }
243
244    // restart the region server
245    HTU.getMiniHBaseCluster().startRegionServer();
246  }
247
248  /**
249   * Tests the case where there are 3 region replicas and the primary is continuously accepting
250   * new writes while one of the secondaries is killed. Verification is done for both of the
251   * secondary replicas.
252   */
253  @Test
254  public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
255    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
256        Table table = connection.getTable(htd.getTableName());
257        Admin admin = connection.getAdmin()) {
258      // start a thread to do the loading of primary
259      HTU.loadNumericRows(table, fam, 0, 1000); // start with some base
260      admin.flush(table.getName());
261      HTU.loadNumericRows(table, fam, 1000, 2000);
262
263      final AtomicReference<Throwable> ex = new AtomicReference<>(null);
264      final AtomicBoolean done = new AtomicBoolean(false);
265      final AtomicInteger key = new AtomicInteger(2000);
266
267      Thread loader = new Thread() {
268        @Override
269        public void run() {
270          while (!done.get()) {
271            try {
272              HTU.loadNumericRows(table, fam, key.get(), key.get()+1000);
273              key.addAndGet(1000);
274            } catch (Throwable e) {
275              ex.compareAndSet(null, e);
276            }
277          }
278        }
279      };
280      loader.start();
281
282      Thread aborter = new Thread() {
283        @Override
284        public void run() {
285          try {
286            boolean aborted = false;
287            for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
288              for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
289                if (r.getRegionInfo().getReplicaId() == 1) {
290                  LOG.info("Aborting region server hosting secondary region replica");
291                  rs.getRegionServer().abort("for test");
292                  aborted = true;
293                }
294              }
295            }
296            assertTrue(aborted);
297          } catch (Throwable e) {
298            ex.compareAndSet(null, e);
299          }
300        };
301      };
302
303      aborter.start();
304      aborter.join();
305      done.set(true);
306      loader.join();
307
308      assertNull(ex.get());
309
310      assertTrue(key.get() > 1000); // assert that the test is working as designed
311      LOG.info("Loaded up to key :" + key.get());
312      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
313      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
314      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
315    }
316
317    // restart the region server
318    HTU.getMiniHBaseCluster().startRegionServer();
319  }
320
321  /**
322   * Tests the case where we are creating a table with a lot of regions and replicas. Opening region
323   * replicas should not block handlers on RS indefinitely.
324   */
325  @Test
326  public void testLotsOfRegionReplicas() throws IOException {
327    int numRegions = NB_SERVERS * 20;
328    int regionReplication = 10;
329    String tableName = htd.getTableName().getNameAsString() + "2";
330    htd = HTU.createTableDescriptor(tableName);
331    htd.setRegionReplication(regionReplication);
332
333    // dont care about splits themselves too much
334    byte[] startKey = Bytes.toBytes("aaa");
335    byte[] endKey = Bytes.toBytes("zzz");
336    byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
337    HTU.getAdmin().createTable(htd, startKey, endKey, numRegions);
338
339    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
340        Table table = connection.getTable(htd.getTableName())) {
341
342      for (int i = 1; i < splits.length; i++) {
343        for (int j = 0; j < regionReplication; j++) {
344          Get get = new Get(splits[i]);
345          get.setConsistency(Consistency.TIMELINE);
346          get.setReplicaId(j);
347          table.get(get); // this should not block. Regions should be coming online
348        }
349      }
350    }
351
352    HTU.deleteTableIfAny(TableName.valueOf(tableName));
353  }
354}