001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.regionreplication;
019
020import static org.junit.Assert.assertNotNull;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.List;
026import java.util.Optional;
027import java.util.concurrent.BrokenBarrierException;
028import java.util.concurrent.CyclicBarrier;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.NotServingRegionException;
038import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
039import org.apache.hadoop.hbase.StartTestingClusterOption;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionReplicaUtil;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.monitoring.MonitoredTask;
048import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
049import org.apache.hadoop.hbase.regionserver.HRegion;
050import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.regionserver.HStore;
053import org.apache.hadoop.hbase.regionserver.MemStoreFlusher;
054import org.apache.hadoop.hbase.regionserver.RSRpcServices;
055import org.apache.hadoop.hbase.regionserver.Region;
056import org.apache.hadoop.hbase.regionserver.RegionServerServices;
057import org.apache.hadoop.hbase.testclassification.LargeTests;
058import org.apache.hadoop.hbase.testclassification.RegionServerTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
061import org.apache.hadoop.hbase.wal.WAL;
062import org.junit.AfterClass;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067
068import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
069import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
070import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
071import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
072
073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
077
078@Category({ RegionServerTests.class, LargeTests.class })
079public class TestRegionReplicationForFlushMarker {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083    HBaseClassTestRule.forClass(TestRegionReplicationForFlushMarker.class);
084
085  private static final byte[] FAMILY = Bytes.toBytes("family_test");
086
087  private static final byte[] QUAL = Bytes.toBytes("qualifier_test");
088
089  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
090  private static final int NB_SERVERS = 2;
091
092  private static TableName tableName = TableName.valueOf("TestRegionReplicationForFlushMarker");
093  private static volatile boolean startTest = false;
094
095  @BeforeClass
096  public static void setUp() throws Exception {
097    Configuration conf = HTU.getConfiguration();
098    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
099    conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
100    conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 1);
101    conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000);
102    conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
103    conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000);
104    conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
105    conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
106    conf.setInt(RegionReplicationFlushRequester.MIN_INTERVAL_SECS, 3);
107    HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class)
108      .numRegionServers(NB_SERVERS).build());
109
110  }
111
112  @AfterClass
113  public static void tearDown() throws Exception {
114    HTU.shutdownMiniCluster();
115  }
116
117  /**
118   * This test is for HBASE-26960, before HBASE-26960, {@link MemStoreFlusher} does not write the
119   * {@link FlushAction#CANNOT_FLUSH} marker to the WAL when the memstore is empty,so if the
120   * {@link RegionReplicationSink} request a flush when the memstore is empty, it could not receive
121   * the {@link FlushAction#CANNOT_FLUSH} and the replication may be hanged. After HBASE-26768,when
122   * the {@link RegionReplicationSink} request a flush when the memstore is empty,even it does not
123   * writes the {@link FlushAction#CANNOT_FLUSH} marker to the WAL,we also replicate the
124   * {@link FlushAction#CANNOT_FLUSH} marker to the secondary region replica.
125   */
126  @Test
127  public void testCannotFlushMarker() throws Exception {
128    final HRegionForTest[] regions = this.createTable();
129    RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get();
130    assertTrue(regionReplicationSink != null);
131
132    String oldThreadName = Thread.currentThread().getName();
133    Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME);
134    try {
135
136      byte[] rowKey1 = Bytes.toBytes(1);
137      startTest = true;
138      /**
139       * Write First cell,replicating to secondary replica is error,and then
140       * {@link RegionReplicationSink} request flush,after {@link RegionReplicationSink} receiving
141       * the {@link FlushAction#START_FLUSH},the {@link RegionReplicationSink#failedReplicas} is
142       * cleared,but replicating {@link FlushAction#START_FLUSH} is failed again,so
143       * {@link RegionReplicationSink} request flush once more, but now memstore is empty,so the
144       * {@link MemStoreFlusher} just write a {@link FlushAction#CANNOT_FLUSH} marker to the WAL.
145       */
146      regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
147      /**
148       * Wait for the {@link FlushAction#CANNOT_FLUSH} is written and initiating replication
149       */
150      regions[0].cyclicBarrier.await();
151      assertTrue(regions[0].prepareFlushCounter.get() == 2);
152      /**
153       * The {@link RegionReplicationSink#failedReplicas} is cleared by the
154       * {@link FlushAction#CANNOT_FLUSH} marker.
155       */
156      assertTrue(regionReplicationSink.getFailedReplicas().isEmpty());
157    } finally {
158      startTest = false;
159      Thread.currentThread().setName(oldThreadName);
160    }
161  }
162
163  private HRegionForTest[] createTable() throws Exception {
164    TableDescriptor tableDescriptor =
165      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS)
166        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
167    HTU.getAdmin().createTable(tableDescriptor);
168    final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS];
169    for (int i = 0; i < NB_SERVERS; i++) {
170      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
171      List<HRegion> onlineRegions = rs.getRegions(tableName);
172      for (HRegion region : onlineRegions) {
173        int replicaId = region.getRegionInfo().getReplicaId();
174        assertTrue(regions[replicaId] == null);
175        regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region;
176      }
177    }
178    for (Region region : regions) {
179      assertNotNull(region);
180    }
181    return regions;
182  }
183
184  public static final class HRegionForTest extends HRegion {
185    static final String USER_THREAD_NAME = "TestRegionReplicationForFlushMarker";
186    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
187    final AtomicInteger prepareFlushCounter = new AtomicInteger(0);
188
189    public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
190      TableDescriptor htd, RegionServerServices rsServices) {
191      super(fs, wal, confParam, htd, rsServices);
192    }
193
194    @SuppressWarnings("deprecation")
195    public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
196      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
197      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
198    }
199
200    public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) {
201      this.regionReplicationSink = Optional.of(regionReplicationSink);
202    }
203
204    @Override
205    protected void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
206      // not write the region open marker to interrupt the test.
207    }
208
209    @Override
210    protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
211      Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
212      FlushLifeCycleTracker tracker) throws IOException {
213      if (!startTest) {
214        return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status,
215          writeFlushWalMarker, tracker);
216      }
217
218      if (this.getRegionInfo().getReplicaId() != 0) {
219        return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status,
220          writeFlushWalMarker, tracker);
221      }
222
223      try {
224        PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush,
225          status, writeFlushWalMarker, tracker);
226        this.prepareFlushCounter.incrementAndGet();
227        /**
228         * First flush is {@link FlushAction#START_FLUSH} marker and the second flush is
229         * {@link FlushAction#CANNOT_FLUSH} marker because the memstore is empty.
230         */
231        if (
232          this.prepareFlushCounter.get() == 2 && result.getResult() != null
233            && result.getResult().getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY
234        ) {
235
236          cyclicBarrier.await();
237        }
238        return result;
239      } catch (BrokenBarrierException | InterruptedException e) {
240        throw new RuntimeException(e);
241      }
242
243    }
244  }
245
246  public static final class ErrorReplayRSRpcServices extends RSRpcServices {
247    private static final AtomicInteger callCounter = new AtomicInteger(0);
248
249    public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException {
250      super(rs);
251    }
252
253    @Override
254    public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController,
255      ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException {
256
257      if (!startTest) {
258        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
259      }
260
261      List<WALEntry> entries = replicateWALEntryRequest.getEntryList();
262      if (CollectionUtils.isEmpty(entries)) {
263        return ReplicateWALEntryResponse.getDefaultInstance();
264      }
265      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
266
267      HRegion region;
268      try {
269        region = server.getRegionByEncodedName(regionName.toStringUtf8());
270      } catch (NotServingRegionException e) {
271        throw new ServiceException(e);
272      }
273
274      if (
275        !region.getRegionInfo().getTable().equals(tableName)
276          || region.getRegionInfo().getReplicaId() != 1
277      ) {
278        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
279      }
280
281      /**
282       * Simulate the first cell write and {@link FlushAction#START_FLUSH} marker replicating error.
283       */
284      int count = callCounter.incrementAndGet();
285      if (count > 2) {
286        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
287      }
288      throw new ServiceException(new DoNotRetryIOException("Inject error!"));
289    }
290  }
291
292  public static final class RSForTest
293    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
294
295    public RSForTest(Configuration conf) throws IOException, InterruptedException {
296      super(conf);
297    }
298
299    @Override
300    protected RSRpcServices createRpcServices() throws IOException {
301      return new ErrorReplayRSRpcServices(this);
302    }
303  }
304
305}