001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.snapshot; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.concurrent.Callable; 023import org.apache.hadoop.hbase.client.IsolationLevel; 024import org.apache.hadoop.hbase.errorhandling.ForeignException; 025import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 026import org.apache.hadoop.hbase.procedure.ProcedureMember; 027import org.apache.hadoop.hbase.procedure.Subprocedure; 028import org.apache.hadoop.hbase.regionserver.HRegion; 029import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 030import org.apache.hadoop.hbase.regionserver.Region.Operation; 031import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool; 032import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.apache.yetus.audience.InterfaceStability; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 039 040/** 041 * This online snapshot implementation uses the distributed procedure framework to force a store 042 * flush and then records the hfiles. Its enter stage does nothing. Its leave stage then flushes the 043 * memstore, builds the region server's snapshot manifest from its hfiles list, and copies 044 * .regioninfos into the snapshot working directory. At the master side, there is an atomic rename 045 * of the working dir into the proper snapshot directory. 046 */ 047@InterfaceAudience.Private 048@InterfaceStability.Unstable 049public class FlushSnapshotSubprocedure extends Subprocedure { 050 private static final Logger LOG = LoggerFactory.getLogger(FlushSnapshotSubprocedure.class); 051 052 private final List<HRegion> regions; 053 private final SnapshotDescription snapshot; 054 private final SnapshotSubprocedurePool taskManager; 055 private boolean snapshotSkipFlush = false; 056 057 // the maximum number of attempts we flush 058 final static int MAX_RETRIES = 3; 059 060 public FlushSnapshotSubprocedure(ProcedureMember member, ForeignExceptionDispatcher errorListener, 061 long wakeFrequency, long timeout, List<HRegion> regions, SnapshotDescription snapshot, 062 SnapshotSubprocedurePool taskManager) { 063 super(member, snapshot.getName(), errorListener, wakeFrequency, timeout); 064 this.snapshot = snapshot; 065 066 if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) { 067 snapshotSkipFlush = true; 068 } 069 this.regions = regions; 070 this.taskManager = taskManager; 071 } 072 073 /** 074 * Callable for adding files to snapshot manifest working dir. Ready for multithreading. 075 */ 076 public static class RegionSnapshotTask implements Callable<Void> { 077 private HRegion region; 078 private boolean skipFlush; 079 private ForeignExceptionDispatcher monitor; 080 private SnapshotDescription snapshotDesc; 081 082 public RegionSnapshotTask(HRegion region, SnapshotDescription snapshotDesc, boolean skipFlush, 083 ForeignExceptionDispatcher monitor) { 084 this.region = region; 085 this.skipFlush = skipFlush; 086 this.monitor = monitor; 087 this.snapshotDesc = snapshotDesc; 088 } 089 090 @Override 091 public Void call() throws Exception { 092 // Taking the region read lock prevents the individual region from being closed while a 093 // snapshot is in progress. This is helpful but not sufficient for preventing races with 094 // snapshots that involve multiple regions and regionservers. It is still possible to have 095 // an interleaving such that globally regions are missing, so we still need the verification 096 // step. 097 LOG.debug("Starting snapshot operation on " + region); 098 region.startRegionOperation(Operation.SNAPSHOT); 099 try { 100 if (skipFlush) { 101 /* 102 * This is to take an online-snapshot without force a coordinated flush to prevent pause 103 * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure 104 * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be 105 * turned on/off based on the flush type. To minimized the code change, class name is not 106 * changed. 107 */ 108 LOG.debug("take snapshot without flush memstore first"); 109 } else { 110 LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); 111 boolean succeeded = false; 112 long readPt = region.getReadPoint(IsolationLevel.READ_COMMITTED); 113 for (int i = 0; i < MAX_RETRIES; i++) { 114 FlushResult res = region.flush(true); 115 if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { 116 // CANNOT_FLUSH may mean that a flush is already on-going 117 // we need to wait for that flush to complete 118 region.waitForFlushes(); 119 if (region.getMaxFlushedSeqId() >= readPt) { 120 // writes at the start of the snapshot have been persisted 121 succeeded = true; 122 break; 123 } 124 } else { 125 succeeded = true; 126 break; 127 } 128 } 129 if (!succeeded) { 130 throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts"); 131 } 132 } 133 region.addRegionToSnapshot(snapshotDesc, monitor); 134 if (skipFlush) { 135 LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed."); 136 } else { 137 LOG.debug("... Flush Snapshotting region " + region.toString() + " completed."); 138 } 139 } finally { 140 LOG.debug("Closing snapshot operation on " + region); 141 region.closeRegionOperation(Operation.SNAPSHOT); 142 } 143 return null; 144 } 145 } 146 147 private void flushSnapshot() throws ForeignException { 148 if (regions.isEmpty()) { 149 // No regions on this RS, we are basically done. 150 return; 151 } 152 153 monitor.rethrowException(); 154 155 // assert that the taskManager is empty. 156 if (taskManager.hasTasks()) { 157 throw new IllegalStateException( 158 "Attempting to take snapshot " + ClientSnapshotDescriptionUtils.toString(snapshot) 159 + " but we currently have outstanding tasks"); 160 } 161 162 // Add all hfiles already existing in region. 163 for (HRegion region : regions) { 164 // submit one task per region for parallelize by region. 165 taskManager.submitTask(new RegionSnapshotTask(region, snapshot, snapshotSkipFlush, monitor)); 166 monitor.rethrowException(); 167 } 168 169 // wait for everything to complete. 170 LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions"); 171 try { 172 taskManager.waitForOutstandingTasks(); 173 } catch (InterruptedException e) { 174 LOG.error("got interrupted exception for " + getMemberName()); 175 throw new ForeignException(getMemberName(), e); 176 } 177 } 178 179 /** 180 * do nothing, core of snapshot is executed in {@link #insideBarrier} step. 181 */ 182 @Override 183 public void acquireBarrier() throws ForeignException { 184 // NO OP 185 } 186 187 /** 188 * do a flush snapshot of every region on this rs from the target table. 189 */ 190 @Override 191 public byte[] insideBarrier() throws ForeignException { 192 flushSnapshot(); 193 return new byte[0]; 194 } 195 196 /** 197 * Cancel threads if they haven't finished. 198 */ 199 @Override 200 public void cleanup(Exception e) { 201 LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '" 202 + snapshot.getName() + "' due to error", e); 203 try { 204 taskManager.cancelTasks(); 205 } catch (InterruptedException e1) { 206 Thread.currentThread().interrupt(); 207 } 208 } 209 210 /** 211 * Hooray! 212 */ 213 public void releaseBarrier() { 214 // NO OP 215 } 216 217}