001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.net.ConnectException; 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map.Entry; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Abortable; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 033import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 034import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.ipc.RemoteException; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Runs periodically to determine if the WAL should be rolled. 043 * <p/> 044 * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when 045 * there is something to do, rather than the Chore sleep time which is invariant. 046 * <p/> 047 * The {@link #scheduleFlush(String)} is abstract here, as sometimes we hold a region without a 048 * region server but we still want to roll its WAL. 049 * <p/> 050 * TODO: change to a pool of threads 051 */ 052@InterfaceAudience.Private 053public abstract class AbstractWALRoller<T extends Abortable> extends Thread 054 implements Closeable { 055 private static final Logger LOG = LoggerFactory.getLogger(AbstractWALRoller.class); 056 057 protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period"; 058 059 protected final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>(); 060 protected final T abortable; 061 private volatile long lastRollTime = System.currentTimeMillis(); 062 // Period to roll log. 063 private final long rollPeriod; 064 private final int threadWakeFrequency; 065 // The interval to check low replication on hlog's pipeline 066 private long checkLowReplicationInterval; 067 068 private volatile boolean running = true; 069 070 public void addWAL(WAL wal) { 071 // check without lock first 072 if (walNeedsRoll.containsKey(wal)) { 073 return; 074 } 075 // this is to avoid race between addWAL and requestRollAll. 076 synchronized (this) { 077 if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) { 078 wal.registerWALActionsListener(new WALActionsListener() { 079 @Override 080 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 081 // TODO logs will contend with each other here, replace with e.g. DelayedQueue 082 synchronized (AbstractWALRoller.this) { 083 walNeedsRoll.put(wal, Boolean.TRUE); 084 AbstractWALRoller.this.notifyAll(); 085 } 086 } 087 }); 088 } 089 } 090 } 091 092 public void requestRollAll() { 093 synchronized (this) { 094 List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet()); 095 for (WAL wal : wals) { 096 walNeedsRoll.put(wal, Boolean.TRUE); 097 } 098 notifyAll(); 099 } 100 } 101 102 protected AbstractWALRoller(String name, Configuration conf, T abortable) { 103 super(name); 104 this.abortable = abortable; 105 this.rollPeriod = conf.getLong(WAL_ROLL_PERIOD_KEY, 3600000); 106 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 107 this.checkLowReplicationInterval = 108 conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); 109 } 110 111 /** 112 * we need to check low replication in period, see HBASE-18132 113 */ 114 private void checkLowReplication(long now) { 115 try { 116 for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) { 117 WAL wal = entry.getKey(); 118 boolean needRollAlready = entry.getValue(); 119 if (needRollAlready || !(wal instanceof AbstractFSWAL)) { 120 continue; 121 } 122 ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval); 123 } 124 } catch (Throwable e) { 125 LOG.warn("Failed checking low replication", e); 126 } 127 } 128 129 private void abort(String reason, Throwable cause) { 130 // close all WALs before calling abort on RS. 131 // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we 132 // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it 133 // is already broken. 134 for (WAL wal : walNeedsRoll.keySet()) { 135 // shutdown rather than close here since we are going to abort the RS and the wals need to be 136 // split when recovery 137 try { 138 wal.shutdown(); 139 } catch (IOException e) { 140 LOG.warn("Failed to shutdown wal", e); 141 } 142 } 143 abortable.abort(reason, cause); 144 } 145 146 @Override 147 public void run() { 148 while (running) { 149 boolean periodic = false; 150 long now = System.currentTimeMillis(); 151 checkLowReplication(now); 152 periodic = (now - this.lastRollTime) > this.rollPeriod; 153 if (periodic) { 154 // Time for periodic roll, fall through 155 LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod); 156 } else { 157 synchronized (this) { 158 if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) { 159 // WAL roll requested, fall through 160 LOG.debug("WAL roll requested"); 161 } else { 162 try { 163 wait(this.threadWakeFrequency); 164 } catch (InterruptedException e) { 165 // restore the interrupt state 166 Thread.currentThread().interrupt(); 167 } 168 // goto the beginning to check whether again whether we should fall through to roll 169 // several WALs, and also check whether we should quit. 170 continue; 171 } 172 } 173 } 174 try { 175 this.lastRollTime = System.currentTimeMillis(); 176 for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter 177 .hasNext();) { 178 Entry<WAL, Boolean> entry = iter.next(); 179 WAL wal = entry.getKey(); 180 // reset the flag in front to avoid missing roll request before we return from rollWriter. 181 walNeedsRoll.put(wal, Boolean.FALSE); 182 // Force the roll if the logroll.period is elapsed or if a roll was requested. 183 // The returned value is an array of actual region names. 184 byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue()); 185 if (regionsToFlush != null) { 186 for (byte[] r : regionsToFlush) { 187 scheduleFlush(Bytes.toString(r)); 188 } 189 } 190 afterRoll(wal); 191 } 192 } catch (FailedLogCloseException | ConnectException e) { 193 abort("Failed log close in log roller", e); 194 } catch (IOException ex) { 195 // Abort if we get here. We probably won't recover an IOE. HBASE-1132 196 abort("IOE in log roller", 197 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex); 198 } catch (Exception ex) { 199 LOG.error("Log rolling failed", ex); 200 abort("Log rolling failed", ex); 201 } 202 } 203 LOG.info("LogRoller exiting."); 204 } 205 206 /** 207 * Called after we finish rolling the give {@code wal}. 208 */ 209 protected void afterRoll(WAL wal) { 210 } 211 212 /** 213 * @param encodedRegionName Encoded name of region to flush. 214 */ 215 protected abstract void scheduleFlush(String encodedRegionName); 216 217 private boolean isWaiting() { 218 Thread.State state = getState(); 219 return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING; 220 } 221 222 /** 223 * @return true if all WAL roll finished 224 */ 225 public boolean walRollFinished() { 226 return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting(); 227 } 228 229 /** 230 * Wait until all wals have been rolled after calling {@link #requestRollAll()}. 231 */ 232 public void waitUntilWalRollFinished() throws InterruptedException { 233 while (!walRollFinished()) { 234 Thread.sleep(100); 235 } 236 } 237 238 @Override 239 public void close() { 240 running = false; 241 interrupt(); 242 } 243}