001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertNull; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.when; 023 024import java.io.IOException; 025import java.io.UncheckedIOException; 026import java.util.List; 027import java.util.NavigableMap; 028import java.util.TreeMap; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.concurrent.atomic.AtomicReference; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.RegionInfoBuilder; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 045import org.apache.hadoop.hbase.regionserver.LogRoller; 046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 047import org.apache.hadoop.hbase.regionserver.RegionServerServices; 048import org.apache.hadoop.hbase.regionserver.SequenceId; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.RegionServerTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.CommonFSUtils; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.FutureUtils; 055import org.apache.hadoop.hbase.util.Threads; 056import org.apache.hadoop.hbase.wal.WALEdit; 057import org.apache.hadoop.hbase.wal.WALKey; 058import org.apache.hadoop.hbase.wal.WALKeyImpl; 059import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 060import org.junit.AfterClass; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065 066import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 067import org.apache.hbase.thirdparty.io.netty.channel.Channel; 068import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 069import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 070import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 071 072/** 073 * Provides AsyncFSWAL test cases. 074 */ 075@Category({ RegionServerTests.class, LargeTests.class }) 076public class TestAsyncFSWAL extends AbstractTestFSWAL { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestAsyncFSWAL.class); 081 082 private static EventLoopGroup GROUP; 083 084 private static Class<? extends Channel> CHANNEL_CLASS; 085 086 @BeforeClass 087 public static void setUpBeforeClass() throws Exception { 088 GROUP = 089 new NioEventLoopGroup(1, new ThreadFactoryBuilder().setNameFormat("TestAsyncFSWAL-pool-%d") 090 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 091 CHANNEL_CLASS = NioSocketChannel.class; 092 AbstractTestFSWAL.setUpBeforeClass(); 093 } 094 095 @AfterClass 096 public static void tearDownAfterClass() throws Exception { 097 AbstractTestFSWAL.tearDownAfterClass(); 098 GROUP.shutdownGracefully(); 099 } 100 101 @Override 102 protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, 103 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, 104 String suffix) throws IOException { 105 AsyncFSWAL wal = new AsyncFSWAL(fs, null, rootDir, logDir, archiveDir, conf, listeners, 106 failIfWALExists, prefix, suffix, null, null, GROUP, CHANNEL_CLASS, 107 StreamSlowMonitor.create(conf, "monitor")); 108 wal.init(); 109 return wal; 110 } 111 112 @Override 113 protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir, 114 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 115 boolean failIfWALExists, String prefix, String suffix, final Runnable action) 116 throws IOException { 117 AsyncFSWAL wal = new AsyncFSWAL(fs, null, rootDir, logDir, archiveDir, conf, listeners, 118 failIfWALExists, prefix, suffix, null, null, GROUP, CHANNEL_CLASS, 119 StreamSlowMonitor.create(conf, "monitor")) { 120 121 @Override 122 protected void atHeadOfRingBufferEventHandlerAppend() { 123 action.run(); 124 super.atHeadOfRingBufferEventHandlerAppend(); 125 } 126 }; 127 wal.init(); 128 return wal; 129 } 130 131 @Test 132 public void testBrokenWriter() throws Exception { 133 RegionServerServices services = mock(RegionServerServices.class); 134 when(services.getConfiguration()).thenReturn(CONF); 135 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 136 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 137 RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build(); 138 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 139 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 140 for (byte[] fam : td.getColumnFamilyNames()) { 141 scopes.put(fam, 0); 142 } 143 long timestamp = EnvironmentEdgeManager.currentTime(); 144 String testName = currentTest.getMethodName(); 145 AtomicInteger failedCount = new AtomicInteger(0); 146 try (LogRoller roller = new LogRoller(services); 147 AsyncFSWAL wal = new AsyncFSWAL(FS, null, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 148 testName, CONF, null, true, null, null, null, null, GROUP, CHANNEL_CLASS, 149 StreamSlowMonitor.create(CONF, "monitorForSuffix")) { 150 151 @Override 152 protected AsyncWriter createWriterInstance(FileSystem fs, Path path) throws IOException { 153 AsyncWriter writer = super.createWriterInstance(fs, path); 154 return new AsyncWriter() { 155 156 @Override 157 public void close() throws IOException { 158 writer.close(); 159 } 160 161 @Override 162 public long getLength() { 163 return writer.getLength(); 164 } 165 166 @Override 167 public long getSyncedLength() { 168 return writer.getSyncedLength(); 169 } 170 171 @Override 172 public CompletableFuture<Long> sync(boolean forceSync) { 173 CompletableFuture<Long> result = writer.sync(forceSync); 174 if (failedCount.incrementAndGet() < 1000) { 175 CompletableFuture<Long> future = new CompletableFuture<>(); 176 FutureUtils.addListener(result, 177 (r, e) -> future.completeExceptionally(new IOException("Inject Error"))); 178 return future; 179 } else { 180 return result; 181 } 182 } 183 184 @Override 185 public void append(Entry entry) { 186 writer.append(entry); 187 } 188 }; 189 } 190 }) { 191 wal.init(); 192 roller.addWAL(wal); 193 roller.start(); 194 int numThreads = 10; 195 AtomicReference<Exception> error = new AtomicReference<>(); 196 Thread[] threads = new Thread[numThreads]; 197 for (int i = 0; i < 10; i++) { 198 final int index = i; 199 threads[index] = new Thread("Write-Thread-" + index) { 200 201 @Override 202 public void run() { 203 byte[] row = Bytes.toBytes("row" + index); 204 WALEdit cols = new WALEdit(); 205 cols.add(new KeyValue(row, row, row, timestamp + index, row)); 206 WALKeyImpl key = new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), 207 SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, 208 HConstants.NO_NONCE, mvcc, scopes); 209 try { 210 wal.append(ri, key, cols, true); 211 } catch (IOException e) { 212 // should not happen 213 throw new UncheckedIOException(e); 214 } 215 try { 216 wal.sync(); 217 } catch (IOException e) { 218 error.set(e); 219 } 220 } 221 }; 222 } 223 for (Thread t : threads) { 224 t.start(); 225 } 226 for (Thread t : threads) { 227 t.join(); 228 } 229 assertNull(error.get()); 230 } 231 } 232}