1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.snapshot;
19
20 import java.io.IOException;
21 import java.util.List;
22 import java.util.concurrent.Callable;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.hbase.client.IsolationLevel;
29 import org.apache.hadoop.hbase.errorhandling.ForeignException;
30 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
31 import org.apache.hadoop.hbase.procedure.ProcedureMember;
32 import org.apache.hadoop.hbase.procedure.Subprocedure;
33 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
34 import org.apache.hadoop.hbase.regionserver.HRegion;
35 import org.apache.hadoop.hbase.regionserver.Region;
36 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
37 import org.apache.hadoop.hbase.regionserver.Region.Operation;
38 import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
39 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
40
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 @InterfaceStability.Unstable
50 public class FlushSnapshotSubprocedure extends Subprocedure {
51 private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class);
52
53 private final List<Region> regions;
54 private final SnapshotDescription snapshot;
55 private final SnapshotSubprocedurePool taskManager;
56 private boolean snapshotSkipFlush = false;
57
58
59 final static int MAX_RETRIES = 3;
60
61 public FlushSnapshotSubprocedure(ProcedureMember member,
62 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
63 List<Region> regions, SnapshotDescription snapshot,
64 SnapshotSubprocedurePool taskManager) {
65 super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
66 this.snapshot = snapshot;
67
68 if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) {
69 snapshotSkipFlush = true;
70 }
71 this.regions = regions;
72 this.taskManager = taskManager;
73 }
74
75
76
77
78 private class RegionSnapshotTask implements Callable<Void> {
79 Region region;
80 RegionSnapshotTask(Region region) {
81 this.region = region;
82 }
83
84 @Override
85 public Void call() throws Exception {
86
87
88
89
90
91 LOG.debug("Starting region operation on " + region);
92 region.startRegionOperation();
93 try {
94 if (snapshotSkipFlush) {
95
96
97
98
99
100
101
102 LOG.debug("take snapshot without flush memstore first");
103 } else {
104 LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
105 boolean succeeded = false;
106 long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED);
107 for (int i = 0; i < MAX_RETRIES; i++) {
108 FlushResult res = region.flush(true);
109 if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) {
110 if (region instanceof HRegion) {
111 HRegion hreg = (HRegion) region;
112
113
114 hreg.waitForFlushes();
115 }
116 if (region.getMaxFlushedSeqId() >= readPt) {
117
118 succeeded = true;
119 break;
120 }
121 } else {
122 succeeded = true;
123 break;
124 }
125 }
126 if (!succeeded) {
127 throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts");
128 }
129 }
130 ((HRegion)region).addRegionToSnapshot(snapshot, monitor);
131 if (snapshotSkipFlush) {
132 LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
133 } else {
134 LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
135 }
136 } finally {
137 LOG.debug("Closing region operation on " + region);
138 region.closeRegionOperation();
139 }
140 return null;
141 }
142 }
143
144 private void flushSnapshot() throws ForeignException {
145 if (regions.isEmpty()) {
146
147 return;
148 }
149
150 monitor.rethrowException();
151
152
153 if (taskManager.hasTasks()) {
154 throw new IllegalStateException("Attempting to take snapshot "
155 + ClientSnapshotDescriptionUtils.toString(snapshot)
156 + " but we currently have outstanding tasks");
157 }
158
159
160 for (Region region : regions) {
161
162 taskManager.submitTask(new RegionSnapshotTask(region));
163 monitor.rethrowException();
164 }
165
166
167 LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
168 try {
169 taskManager.waitForOutstandingTasks();
170 } catch (InterruptedException e) {
171 LOG.error("got interrupted exception for " + getMemberName());
172 throw new ForeignException(getMemberName(), e);
173 }
174 }
175
176
177
178
179 @Override
180 public void acquireBarrier() throws ForeignException {
181
182 }
183
184
185
186
187 @Override
188 public byte[] insideBarrier() throws ForeignException {
189 flushSnapshot();
190 return new byte[0];
191 }
192
193
194
195
196 @Override
197 public void cleanup(Exception e) {
198 LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '"
199 + snapshot.getName() + "' due to error", e);
200 try {
201 taskManager.cancelTasks();
202 } catch (InterruptedException e1) {
203 Thread.currentThread().interrupt();
204 }
205 }
206
207
208
209
210 public void releaseBarrier() {
211
212 }
213
214 }