001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.concurrent.CompletableFuture;
023import java.util.concurrent.CountDownLatch;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
028import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
029import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
030import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
031
032import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
033import org.apache.hbase.thirdparty.io.netty.channel.Channel;
034import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
035
036class DualAsyncFSWALForTest extends DualAsyncFSWAL {
037
038  private boolean localBroken;
039
040  private boolean remoteBroken;
041
042  private CountDownLatch arrive;
043
044  private CountDownLatch resume;
045
046  private final class MyCombinedAsyncWriter implements AsyncWriter {
047
048    private final AsyncWriter localWriter;
049
050    private final AsyncWriter remoteWriter;
051
052    public MyCombinedAsyncWriter(AsyncWriter localWriter, AsyncWriter remoteWriter) {
053      this.localWriter = localWriter;
054      this.remoteWriter = remoteWriter;
055    }
056
057    @Override
058    public long getLength() {
059      return localWriter.getLength();
060    }
061
062    @Override
063    public long getSyncedLength() {
064      return this.localWriter.getSyncedLength();
065    }
066
067    @Override
068    public void close() throws IOException {
069      Closeables.close(localWriter, true);
070      Closeables.close(remoteWriter, true);
071    }
072
073    @Override
074    public CompletableFuture<Long> sync(boolean forceSync) {
075      CompletableFuture<Long> localFuture;
076      CompletableFuture<Long> remoteFuture;
077      if (!localBroken) {
078        localFuture = localWriter.sync(forceSync);
079      } else {
080        localFuture = new CompletableFuture<>();
081        localFuture.completeExceptionally(new IOException("Inject error"));
082      }
083      if (!remoteBroken) {
084        remoteFuture = remoteWriter.sync(forceSync);
085      } else {
086        remoteFuture = new CompletableFuture<>();
087        remoteFuture.completeExceptionally(new IOException("Inject error"));
088      }
089      return CompletableFuture.allOf(localFuture, remoteFuture).thenApply(v -> {
090        return localFuture.getNow(0L);
091      });
092    }
093
094    @Override
095    public void append(Entry entry) {
096      if (!localBroken) {
097        localWriter.append(entry);
098      }
099      if (!remoteBroken) {
100        remoteWriter.append(entry);
101      }
102    }
103  }
104
105  public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir,
106    String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
107    boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
108    Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
109    super(fs, remoteFs, rootDir, remoteWALDir, logDir, archiveDir, conf, listeners, failIfWALExists,
110      prefix, suffix, eventLoopGroup, channelClass);
111  }
112
113  @Override
114  protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter,
115    AsyncWriter remoteWriter) {
116    return new MyCombinedAsyncWriter(localWriter, remoteWriter);
117  }
118
119  @Override
120  protected AsyncWriter createWriterInstance(Path path) throws IOException {
121    if (arrive != null) {
122      arrive.countDown();
123      try {
124        resume.await();
125      } catch (InterruptedException e) {
126      }
127    }
128    if (localBroken || remoteBroken) {
129      throw new IOException("WAL broken");
130    }
131    return super.createWriterInstance(path);
132  }
133
134  public void setLocalBroken() {
135    this.localBroken = true;
136  }
137
138  public void setRemoteBroken() {
139    this.remoteBroken = true;
140  }
141
142  public void suspendLogRoll() {
143    arrive = new CountDownLatch(1);
144    resume = new CountDownLatch(1);
145  }
146
147  public void waitUntilArrive() throws InterruptedException {
148    arrive.await();
149  }
150
151  public void resumeLogRoll() {
152    resume.countDown();
153  }
154}