001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.regionreplication;
019
020import static org.junit.jupiter.api.Assertions.assertNotNull;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.List;
026import java.util.Optional;
027import java.util.concurrent.CyclicBarrier;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.NotServingRegionException;
037import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
038import org.apache.hadoop.hbase.StartTestingClusterOption;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.RegionReplicaUtil;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.monitoring.MonitoredTask;
047import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
048import org.apache.hadoop.hbase.regionserver.HRegion;
049import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
050import org.apache.hadoop.hbase.regionserver.HRegionServer;
051import org.apache.hadoop.hbase.regionserver.HStore;
052import org.apache.hadoop.hbase.regionserver.RSRpcServices;
053import org.apache.hadoop.hbase.regionserver.Region;
054import org.apache.hadoop.hbase.regionserver.RegionServerServices;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.RegionServerTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
059import org.apache.hadoop.hbase.wal.WAL;
060import org.junit.jupiter.api.AfterAll;
061import org.junit.jupiter.api.BeforeAll;
062import org.junit.jupiter.api.Tag;
063import org.junit.jupiter.api.Test;
064import org.mockito.Mockito;
065
066import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
067import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
068import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
069import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
070
071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
074
075@Tag(RegionServerTests.TAG)
076@Tag(LargeTests.TAG)
077public class TestRegionReplicationSinkCallbackAndFlushConcurrently {
078
079  private static final byte[] FAMILY = Bytes.toBytes("family_test");
080
081  private static final byte[] QUAL = Bytes.toBytes("qualifier_test");
082
083  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
084  private static final int NB_SERVERS = 2;
085
086  private static TableName tableName = TableName.valueOf("testRegionReplicationSinkSuspend");
087  private static volatile boolean startTest = false;
088
089  @BeforeAll
090  public static void setUp() throws Exception {
091    Configuration conf = HTU.getConfiguration();
092    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
093    conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
094    conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 15);
095    conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000);
096    conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
097    conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000);
098    conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
099    conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
100    HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class)
101      .numRegionServers(NB_SERVERS).build());
102
103  }
104
105  @AfterAll
106  public static void tearDown() throws Exception {
107    HTU.shutdownMiniCluster();
108  }
109
110  /**
111   * This test is for HBASE-26768,test the case that we have already clear the
112   * {@link RegionReplicationSink#failedReplicas} due to a flush all edit,which may in flusher
113   * thread,and then in the callback of replay, which may in Netty nioEventLoop,we add a replica to
114   * the {@link RegionReplicationSink#failedReplicas} because of a failure of replicating.
115   */
116  @Test
117  public void test() throws Exception {
118    final HRegionForTest[] regions = this.createTable();
119    final AtomicBoolean completedRef = new AtomicBoolean(false);
120    RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get();
121    assertTrue(regionReplicationSink != null);
122
123    RegionReplicationSink spiedRegionReplicationSink =
124      this.setUpSpiedRegionReplicationSink(regionReplicationSink, regions[0], completedRef);
125
126    String oldThreadName = Thread.currentThread().getName();
127    Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME);
128    try {
129      startTest = true;
130      /**
131       * Write First cell,replicating to secondary replica is error.
132       */
133      byte[] rowKey1 = Bytes.toBytes(1);
134
135      regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
136      regions[0].flushcache(true, true, FlushLifeCycleTracker.DUMMY);
137
138      HTU.waitFor(120000, () -> completedRef.get());
139      assertTrue(spiedRegionReplicationSink.getFailedReplicas().isEmpty());
140    } finally {
141      startTest = false;
142      Thread.currentThread().setName(oldThreadName);
143    }
144  }
145
146  private RegionReplicationSink setUpSpiedRegionReplicationSink(
147    final RegionReplicationSink regionReplicationSink, final HRegionForTest primaryRegion,
148    final AtomicBoolean completedRef) {
149    final AtomicInteger onCompleteCounter = new AtomicInteger(0);
150    final AtomicInteger getStartFlushAllDescriptorCounter = new AtomicInteger(0);
151    RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink);
152
153    Mockito.doAnswer((invocationOnMock) -> {
154      if (!startTest) {
155        invocationOnMock.callRealMethod();
156        return null;
157      }
158      int count = onCompleteCounter.incrementAndGet();
159      if (count == 1) {
160        primaryRegion.cyclicBarrier.await();
161        invocationOnMock.callRealMethod();
162        completedRef.set(true);
163        return null;
164      }
165      invocationOnMock.callRealMethod();
166      return null;
167    }).when(spiedRegionReplicationSink).onComplete(Mockito.anyList(), Mockito.anyMap());
168
169    Mockito.doAnswer((invocationOnMock) -> {
170      if (!startTest) {
171        return invocationOnMock.callRealMethod();
172      }
173      if (
174        primaryRegion.prepareFlush
175          && Thread.currentThread().getName().equals(HRegionForTest.USER_THREAD_NAME)
176      ) {
177        int count = getStartFlushAllDescriptorCounter.incrementAndGet();
178        if (count == 1) {
179          // onComplete could execute
180          primaryRegion.cyclicBarrier.await();
181          return invocationOnMock.callRealMethod();
182        }
183      }
184      return invocationOnMock.callRealMethod();
185    }).when(spiedRegionReplicationSink).getStartFlushAllDescriptor(Mockito.any());
186
187    primaryRegion.setRegionReplicationSink(spiedRegionReplicationSink);
188    return spiedRegionReplicationSink;
189  }
190
191  private HRegionForTest[] createTable() throws Exception {
192    TableDescriptor tableDescriptor =
193      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS)
194        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
195    HTU.getAdmin().createTable(tableDescriptor);
196    final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS];
197    for (int i = 0; i < NB_SERVERS; i++) {
198      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
199      List<HRegion> onlineRegions = rs.getRegions(tableName);
200      for (HRegion region : onlineRegions) {
201        int replicaId = region.getRegionInfo().getReplicaId();
202        assertTrue(regions[replicaId] == null);
203        regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region;
204      }
205    }
206    for (Region region : regions) {
207      assertNotNull(region);
208    }
209    return regions;
210  }
211
212  public static final class HRegionForTest extends HRegion {
213    static final String USER_THREAD_NAME = "TestReplicationHang";
214    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
215    volatile boolean prepareFlush = false;
216
217    public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
218      TableDescriptor htd, RegionServerServices rsServices) {
219      super(fs, wal, confParam, htd, rsServices);
220    }
221
222    @SuppressWarnings("deprecation")
223    public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
224      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
225      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
226    }
227
228    public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) {
229      this.regionReplicationSink = Optional.of(regionReplicationSink);
230    }
231
232    @Override
233    protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
234      Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
235      FlushLifeCycleTracker tracker) throws IOException {
236      if (!startTest) {
237        return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status,
238          writeFlushWalMarker, tracker);
239      }
240
241      if (
242        this.getRegionInfo().getReplicaId() == 0
243          && Thread.currentThread().getName().equals(USER_THREAD_NAME)
244      ) {
245        this.prepareFlush = true;
246      }
247      try {
248        PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush,
249          status, writeFlushWalMarker, tracker);
250
251        return result;
252      } finally {
253        if (
254          this.getRegionInfo().getReplicaId() == 0
255            && Thread.currentThread().getName().equals(USER_THREAD_NAME)
256        ) {
257          this.prepareFlush = false;
258        }
259      }
260
261    }
262  }
263
264  public static final class ErrorReplayRSRpcServices extends RSRpcServices {
265    private static final AtomicInteger callCounter = new AtomicInteger(0);
266
267    public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException {
268      super(rs);
269    }
270
271    @Override
272    public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController,
273      ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException {
274
275      if (!startTest) {
276        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
277      }
278
279      List<WALEntry> entries = replicateWALEntryRequest.getEntryList();
280      if (CollectionUtils.isEmpty(entries)) {
281        return ReplicateWALEntryResponse.getDefaultInstance();
282      }
283      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
284
285      HRegion region;
286      try {
287        region = server.getRegionByEncodedName(regionName.toStringUtf8());
288      } catch (NotServingRegionException e) {
289        throw new ServiceException(e);
290      }
291
292      if (
293        !region.getRegionInfo().getTable().equals(tableName)
294          || region.getRegionInfo().getReplicaId() != 1
295      ) {
296        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
297      }
298
299      /**
300       * Simulate the first cell replicating error.
301       */
302      int count = callCounter.incrementAndGet();
303      if (count > 1) {
304        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
305      }
306      throw new ServiceException(new DoNotRetryIOException("Inject error!"));
307    }
308  }
309
310  public static final class RSForTest
311    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
312
313    public RSForTest(Configuration conf) throws IOException, InterruptedException {
314      super(conf);
315    }
316
317    @Override
318    protected RSRpcServices createRpcServices() throws IOException {
319      return new ErrorReplayRSRpcServices(this);
320    }
321  }
322}