001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.concurrent.ArrayBlockingQueue; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.CompletableFuture; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell.Type; 034import org.apache.hadoop.hbase.CellBuilderFactory; 035import org.apache.hadoop.hbase.CellBuilderType; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 042import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.testclassification.RegionServerTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 048import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter; 049import org.apache.hadoop.hbase.wal.WALEdit; 050import org.apache.hadoop.hbase.wal.WALKeyImpl; 051import org.junit.AfterClass; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061import org.apache.hbase.thirdparty.io.netty.channel.Channel; 062import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 063import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 064import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 065 066/** 067 * Testcase for HBASE-25905 068 */ 069@Category({ RegionServerTests.class, MediumTests.class }) 070public class TestAsyncFSWALRollStuck { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestAsyncFSWALRollStuck.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class); 077 078 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 079 080 private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup(); 081 082 private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class; 083 084 private static ScheduledExecutorService EXECUTOR; 085 086 private static BlockingQueue<CompletableFuture<Long>> FUTURES = new ArrayBlockingQueue<>(3); 087 088 private static AtomicInteger SYNC_COUNT = new AtomicInteger(0); 089 090 private static CountDownLatch ARRIVE = new CountDownLatch(1); 091 092 private static CountDownLatch RESUME = new CountDownLatch(1); 093 094 public static final class TestAsyncWriter extends AsyncProtobufLogWriter { 095 096 public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) { 097 super(eventLoopGroup, channelClass); 098 } 099 100 @Override 101 public CompletableFuture<Long> sync(boolean forceSync) { 102 int count = SYNC_COUNT.incrementAndGet(); 103 if (count < 3) { 104 // we will mark these two futures as failure, to make sure that we have 2 edits in 105 // unackedAppends, and trigger a WAL roll 106 CompletableFuture<Long> f = new CompletableFuture<>(); 107 FUTURES.offer(f); 108 return f; 109 } else if (count == 3) { 110 // for this future, we will mark it as succeeded, but before returning from this method, we 111 // need to request a roll, to enter the special corner case, where we have left some edits 112 // in unackedAppends but never tries to write them out which leads to a hang 113 ARRIVE.countDown(); 114 try { 115 RESUME.await(); 116 } catch (InterruptedException e) { 117 } 118 return super.sync(forceSync); 119 } else { 120 return super.sync(forceSync); 121 } 122 } 123 } 124 125 private static TableName TN; 126 127 private static RegionInfo RI; 128 129 private static MultiVersionConcurrencyControl MVCC; 130 131 private static AsyncFSWAL WAL; 132 133 private static ExecutorService ROLL_EXEC; 134 135 @BeforeClass 136 public static void setUp() throws Exception { 137 Configuration conf = UTIL.getConfiguration(); 138 conf.setClass(AsyncFSWALProvider.WRITER_IMPL, TestAsyncWriter.class, AsyncWriter.class); 139 // set a very small size so we will reach the batch size when writing out a single edit 140 conf.setLong(AsyncFSWAL.WAL_BATCH_SIZE, 1); 141 142 TN = TableName.valueOf("test"); 143 RI = RegionInfoBuilder.newBuilder(TN).build(); 144 MVCC = new MultiVersionConcurrencyControl(); 145 146 EXECUTOR = 147 Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build()); 148 149 Path rootDir = UTIL.getDataTestDir(); 150 ROLL_EXEC = 151 Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build()); 152 WALActionsListener listener = new WALActionsListener() { 153 154 @Override 155 public void logRollRequested(RollRequestReason reason) { 156 ROLL_EXEC.execute(() -> { 157 try { 158 WAL.rollWriter(); 159 } catch (Exception e) { 160 LOG.warn("failed to roll writer", e); 161 } 162 }); 163 } 164 165 }; 166 WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), null, rootDir, "log", "oldlog", conf, 167 Arrays.asList(listener), true, null, null, null, null, EVENT_LOOP_GROUP, CHANNEL_CLASS, 168 StreamSlowMonitor.create(conf, "monitor")); 169 WAL.init(); 170 } 171 172 @AfterClass 173 public static void tearDown() throws Exception { 174 EXECUTOR.shutdownNow(); 175 ROLL_EXEC.shutdownNow(); 176 Closeables.close(WAL, true); 177 UTIL.cleanupTestDir(); 178 } 179 180 @Test 181 public void testRoll() throws Exception { 182 byte[] row = Bytes.toBytes("family"); 183 WALEdit edit = new WALEdit(); 184 edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setFamily(row) 185 .setQualifier(row).setRow(row).setValue(row) 186 .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Type.Put).build()); 187 WALKeyImpl key1 = 188 new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC); 189 WAL.appendData(RI, key1, edit); 190 191 WALKeyImpl key2 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, key1.getWriteTime() + 1, MVCC); 192 long txid = WAL.appendData(RI, key2, edit); 193 194 // we need to make sure the two edits have both been added unackedAppends, so we have two syncs 195 UTIL.waitFor(10000, () -> FUTURES.size() == 2); 196 FUTURES.poll().completeExceptionally(new IOException("inject error")); 197 FUTURES.poll().completeExceptionally(new IOException("inject error")); 198 ARRIVE.await(); 199 // resume after 1 seconds, to give us enough time to enter the roll state 200 EXECUTOR.schedule(() -> RESUME.countDown(), 1, TimeUnit.SECONDS); 201 // let's roll the wal, before the fix in HBASE-25905, it will hang forever inside 202 // waitForSafePoint 203 WAL.rollWriter(); 204 // make sure we can finally succeed 205 WAL.sync(txid); 206 } 207}