001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.regionreplication;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.StartTestingClusterOption;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Consistency;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Mutation;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionReplicaUtil;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.coprocessor.ObserverContext;
051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
053import org.apache.hadoop.hbase.coprocessor.RegionObserver;
054import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
055import org.apache.hadoop.hbase.regionserver.HRegion;
056import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
057import org.apache.hadoop.hbase.regionserver.HRegionServer;
058import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
059import org.apache.hadoop.hbase.regionserver.RegionServerServices;
060import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
061import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
062import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
063import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
064import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
065import org.apache.hadoop.hbase.testclassification.LargeTests;
066import org.apache.hadoop.hbase.testclassification.RegionServerTests;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.CommonFSUtils;
069import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
070import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
071import org.apache.hadoop.hbase.wal.WAL;
072import org.apache.hadoop.hbase.wal.WALFactory;
073import org.apache.hadoop.hbase.wal.WALKeyImpl;
074import org.apache.hadoop.hbase.wal.WALProvider;
075import org.junit.jupiter.api.AfterAll;
076import org.junit.jupiter.api.BeforeAll;
077import org.junit.jupiter.api.Tag;
078import org.junit.jupiter.api.Test;
079import org.mockito.Mockito;
080
081import org.apache.hbase.thirdparty.io.netty.channel.Channel;
082import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
083
084@Tag(RegionServerTests.TAG)
085@Tag(LargeTests.TAG)
086public class TestRegionReplicationForWriteException {
087
088  private static final byte[] FAMILY = Bytes.toBytes("family_test");
089
090  private static final byte[] QUAL = Bytes.toBytes("qualifier_test");
091
092  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
093  private static final int NB_SERVERS = 2;
094
095  private static TableName tableName = TableName.valueOf("TestRegionReplicationForWriteException");
096  private static volatile boolean testWALTimout = false;
097  private static volatile boolean testCP = false;
098  private static final long timeoutMIlliseconds = 3000;
099  private static final String USER_THREAD_NAME = tableName.getNameAsString();
100
101  @BeforeAll
102  public static void setUp() throws Exception {
103    Configuration conf = HTU.getConfiguration();
104    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
105    conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
106    conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 1);
107    conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000);
108    conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
109    conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000);
110    conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
111    conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
112    conf.setInt(RegionReplicationFlushRequester.MIN_INTERVAL_SECS, 3);
113    conf.setClass(WALFactory.WAL_PROVIDER, SlowAsyncFSWALProvider.class, WALProvider.class);
114    conf.setLong(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, timeoutMIlliseconds);
115    HTU.startMiniCluster(StartTestingClusterOption.builder().numRegionServers(NB_SERVERS).build());
116
117  }
118
119  @AfterAll
120  public static void tearDown() throws Exception {
121    HTU.shutdownMiniCluster();
122  }
123
124  /**
125   * This test is for HBASE-27303.
126   */
127  @Test
128  public void testWriteException() throws Exception {
129    final HRegionForTest[] regions = this.createTable();
130    RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get();
131    assertTrue(regionReplicationSink != null);
132    final AtomicInteger replicateCounter = new AtomicInteger(0);
133    this.setUpSpiedRegionReplicationSink(regionReplicationSink, regions[0], replicateCounter);
134
135    String oldThreadName = Thread.currentThread().getName();
136    Thread.currentThread().setName(USER_THREAD_NAME);
137    try {
138      testCP = true;
139      try {
140        byte[] rowKey1 = Bytes.toBytes(1);
141        byte[] value1 = Bytes.toBytes(3);
142        /**
143         * Write first put,{@link WAL#sync} is successful but {@link RegionObserver#postBatchMutate}
144         * throws exception,the rowkey1 is applied to primary and secondary replicas successfully.
145         */
146        try {
147          regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, value1));
148          fail();
149        } catch (DoNotRetryIOException e) {
150          assertTrue(e.getMessage().equals(MyRegionObserver.ERROR_MESSAGE));
151        }
152
153        try (Table table = HTU.getConnection().getTable(tableName)) {
154          assertTrue(checkReplica(table, FAMILY, QUAL, rowKey1, value1, 0));
155          HTU.waitFor(30000, () -> checkReplica(table, FAMILY, QUAL, rowKey1, value1, 1));
156        }
157      } finally {
158        testCP = false;
159      }
160
161      byte[] rowKey2 = Bytes.toBytes(2);
162      byte[] value2 = Bytes.toBytes(6);
163      replicateCounter.set(0);
164      testWALTimout = true;
165      try {
166        /**
167         * Write second put,the {@link WAL#sync} timeout and throws
168         * {@link WALSyncTimeoutIOException},{@link HRegion#put} is failed and rowKey2 is not
169         * applied to primary and secondary replicas.
170         */
171        try {
172          regions[0].put(new Put(rowKey2).addColumn(FAMILY, QUAL, value2));
173          fail();
174        } catch (WALSyncTimeoutIOException e) {
175          assertTrue(e != null);
176        }
177
178        assertTrue(regions[0].getRSServices().isAborted());
179        assertTrue(replicateCounter.get() == 0);
180        Thread.sleep(2000);
181        try (Table table = HTU.getConnection().getTable(tableName)) {
182          assertFalse(checkReplica(table, FAMILY, QUAL, rowKey2, value2, 1));
183        }
184      } finally {
185        testWALTimout = false;
186      }
187    } finally {
188      Thread.currentThread().setName(oldThreadName);
189    }
190  }
191
192  private RegionReplicationSink setUpSpiedRegionReplicationSink(
193    final RegionReplicationSink regionReplicationSink, final HRegionForTest primaryRegion,
194    final AtomicInteger counter) {
195    RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink);
196
197    Mockito.doAnswer((invocationOnMock) -> {
198      if (!testWALTimout || !USER_THREAD_NAME.equals(Thread.currentThread().getName())) {
199        invocationOnMock.callRealMethod();
200        return null;
201      }
202      WALKeyImpl walKey = invocationOnMock.getArgument(0);
203      if (!walKey.getTableName().equals(tableName)) {
204        invocationOnMock.callRealMethod();
205        return null;
206      }
207      counter.incrementAndGet();
208      invocationOnMock.callRealMethod();
209      return null;
210    }).when(spiedRegionReplicationSink).add(Mockito.any(), Mockito.any(), Mockito.any());
211    primaryRegion.setRegionReplicationSink(spiedRegionReplicationSink);
212    return spiedRegionReplicationSink;
213  }
214
215  private static boolean checkReplica(Table table, byte[] fam, byte[] qual, byte[] rowKey,
216    byte[] expectValue, int replicaId) throws IOException {
217    Get get = new Get(rowKey).setConsistency(Consistency.TIMELINE).setReplicaId(replicaId);
218    Result result = table.get(get);
219    byte[] value = result.getValue(fam, qual);
220    return value != null && value.length > 0 && Arrays.equals(expectValue, value);
221  }
222
223  private HRegionForTest[] createTable() throws Exception {
224    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
225      .setRegionReplication(NB_SERVERS).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
226      .setCoprocessor(MyRegionObserver.class.getName()).build();
227    HTU.getAdmin().createTable(tableDescriptor);
228    final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS];
229    for (int i = 0; i < NB_SERVERS; i++) {
230      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
231      List<HRegion> onlineRegions = rs.getRegions(tableName);
232      for (HRegion region : onlineRegions) {
233        int replicaId = region.getRegionInfo().getReplicaId();
234        assertTrue(regions[replicaId] == null);
235        regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region;
236      }
237    }
238    for (HRegionForTest region : regions) {
239      assertNotNull(region);
240    }
241    return regions;
242  }
243
244  public static final class HRegionForTest extends HRegion {
245
246    public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
247      TableDescriptor htd, RegionServerServices rsServices) {
248      super(fs, wal, confParam, htd, rsServices);
249    }
250
251    @SuppressWarnings("deprecation")
252    public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
253      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
254      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
255    }
256
257    public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) {
258      this.regionReplicationSink = Optional.of(regionReplicationSink);
259    }
260
261    public RegionServerServices getRSServices() {
262      return this.rsServices;
263    }
264  }
265
266  public static class SlowAsyncFSWAL extends AsyncFSWAL {
267
268    public SlowAsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
269      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
270      boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
271      Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
272      throws FailedLogCloseException, IOException {
273      super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
274        suffix, null, null, eventLoopGroup, channelClass, monitor);
275    }
276
277    @Override
278    protected void atHeadOfRingBufferEventHandlerAppend() {
279      if (testWALTimout) {
280        try {
281          Thread.sleep(timeoutMIlliseconds + 1000);
282        } catch (InterruptedException e) {
283          throw new RuntimeException(e);
284        }
285      }
286      super.atHeadOfRingBufferEventHandlerAppend();
287    }
288
289  }
290
291  public static class SlowAsyncFSWALProvider extends AsyncFSWALProvider {
292
293    @Override
294    protected AsyncFSWAL createWAL() throws IOException {
295      return new SlowAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
296        CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.getFactoryId()),
297        getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, listeners, true, logPrefix,
298        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
299        channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId));
300    }
301
302  }
303
304  public static class MyRegionObserver implements RegionCoprocessor, RegionObserver {
305
306    private static final String ERROR_MESSAGE = "Inject error!";
307
308    @Override
309    public Optional<RegionObserver> getRegionObserver() {
310      return Optional.of(this);
311    }
312
313    @Override
314    public void postBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c,
315      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
316      if (!testCP || !RegionReplicaUtil.isDefaultReplica(c.getEnvironment().getRegionInfo())) {
317        return;
318      }
319      throw new DoNotRetryIOException(ERROR_MESSAGE);
320    }
321  }
322
323}