001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Optional;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.locks.Condition;
028import java.util.concurrent.locks.Lock;
029import java.util.concurrent.locks.ReentrantLock;
030import java.util.function.BiPredicate;
031import java.util.regex.Matcher;
032import java.util.regex.Pattern;
033import java.util.stream.Collectors;
034import java.util.stream.Stream;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Abortable;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
042import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
043import org.apache.hadoop.hbase.replication.ReplicationUtils;
044import org.apache.hadoop.hbase.replication.SyncReplicationState;
045import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
046import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.IOExceptionConsumer;
049import org.apache.hadoop.hbase.util.IOExceptionRunnable;
050import org.apache.hadoop.hbase.util.KeyLocker;
051import org.apache.hadoop.hbase.util.Pair;
052import org.apache.hadoop.io.MultipleIOException;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
058
059/**
060 * Base class for a WAL Provider.
061 * <p>
062 * We will put some common logic here, especially for sync replication implementation, as it must do
063 * some hacks before the normal wal creation operation.
064 * <p>
065 * All {@link WALProvider} implementations should extends this class instead of implement
066 * {@link WALProvider} directly, except {@link DisabledWALProvider}.
067 */
068@InterfaceAudience.Private
069public abstract class AbstractWALProvider implements WALProvider, PeerActionListener {
070
071  private static final Logger LOG = LoggerFactory.getLogger(AbstractWALProvider.class);
072
073  // should be package private; more visible for use in AbstractFSWAL
074  public static final String WAL_FILE_NAME_DELIMITER = ".";
075
076  protected WALFactory factory;
077  protected Configuration conf;
078  protected List<WALActionsListener> listeners = new ArrayList<>();
079  protected String providerId;
080  protected AtomicBoolean initialized = new AtomicBoolean(false);
081  // for default wal provider, logPrefix won't change
082  protected String logPrefix;
083  protected Abortable abortable;
084
085  // when switching from A to DA, we will put a Optional.empty into this map if there is no WAL for
086  // the peer yet. When getting WAL from this map the caller should know that it should not use
087  // the remote WAL any more.
088  private final ConcurrentMap<String, Optional<WAL>> peerId2WAL = new ConcurrentHashMap<>();
089
090  private final KeyLocker<String> createLock = new KeyLocker<>();
091
092  // in getWALs we can not throw any exceptions out, so we use lock and condition here as it
093  // supports awaitUninterruptibly which will not throw a InterruptedException
094  private final Lock numRemoteWALUnderCreationLock = new ReentrantLock();
095  private final Condition noRemoteWALUnderCreationCond =
096    numRemoteWALUnderCreationLock.newCondition();
097  // record the number of remote WALs which are under creation. This is very important to not
098  // missing a WAL instance in getWALs method. See HBASE-28140 and related issues for more details.
099  private int numRemoteWALUnderCreation;
100
101  // we need to have this because when getting meta wal, there is no peer info provider yet.
102  private SyncReplicationPeerInfoProvider peerInfoProvider = new SyncReplicationPeerInfoProvider() {
103
104    @Override
105    public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
106      return Optional.empty();
107    }
108
109    @Override
110    public boolean checkState(TableName table,
111      BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
112      return false;
113    }
114
115  };
116
117  @Override
118  public final void init(WALFactory factory, Configuration conf, String providerId,
119    Abortable server) throws IOException {
120    if (!initialized.compareAndSet(false, true)) {
121      throw new IllegalStateException("WALProvider.init should only be called once.");
122    }
123    this.factory = factory;
124    this.conf = conf;
125    this.abortable = server;
126    doInit(factory, conf, providerId);
127  }
128
129  protected final void initWAL(WAL wal) throws IOException {
130    boolean succ = false;
131    try {
132      wal.init();
133      succ = true;
134    } finally {
135      if (!succ) {
136        safeClose(wal);
137      }
138    }
139  }
140
141  // Use a timestamp to make it identical. That means, after we transit the peer to DA/S and then
142  // back to A, the log prefix will be changed. This is used to simplify the implementation for
143  // replication source, where we do not need to consider that a terminated shipper could be added
144  // back.
145  private String getRemoteWALPrefix(String peerId) {
146    return factory.factoryId + "-" + EnvironmentEdgeManager.currentTime() + "-" + peerId;
147  }
148
149  private WAL getRemoteWAL(RegionInfo region, String peerId, String remoteWALDir)
150    throws IOException {
151    Optional<WAL> opt = peerId2WAL.get(peerId);
152    if (opt != null) {
153      return opt.orElse(null);
154    }
155    Lock lock = createLock.acquireLock(peerId);
156    try {
157      opt = peerId2WAL.get(peerId);
158      if (opt != null) {
159        return opt.orElse(null);
160      }
161      WAL wal = createRemoteWAL(region, ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
162        ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId), getRemoteWALPrefix(peerId),
163        ReplicationUtils.SYNC_WAL_SUFFIX);
164      numRemoteWALUnderCreationLock.lock();
165      try {
166        numRemoteWALUnderCreation++;
167      } finally {
168        numRemoteWALUnderCreationLock.unlock();
169      }
170      initWAL(wal);
171      peerId2WAL.put(peerId, Optional.of(wal));
172      return wal;
173    } finally {
174      lock.unlock();
175      numRemoteWALUnderCreationLock.lock();
176      try {
177        numRemoteWALUnderCreation--;
178        if (numRemoteWALUnderCreation == 0) {
179          noRemoteWALUnderCreationCond.signalAll();
180        }
181      } finally {
182        numRemoteWALUnderCreationLock.unlock();
183      }
184    }
185  }
186
187  @Override
188  public final WAL getWAL(RegionInfo region) throws IOException {
189    if (region == null) {
190      return getWAL0(null);
191    }
192    // deal with sync replication
193    Optional<Pair<String, String>> peerIdAndRemoteWALDir =
194      peerInfoProvider.getPeerIdAndRemoteWALDir(region.getTable());
195    if (peerIdAndRemoteWALDir.isPresent()) {
196      Pair<String, String> pair = peerIdAndRemoteWALDir.get();
197      WAL wal = getRemoteWAL(region, pair.getFirst(), pair.getSecond());
198      if (wal != null) {
199        return wal;
200      }
201    }
202    // fallback to normal WALProvider logic
203    return getWAL0(region);
204  }
205
206  @Override
207  public final List<WAL> getWALs() {
208    List<WAL> wals = new ArrayList<WAL>();
209    numRemoteWALUnderCreationLock.lock();
210    try {
211      while (numRemoteWALUnderCreation > 0) {
212        noRemoteWALUnderCreationCond.awaitUninterruptibly();
213      }
214      peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get)
215        .forEach(wals::add);
216    } finally {
217      numRemoteWALUnderCreationLock.unlock();
218    }
219    return Streams
220      .concat(peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get),
221        getWALs0().stream())
222      .collect(Collectors.toList());
223  }
224
225  @Override
226  public PeerActionListener getPeerActionListener() {
227    return this;
228  }
229
230  @Override
231  public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
232    SyncReplicationState to, int stage) {
233    if (from == SyncReplicationState.ACTIVE) {
234      if (stage == 0) {
235        Lock lock = createLock.acquireLock(peerId);
236        try {
237          Optional<WAL> opt = peerId2WAL.get(peerId);
238          if (opt != null) {
239            opt.ifPresent(w -> w.skipRemoteWAL(to == SyncReplicationState.STANDBY));
240          } else {
241            // add a place holder to tell the getWAL caller do not use the remote WAL any more.
242            peerId2WAL.put(peerId, Optional.empty());
243          }
244        } finally {
245          lock.unlock();
246        }
247      } else if (stage == 1) {
248        peerId2WAL.remove(peerId).ifPresent(AbstractWALProvider::safeClose);
249      }
250    }
251  }
252
253  @Override
254  public void setSyncReplicationPeerInfoProvider(SyncReplicationPeerInfoProvider provider) {
255    this.peerInfoProvider = provider;
256  }
257
258  private static void safeClose(WAL wal) {
259    if (wal != null) {
260      try {
261        wal.close();
262      } catch (IOException e) {
263        LOG.error("Close WAL failed", e);
264      }
265    }
266  }
267
268  @Override
269  public void addWALActionsListener(WALActionsListener listener) {
270    listeners.add(listener);
271  }
272
273  private void cleanup(IOExceptionConsumer<WAL> cleanupWAL, IOExceptionRunnable finalCleanup)
274    throws IOException {
275    MultipleIOException.Builder builder = new MultipleIOException.Builder();
276    for (Optional<WAL> wal : peerId2WAL.values()) {
277      if (wal.isPresent()) {
278        try {
279          cleanupWAL.accept(wal.get());
280        } catch (IOException e) {
281          LOG.error("cleanup WAL failed", e);
282          builder.add(e);
283        }
284      }
285    }
286    try {
287      finalCleanup.run();
288    } catch (IOException e) {
289      LOG.error("cleanup WAL failed", e);
290      builder.add(e);
291    }
292    if (!builder.isEmpty()) {
293      throw builder.build();
294    }
295  }
296
297  @Override
298  public final void shutdown() throws IOException {
299    cleanup(WAL::shutdown, this::shutdown0);
300  }
301
302  @Override
303  public final void close() throws IOException {
304    cleanup(WAL::close, this::close0);
305  }
306
307  private Stream<AbstractFSWAL<?>> remoteWALStream() {
308    return peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get)
309      .filter(w -> w instanceof AbstractFSWAL).map(w -> (AbstractFSWAL<?>) w);
310  }
311
312  @Override
313  public final long getNumLogFiles() {
314    return remoteWALStream().mapToLong(AbstractFSWAL::getNumLogFiles).sum() + getNumLogFiles0();
315  }
316
317  @Override
318  public final long getLogFileSize() {
319    return remoteWALStream().mapToLong(AbstractFSWAL::getLogFileSize).sum() + getLogFileSize0();
320  }
321
322  private static final Pattern LOG_PREFIX_PATTERN = Pattern.compile(".*-\\d+-(.+)");
323
324  /**
325   * <p>
326   * Returns the peer id if the wal file name is in the special group for a sync replication peer.
327   * </p>
328   * <p>
329   * The prefix format is &lt;factoryId&gt;-&lt;ts&gt;-&lt;peerId&gt;.
330   * </p>
331   */
332  public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) {
333    if (!name.endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
334      // fast path to return earlier if the name is not for a sync replication peer.
335      return Optional.empty();
336    }
337    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
338    Matcher matcher = LOG_PREFIX_PATTERN.matcher(logPrefix);
339    if (matcher.matches()) {
340      return Optional.of(matcher.group(1));
341    } else {
342      return Optional.empty();
343    }
344  }
345
346  protected abstract WAL createRemoteWAL(RegionInfo region, FileSystem remoteFs, Path remoteWALDir,
347    String prefix, String suffix) throws IOException;
348
349  protected abstract void doInit(WALFactory factory, Configuration conf, String providerId)
350    throws IOException;
351
352  protected abstract WAL getWAL0(RegionInfo region) throws IOException;
353
354  protected abstract List<WAL> getWALs0();
355
356  protected abstract void shutdown0() throws IOException;
357
358  protected abstract void close0() throws IOException;
359
360  protected abstract long getNumLogFiles0();
361
362  protected abstract long getLogFileSize0();
363
364}