001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Optional; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.locks.Condition; 028import java.util.concurrent.locks.Lock; 029import java.util.concurrent.locks.ReentrantLock; 030import java.util.function.BiPredicate; 031import java.util.regex.Matcher; 032import java.util.regex.Pattern; 033import java.util.stream.Collectors; 034import java.util.stream.Stream; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Abortable; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 042import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 043import org.apache.hadoop.hbase.replication.ReplicationUtils; 044import org.apache.hadoop.hbase.replication.SyncReplicationState; 045import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; 046import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.IOExceptionConsumer; 049import org.apache.hadoop.hbase.util.IOExceptionRunnable; 050import org.apache.hadoop.hbase.util.KeyLocker; 051import org.apache.hadoop.hbase.util.Pair; 052import org.apache.hadoop.io.MultipleIOException; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.Streams; 058 059/** 060 * Base class for a WAL Provider. 061 * <p> 062 * We will put some common logic here, especially for sync replication implementation, as it must do 063 * some hacks before the normal wal creation operation. 064 * <p> 065 * All {@link WALProvider} implementations should extends this class instead of implement 066 * {@link WALProvider} directly, except {@link DisabledWALProvider}. 067 */ 068@InterfaceAudience.Private 069public abstract class AbstractWALProvider implements WALProvider, PeerActionListener { 070 071 private static final Logger LOG = LoggerFactory.getLogger(AbstractWALProvider.class); 072 073 // should be package private; more visible for use in AbstractFSWAL 074 public static final String WAL_FILE_NAME_DELIMITER = "."; 075 076 protected WALFactory factory; 077 protected Configuration conf; 078 protected List<WALActionsListener> listeners = new ArrayList<>(); 079 protected String providerId; 080 protected AtomicBoolean initialized = new AtomicBoolean(false); 081 // for default wal provider, logPrefix won't change 082 protected String logPrefix; 083 protected Abortable abortable; 084 085 // when switching from A to DA, we will put a Optional.empty into this map if there is no WAL for 086 // the peer yet. When getting WAL from this map the caller should know that it should not use 087 // the remote WAL any more. 088 private final ConcurrentMap<String, Optional<WAL>> peerId2WAL = new ConcurrentHashMap<>(); 089 090 private final KeyLocker<String> createLock = new KeyLocker<>(); 091 092 // in getWALs we can not throw any exceptions out, so we use lock and condition here as it 093 // supports awaitUninterruptibly which will not throw a InterruptedException 094 private final Lock numRemoteWALUnderCreationLock = new ReentrantLock(); 095 private final Condition noRemoteWALUnderCreationCond = 096 numRemoteWALUnderCreationLock.newCondition(); 097 // record the number of remote WALs which are under creation. This is very important to not 098 // missing a WAL instance in getWALs method. See HBASE-28140 and related issues for more details. 099 private int numRemoteWALUnderCreation; 100 101 // we need to have this because when getting meta wal, there is no peer info provider yet. 102 private SyncReplicationPeerInfoProvider peerInfoProvider = new SyncReplicationPeerInfoProvider() { 103 104 @Override 105 public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) { 106 return Optional.empty(); 107 } 108 109 @Override 110 public boolean checkState(TableName table, 111 BiPredicate<SyncReplicationState, SyncReplicationState> checker) { 112 return false; 113 } 114 115 }; 116 117 @Override 118 public final void init(WALFactory factory, Configuration conf, String providerId, 119 Abortable server) throws IOException { 120 if (!initialized.compareAndSet(false, true)) { 121 throw new IllegalStateException("WALProvider.init should only be called once."); 122 } 123 this.factory = factory; 124 this.conf = conf; 125 this.abortable = server; 126 doInit(factory, conf, providerId); 127 } 128 129 protected final void initWAL(WAL wal) throws IOException { 130 boolean succ = false; 131 try { 132 wal.init(); 133 succ = true; 134 } finally { 135 if (!succ) { 136 safeClose(wal); 137 } 138 } 139 } 140 141 // Use a timestamp to make it identical. That means, after we transit the peer to DA/S and then 142 // back to A, the log prefix will be changed. This is used to simplify the implementation for 143 // replication source, where we do not need to consider that a terminated shipper could be added 144 // back. 145 private String getRemoteWALPrefix(String peerId) { 146 return factory.factoryId + "-" + EnvironmentEdgeManager.currentTime() + "-" + peerId; 147 } 148 149 private WAL getRemoteWAL(RegionInfo region, String peerId, String remoteWALDir) 150 throws IOException { 151 Optional<WAL> opt = peerId2WAL.get(peerId); 152 if (opt != null) { 153 return opt.orElse(null); 154 } 155 Lock lock = createLock.acquireLock(peerId); 156 try { 157 opt = peerId2WAL.get(peerId); 158 if (opt != null) { 159 return opt.orElse(null); 160 } 161 WAL wal = createRemoteWAL(region, ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir), 162 ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId), getRemoteWALPrefix(peerId), 163 ReplicationUtils.SYNC_WAL_SUFFIX); 164 numRemoteWALUnderCreationLock.lock(); 165 try { 166 numRemoteWALUnderCreation++; 167 } finally { 168 numRemoteWALUnderCreationLock.unlock(); 169 } 170 initWAL(wal); 171 peerId2WAL.put(peerId, Optional.of(wal)); 172 return wal; 173 } finally { 174 lock.unlock(); 175 numRemoteWALUnderCreationLock.lock(); 176 try { 177 numRemoteWALUnderCreation--; 178 if (numRemoteWALUnderCreation == 0) { 179 noRemoteWALUnderCreationCond.signalAll(); 180 } 181 } finally { 182 numRemoteWALUnderCreationLock.unlock(); 183 } 184 } 185 } 186 187 @Override 188 public final WAL getWAL(RegionInfo region) throws IOException { 189 if (region == null) { 190 return getWAL0(null); 191 } 192 // deal with sync replication 193 Optional<Pair<String, String>> peerIdAndRemoteWALDir = 194 peerInfoProvider.getPeerIdAndRemoteWALDir(region.getTable()); 195 if (peerIdAndRemoteWALDir.isPresent()) { 196 Pair<String, String> pair = peerIdAndRemoteWALDir.get(); 197 WAL wal = getRemoteWAL(region, pair.getFirst(), pair.getSecond()); 198 if (wal != null) { 199 return wal; 200 } 201 } 202 // fallback to normal WALProvider logic 203 return getWAL0(region); 204 } 205 206 @Override 207 public final List<WAL> getWALs() { 208 List<WAL> wals = new ArrayList<WAL>(); 209 numRemoteWALUnderCreationLock.lock(); 210 try { 211 while (numRemoteWALUnderCreation > 0) { 212 noRemoteWALUnderCreationCond.awaitUninterruptibly(); 213 } 214 peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get) 215 .forEach(wals::add); 216 } finally { 217 numRemoteWALUnderCreationLock.unlock(); 218 } 219 return Streams 220 .concat(peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get), 221 getWALs0().stream()) 222 .collect(Collectors.toList()); 223 } 224 225 @Override 226 public PeerActionListener getPeerActionListener() { 227 return this; 228 } 229 230 @Override 231 public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from, 232 SyncReplicationState to, int stage) { 233 if (from == SyncReplicationState.ACTIVE) { 234 if (stage == 0) { 235 Lock lock = createLock.acquireLock(peerId); 236 try { 237 Optional<WAL> opt = peerId2WAL.get(peerId); 238 if (opt != null) { 239 opt.ifPresent(w -> w.skipRemoteWAL(to == SyncReplicationState.STANDBY)); 240 } else { 241 // add a place holder to tell the getWAL caller do not use the remote WAL any more. 242 peerId2WAL.put(peerId, Optional.empty()); 243 } 244 } finally { 245 lock.unlock(); 246 } 247 } else if (stage == 1) { 248 peerId2WAL.remove(peerId).ifPresent(AbstractWALProvider::safeClose); 249 } 250 } 251 } 252 253 @Override 254 public void setSyncReplicationPeerInfoProvider(SyncReplicationPeerInfoProvider provider) { 255 this.peerInfoProvider = provider; 256 } 257 258 private static void safeClose(WAL wal) { 259 if (wal != null) { 260 try { 261 wal.close(); 262 } catch (IOException e) { 263 LOG.error("Close WAL failed", e); 264 } 265 } 266 } 267 268 @Override 269 public void addWALActionsListener(WALActionsListener listener) { 270 listeners.add(listener); 271 } 272 273 private void cleanup(IOExceptionConsumer<WAL> cleanupWAL, IOExceptionRunnable finalCleanup) 274 throws IOException { 275 MultipleIOException.Builder builder = new MultipleIOException.Builder(); 276 for (Optional<WAL> wal : peerId2WAL.values()) { 277 if (wal.isPresent()) { 278 try { 279 cleanupWAL.accept(wal.get()); 280 } catch (IOException e) { 281 LOG.error("cleanup WAL failed", e); 282 builder.add(e); 283 } 284 } 285 } 286 try { 287 finalCleanup.run(); 288 } catch (IOException e) { 289 LOG.error("cleanup WAL failed", e); 290 builder.add(e); 291 } 292 if (!builder.isEmpty()) { 293 throw builder.build(); 294 } 295 } 296 297 @Override 298 public final void shutdown() throws IOException { 299 cleanup(WAL::shutdown, this::shutdown0); 300 } 301 302 @Override 303 public final void close() throws IOException { 304 cleanup(WAL::close, this::close0); 305 } 306 307 private Stream<AbstractFSWAL<?>> remoteWALStream() { 308 return peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get) 309 .filter(w -> w instanceof AbstractFSWAL).map(w -> (AbstractFSWAL<?>) w); 310 } 311 312 @Override 313 public final long getNumLogFiles() { 314 return remoteWALStream().mapToLong(AbstractFSWAL::getNumLogFiles).sum() + getNumLogFiles0(); 315 } 316 317 @Override 318 public final long getLogFileSize() { 319 return remoteWALStream().mapToLong(AbstractFSWAL::getLogFileSize).sum() + getLogFileSize0(); 320 } 321 322 private static final Pattern LOG_PREFIX_PATTERN = Pattern.compile(".*-\\d+-(.+)"); 323 324 /** 325 * <p> 326 * Returns the peer id if the wal file name is in the special group for a sync replication peer. 327 * </p> 328 * <p> 329 * The prefix format is <factoryId>-<ts>-<peerId>. 330 * </p> 331 */ 332 public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) { 333 if (!name.endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) { 334 // fast path to return earlier if the name is not for a sync replication peer. 335 return Optional.empty(); 336 } 337 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); 338 Matcher matcher = LOG_PREFIX_PATTERN.matcher(logPrefix); 339 if (matcher.matches()) { 340 return Optional.of(matcher.group(1)); 341 } else { 342 return Optional.empty(); 343 } 344 } 345 346 protected abstract WAL createRemoteWAL(RegionInfo region, FileSystem remoteFs, Path remoteWALDir, 347 String prefix, String suffix) throws IOException; 348 349 protected abstract void doInit(WALFactory factory, Configuration conf, String providerId) 350 throws IOException; 351 352 protected abstract WAL getWAL0(RegionInfo region) throws IOException; 353 354 protected abstract List<WAL> getWALs0(); 355 356 protected abstract void shutdown0() throws IOException; 357 358 protected abstract void close0() throws IOException; 359 360 protected abstract long getNumLogFiles0(); 361 362 protected abstract long getLogFileSize0(); 363 364}