001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.snapshot; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.concurrent.Callable; 023 024import org.apache.yetus.audience.InterfaceAudience; 025import org.apache.yetus.audience.InterfaceStability; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028import org.apache.hadoop.hbase.client.IsolationLevel; 029import org.apache.hadoop.hbase.errorhandling.ForeignException; 030import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 031import org.apache.hadoop.hbase.procedure.ProcedureMember; 032import org.apache.hadoop.hbase.procedure.Subprocedure; 033import org.apache.hadoop.hbase.regionserver.HRegion; 034import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 035import org.apache.hadoop.hbase.regionserver.Region.Operation; 036import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 038import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 039 040/** 041 * This online snapshot implementation uses the distributed procedure framework to force a 042 * store flush and then records the hfiles. Its enter stage does nothing. Its leave stage then 043 * flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and 044 * copies .regioninfos into the snapshot working directory. At the master side, there is an atomic 045 * rename of the working dir into the proper snapshot directory. 046 */ 047@InterfaceAudience.Private 048@InterfaceStability.Unstable 049public class FlushSnapshotSubprocedure extends Subprocedure { 050 private static final Logger LOG = LoggerFactory.getLogger(FlushSnapshotSubprocedure.class); 051 052 private final List<HRegion> regions; 053 private final SnapshotDescription snapshot; 054 private final SnapshotSubprocedurePool taskManager; 055 private boolean snapshotSkipFlush = false; 056 057 // the maximum number of attempts we flush 058 final static int MAX_RETRIES = 3; 059 060 public FlushSnapshotSubprocedure(ProcedureMember member, 061 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, 062 List<HRegion> regions, SnapshotDescription snapshot, 063 SnapshotSubprocedurePool taskManager) { 064 super(member, snapshot.getName(), errorListener, wakeFrequency, timeout); 065 this.snapshot = snapshot; 066 067 if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) { 068 snapshotSkipFlush = true; 069 } 070 this.regions = regions; 071 this.taskManager = taskManager; 072 } 073 074 /** 075 * Callable for adding files to snapshot manifest working dir. Ready for multithreading. 076 */ 077 public static class RegionSnapshotTask implements Callable<Void> { 078 private HRegion region; 079 private boolean skipFlush; 080 private ForeignExceptionDispatcher monitor; 081 private SnapshotDescription snapshotDesc; 082 083 public RegionSnapshotTask(HRegion region, SnapshotDescription snapshotDesc, 084 boolean skipFlush, ForeignExceptionDispatcher monitor) { 085 this.region = region; 086 this.skipFlush = skipFlush; 087 this.monitor = monitor; 088 this.snapshotDesc = snapshotDesc; 089 } 090 091 @Override 092 public Void call() throws Exception { 093 // Taking the region read lock prevents the individual region from being closed while a 094 // snapshot is in progress. This is helpful but not sufficient for preventing races with 095 // snapshots that involve multiple regions and regionservers. It is still possible to have 096 // an interleaving such that globally regions are missing, so we still need the verification 097 // step. 098 LOG.debug("Starting snapshot operation on " + region); 099 region.startRegionOperation(Operation.SNAPSHOT); 100 try { 101 if (skipFlush) { 102 /* 103 * This is to take an online-snapshot without force a coordinated flush to prevent pause 104 * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure 105 * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be 106 * turned on/off based on the flush type. 107 * To minimized the code change, class name is not changed. 108 */ 109 LOG.debug("take snapshot without flush memstore first"); 110 } else { 111 LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); 112 boolean succeeded = false; 113 long readPt = region.getReadPoint(IsolationLevel.READ_COMMITTED); 114 for (int i = 0; i < MAX_RETRIES; i++) { 115 FlushResult res = region.flush(true); 116 if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { 117 // CANNOT_FLUSH may mean that a flush is already on-going 118 // we need to wait for that flush to complete 119 region.waitForFlushes(); 120 if (region.getMaxFlushedSeqId() >= readPt) { 121 // writes at the start of the snapshot have been persisted 122 succeeded = true; 123 break; 124 } 125 } else { 126 succeeded = true; 127 break; 128 } 129 } 130 if (!succeeded) { 131 throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts"); 132 } 133 } 134 region.addRegionToSnapshot(snapshotDesc, monitor); 135 if (skipFlush) { 136 LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed."); 137 } else { 138 LOG.debug("... Flush Snapshotting region " + region.toString() + " completed."); 139 } 140 } finally { 141 LOG.debug("Closing snapshot operation on " + region); 142 region.closeRegionOperation(Operation.SNAPSHOT); 143 } 144 return null; 145 } 146 } 147 148 private void flushSnapshot() throws ForeignException { 149 if (regions.isEmpty()) { 150 // No regions on this RS, we are basically done. 151 return; 152 } 153 154 monitor.rethrowException(); 155 156 // assert that the taskManager is empty. 157 if (taskManager.hasTasks()) { 158 throw new IllegalStateException("Attempting to take snapshot " 159 + ClientSnapshotDescriptionUtils.toString(snapshot) 160 + " but we currently have outstanding tasks"); 161 } 162 163 // Add all hfiles already existing in region. 164 for (HRegion region : regions) { 165 // submit one task per region for parallelize by region. 166 taskManager.submitTask(new RegionSnapshotTask(region, snapshot, snapshotSkipFlush, monitor)); 167 monitor.rethrowException(); 168 } 169 170 // wait for everything to complete. 171 LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions"); 172 try { 173 taskManager.waitForOutstandingTasks(); 174 } catch (InterruptedException e) { 175 LOG.error("got interrupted exception for " + getMemberName()); 176 throw new ForeignException(getMemberName(), e); 177 } 178 } 179 180 /** 181 * do nothing, core of snapshot is executed in {@link #insideBarrier} step. 182 */ 183 @Override 184 public void acquireBarrier() throws ForeignException { 185 // NO OP 186 } 187 188 /** 189 * do a flush snapshot of every region on this rs from the target table. 190 */ 191 @Override 192 public byte[] insideBarrier() throws ForeignException { 193 flushSnapshot(); 194 return new byte[0]; 195 } 196 197 /** 198 * Cancel threads if they haven't finished. 199 */ 200 @Override 201 public void cleanup(Exception e) { 202 LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '" 203 + snapshot.getName() + "' due to error", e); 204 try { 205 taskManager.cancelTasks(); 206 } catch (InterruptedException e1) { 207 Thread.currentThread().interrupt(); 208 } 209 } 210 211 /** 212 * Hooray! 213 */ 214 public void releaseBarrier() { 215 // NO OP 216 } 217 218}