001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.concurrent.ArrayBlockingQueue; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.CompletableFuture; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell.Type; 034import org.apache.hadoop.hbase.CellBuilderType; 035import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.RegionInfoBuilder; 040import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 041import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.testclassification.RegionServerTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 047import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter; 048import org.apache.hadoop.hbase.wal.WALEdit; 049import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 050import org.apache.hadoop.hbase.wal.WALKeyImpl; 051import org.junit.jupiter.api.AfterAll; 052import org.junit.jupiter.api.BeforeAll; 053import org.junit.jupiter.api.Tag; 054import org.junit.jupiter.api.Test; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 060import org.apache.hbase.thirdparty.io.netty.channel.Channel; 061import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 062import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 063import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 064 065/** 066 * Testcase for HBASE-25905 067 */ 068@Tag(RegionServerTests.TAG) 069@Tag(MediumTests.TAG) 070public class TestAsyncFSWALRollStuck { 071 072 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class); 073 074 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 075 076 private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup(); 077 078 private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class; 079 080 private static ScheduledExecutorService EXECUTOR; 081 082 private static BlockingQueue<CompletableFuture<Long>> FUTURES = new ArrayBlockingQueue<>(3); 083 084 private static AtomicInteger SYNC_COUNT = new AtomicInteger(0); 085 086 private static CountDownLatch ARRIVE = new CountDownLatch(1); 087 088 private static CountDownLatch RESUME = new CountDownLatch(1); 089 090 public static final class TestAsyncWriter extends AsyncProtobufLogWriter { 091 092 public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) { 093 super(eventLoopGroup, channelClass); 094 } 095 096 @Override 097 public CompletableFuture<Long> sync(boolean forceSync) { 098 int count = SYNC_COUNT.incrementAndGet(); 099 if (count < 3) { 100 // we will mark these two futures as failure, to make sure that we have 2 edits in 101 // unackedAppends, and trigger a WAL roll 102 CompletableFuture<Long> f = new CompletableFuture<>(); 103 FUTURES.offer(f); 104 return f; 105 } else if (count == 3) { 106 // for this future, we will mark it as succeeded, but before returning from this method, we 107 // need to request a roll, to enter the special corner case, where we have left some edits 108 // in unackedAppends but never tries to write them out which leads to a hang 109 ARRIVE.countDown(); 110 try { 111 RESUME.await(); 112 } catch (InterruptedException e) { 113 } 114 return super.sync(forceSync); 115 } else { 116 return super.sync(forceSync); 117 } 118 } 119 } 120 121 private static TableName TN; 122 123 private static RegionInfo RI; 124 125 private static MultiVersionConcurrencyControl MVCC; 126 127 private static AsyncFSWAL WAL; 128 129 private static ExecutorService ROLL_EXEC; 130 131 @BeforeAll 132 public static void setUp() throws Exception { 133 Configuration conf = UTIL.getConfiguration(); 134 conf.setClass(AsyncFSWALProvider.WRITER_IMPL, TestAsyncWriter.class, AsyncWriter.class); 135 // set a very small size so we will reach the batch size when writing out a single edit 136 conf.setLong(AbstractFSWAL.WAL_BATCH_SIZE, 1); 137 138 TN = TableName.valueOf("test"); 139 RI = RegionInfoBuilder.newBuilder(TN).build(); 140 MVCC = new MultiVersionConcurrencyControl(); 141 142 EXECUTOR = 143 Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build()); 144 145 Path rootDir = UTIL.getDataTestDir(); 146 ROLL_EXEC = 147 Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build()); 148 WALActionsListener listener = new WALActionsListener() { 149 150 @Override 151 public void logRollRequested(RollRequestReason reason) { 152 ROLL_EXEC.execute(() -> { 153 try { 154 WAL.rollWriter(); 155 } catch (Exception e) { 156 LOG.warn("failed to roll writer", e); 157 } 158 }); 159 } 160 161 }; 162 UTIL.getTestFileSystem().mkdirs(new Path(rootDir, "log")); 163 WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), null, rootDir, "log", "oldlog", conf, 164 Arrays.asList(listener), true, null, null, null, null, EVENT_LOOP_GROUP, CHANNEL_CLASS, 165 StreamSlowMonitor.create(conf, "monitor")); 166 WAL.init(); 167 } 168 169 @AfterAll 170 public static void tearDown() throws Exception { 171 EXECUTOR.shutdownNow(); 172 ROLL_EXEC.shutdownNow(); 173 Closeables.close(WAL, true); 174 UTIL.cleanupTestDir(); 175 } 176 177 @Test 178 public void testRoll() throws Exception { 179 byte[] row = Bytes.toBytes("family"); 180 WALEdit edit = new WALEdit(); 181 WALEditInternalHelper.addExtendedCell(edit, 182 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setFamily(row) 183 .setQualifier(row).setRow(row).setValue(row) 184 .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Type.Put).build()); 185 WALKeyImpl key1 = 186 new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC); 187 WAL.appendData(RI, key1, edit); 188 189 WALKeyImpl key2 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, key1.getWriteTime() + 1, MVCC); 190 long txid = WAL.appendData(RI, key2, edit); 191 192 // we need to make sure the two edits have both been added unackedAppends, so we have two syncs 193 UTIL.waitFor(10000, () -> FUTURES.size() == 2); 194 FUTURES.poll().completeExceptionally(new IOException("inject error")); 195 FUTURES.poll().completeExceptionally(new IOException("inject error")); 196 ARRIVE.await(); 197 // resume after 1 seconds, to give us enough time to enter the roll state 198 EXECUTOR.schedule(() -> RESUME.countDown(), 1, TimeUnit.SECONDS); 199 // let's roll the wal, before the fix in HBASE-25905, it will hang forever inside 200 // waitForSafePoint 201 WAL.rollWriter(); 202 // make sure we can finally succeed 203 WAL.sync(txid); 204 } 205}