001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.net.ConnectException; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.atomic.AtomicBoolean; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 035import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 037import org.apache.hadoop.hbase.regionserver.wal.WALClosedException; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.hadoop.ipc.RemoteException; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Runs periodically to determine if the WAL should be rolled. 047 * <p/> 048 * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when 049 * there is something to do, rather than the Chore sleep time which is invariant. 050 * <p/> 051 * The {@link #scheduleFlush(String, List)} is abstract here, as sometimes we hold a region without 052 * a region server but we still want to roll its WAL. 053 * <p/> 054 * TODO: change to a pool of threads 055 */ 056@InterfaceAudience.Private 057public abstract class AbstractWALRoller<T extends Abortable> extends Thread implements Closeable { 058 private static final Logger LOG = LoggerFactory.getLogger(AbstractWALRoller.class); 059 060 protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period"; 061 062 /** 063 * Configure for the timeout of log rolling retry. 064 */ 065 public static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms"; 066 public static final long DEFAULT_WAL_ROLL_WAIT_TIMEOUT = 30000; 067 068 /** 069 * Configure for the max count of log rolling retry. The real retry count is also limited by the 070 * timeout of log rolling via {@link #WAL_ROLL_WAIT_TIMEOUT} 071 */ 072 protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries"; 073 074 protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>(); 075 protected final T abortable; 076 // Period to roll log. 077 private final long rollPeriod; 078 private final int threadWakeFrequency; 079 // The interval to check low replication on hlog's pipeline 080 private final long checkLowReplicationInterval; 081 // Wait period for roll log 082 private final long rollWaitTimeout; 083 // Max retry for roll log 084 private final int maxRollRetry; 085 086 private volatile boolean running = true; 087 088 public void addWAL(WAL wal) { 089 // check without lock first 090 if (wals.containsKey(wal)) { 091 return; 092 } 093 // this is to avoid race between addWAL and requestRollAll. 094 synchronized (this) { 095 if (wals.putIfAbsent(wal, new RollController(wal)) == null) { 096 wal.registerWALActionsListener(new WALActionsListener() { 097 @Override 098 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 099 // TODO logs will contend with each other here, replace with e.g. DelayedQueue 100 synchronized (AbstractWALRoller.this) { 101 RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal)); 102 controller.requestRoll(); 103 AbstractWALRoller.this.notifyAll(); 104 } 105 } 106 107 @Override 108 public void postLogArchive(Path oldPath, Path newPath) throws IOException { 109 afterWALArchive(oldPath, newPath); 110 } 111 }); 112 } 113 } 114 } 115 116 public void requestRollAll() { 117 synchronized (this) { 118 for (RollController controller : wals.values()) { 119 controller.requestRoll(); 120 } 121 notifyAll(); 122 } 123 } 124 125 protected AbstractWALRoller(String name, Configuration conf, T abortable) { 126 super(name); 127 this.abortable = abortable; 128 this.rollPeriod = conf.getLong(WAL_ROLL_PERIOD_KEY, 3600000); 129 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 130 this.checkLowReplicationInterval = 131 conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); 132 this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, DEFAULT_WAL_ROLL_WAIT_TIMEOUT); 133 // retry rolling does not have to be the default behavior, so the default value is 0 here 134 this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0); 135 } 136 137 /** 138 * we need to check low replication in period, see HBASE-18132 139 */ 140 private void checkLowReplication(long now) { 141 try { 142 for (Entry<WAL, RollController> entry : wals.entrySet()) { 143 WAL wal = entry.getKey(); 144 boolean needRollAlready = entry.getValue().needsRoll(now); 145 if (needRollAlready || !(wal instanceof AbstractFSWAL)) { 146 continue; 147 } 148 ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval); 149 } 150 } catch (Throwable e) { 151 LOG.warn("Failed checking low replication", e); 152 } 153 } 154 155 private void abort(String reason, Throwable cause) { 156 // close all WALs before calling abort on RS. 157 // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we 158 // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it 159 // is already broken. 160 for (WAL wal : wals.keySet()) { 161 // shutdown rather than close here since we are going to abort the RS and the wals need to be 162 // split when recovery 163 try { 164 wal.shutdown(); 165 } catch (IOException e) { 166 LOG.warn("Failed to shutdown wal", e); 167 } 168 } 169 abortable.abort(reason, cause); 170 } 171 172 @Override 173 public void run() { 174 while (running) { 175 long now = EnvironmentEdgeManager.currentTime(); 176 checkLowReplication(now); 177 synchronized (this) { 178 if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) { 179 try { 180 wait(this.threadWakeFrequency); 181 } catch (InterruptedException e) { 182 // restore the interrupt state 183 Thread.currentThread().interrupt(); 184 } 185 // goto the beginning to check whether again whether we should fall through to roll 186 // several WALs, and also check whether we should quit. 187 continue; 188 } 189 } 190 try { 191 for (Iterator<Entry<WAL, RollController>> iter = wals.entrySet().iterator(); iter 192 .hasNext();) { 193 Entry<WAL, RollController> entry = iter.next(); 194 WAL wal = entry.getKey(); 195 RollController controller = entry.getValue(); 196 if (controller.isRollRequested()) { 197 // WAL roll requested, fall through 198 LOG.debug("WAL {} roll requested", wal); 199 } else if (controller.needsPeriodicRoll(now)) { 200 // Time for periodic roll, fall through 201 LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod); 202 } else { 203 continue; 204 } 205 Map<byte[], List<byte[]>> regionsToFlush = null; 206 int nAttempts = 0; 207 long startWaiting = EnvironmentEdgeManager.currentTime(); 208 do { 209 try { 210 // Force the roll if the logroll.period is elapsed or if a roll was requested. 211 // The returned value is an collection of actual region and family names. 212 regionsToFlush = controller.rollWal(EnvironmentEdgeManager.currentTime()); 213 break; 214 } catch (IOException ioe) { 215 if (ioe instanceof WALClosedException) { 216 LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", ioe); 217 iter.remove(); 218 break; 219 } 220 long waitingTime = EnvironmentEdgeManager.currentTime() - startWaiting; 221 if (waitingTime < rollWaitTimeout && nAttempts < maxRollRetry) { 222 nAttempts++; 223 LOG.warn("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry," 224 + " last exception", nAttempts, waitingTime, ioe); 225 sleep(1000); 226 } else { 227 LOG.error("Roll wal failed and waiting timeout, will not retry", ioe); 228 throw ioe; 229 } 230 } 231 } while (EnvironmentEdgeManager.currentTime() - startWaiting < rollWaitTimeout); 232 if (regionsToFlush != null) { 233 for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) { 234 scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); 235 } 236 } 237 } 238 } catch (FailedLogCloseException | ConnectException e) { 239 abort("Failed log close in log roller", e); 240 } catch (IOException ex) { 241 // Abort if we get here. We probably won't recover an IOE. HBASE-1132 242 abort("IOE in log roller", 243 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex); 244 } catch (Exception ex) { 245 LOG.error("Log rolling failed", ex); 246 abort("Log rolling failed", ex); 247 } 248 } 249 LOG.info("LogRoller exiting."); 250 } 251 252 protected void afterWALArchive(Path oldPath, Path newPath) { 253 } 254 255 /** 256 * @param encodedRegionName Encoded name of region to flush. 257 * @param families stores of region to flush. 258 */ 259 protected abstract void scheduleFlush(String encodedRegionName, List<byte[]> families); 260 261 private boolean isWaiting() { 262 Thread.State state = getState(); 263 return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING; 264 } 265 266 /** Returns true if all WAL roll finished */ 267 public boolean walRollFinished() { 268 // TODO add a status field of roll in RollController 269 return wals.values().stream() 270 .noneMatch(rc -> rc.needsRoll(EnvironmentEdgeManager.currentTime())) && isWaiting(); 271 } 272 273 /** 274 * Wait until all wals have been rolled after calling {@link #requestRollAll()}. 275 */ 276 public void waitUntilWalRollFinished() throws InterruptedException { 277 while (!walRollFinished()) { 278 Thread.sleep(100); 279 } 280 } 281 282 @Override 283 public void close() { 284 running = false; 285 interrupt(); 286 } 287 288 /** 289 * Independently control the roll of each wal. When use multiwal, can avoid all wal roll together. 290 * see HBASE-24665 for detail 291 */ 292 protected class RollController { 293 private final WAL wal; 294 private final AtomicBoolean rollRequest; 295 private long lastRollTime; 296 297 RollController(WAL wal) { 298 this.wal = wal; 299 this.rollRequest = new AtomicBoolean(false); 300 this.lastRollTime = EnvironmentEdgeManager.currentTime(); 301 } 302 303 public void requestRoll() { 304 this.rollRequest.set(true); 305 } 306 307 public Map<byte[], List<byte[]>> rollWal(long now) throws IOException { 308 this.lastRollTime = now; 309 // reset the flag in front to avoid missing roll request before we return from rollWriter. 310 this.rollRequest.set(false); 311 return wal.rollWriter(true); 312 } 313 314 public boolean isRollRequested() { 315 return rollRequest.get(); 316 } 317 318 public boolean needsPeriodicRoll(long now) { 319 return (now - this.lastRollTime) > rollPeriod; 320 } 321 322 public boolean needsRoll(long now) { 323 return isRollRequested() || needsPeriodicRoll(now); 324 } 325 } 326}