001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.net.ConnectException; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.atomic.AtomicBoolean; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 035import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.hadoop.ipc.RemoteException; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * Runs periodically to determine if the WAL should be rolled. 046 * <p/> 047 * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when 048 * there is something to do, rather than the Chore sleep time which is invariant. 049 * <p/> 050 * The {@link #scheduleFlush(String, List)} is abstract here, as sometimes we hold a region without 051 * a region server but we still want to roll its WAL. 052 * <p/> 053 * TODO: change to a pool of threads 054 */ 055@InterfaceAudience.Private 056public abstract class AbstractWALRoller<T extends Abortable> extends Thread implements Closeable { 057 private static final Logger LOG = LoggerFactory.getLogger(AbstractWALRoller.class); 058 059 protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period"; 060 061 /** 062 * Configure for the timeout of log rolling retry. 063 */ 064 public static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms"; 065 public static final long DEFAULT_WAL_ROLL_WAIT_TIMEOUT = 30000; 066 067 /** 068 * Configure for the max count of log rolling retry. The real retry count is also limited by the 069 * timeout of log rolling via {@link #WAL_ROLL_WAIT_TIMEOUT} 070 */ 071 protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries"; 072 073 protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>(); 074 protected final T abortable; 075 // Period to roll log. 076 private final long rollPeriod; 077 private final int threadWakeFrequency; 078 // The interval to check low replication on hlog's pipeline 079 private final long checkLowReplicationInterval; 080 // Wait period for roll log 081 private final long rollWaitTimeout; 082 // Max retry for roll log 083 private final int maxRollRetry; 084 085 private volatile boolean running = true; 086 087 public void addWAL(WAL wal) { 088 // check without lock first 089 if (wals.containsKey(wal)) { 090 return; 091 } 092 // this is to avoid race between addWAL and requestRollAll. 093 synchronized (this) { 094 if (wals.putIfAbsent(wal, new RollController(wal)) == null) { 095 wal.registerWALActionsListener(new WALActionsListener() { 096 @Override 097 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 098 // TODO logs will contend with each other here, replace with e.g. DelayedQueue 099 synchronized (AbstractWALRoller.this) { 100 RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal)); 101 controller.requestRoll(); 102 AbstractWALRoller.this.notifyAll(); 103 } 104 } 105 106 @Override 107 public void postLogArchive(Path oldPath, Path newPath) throws IOException { 108 afterWALArchive(oldPath, newPath); 109 } 110 }); 111 } 112 } 113 } 114 115 public void requestRollAll() { 116 synchronized (this) { 117 for (RollController controller : wals.values()) { 118 controller.requestRoll(); 119 } 120 notifyAll(); 121 } 122 } 123 124 protected AbstractWALRoller(String name, Configuration conf, T abortable) { 125 super(name); 126 this.abortable = abortable; 127 this.rollPeriod = conf.getLong(WAL_ROLL_PERIOD_KEY, 3600000); 128 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 129 this.checkLowReplicationInterval = 130 conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); 131 this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, DEFAULT_WAL_ROLL_WAIT_TIMEOUT); 132 // retry rolling does not have to be the default behavior, so the default value is 0 here 133 this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0); 134 } 135 136 /** 137 * we need to check low replication in period, see HBASE-18132 138 */ 139 private void checkLowReplication(long now) { 140 try { 141 for (Entry<WAL, RollController> entry : wals.entrySet()) { 142 WAL wal = entry.getKey(); 143 boolean needRollAlready = entry.getValue().needsRoll(now); 144 if (needRollAlready || !(wal instanceof AbstractFSWAL)) { 145 continue; 146 } 147 ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval); 148 } 149 } catch (Throwable e) { 150 LOG.warn("Failed checking low replication", e); 151 } 152 } 153 154 private void abort(String reason, Throwable cause) { 155 // close all WALs before calling abort on RS. 156 // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we 157 // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it 158 // is already broken. 159 for (WAL wal : wals.keySet()) { 160 // shutdown rather than close here since we are going to abort the RS and the wals need to be 161 // split when recovery 162 try { 163 wal.shutdown(); 164 } catch (IOException e) { 165 LOG.warn("Failed to shutdown wal", e); 166 } 167 } 168 abortable.abort(reason, cause); 169 } 170 171 @Override 172 public void run() { 173 while (running) { 174 long now = EnvironmentEdgeManager.currentTime(); 175 checkLowReplication(now); 176 synchronized (this) { 177 if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) { 178 try { 179 wait(this.threadWakeFrequency); 180 } catch (InterruptedException e) { 181 // restore the interrupt state 182 Thread.currentThread().interrupt(); 183 } 184 // goto the beginning to check whether again whether we should fall through to roll 185 // several WALs, and also check whether we should quit. 186 continue; 187 } 188 } 189 try { 190 for (Iterator<Entry<WAL, RollController>> iter = wals.entrySet().iterator(); iter 191 .hasNext();) { 192 Entry<WAL, RollController> entry = iter.next(); 193 WAL wal = entry.getKey(); 194 RollController controller = entry.getValue(); 195 if (controller.isRollRequested()) { 196 // WAL roll requested, fall through 197 LOG.debug("WAL {} roll requested", wal); 198 } else if (controller.needsPeriodicRoll(now)) { 199 // Time for periodic roll, fall through 200 LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod); 201 } else { 202 continue; 203 } 204 Map<byte[], List<byte[]>> regionsToFlush = null; 205 int nAttempts = 0; 206 long startWaiting = System.currentTimeMillis(); 207 do { 208 try { 209 // Force the roll if the logroll.period is elapsed or if a roll was requested. 210 // The returned value is an collection of actual region and family names. 211 regionsToFlush = controller.rollWal(System.currentTimeMillis()); 212 break; 213 } catch (IOException ioe) { 214 long waitingTime = System.currentTimeMillis() - startWaiting; 215 if (waitingTime < rollWaitTimeout && nAttempts < maxRollRetry) { 216 nAttempts++; 217 LOG.warn("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry," 218 + " last exception", nAttempts, waitingTime, ioe); 219 sleep(1000); 220 } else { 221 LOG.error("Roll wal failed and waiting timeout, will not retry", ioe); 222 throw ioe; 223 } 224 } 225 } while (EnvironmentEdgeManager.currentTime() - startWaiting < rollWaitTimeout); 226 if (regionsToFlush != null) { 227 for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) { 228 scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); 229 } 230 } 231 } 232 } catch (FailedLogCloseException | ConnectException e) { 233 abort("Failed log close in log roller", e); 234 } catch (IOException ex) { 235 // Abort if we get here. We probably won't recover an IOE. HBASE-1132 236 abort("IOE in log roller", 237 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex); 238 } catch (Exception ex) { 239 LOG.error("Log rolling failed", ex); 240 abort("Log rolling failed", ex); 241 } 242 } 243 LOG.info("LogRoller exiting."); 244 } 245 246 protected void afterWALArchive(Path oldPath, Path newPath) { 247 } 248 249 /** 250 * @param encodedRegionName Encoded name of region to flush. 251 * @param families stores of region to flush. 252 */ 253 protected abstract void scheduleFlush(String encodedRegionName, List<byte[]> families); 254 255 private boolean isWaiting() { 256 Thread.State state = getState(); 257 return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING; 258 } 259 260 /** Returns true if all WAL roll finished */ 261 public boolean walRollFinished() { 262 // TODO add a status field of roll in RollController 263 return wals.values().stream() 264 .noneMatch(rc -> rc.needsRoll(EnvironmentEdgeManager.currentTime())) && isWaiting(); 265 } 266 267 /** 268 * Wait until all wals have been rolled after calling {@link #requestRollAll()}. 269 */ 270 public void waitUntilWalRollFinished() throws InterruptedException { 271 while (!walRollFinished()) { 272 Thread.sleep(100); 273 } 274 } 275 276 @Override 277 public void close() { 278 running = false; 279 interrupt(); 280 } 281 282 /** 283 * Independently control the roll of each wal. When use multiwal, can avoid all wal roll together. 284 * see HBASE-24665 for detail 285 */ 286 protected class RollController { 287 private final WAL wal; 288 private final AtomicBoolean rollRequest; 289 private long lastRollTime; 290 291 RollController(WAL wal) { 292 this.wal = wal; 293 this.rollRequest = new AtomicBoolean(false); 294 this.lastRollTime = EnvironmentEdgeManager.currentTime(); 295 } 296 297 public void requestRoll() { 298 this.rollRequest.set(true); 299 } 300 301 public Map<byte[], List<byte[]>> rollWal(long now) throws IOException { 302 this.lastRollTime = now; 303 // reset the flag in front to avoid missing roll request before we return from rollWriter. 304 this.rollRequest.set(false); 305 return wal.rollWriter(true); 306 } 307 308 public boolean isRollRequested() { 309 return rollRequest.get(); 310 } 311 312 public boolean needsPeriodicRoll(long now) { 313 return (now - this.lastRollTime) > rollPeriod; 314 } 315 316 public boolean needsRoll(long now) { 317 return isRollRequested() || needsPeriodicRoll(now); 318 } 319 } 320}