001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.region; 019 020import static org.junit.Assert.assertThrows; 021import static org.mockito.ArgumentMatchers.any; 022import static org.mockito.Mockito.verify; 023 024import java.io.IOException; 025import java.time.Duration; 026import java.util.List; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Abortable; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 035import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; 036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 037import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException; 038import org.apache.hadoop.hbase.testclassification.MasterTests; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 043import org.apache.hadoop.hbase.wal.WALFactory; 044import org.apache.hadoop.hbase.wal.WALProvider; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048 049import org.apache.hbase.thirdparty.io.netty.channel.Channel; 050import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 051 052@Category({ MasterTests.class, MediumTests.class }) 053public class TestMasterRegionWALSyncTimeoutIOException extends MasterRegionTestBase { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestMasterRegionWALSyncTimeoutIOException.class); 058 059 private static final Duration WAL_SYNC_TIMEOUT = Duration.ofSeconds(3); 060 061 private static volatile boolean testWalTimeout = false; 062 063 @Override 064 protected void configure(Configuration conf) throws IOException { 065 conf.setClass(WALFactory.WAL_PROVIDER, SlowAsyncFSWALProvider.class, WALProvider.class); 066 conf.setLong(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, WAL_SYNC_TIMEOUT.toMillis()); 067 } 068 069 @Override 070 protected void configure(MasterRegionParams params) { 071 params.flushIntervalMs(Duration.ofSeconds(1).toMillis()); 072 } 073 074 @Test 075 public void testUpdateWalSyncWriteException() { 076 testWalTimeout = true; 077 assertThrows(WALSyncTimeoutIOException.class, () -> { 078 for (int i = 0; i < 10; i++) { 079 region.update( 080 r -> r.put(new Put(Bytes.toBytes("0")).addColumn(CF1, QUALIFIER, Bytes.toBytes("0")))); 081 Thread.sleep(Duration.ofSeconds(1).toMillis()); 082 } 083 }); 084 verify(server).abort(any(), any()); 085 } 086 087 public static class SlowAsyncFSWAL extends AsyncFSWAL { 088 089 public SlowAsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, 090 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 091 boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, 092 Class<? extends Channel> channelClass, StreamSlowMonitor monitor) throws IOException { 093 super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, 094 suffix, null, null, eventLoopGroup, channelClass, monitor); 095 } 096 097 @Override 098 protected void atHeadOfRingBufferEventHandlerAppend() { 099 if (testWalTimeout) { 100 try { 101 Thread.sleep(WAL_SYNC_TIMEOUT.plusSeconds(1).toMillis()); 102 } catch (InterruptedException e) { 103 throw new RuntimeException(e); 104 } 105 } 106 super.atHeadOfRingBufferEventHandlerAppend(); 107 } 108 } 109 110 public static class SlowAsyncFSWALProvider extends AsyncFSWALProvider { 111 112 @Override 113 protected AsyncFSWAL createWAL() throws IOException { 114 return new SlowAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, 115 CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.getFactoryId()), 116 getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, listeners, true, logPrefix, 117 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, 118 channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId)); 119 } 120 } 121}