001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.regionreplication;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.StartTestingClusterOption;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Consistency;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Mutation;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.RegionReplicaUtil;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.coprocessor.ObserverContext;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
054import org.apache.hadoop.hbase.coprocessor.RegionObserver;
055import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
056import org.apache.hadoop.hbase.regionserver.HRegion;
057import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
058import org.apache.hadoop.hbase.regionserver.HRegionServer;
059import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
060import org.apache.hadoop.hbase.regionserver.RegionServerServices;
061import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
062import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
063import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
064import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
065import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.testclassification.RegionServerTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CommonFSUtils;
070import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
071import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
072import org.apache.hadoop.hbase.wal.WAL;
073import org.apache.hadoop.hbase.wal.WALFactory;
074import org.apache.hadoop.hbase.wal.WALKeyImpl;
075import org.apache.hadoop.hbase.wal.WALProvider;
076import org.junit.AfterClass;
077import org.junit.BeforeClass;
078import org.junit.ClassRule;
079import org.junit.Test;
080import org.junit.experimental.categories.Category;
081import org.mockito.Mockito;
082
083import org.apache.hbase.thirdparty.io.netty.channel.Channel;
084import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
085
086@Category({ RegionServerTests.class, LargeTests.class })
087public class TestRegionReplicationForWriteException {
088
089  @ClassRule
090  public static final HBaseClassTestRule CLASS_RULE =
091    HBaseClassTestRule.forClass(TestRegionReplicationForWriteException.class);
092
093  private static final byte[] FAMILY = Bytes.toBytes("family_test");
094
095  private static final byte[] QUAL = Bytes.toBytes("qualifier_test");
096
097  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
098  private static final int NB_SERVERS = 2;
099
100  private static TableName tableName = TableName.valueOf("TestRegionReplicationForWriteException");
101  private static volatile boolean testWALTimout = false;
102  private static volatile boolean testCP = false;
103  private static final long timeoutMIlliseconds = 3000;
104  private static final String USER_THREAD_NAME = tableName.getNameAsString();
105
106  @BeforeClass
107  public static void setUp() throws Exception {
108    Configuration conf = HTU.getConfiguration();
109    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
110    conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
111    conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 1);
112    conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000);
113    conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
114    conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000);
115    conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
116    conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
117    conf.setInt(RegionReplicationFlushRequester.MIN_INTERVAL_SECS, 3);
118    conf.setClass(WALFactory.WAL_PROVIDER, SlowAsyncFSWALProvider.class, WALProvider.class);
119    conf.setLong(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, timeoutMIlliseconds);
120    HTU.startMiniCluster(StartTestingClusterOption.builder().numRegionServers(NB_SERVERS).build());
121
122  }
123
124  @AfterClass
125  public static void tearDown() throws Exception {
126    HTU.shutdownMiniCluster();
127  }
128
129  /**
130   * This test is for HBASE-27303.
131   */
132  @Test
133  public void testWriteException() throws Exception {
134    final HRegionForTest[] regions = this.createTable();
135    RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get();
136    assertTrue(regionReplicationSink != null);
137    final AtomicInteger replicateCounter = new AtomicInteger(0);
138    this.setUpSpiedRegionReplicationSink(regionReplicationSink, regions[0], replicateCounter);
139
140    String oldThreadName = Thread.currentThread().getName();
141    Thread.currentThread().setName(USER_THREAD_NAME);
142    try {
143      testCP = true;
144      try {
145        byte[] rowKey1 = Bytes.toBytes(1);
146        byte[] value1 = Bytes.toBytes(3);
147        /**
148         * Write first put,{@link WAL#sync} is successful but {@link RegionObserver#postBatchMutate}
149         * throws exception,the rowkey1 is applied to primary and secondary replicas successfully.
150         */
151        try {
152          regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, value1));
153          fail();
154        } catch (DoNotRetryIOException e) {
155          assertTrue(e.getMessage().equals(MyRegionObserver.ERROR_MESSAGE));
156        }
157
158        try (Table table = HTU.getConnection().getTable(tableName)) {
159          assertTrue(checkReplica(table, FAMILY, QUAL, rowKey1, value1, 0));
160          HTU.waitFor(30000, () -> checkReplica(table, FAMILY, QUAL, rowKey1, value1, 1));
161        }
162      } finally {
163        testCP = false;
164      }
165
166      byte[] rowKey2 = Bytes.toBytes(2);
167      byte[] value2 = Bytes.toBytes(6);
168      replicateCounter.set(0);
169      testWALTimout = true;
170      try {
171        /**
172         * Write second put,the {@link WAL#sync} timeout and throws
173         * {@link WALSyncTimeoutIOException},{@link HRegion#put} is failed and rowKey2 is not
174         * applied to primary and secondary replicas.
175         */
176        try {
177          regions[0].put(new Put(rowKey2).addColumn(FAMILY, QUAL, value2));
178          fail();
179        } catch (WALSyncTimeoutIOException e) {
180          assertTrue(e != null);
181        }
182
183        assertTrue(regions[0].getRSServices().isAborted());
184        assertTrue(replicateCounter.get() == 0);
185        Thread.sleep(2000);
186        try (Table table = HTU.getConnection().getTable(tableName)) {
187          assertFalse(checkReplica(table, FAMILY, QUAL, rowKey2, value2, 1));
188        }
189      } finally {
190        testWALTimout = false;
191      }
192    } finally {
193      Thread.currentThread().setName(oldThreadName);
194    }
195  }
196
197  private RegionReplicationSink setUpSpiedRegionReplicationSink(
198    final RegionReplicationSink regionReplicationSink, final HRegionForTest primaryRegion,
199    final AtomicInteger counter) {
200    RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink);
201
202    Mockito.doAnswer((invocationOnMock) -> {
203      if (!testWALTimout || !USER_THREAD_NAME.equals(Thread.currentThread().getName())) {
204        invocationOnMock.callRealMethod();
205        return null;
206      }
207      WALKeyImpl walKey = invocationOnMock.getArgument(0);
208      if (!walKey.getTableName().equals(tableName)) {
209        invocationOnMock.callRealMethod();
210        return null;
211      }
212      counter.incrementAndGet();
213      invocationOnMock.callRealMethod();
214      return null;
215    }).when(spiedRegionReplicationSink).add(Mockito.any(), Mockito.any(), Mockito.any());
216    primaryRegion.setRegionReplicationSink(spiedRegionReplicationSink);
217    return spiedRegionReplicationSink;
218  }
219
220  private static boolean checkReplica(Table table, byte[] fam, byte[] qual, byte[] rowKey,
221    byte[] expectValue, int replicaId) throws IOException {
222    Get get = new Get(rowKey).setConsistency(Consistency.TIMELINE).setReplicaId(replicaId);
223    Result result = table.get(get);
224    byte[] value = result.getValue(fam, qual);
225    return value != null && value.length > 0 && Arrays.equals(expectValue, value);
226  }
227
228  private HRegionForTest[] createTable() throws Exception {
229    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
230      .setRegionReplication(NB_SERVERS).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
231      .setCoprocessor(MyRegionObserver.class.getName()).build();
232    HTU.getAdmin().createTable(tableDescriptor);
233    final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS];
234    for (int i = 0; i < NB_SERVERS; i++) {
235      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
236      List<HRegion> onlineRegions = rs.getRegions(tableName);
237      for (HRegion region : onlineRegions) {
238        int replicaId = region.getRegionInfo().getReplicaId();
239        assertTrue(regions[replicaId] == null);
240        regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region;
241      }
242    }
243    for (HRegionForTest region : regions) {
244      assertNotNull(region);
245    }
246    return regions;
247  }
248
249  public static final class HRegionForTest extends HRegion {
250
251    public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
252      TableDescriptor htd, RegionServerServices rsServices) {
253      super(fs, wal, confParam, htd, rsServices);
254    }
255
256    @SuppressWarnings("deprecation")
257    public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
258      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
259      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
260    }
261
262    public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) {
263      this.regionReplicationSink = Optional.of(regionReplicationSink);
264    }
265
266    public RegionServerServices getRSServices() {
267      return this.rsServices;
268    }
269  }
270
271  public static class SlowAsyncFSWAL extends AsyncFSWAL {
272
273    public SlowAsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
274      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
275      boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
276      Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
277      throws FailedLogCloseException, IOException {
278      super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
279        suffix, null, null, eventLoopGroup, channelClass, monitor);
280    }
281
282    @Override
283    protected void atHeadOfRingBufferEventHandlerAppend() {
284      if (testWALTimout) {
285        try {
286          Thread.sleep(timeoutMIlliseconds + 1000);
287        } catch (InterruptedException e) {
288          throw new RuntimeException(e);
289        }
290      }
291      super.atHeadOfRingBufferEventHandlerAppend();
292    }
293
294  }
295
296  public static class SlowAsyncFSWALProvider extends AsyncFSWALProvider {
297
298    @Override
299    protected AsyncFSWAL createWAL() throws IOException {
300      return new SlowAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
301        CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.getFactoryId()),
302        getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, listeners, true, logPrefix,
303        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
304        channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId));
305    }
306
307  }
308
309  public static class MyRegionObserver implements RegionCoprocessor, RegionObserver {
310
311    private static final String ERROR_MESSAGE = "Inject error!";
312
313    @Override
314    public Optional<RegionObserver> getRegionObserver() {
315      return Optional.of(this);
316    }
317
318    @Override
319    public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
320      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
321      if (!testCP || !RegionReplicaUtil.isDefaultReplica(c.getEnvironment().getRegionInfo())) {
322        return;
323      }
324      throw new DoNotRetryIOException(ERROR_MESSAGE);
325    }
326  }
327
328}