001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.concurrent.ArrayBlockingQueue; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.CompletableFuture; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell.Type; 034import org.apache.hadoop.hbase.CellBuilderFactory; 035import org.apache.hadoop.hbase.CellBuilderType; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.testclassification.RegionServerTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 047import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter; 048import org.apache.hadoop.hbase.wal.WALEdit; 049import org.apache.hadoop.hbase.wal.WALKeyImpl; 050import org.junit.AfterClass; 051import org.junit.BeforeClass; 052import org.junit.ClassRule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 060import org.apache.hbase.thirdparty.io.netty.channel.Channel; 061import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 062import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 063import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 064 065/** 066 * Testcase for HBASE-25905 067 */ 068@Category({ RegionServerTests.class, MediumTests.class }) 069public class TestAsyncFSWALRollStuck { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestAsyncFSWALRollStuck.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class); 076 077 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 078 079 private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup(); 080 081 private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class; 082 083 private static ScheduledExecutorService EXECUTOR; 084 085 private static BlockingQueue<CompletableFuture<Long>> FUTURES = new ArrayBlockingQueue<>(3); 086 087 private static AtomicInteger SYNC_COUNT = new AtomicInteger(0); 088 089 private static CountDownLatch ARRIVE = new CountDownLatch(1); 090 091 private static CountDownLatch RESUME = new CountDownLatch(1); 092 093 public static final class TestAsyncWriter extends AsyncProtobufLogWriter { 094 095 public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) { 096 super(eventLoopGroup, channelClass); 097 } 098 099 @Override 100 public CompletableFuture<Long> sync(boolean forceSync) { 101 int count = SYNC_COUNT.incrementAndGet(); 102 if (count < 3) { 103 // we will mark these two futures as failure, to make sure that we have 2 edits in 104 // unackedAppends, and trigger a WAL roll 105 CompletableFuture<Long> f = new CompletableFuture<>(); 106 FUTURES.offer(f); 107 return f; 108 } else if (count == 3) { 109 // for this future, we will mark it as succeeded, but before returning from this method, we 110 // need to request a roll, to enter the special corner case, where we have left some edits 111 // in unackedAppends but never tries to write them out which leads to a hang 112 ARRIVE.countDown(); 113 try { 114 RESUME.await(); 115 } catch (InterruptedException e) { 116 } 117 return super.sync(forceSync); 118 } else { 119 return super.sync(forceSync); 120 } 121 } 122 } 123 124 private static TableName TN; 125 126 private static RegionInfo RI; 127 128 private static MultiVersionConcurrencyControl MVCC; 129 130 private static AsyncFSWAL WAL; 131 132 private static ExecutorService ROLL_EXEC; 133 134 @BeforeClass 135 public static void setUp() throws Exception { 136 Configuration conf = UTIL.getConfiguration(); 137 conf.setClass(AsyncFSWALProvider.WRITER_IMPL, TestAsyncWriter.class, AsyncWriter.class); 138 // set a very small size so we will reach the batch size when writing out a single edit 139 conf.setLong(AsyncFSWAL.WAL_BATCH_SIZE, 1); 140 141 TN = TableName.valueOf("test"); 142 RI = RegionInfoBuilder.newBuilder(TN).build(); 143 MVCC = new MultiVersionConcurrencyControl(); 144 145 EXECUTOR = 146 Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build()); 147 148 Path rootDir = UTIL.getDataTestDir(); 149 ROLL_EXEC = 150 Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build()); 151 WALActionsListener listener = new WALActionsListener() { 152 153 @Override 154 public void logRollRequested(RollRequestReason reason) { 155 ROLL_EXEC.execute(() -> { 156 try { 157 WAL.rollWriter(); 158 } catch (Exception e) { 159 LOG.warn("failed to roll writer", e); 160 } 161 }); 162 } 163 164 }; 165 WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), rootDir, "log", "oldlog", conf, 166 Arrays.asList(listener), true, null, null, EVENT_LOOP_GROUP, CHANNEL_CLASS); 167 WAL.init(); 168 } 169 170 @AfterClass 171 public static void tearDown() throws Exception { 172 EXECUTOR.shutdownNow(); 173 ROLL_EXEC.shutdownNow(); 174 Closeables.close(WAL, true); 175 UTIL.cleanupTestDir(); 176 } 177 178 @Test 179 public void testRoll() throws Exception { 180 byte[] row = Bytes.toBytes("family"); 181 WALEdit edit = new WALEdit(); 182 edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setFamily(row) 183 .setQualifier(row).setRow(row).setValue(row) 184 .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Type.Put).build()); 185 WALKeyImpl key1 = 186 new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC); 187 WAL.appendData(RI, key1, edit); 188 189 WALKeyImpl key2 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, key1.getWriteTime() + 1, MVCC); 190 long txid = WAL.appendData(RI, key2, edit); 191 192 // we need to make sure the two edits have both been added unackedAppends, so we have two syncs 193 UTIL.waitFor(10000, () -> FUTURES.size() == 2); 194 FUTURES.poll().completeExceptionally(new IOException("inject error")); 195 FUTURES.poll().completeExceptionally(new IOException("inject error")); 196 ARRIVE.await(); 197 // resume after 1 seconds, to give us enough time to enter the roll state 198 EXECUTOR.schedule(() -> RESUME.countDown(), 1, TimeUnit.SECONDS); 199 // let's roll the wal, before the fix in HBASE-25905, it will hang forever inside 200 // waitForSafePoint 201 WAL.rollWriter(); 202 // make sure we can finally succeed 203 WAL.sync(txid); 204 } 205}