001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName;
021import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
022
023import java.io.IOException;
024import java.lang.reflect.Constructor;
025import java.lang.reflect.InvocationTargetException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.Lock;
033import java.util.function.BiPredicate;
034import java.util.regex.Matcher;
035import java.util.regex.Pattern;
036import java.util.stream.Collectors;
037import java.util.stream.Stream;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
042import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
043import org.apache.hadoop.hbase.replication.ReplicationUtils;
044import org.apache.hadoop.hbase.replication.SyncReplicationState;
045import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
046import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.KeyLocker;
050import org.apache.hadoop.hbase.util.Pair;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
056import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
057import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
058import org.apache.hbase.thirdparty.io.netty.channel.Channel;
059import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
060
061/**
062 * The special {@link WALProvider} for synchronous replication.
063 * <p>
064 * It works like an interceptor, when getting WAL, first it will check if the given region should be
065 * replicated synchronously, if so it will return a special WAL for it, otherwise it will delegate
066 * the request to the normal {@link WALProvider}.
067 */
068@InterfaceAudience.Private
069public class SyncReplicationWALProvider implements WALProvider, PeerActionListener {
070
071  private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
072
073  // only for injecting errors for testcase, do not use it for other purpose.
074  @VisibleForTesting
075  public static final String DUAL_WAL_IMPL = "hbase.wal.sync.impl";
076
077  private final WALProvider provider;
078
079  private SyncReplicationPeerInfoProvider peerInfoProvider =
080    new DefaultSyncReplicationPeerInfoProvider();
081
082  private WALFactory factory;
083
084  private Configuration conf;
085
086  private List<WALActionsListener> listeners = new ArrayList<>();
087
088  private EventLoopGroup eventLoopGroup;
089
090  private Class<? extends Channel> channelClass;
091
092  private AtomicBoolean initialized = new AtomicBoolean(false);
093
094  // when switching from A to DA, we will put a Optional.empty into this map if there is no WAL for
095  // the peer yet. When getting WAL from this map the caller should know that it should not use
096  // DualAsyncFSWAL any more.
097  private final ConcurrentMap<String, Optional<DualAsyncFSWAL>> peerId2WAL =
098    new ConcurrentHashMap<>();
099
100  private final KeyLocker<String> createLock = new KeyLocker<>();
101
102  SyncReplicationWALProvider(WALProvider provider) {
103    this.provider = provider;
104  }
105
106  public void setPeerInfoProvider(SyncReplicationPeerInfoProvider peerInfoProvider) {
107    this.peerInfoProvider = peerInfoProvider;
108  }
109
110  @Override
111  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
112    if (!initialized.compareAndSet(false, true)) {
113      throw new IllegalStateException("WALProvider.init should only be called once.");
114    }
115    provider.init(factory, conf, providerId);
116    this.conf = conf;
117    this.factory = factory;
118    Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
119      NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
120    eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
121    channelClass = eventLoopGroupAndChannelClass.getSecond();
122  }
123
124  // Use a timestamp to make it identical. That means, after we transit the peer to DA/S and then
125  // back to A, the log prefix will be changed. This is used to simplify the implementation for
126  // replication source, where we do not need to consider that a terminated shipper could be added
127  // back.
128  private String getLogPrefix(String peerId) {
129    return factory.factoryId + "-" + EnvironmentEdgeManager.currentTime() + "-" + peerId;
130  }
131
132  private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
133    Class<? extends DualAsyncFSWAL> clazz =
134      conf.getClass(DUAL_WAL_IMPL, DualAsyncFSWAL.class, DualAsyncFSWAL.class);
135    try {
136      Constructor<?> constructor = null;
137      for (Constructor<?> c : clazz.getDeclaredConstructors()) {
138        if (c.getParameterCount() > 0) {
139          constructor = c;
140          break;
141        }
142      }
143      if (constructor == null) {
144        throw new IllegalArgumentException("No valid constructor provided for class " + clazz);
145      }
146      constructor.setAccessible(true);
147      return (DualAsyncFSWAL) constructor.newInstance(
148        CommonFSUtils.getWALFileSystem(conf),
149        ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
150        CommonFSUtils.getWALRootDir(conf),
151        ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId),
152        getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
153        conf, listeners, true, getLogPrefix(peerId), ReplicationUtils.SYNC_WAL_SUFFIX,
154        eventLoopGroup, channelClass);
155    } catch (InstantiationException | IllegalAccessException e) {
156      throw new RuntimeException(e);
157    } catch (InvocationTargetException e) {
158      Throwable cause = e.getTargetException();
159      Throwables.propagateIfPossible(cause, IOException.class);
160      throw new RuntimeException(cause);
161    }
162  }
163
164  private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
165    Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
166    if (opt != null) {
167      return opt.orElse(null);
168    }
169    Lock lock = createLock.acquireLock(peerId);
170    try {
171      opt = peerId2WAL.get(peerId);
172      if (opt != null) {
173        return opt.orElse(null);
174      }
175      DualAsyncFSWAL wal = createWAL(peerId, remoteWALDir);
176      boolean succ = false;
177      try {
178        wal.init();
179        succ = true;
180      } finally {
181        if (!succ) {
182          wal.close();
183        }
184      }
185      peerId2WAL.put(peerId, Optional.of(wal));
186      return wal;
187    } finally {
188      lock.unlock();
189    }
190  }
191
192  @Override
193  public WAL getWAL(RegionInfo region) throws IOException {
194    if (region == null) {
195      return provider.getWAL(null);
196    }
197    WAL wal = null;
198    Optional<Pair<String, String>> peerIdAndRemoteWALDir =
199        peerInfoProvider.getPeerIdAndRemoteWALDir(region.getTable());
200    if (peerIdAndRemoteWALDir.isPresent()) {
201      Pair<String, String> pair = peerIdAndRemoteWALDir.get();
202      wal = getWAL(pair.getFirst(), pair.getSecond());
203    }
204    return wal != null ? wal : provider.getWAL(region);
205  }
206
207  private Stream<WAL> getWALStream() {
208    return Streams.concat(
209      peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get),
210      provider.getWALs().stream());
211  }
212
213  @Override
214  public List<WAL> getWALs() {
215    return getWALStream().collect(Collectors.toList());
216  }
217
218  @Override
219  public void shutdown() throws IOException {
220    // save the last exception and rethrow
221    IOException failure = null;
222    for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) {
223      if (wal.isPresent()) {
224        try {
225          wal.get().shutdown();
226        } catch (IOException e) {
227          LOG.error("Shutdown WAL failed", e);
228          failure = e;
229        }
230      }
231    }
232    provider.shutdown();
233    if (failure != null) {
234      throw failure;
235    }
236  }
237
238  @Override
239  public void close() throws IOException {
240    // save the last exception and rethrow
241    IOException failure = null;
242    for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) {
243      if (wal.isPresent()) {
244        try {
245          wal.get().close();
246        } catch (IOException e) {
247          LOG.error("Close WAL failed", e);
248          failure = e;
249        }
250      }
251    }
252    provider.close();
253    if (failure != null) {
254      throw failure;
255    }
256  }
257
258  @Override
259  public long getNumLogFiles() {
260    return peerId2WAL.size() + provider.getNumLogFiles();
261  }
262
263  @Override
264  public long getLogFileSize() {
265    return peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get)
266      .mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + provider.getLogFileSize();
267  }
268
269  private void safeClose(WAL wal) {
270    if (wal != null) {
271      try {
272        wal.close();
273      } catch (IOException e) {
274        LOG.error("Close WAL failed", e);
275      }
276    }
277  }
278
279  @Override
280  public void addWALActionsListener(WALActionsListener listener) {
281    listeners.add(listener);
282    provider.addWALActionsListener(listener);
283  }
284
285  @Override
286  public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
287      SyncReplicationState to, int stage) {
288    if (from == SyncReplicationState.ACTIVE) {
289      if (stage == 0) {
290        Lock lock = createLock.acquireLock(peerId);
291        try {
292          Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
293          if (opt != null) {
294            opt.ifPresent(w -> w.skipRemoteWAL(to == SyncReplicationState.STANDBY));
295          } else {
296            // add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more.
297            peerId2WAL.put(peerId, Optional.empty());
298          }
299        } finally {
300          lock.unlock();
301        }
302      } else if (stage == 1) {
303        peerId2WAL.remove(peerId).ifPresent(this::safeClose);
304      }
305    }
306  }
307
308  private static class DefaultSyncReplicationPeerInfoProvider
309      implements SyncReplicationPeerInfoProvider {
310
311    @Override
312    public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
313      return Optional.empty();
314    }
315
316    @Override
317    public boolean checkState(TableName table,
318        BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
319      return false;
320    }
321  }
322
323  private static final Pattern LOG_PREFIX_PATTERN = Pattern.compile(".*-\\d+-(.+)");
324
325  /**
326   * <p>
327   * Returns the peer id if the wal file name is in the special group for a sync replication peer.
328   * </p>
329   * <p>
330   * The prefix format is &lt;factoryId&gt;-&lt;ts&gt;-&lt;peerId&gt;.
331   * </p>
332   */
333  public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) {
334    if (!name.endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
335      // fast path to return earlier if the name is not for a sync replication peer.
336      return Optional.empty();
337    }
338    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
339    Matcher matcher = LOG_PREFIX_PATTERN.matcher(logPrefix);
340    if (matcher.matches()) {
341      return Optional.of(matcher.group(1));
342    } else {
343      return Optional.empty();
344    }
345  }
346
347  @VisibleForTesting
348  WALProvider getWrappedProvider() {
349    return provider;
350  }
351
352}