001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.net.ConnectException; 024import java.util.ArrayList; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map.Entry; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.Server; 032import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 033import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 034import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.HasThread; 037import org.apache.hadoop.hbase.wal.WAL; 038import org.apache.hadoop.ipc.RemoteException; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 044 045/** 046 * Runs periodically to determine if the WAL should be rolled. 047 * 048 * NOTE: This class extends Thread rather than Chore because the sleep time 049 * can be interrupted when there is something to do, rather than the Chore 050 * sleep time which is invariant. 051 * 052 * TODO: change to a pool of threads 053 */ 054@InterfaceAudience.Private 055@VisibleForTesting 056public class LogRoller extends HasThread implements Closeable { 057 private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class); 058 private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>(); 059 private final Server server; 060 protected final RegionServerServices services; 061 private volatile long lastRollTime = System.currentTimeMillis(); 062 // Period to roll log. 063 private final long rollPeriod; 064 private final int threadWakeFrequency; 065 // The interval to check low replication on hlog's pipeline 066 private long checkLowReplicationInterval; 067 068 private volatile boolean running = true; 069 070 public void addWAL(WAL wal) { 071 // check without lock first 072 if (walNeedsRoll.containsKey(wal)) { 073 return; 074 } 075 // this is to avoid race between addWAL and requestRollAll. 076 synchronized (this) { 077 if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) { 078 wal.registerWALActionsListener(new WALActionsListener() { 079 @Override 080 public void logRollRequested(boolean lowReplicas) { 081 // TODO logs will contend with each other here, replace with e.g. DelayedQueue 082 synchronized (LogRoller.this) { 083 walNeedsRoll.put(wal, Boolean.TRUE); 084 LogRoller.this.notifyAll(); 085 } 086 } 087 }); 088 } 089 } 090 } 091 092 public void requestRollAll() { 093 synchronized (this) { 094 List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet()); 095 for (WAL wal : wals) { 096 walNeedsRoll.put(wal, Boolean.TRUE); 097 } 098 notifyAll(); 099 } 100 } 101 102 /** @param server */ 103 public LogRoller(final Server server, final RegionServerServices services) { 104 super("LogRoller"); 105 this.server = server; 106 this.services = services; 107 this.rollPeriod = this.server.getConfiguration(). 108 getLong("hbase.regionserver.logroll.period", 3600000); 109 this.threadWakeFrequency = this.server.getConfiguration(). 110 getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 111 this.checkLowReplicationInterval = this.server.getConfiguration().getLong( 112 "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); 113 } 114 115 /** 116 * we need to check low replication in period, see HBASE-18132 117 */ 118 private void checkLowReplication(long now) { 119 try { 120 for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) { 121 WAL wal = entry.getKey(); 122 boolean needRollAlready = entry.getValue(); 123 if (needRollAlready || !(wal instanceof AbstractFSWAL)) { 124 continue; 125 } 126 ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval); 127 } 128 } catch (Throwable e) { 129 LOG.warn("Failed checking low replication", e); 130 } 131 } 132 133 private void abort(String reason, Throwable cause) { 134 // close all WALs before calling abort on RS. 135 // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we 136 // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it 137 // is already broken. 138 for (WAL wal : walNeedsRoll.keySet()) { 139 // shutdown rather than close here since we are going to abort the RS and the wals need to be 140 // split when recovery 141 try { 142 wal.shutdown(); 143 } catch (IOException e) { 144 LOG.warn("Failed to shutdown wal", e); 145 } 146 } 147 server.abort(reason, cause); 148 } 149 150 @Override 151 public void run() { 152 while (running) { 153 boolean periodic = false; 154 long now = System.currentTimeMillis(); 155 checkLowReplication(now); 156 periodic = (now - this.lastRollTime) > this.rollPeriod; 157 if (periodic) { 158 // Time for periodic roll, fall through 159 LOG.debug("Wal roll period {} ms elapsed", this.rollPeriod); 160 } else { 161 synchronized (this) { 162 if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) { 163 // WAL roll requested, fall through 164 LOG.debug("WAL roll requested"); 165 } else { 166 try { 167 wait(this.threadWakeFrequency); 168 } catch (InterruptedException e) { 169 // restore the interrupt state 170 Thread.currentThread().interrupt(); 171 } 172 // goto the beginning to check whether again whether we should fall through to roll 173 // several WALs, and also check whether we should quit. 174 continue; 175 } 176 } 177 } 178 try { 179 this.lastRollTime = System.currentTimeMillis(); 180 for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter 181 .hasNext();) { 182 Entry<WAL, Boolean> entry = iter.next(); 183 WAL wal = entry.getKey(); 184 // reset the flag in front to avoid missing roll request before we return from rollWriter. 185 walNeedsRoll.put(wal, Boolean.FALSE); 186 // Force the roll if the logroll.period is elapsed or if a roll was requested. 187 // The returned value is an array of actual region names. 188 byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue()); 189 if (regionsToFlush != null) { 190 for (byte[] r : regionsToFlush) { 191 scheduleFlush(Bytes.toString(r)); 192 } 193 } 194 } 195 } catch (FailedLogCloseException | ConnectException e) { 196 abort("Failed log close in log roller", e); 197 } catch (IOException ex) { 198 // Abort if we get here. We probably won't recover an IOE. HBASE-1132 199 abort("IOE in log roller", 200 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex); 201 } catch (Exception ex) { 202 LOG.error("Log rolling failed", ex); 203 abort("Log rolling failed", ex); 204 } 205 } 206 LOG.info("LogRoller exiting."); 207 } 208 209 /** 210 * @param encodedRegionName Encoded name of region to flush. 211 */ 212 private void scheduleFlush(String encodedRegionName) { 213 HRegion r = (HRegion) this.services.getRegion(encodedRegionName); 214 if (r == null) { 215 LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName); 216 return; 217 } 218 FlushRequester requester = this.services.getFlushRequester(); 219 if (requester == null) { 220 LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null", 221 encodedRegionName, r); 222 return; 223 } 224 // force flushing all stores to clean old logs 225 requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY); 226 } 227 228 /** 229 * For testing only 230 * @return true if all WAL roll finished 231 */ 232 @VisibleForTesting 233 public boolean walRollFinished() { 234 return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll); 235 } 236 237 @Override 238 public void close() { 239 running = false; 240 interrupt(); 241 } 242}