1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.snapshot;
19
20 import java.util.List;
21 import java.util.concurrent.Callable;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.hbase.errorhandling.ForeignException;
28 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
29 import org.apache.hadoop.hbase.procedure.ProcedureMember;
30 import org.apache.hadoop.hbase.procedure.Subprocedure;
31 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
32 import org.apache.hadoop.hbase.regionserver.HRegion;
33 import org.apache.hadoop.hbase.regionserver.Region;
34 import org.apache.hadoop.hbase.regionserver.Region.Operation;
35 import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
36 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
37
38
39
40
41
42
43
44
45 @InterfaceAudience.Private
46 @InterfaceStability.Unstable
47 public class FlushSnapshotSubprocedure extends Subprocedure {
48 private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class);
49
50 private final List<Region> regions;
51 private final SnapshotDescription snapshot;
52 private final SnapshotSubprocedurePool taskManager;
53 private boolean snapshotSkipFlush = false;
54
55 public FlushSnapshotSubprocedure(ProcedureMember member,
56 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
57 List<Region> regions, SnapshotDescription snapshot,
58 SnapshotSubprocedurePool taskManager) {
59 super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
60 this.snapshot = snapshot;
61
62 if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) {
63 snapshotSkipFlush = true;
64 }
65 this.regions = regions;
66 this.taskManager = taskManager;
67 }
68
69
70
71
72 private class RegionSnapshotTask implements Callable<Void> {
73 Region region;
74 RegionSnapshotTask(Region region) {
75 this.region = region;
76 }
77
78 @Override
79 public Void call() throws Exception {
80
81
82
83
84
85 LOG.debug("Starting region operation on " + region);
86 region.startRegionOperation();
87 try {
88 if (snapshotSkipFlush) {
89
90
91
92
93
94
95
96 LOG.debug("take snapshot without flush memstore first");
97 } else {
98 LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
99 region.flush(true);
100 }
101 ((HRegion)region).addRegionToSnapshot(snapshot, monitor);
102 if (snapshotSkipFlush) {
103 LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
104 } else {
105 LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
106 }
107 } finally {
108 LOG.debug("Closing region operation on " + region);
109 region.closeRegionOperation();
110 }
111 return null;
112 }
113 }
114
115 private void flushSnapshot() throws ForeignException {
116 if (regions.isEmpty()) {
117
118 return;
119 }
120
121 monitor.rethrowException();
122
123
124 if (taskManager.hasTasks()) {
125 throw new IllegalStateException("Attempting to take snapshot "
126 + ClientSnapshotDescriptionUtils.toString(snapshot)
127 + " but we currently have outstanding tasks");
128 }
129
130
131 for (Region region : regions) {
132
133 taskManager.submitTask(new RegionSnapshotTask(region));
134 monitor.rethrowException();
135 }
136
137
138 LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
139 try {
140 taskManager.waitForOutstandingTasks();
141 } catch (InterruptedException e) {
142 throw new ForeignException(getMemberName(), e);
143 }
144 }
145
146
147
148
149 @Override
150 public void acquireBarrier() throws ForeignException {
151
152 }
153
154
155
156
157 @Override
158 public byte[] insideBarrier() throws ForeignException {
159 flushSnapshot();
160 return new byte[0];
161 }
162
163
164
165
166 @Override
167 public void cleanup(Exception e) {
168 LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '"
169 + snapshot.getName() + "' due to error", e);
170 try {
171 taskManager.cancelTasks();
172 } catch (InterruptedException e1) {
173 Thread.currentThread().interrupt();
174 }
175 }
176
177
178
179
180 public void releaseBarrier() {
181
182 }
183
184 }