001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.net.ConnectException; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.atomic.AtomicBoolean; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 035import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.ipc.RemoteException; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Runs periodically to determine if the WAL should be rolled. 045 * <p/> 046 * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when 047 * there is something to do, rather than the Chore sleep time which is invariant. 048 * <p/> 049 * The {@link #scheduleFlush(String, List)} is abstract here, 050 * as sometimes we hold a region without a region server but we still want to roll its WAL. 051 * <p/> 052 * TODO: change to a pool of threads 053 */ 054@InterfaceAudience.Private 055public abstract class AbstractWALRoller<T extends Abortable> extends Thread 056 implements Closeable { 057 private static final Logger LOG = LoggerFactory.getLogger(AbstractWALRoller.class); 058 059 protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period"; 060 061 protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>(); 062 protected final T abortable; 063 // Period to roll log. 064 private final long rollPeriod; 065 private final int threadWakeFrequency; 066 // The interval to check low replication on hlog's pipeline 067 private final long checkLowReplicationInterval; 068 069 private volatile boolean running = true; 070 071 public void addWAL(WAL wal) { 072 // check without lock first 073 if (wals.containsKey(wal)) { 074 return; 075 } 076 // this is to avoid race between addWAL and requestRollAll. 077 synchronized (this) { 078 if (wals.putIfAbsent(wal, new RollController(wal)) == null) { 079 wal.registerWALActionsListener(new WALActionsListener() { 080 @Override 081 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 082 // TODO logs will contend with each other here, replace with e.g. DelayedQueue 083 synchronized (AbstractWALRoller.this) { 084 RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal)); 085 controller.requestRoll(); 086 AbstractWALRoller.this.notifyAll(); 087 } 088 } 089 090 @Override 091 public void postLogArchive(Path oldPath, Path newPath) throws IOException { 092 afterWALArchive(oldPath, newPath); 093 } 094 }); 095 } 096 } 097 } 098 099 public void requestRollAll() { 100 synchronized (this) { 101 for (RollController controller : wals.values()) { 102 controller.requestRoll(); 103 } 104 notifyAll(); 105 } 106 } 107 108 protected AbstractWALRoller(String name, Configuration conf, T abortable) { 109 super(name); 110 this.abortable = abortable; 111 this.rollPeriod = conf.getLong(WAL_ROLL_PERIOD_KEY, 3600000); 112 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 113 this.checkLowReplicationInterval = 114 conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); 115 } 116 117 /** 118 * we need to check low replication in period, see HBASE-18132 119 */ 120 private void checkLowReplication(long now) { 121 try { 122 for (Entry<WAL, RollController> entry : wals.entrySet()) { 123 WAL wal = entry.getKey(); 124 boolean needRollAlready = entry.getValue().needsRoll(now); 125 if (needRollAlready || !(wal instanceof AbstractFSWAL)) { 126 continue; 127 } 128 ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval); 129 } 130 } catch (Throwable e) { 131 LOG.warn("Failed checking low replication", e); 132 } 133 } 134 135 private void abort(String reason, Throwable cause) { 136 // close all WALs before calling abort on RS. 137 // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we 138 // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it 139 // is already broken. 140 for (WAL wal : wals.keySet()) { 141 // shutdown rather than close here since we are going to abort the RS and the wals need to be 142 // split when recovery 143 try { 144 wal.shutdown(); 145 } catch (IOException e) { 146 LOG.warn("Failed to shutdown wal", e); 147 } 148 } 149 abortable.abort(reason, cause); 150 } 151 152 @Override 153 public void run() { 154 while (running) { 155 long now = System.currentTimeMillis(); 156 checkLowReplication(now); 157 synchronized (this) { 158 if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) { 159 try { 160 wait(this.threadWakeFrequency); 161 } catch (InterruptedException e) { 162 // restore the interrupt state 163 Thread.currentThread().interrupt(); 164 } 165 // goto the beginning to check whether again whether we should fall through to roll 166 // several WALs, and also check whether we should quit. 167 continue; 168 } 169 } 170 try { 171 for (Iterator<Entry<WAL, RollController>> iter = wals.entrySet().iterator(); 172 iter.hasNext();) { 173 Entry<WAL, RollController> entry = iter.next(); 174 WAL wal = entry.getKey(); 175 RollController controller = entry.getValue(); 176 if (controller.isRollRequested()) { 177 // WAL roll requested, fall through 178 LOG.debug("WAL {} roll requested", wal); 179 } else if (controller.needsPeriodicRoll(now)) { 180 // Time for periodic roll, fall through 181 LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod); 182 } else { 183 continue; 184 } 185 // Force the roll if the logroll.period is elapsed or if a roll was requested. 186 // The returned value is an collection of actual region and family names. 187 Map<byte[], List<byte[]>> regionsToFlush = controller.rollWal(now); 188 if (regionsToFlush != null) { 189 for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) { 190 scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); 191 } 192 } 193 } 194 } catch (FailedLogCloseException | ConnectException e) { 195 abort("Failed log close in log roller", e); 196 } catch (IOException ex) { 197 // Abort if we get here. We probably won't recover an IOE. HBASE-1132 198 abort("IOE in log roller", 199 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex); 200 } catch (Exception ex) { 201 LOG.error("Log rolling failed", ex); 202 abort("Log rolling failed", ex); 203 } 204 } 205 LOG.info("LogRoller exiting."); 206 } 207 208 protected void afterWALArchive(Path oldPath, Path newPath) { 209 } 210 211 /** 212 * @param encodedRegionName Encoded name of region to flush. 213 * @param families stores of region to flush. 214 */ 215 protected abstract void scheduleFlush(String encodedRegionName, List<byte[]> families); 216 217 private boolean isWaiting() { 218 Thread.State state = getState(); 219 return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING; 220 } 221 222 /** 223 * @return true if all WAL roll finished 224 */ 225 public boolean walRollFinished() { 226 return wals.values().stream().noneMatch(rc -> rc.needsRoll(System.currentTimeMillis())) 227 && isWaiting(); 228 } 229 230 /** 231 * Wait until all wals have been rolled after calling {@link #requestRollAll()}. 232 */ 233 public void waitUntilWalRollFinished() throws InterruptedException { 234 while (!walRollFinished()) { 235 Thread.sleep(100); 236 } 237 } 238 239 @Override 240 public void close() { 241 running = false; 242 interrupt(); 243 } 244 245 /** 246 * Independently control the roll of each wal. When use multiwal, 247 * can avoid all wal roll together. see HBASE-24665 for detail 248 */ 249 protected class RollController { 250 private final WAL wal; 251 private final AtomicBoolean rollRequest; 252 private long lastRollTime; 253 254 RollController(WAL wal) { 255 this.wal = wal; 256 this.rollRequest = new AtomicBoolean(false); 257 this.lastRollTime = System.currentTimeMillis(); 258 } 259 260 public void requestRoll() { 261 this.rollRequest.set(true); 262 } 263 264 public Map<byte[], List<byte[]>> rollWal(long now) throws IOException { 265 this.lastRollTime = now; 266 // reset the flag in front to avoid missing roll request before we return from rollWriter. 267 this.rollRequest.set(false); 268 return wal.rollWriter(true); 269 } 270 271 public boolean isRollRequested() { 272 return rollRequest.get(); 273 } 274 275 public boolean needsPeriodicRoll(long now) { 276 return (now - this.lastRollTime) > rollPeriod; 277 } 278 279 public boolean needsRoll(long now) { 280 return isRollRequested() || needsPeriodicRoll(now); 281 } 282 } 283}