001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertNull;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.when;
023
024import java.io.IOException;
025import java.io.UncheckedIOException;
026import java.util.List;
027import java.util.NavigableMap;
028import java.util.TreeMap;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.concurrent.atomic.AtomicReference;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.RegionInfoBuilder;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.regionserver.LogRoller;
045import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
046import org.apache.hadoop.hbase.regionserver.RegionServerServices;
047import org.apache.hadoop.hbase.regionserver.SequenceId;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.testclassification.RegionServerTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.apache.hadoop.hbase.util.FutureUtils;
053import org.apache.hadoop.hbase.util.Threads;
054import org.apache.hadoop.hbase.wal.WALEdit;
055import org.apache.hadoop.hbase.wal.WALKey;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
058import org.junit.AfterClass;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063
064import org.apache.hbase.thirdparty.io.netty.channel.Channel;
065import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
066import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
067import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
068
069/**
070 * Provides AsyncFSWAL test cases.
071 */
072@Category({ RegionServerTests.class, LargeTests.class })
073public class TestAsyncFSWAL extends AbstractTestFSWAL {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077      HBaseClassTestRule.forClass(TestAsyncFSWAL.class);
078
079  private static EventLoopGroup GROUP;
080
081  private static Class<? extends Channel> CHANNEL_CLASS;
082
083  @BeforeClass
084  public static void setUpBeforeClass() throws Exception {
085    GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncFSWAL"));
086    CHANNEL_CLASS = NioSocketChannel.class;
087    AbstractTestFSWAL.setUpBeforeClass();
088  }
089
090  @AfterClass
091  public static void tearDownAfterClass() throws Exception {
092    AbstractTestFSWAL.tearDownAfterClass();
093    GROUP.shutdownGracefully();
094  }
095
096  @Override
097  protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
098      Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
099      String prefix, String suffix) throws IOException {
100    AsyncFSWAL asyncFSWAL = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf,
101      listeners, failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS);
102    asyncFSWAL.init();
103    return asyncFSWAL;
104  }
105
106  @Override
107  protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir,
108      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
109      boolean failIfWALExists, String prefix, String suffix, final Runnable action)
110      throws IOException {
111    AsyncFSWAL asyncFSWAL = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners,
112      failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) {
113      @Override
114      protected void atHeadOfRingBufferEventHandlerAppend() {
115        action.run();
116        super.atHeadOfRingBufferEventHandlerAppend();
117      }
118
119    };
120    asyncFSWAL.init();
121    return asyncFSWAL;
122  }
123
124  @Test
125  public void testBrokenWriter() throws Exception {
126    RegionServerServices services = mock(RegionServerServices.class);
127    when(services.getConfiguration()).thenReturn(CONF);
128    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
129        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
130    RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
131    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
132    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
133    for (byte[] fam : td.getColumnFamilyNames()) {
134      scopes.put(fam, 0);
135    }
136    long timestamp = System.currentTimeMillis();
137    String testName = currentTest.getMethodName();
138    AtomicInteger failedCount = new AtomicInteger(0);
139    try (LogRoller roller = new LogRoller(services);
140        AsyncFSWAL wal = new AsyncFSWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
141            testName, CONF, null, true, null, null, GROUP, CHANNEL_CLASS) {
142
143          @Override
144          protected AsyncWriter createWriterInstance(Path path) throws IOException {
145            AsyncWriter writer = super.createWriterInstance(path);
146            return new AsyncWriter() {
147
148              @Override
149              public void close() throws IOException {
150                writer.close();
151              }
152
153              @Override
154              public long getLength() {
155                return writer.getLength();
156              }
157
158              @Override
159              public CompletableFuture<Long> sync(boolean forceSync) {
160                CompletableFuture<Long> result = writer.sync(forceSync);
161                if (failedCount.incrementAndGet() < 1000) {
162                  CompletableFuture<Long> future = new CompletableFuture<>();
163                  FutureUtils.addListener(result,
164                    (r, e) -> future.completeExceptionally(new IOException("Inject Error")));
165                  return future;
166                } else {
167                  return result;
168                }
169              }
170
171              @Override
172              public void append(Entry entry) {
173                writer.append(entry);
174              }
175            };
176          }
177        }) {
178      wal.init();
179      roller.addWAL(wal);
180      roller.start();
181      int numThreads = 10;
182      AtomicReference<Exception> error = new AtomicReference<>();
183      Thread[] threads = new Thread[numThreads];
184      for (int i = 0; i < 10; i++) {
185        final int index = i;
186        threads[index] = new Thread("Write-Thread-" + index) {
187
188          @Override
189          public void run() {
190            byte[] row = Bytes.toBytes("row" + index);
191            WALEdit cols = new WALEdit();
192            cols.add(new KeyValue(row, row, row, timestamp + index, row));
193            WALKeyImpl key = new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(),
194                SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
195                HConstants.NO_NONCE, mvcc, scopes);
196            try {
197              wal.append(ri, key, cols, true);
198            } catch (IOException e) {
199              // should not happen
200              throw new UncheckedIOException(e);
201            }
202            try {
203              wal.sync();
204            } catch (IOException e) {
205              error.set(e);
206            }
207          }
208        };
209      }
210      for (Thread t : threads) {
211        t.start();
212      }
213      for (Thread t : threads) {
214        t.join();
215      }
216      assertNull(error.get());
217    }
218  }
219}