001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.regionreplication;
019
020import static org.junit.jupiter.api.Assertions.assertNotNull;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.List;
026import java.util.Optional;
027import java.util.concurrent.BrokenBarrierException;
028import java.util.concurrent.CyclicBarrier;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.NotServingRegionException;
037import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
038import org.apache.hadoop.hbase.StartTestingClusterOption;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.RegionReplicaUtil;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.monitoring.MonitoredTask;
047import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
048import org.apache.hadoop.hbase.regionserver.HRegion;
049import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
050import org.apache.hadoop.hbase.regionserver.HRegionServer;
051import org.apache.hadoop.hbase.regionserver.HStore;
052import org.apache.hadoop.hbase.regionserver.MemStoreFlusher;
053import org.apache.hadoop.hbase.regionserver.RSRpcServices;
054import org.apache.hadoop.hbase.regionserver.Region;
055import org.apache.hadoop.hbase.regionserver.RegionServerServices;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.testclassification.RegionServerTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
060import org.apache.hadoop.hbase.wal.WAL;
061import org.junit.jupiter.api.AfterAll;
062import org.junit.jupiter.api.BeforeAll;
063import org.junit.jupiter.api.Tag;
064import org.junit.jupiter.api.Test;
065
066import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
067import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
068import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
069import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
070
071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
075
076@Tag(RegionServerTests.TAG)
077@Tag(LargeTests.TAG)
078public class TestRegionReplicationForFlushMarker {
079
080  private static final byte[] FAMILY = Bytes.toBytes("family_test");
081
082  private static final byte[] QUAL = Bytes.toBytes("qualifier_test");
083
084  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
085  private static final int NB_SERVERS = 2;
086
087  private static TableName tableName = TableName.valueOf("TestRegionReplicationForFlushMarker");
088  private static volatile boolean startTest = false;
089
090  @BeforeAll
091  public static void setUp() throws Exception {
092    Configuration conf = HTU.getConfiguration();
093    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
094    conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
095    conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 1);
096    conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000);
097    conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
098    conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000);
099    conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
100    conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
101    conf.setInt(RegionReplicationFlushRequester.MIN_INTERVAL_SECS, 3);
102    HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class)
103      .numRegionServers(NB_SERVERS).build());
104
105  }
106
107  @AfterAll
108  public static void tearDown() throws Exception {
109    HTU.shutdownMiniCluster();
110  }
111
112  /**
113   * This test is for HBASE-26960, before HBASE-26960, {@link MemStoreFlusher} does not write the
114   * {@link FlushAction#CANNOT_FLUSH} marker to the WAL when the memstore is empty,so if the
115   * {@link RegionReplicationSink} request a flush when the memstore is empty, it could not receive
116   * the {@link FlushAction#CANNOT_FLUSH} and the replication may be hanged. After HBASE-26768,when
117   * the {@link RegionReplicationSink} request a flush when the memstore is empty,even it does not
118   * writes the {@link FlushAction#CANNOT_FLUSH} marker to the WAL,we also replicate the
119   * {@link FlushAction#CANNOT_FLUSH} marker to the secondary region replica.
120   */
121  @Test
122  public void testCannotFlushMarker() throws Exception {
123    final HRegionForTest[] regions = this.createTable();
124    RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get();
125    assertTrue(regionReplicationSink != null);
126
127    String oldThreadName = Thread.currentThread().getName();
128    Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME);
129    try {
130
131      byte[] rowKey1 = Bytes.toBytes(1);
132      startTest = true;
133      /**
134       * Write First cell,replicating to secondary replica is error,and then
135       * {@link RegionReplicationSink} request flush,after {@link RegionReplicationSink} receiving
136       * the {@link FlushAction#START_FLUSH},the {@link RegionReplicationSink#failedReplicas} is
137       * cleared,but replicating {@link FlushAction#START_FLUSH} is failed again,so
138       * {@link RegionReplicationSink} request flush once more, but now memstore is empty,so the
139       * {@link MemStoreFlusher} just write a {@link FlushAction#CANNOT_FLUSH} marker to the WAL.
140       */
141      regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
142      /**
143       * Wait for the {@link FlushAction#CANNOT_FLUSH} is written and initiating replication
144       */
145      regions[0].cyclicBarrier.await();
146      assertTrue(regions[0].prepareFlushCounter.get() == 2);
147      /**
148       * The {@link RegionReplicationSink#failedReplicas} is cleared by the
149       * {@link FlushAction#CANNOT_FLUSH} marker.
150       */
151      assertTrue(regionReplicationSink.getFailedReplicas().isEmpty());
152    } finally {
153      startTest = false;
154      Thread.currentThread().setName(oldThreadName);
155    }
156  }
157
158  private HRegionForTest[] createTable() throws Exception {
159    TableDescriptor tableDescriptor =
160      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS)
161        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
162    HTU.getAdmin().createTable(tableDescriptor);
163    final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS];
164    for (int i = 0; i < NB_SERVERS; i++) {
165      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
166      List<HRegion> onlineRegions = rs.getRegions(tableName);
167      for (HRegion region : onlineRegions) {
168        int replicaId = region.getRegionInfo().getReplicaId();
169        assertTrue(regions[replicaId] == null);
170        regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region;
171      }
172    }
173    for (Region region : regions) {
174      assertNotNull(region);
175    }
176    return regions;
177  }
178
179  public static final class HRegionForTest extends HRegion {
180    static final String USER_THREAD_NAME = "TestRegionReplicationForFlushMarker";
181    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
182    final AtomicInteger prepareFlushCounter = new AtomicInteger(0);
183
184    public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
185      TableDescriptor htd, RegionServerServices rsServices) {
186      super(fs, wal, confParam, htd, rsServices);
187    }
188
189    @SuppressWarnings("deprecation")
190    public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
191      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
192      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
193    }
194
195    public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) {
196      this.regionReplicationSink = Optional.of(regionReplicationSink);
197    }
198
199    @Override
200    protected void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
201      // not write the region open marker to interrupt the test.
202    }
203
204    @Override
205    protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
206      Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
207      FlushLifeCycleTracker tracker) throws IOException {
208      if (!startTest) {
209        return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status,
210          writeFlushWalMarker, tracker);
211      }
212
213      if (this.getRegionInfo().getReplicaId() != 0) {
214        return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status,
215          writeFlushWalMarker, tracker);
216      }
217
218      try {
219        PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush,
220          status, writeFlushWalMarker, tracker);
221        this.prepareFlushCounter.incrementAndGet();
222        /**
223         * First flush is {@link FlushAction#START_FLUSH} marker and the second flush is
224         * {@link FlushAction#CANNOT_FLUSH} marker because the memstore is empty.
225         */
226        if (
227          this.prepareFlushCounter.get() == 2 && result.getResult() != null
228            && result.getResult().getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY
229        ) {
230
231          cyclicBarrier.await();
232        }
233        return result;
234      } catch (BrokenBarrierException | InterruptedException e) {
235        throw new RuntimeException(e);
236      }
237
238    }
239  }
240
241  public static final class ErrorReplayRSRpcServices extends RSRpcServices {
242    private static final AtomicInteger callCounter = new AtomicInteger(0);
243
244    public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException {
245      super(rs);
246    }
247
248    @Override
249    public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController,
250      ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException {
251
252      if (!startTest) {
253        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
254      }
255
256      List<WALEntry> entries = replicateWALEntryRequest.getEntryList();
257      if (CollectionUtils.isEmpty(entries)) {
258        return ReplicateWALEntryResponse.getDefaultInstance();
259      }
260      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
261
262      HRegion region;
263      try {
264        region = server.getRegionByEncodedName(regionName.toStringUtf8());
265      } catch (NotServingRegionException e) {
266        throw new ServiceException(e);
267      }
268
269      if (
270        !region.getRegionInfo().getTable().equals(tableName)
271          || region.getRegionInfo().getReplicaId() != 1
272      ) {
273        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
274      }
275
276      /**
277       * Simulate the first cell write and {@link FlushAction#START_FLUSH} marker replicating error.
278       */
279      int count = callCounter.incrementAndGet();
280      if (count > 2) {
281        return super.replicateToReplica(rpcController, replicateWALEntryRequest);
282      }
283      throw new ServiceException(new DoNotRetryIOException("Inject error!"));
284    }
285  }
286
287  public static final class RSForTest
288    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
289
290    public RSForTest(Configuration conf) throws IOException, InterruptedException {
291      super(conf);
292    }
293
294    @Override
295    protected RSRpcServices createRpcServices() throws IOException {
296      return new ErrorReplayRSRpcServices(this);
297    }
298  }
299
300}