001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.concurrent.CompletableFuture;
023import java.util.concurrent.CountDownLatch;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
030import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
031import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
032import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
033import org.apache.hadoop.hbase.util.CommonFSUtils;
034import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
035import org.apache.hadoop.hbase.wal.WAL;
036import org.apache.hadoop.hbase.wal.WALProvider;
037
038import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
039import org.apache.hbase.thirdparty.io.netty.channel.Channel;
040import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
041
042public class BrokenRemoteAsyncFSWALProvider extends AsyncFSWALProvider {
043
044  static class BrokenRemoteAsyncFSWAL extends AsyncFSWAL {
045
046    private final class MyCombinedAsyncWriter implements WALProvider.AsyncWriter {
047
048      private final WALProvider.AsyncWriter localWriter;
049
050      private final WALProvider.AsyncWriter remoteWriter;
051
052      // remoteWriter on the first
053      public MyCombinedAsyncWriter(WALProvider.AsyncWriter localWriter,
054        WALProvider.AsyncWriter remoteWriter) {
055        this.localWriter = localWriter;
056        this.remoteWriter = remoteWriter;
057      }
058
059      @Override
060      public long getLength() {
061        return localWriter.getLength();
062      }
063
064      @Override
065      public long getSyncedLength() {
066        return this.localWriter.getSyncedLength();
067      }
068
069      @Override
070      public void close() throws IOException {
071        Closeables.close(localWriter, true);
072        Closeables.close(remoteWriter, true);
073      }
074
075      @Override
076      public CompletableFuture<Long> sync(boolean forceSync) {
077        CompletableFuture<Long> localFuture;
078        CompletableFuture<Long> remoteFuture;
079
080        if (!localBroken) {
081          localFuture = localWriter.sync(forceSync);
082        } else {
083          localFuture = new CompletableFuture<>();
084          localFuture.completeExceptionally(new IOException("Inject error"));
085        }
086        if (!remoteBroken) {
087          remoteFuture = remoteWriter.sync(forceSync);
088        } else {
089          remoteFuture = new CompletableFuture<>();
090          remoteFuture.completeExceptionally(new IOException("Inject error"));
091        }
092        return CompletableFuture.allOf(localFuture, remoteFuture).thenApply(v -> {
093          return localFuture.getNow(0L);
094        });
095      }
096
097      @Override
098      public void append(WAL.Entry entry) {
099        if (!localBroken) {
100          localWriter.append(entry);
101        }
102        if (!remoteBroken) {
103          remoteWriter.append(entry);
104        }
105      }
106    }
107
108    private volatile boolean localBroken;
109
110    private volatile boolean remoteBroken;
111
112    private CountDownLatch arrive;
113
114    private CountDownLatch resume;
115
116    public void setLocalBroken() {
117      this.localBroken = true;
118    }
119
120    public void setRemoteBroken() {
121      this.remoteBroken = true;
122    }
123
124    public void suspendLogRoll() {
125      arrive = new CountDownLatch(1);
126      resume = new CountDownLatch(1);
127    }
128
129    public void waitUntilArrive() throws InterruptedException {
130      arrive.await();
131    }
132
133    public void resumeLogRoll() {
134      resume.countDown();
135    }
136
137    public BrokenRemoteAsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
138      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
139      boolean failIfWALExists, String prefix, String suffix, FileSystem remoteFs, Path remoteWALDir,
140      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
141      StreamSlowMonitor monitor) throws FailedLogCloseException, IOException {
142      super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
143        suffix, remoteFs, remoteWALDir, eventLoopGroup, channelClass, monitor);
144    }
145
146    @Override
147    protected WALProvider.AsyncWriter createCombinedWriter(WALProvider.AsyncWriter localWriter,
148      WALProvider.AsyncWriter remoteWriter) {
149      return new MyCombinedAsyncWriter(localWriter, remoteWriter);
150    }
151
152    @Override
153    protected WALProvider.AsyncWriter createWriterInstance(FileSystem fs, Path path)
154      throws IOException {
155      if (arrive != null) {
156        arrive.countDown();
157        try {
158          resume.await();
159        } catch (InterruptedException e) {
160        }
161      }
162      if (localBroken || remoteBroken) {
163        throw new IOException("WAL broken");
164      }
165      return super.createWriterInstance(fs, path);
166    }
167  }
168
169  @Override
170  protected AsyncFSWAL createWAL() throws IOException {
171    return new BrokenRemoteAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
172      CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.getFactoryId()),
173      getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, listeners, true, logPrefix,
174      META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, null, null,
175      eventLoopGroup, channelClass,
176      factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId));
177
178  }
179
180  @Override
181  protected WAL createRemoteWAL(RegionInfo region, FileSystem remoteFs, Path remoteWALDir,
182    String prefix, String suffix) throws IOException {
183    return new BrokenRemoteAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
184      CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.getFactoryId()),
185      getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, listeners, true, prefix,
186      suffix, remoteFs, remoteWALDir, eventLoopGroup, channelClass,
187      factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId));
188  }
189
190}