001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.regionreplication;
019
020import static org.junit.Assert.assertNotNull;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.List;
026import java.util.Optional;
027import java.util.concurrent.CyclicBarrier;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.NotServingRegionException;
038import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
039import org.apache.hadoop.hbase.StartTestingClusterOption;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionReplicaUtil;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.monitoring.MonitoredTask;
048import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
049import org.apache.hadoop.hbase.regionserver.HRegion;
050import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.regionserver.HStore;
053import org.apache.hadoop.hbase.regionserver.RSRpcServices;
054import org.apache.hadoop.hbase.regionserver.Region;
055import org.apache.hadoop.hbase.regionserver.RegionServerServices;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.testclassification.RegionServerTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
060import org.apache.hadoop.hbase.wal.WAL;
061import org.junit.AfterClass;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.mockito.Mockito;
067
068import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
069import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
070import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
071import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
072
073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
076
077@Category({ RegionServerTests.class, LargeTests.class })
078public class TestRegionReplicationSinkCallbackAndFlushConcurrently {
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestRegionReplicationSinkCallbackAndFlushConcurrently.class);
082
083  private static final byte[] FAMILY = Bytes.toBytes("family_test");
084
085  private static final byte[] QUAL = Bytes.toBytes("qualifier_test");
086
087  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
088  private static final int NB_SERVERS = 2;
089
090  private static TableName tableName = TableName.valueOf("testRegionReplicationSinkSuspend");
091  private static volatile boolean startTest = false;
092
093  @BeforeClass
094  public static void setUp() throws Exception {
095    Configuration conf = HTU.getConfiguration();
096    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
097    conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
098    conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 15);
099    conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000);
100    conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
101    conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000);
102    conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
103    conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
104    HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class)
105      .numRegionServers(NB_SERVERS).build());
106
107  }
108
109  @AfterClass
110  public static void tearDown() throws Exception {
111    HTU.shutdownMiniCluster();
112  }
113
114  /**
115   * This test is for HBASE-26768,test the case that we have already clear the
116   * {@link RegionReplicationSink#failedReplicas} due to a flush all edit,which may in flusher
117   * thread,and then in the callback of replay, which may in Netty nioEventLoop,we add a replica to
118   * the {@link RegionReplicationSink#failedReplicas} because of a failure of replicating.
119   */
120  @Test
121  public void test() throws Exception {
122    final HRegionForTest[] regions = this.createTable();
123    final AtomicBoolean completedRef = new AtomicBoolean(false);
124    RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get();
125    assertTrue(regionReplicationSink != null);
126
127    RegionReplicationSink spiedRegionReplicationSink =
128      this.setUpSpiedRegionReplicationSink(regionReplicationSink, regions[0], completedRef);
129
130    String oldThreadName = Thread.currentThread().getName();
131    Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME);
132    try {
133      startTest = true;
134      /**
135       * Write First cell,replicating to secondary replica is error.
136       */
137      byte[] rowKey1 = Bytes.toBytes(1);
138
139      regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
140      regions[0].flushcache(true, true, FlushLifeCycleTracker.DUMMY);
141
142      HTU.waitFor(120000, () -> completedRef.get());
143      assertTrue(spiedRegionReplicationSink.getFailedReplicas().isEmpty());
144    } finally {
145      startTest = false;
146      Thread.currentThread().setName(oldThreadName);
147    }
148  }
149
150  private RegionReplicationSink setUpSpiedRegionReplicationSink(
151    final RegionReplicationSink regionReplicationSink, final HRegionForTest primaryRegion,
152    final AtomicBoolean completedRef) {
153    final AtomicInteger onCompleteCounter = new AtomicInteger(0);
154    final AtomicInteger getStartFlushAllDescriptorCounter = new AtomicInteger(0);
155    RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink);
156
157    Mockito.doAnswer((invocationOnMock) -> {
158      if (!startTest) {
159        invocationOnMock.callRealMethod();
160        return null;
161      }
162      int count = onCompleteCounter.incrementAndGet();
163      if (count == 1) {
164        primaryRegion.cyclicBarrier.await();
165        invocationOnMock.callRealMethod();
166        completedRef.set(true);
167        return null;
168      }
169      invocationOnMock.callRealMethod();
170      return null;
171    }).when(spiedRegionReplicationSink).onComplete(Mockito.anyList(), Mockito.anyMap());
172
173    Mockito.doAnswer((invocationOnMock) -> {
174      if (!startTest) {
175        return invocationOnMock.callRealMethod();
176      }
177      if (
178        primaryRegion.prepareFlush
179          && Thread.currentThread().getName().equals(HRegionForTest.USER_THREAD_NAME)
180      ) {
181        int count = getStartFlushAllDescriptorCounter.incrementAndGet();
182        if (count == 1) {
183          // onComplete could execute
184          primaryRegion.cyclicBarrier.await();
185          return invocationOnMock.callRealMethod();
186        }
187      }
188      return invocationOnMock.callRealMethod();
189    }).when(spiedRegionReplicationSink).getStartFlushAllDescriptor(Mockito.any());
190
191    primaryRegion.setRegionReplicationSink(spiedRegionReplicationSink);
192    return spiedRegionReplicationSink;
193  }
194
195  private HRegionForTest[] createTable() throws Exception {
196    TableDescriptor tableDescriptor =
197      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS)
198        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
199    HTU.getAdmin().createTable(tableDescriptor);
200    final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS];
201    for (int i = 0; i < NB_SERVERS; i++) {
202      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
203      List<HRegion> onlineRegions = rs.getRegions(tableName);
204      for (HRegion region : onlineRegions) {
205        int replicaId = region.getRegionInfo().getReplicaId();
206        assertTrue(regions[replicaId] == null);
207        regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region;
208      }
209    }
210    for (Region region : regions) {
211      assertNotNull(region);
212    }
213    return regions;
214  }
215
216  public static final class HRegionForTest extends HRegion {
217    static final String USER_THREAD_NAME = "TestReplicationHang";
218    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
219    volatile boolean prepareFlush = false;
220
221    public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
222      TableDescriptor htd, RegionServerServices rsServices) {
223      super(fs, wal, confParam, htd, rsServices);
224    }
225
226    @SuppressWarnings("deprecation")
227    public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
228      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
229      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
230    }
231
232    public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) {
233      this.regionReplicationSink = Optional.of(regionReplicationSink);
234    }
235
236    @Override
237    protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
238      Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
239      FlushLifeCycleTracker tracker) throws IOException {
240      if (!startTest) {
241        return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status,
242          writeFlushWalMarker, tracker);
243      }
244
245      if (
246        this.getRegionInfo().getReplicaId() == 0
247          && Thread.currentThread().getName().equals(USER_THREAD_NAME)
248      ) {
249        this.prepareFlush = true;
250      }
251      try {
252        PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush,
253          status, writeFlushWalMarker, tracker);
254
255        return result;
256      } finally {
257        if (
258          this.getRegionInfo().getReplicaId() == 0
259            && Thread.currentThread().getName().equals(USER_THREAD_NAME)
260        ) {
261          this.prepareFlush = false;
262        }
263      }
264
265    }
266  }
267
268  public static final class ErrorReplayRSRpcServices extends RSRpcServices {
269    private static final AtomicInteger callCounter = new AtomicInteger(0);
270
271    public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException {
272      super(rs);
273    }
274
275    @Override
276    public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController,
277      ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException {
278
279      if (!startTest) {
280        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
281      }
282
283      List<WALEntry> entries = replicateWALEntryRequest.getEntryList();
284      if (CollectionUtils.isEmpty(entries)) {
285        return ReplicateWALEntryResponse.getDefaultInstance();
286      }
287      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
288
289      HRegion region;
290      try {
291        region = server.getRegionByEncodedName(regionName.toStringUtf8());
292      } catch (NotServingRegionException e) {
293        throw new ServiceException(e);
294      }
295
296      if (
297        !region.getRegionInfo().getTable().equals(tableName)
298          || region.getRegionInfo().getReplicaId() != 1
299      ) {
300        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
301      }
302
303      /**
304       * Simulate the first cell replicating error.
305       */
306      int count = callCounter.incrementAndGet();
307      if (count > 1) {
308        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
309      }
310      throw new ServiceException(new DoNotRetryIOException("Inject error!"));
311    }
312  }
313
314  public static final class RSForTest
315    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
316
317    public RSForTest(Configuration conf) throws IOException, InterruptedException {
318      super(conf);
319    }
320
321    @Override
322    protected RSRpcServices createRpcServices() throws IOException {
323      return new ErrorReplayRSRpcServices(this);
324    }
325  }
326}