1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure.flush;
19
20 import java.util.List;
21 import java.util.concurrent.Callable;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.errorhandling.ForeignException;
27 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
28 import org.apache.hadoop.hbase.procedure.ProcedureMember;
29 import org.apache.hadoop.hbase.procedure.Subprocedure;
30 import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
31 import org.apache.hadoop.hbase.regionserver.Region;
32
33
34
35
36
37
38 @InterfaceAudience.Private
39 public class FlushTableSubprocedure extends Subprocedure {
40 private static final Log LOG = LogFactory.getLog(FlushTableSubprocedure.class);
41
42 private final String table;
43 private final List<Region> regions;
44 private final FlushTableSubprocedurePool taskManager;
45
46 public FlushTableSubprocedure(ProcedureMember member,
47 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
48 List<Region> regions, String table,
49 FlushTableSubprocedurePool taskManager) {
50 super(member, table, errorListener, wakeFrequency, timeout);
51 this.table = table;
52 this.regions = regions;
53 this.taskManager = taskManager;
54 }
55
56 private static class RegionFlushTask implements Callable<Void> {
57 Region region;
58 RegionFlushTask(Region region) {
59 this.region = region;
60 }
61
62 @Override
63 public Void call() throws Exception {
64 LOG.debug("Starting region operation on " + region);
65 region.startRegionOperation();
66 try {
67 LOG.debug("Flush region " + region.toString() + " started...");
68 region.flush(true);
69 } finally {
70 LOG.debug("Closing region operation on " + region);
71 region.closeRegionOperation();
72 }
73 return null;
74 }
75 }
76
77 private void flushRegions() throws ForeignException {
78 if (regions.isEmpty()) {
79
80 return;
81 }
82
83 monitor.rethrowException();
84
85
86 if (taskManager.hasTasks()) {
87 throw new IllegalStateException("Attempting to flush "
88 + table + " but we currently have outstanding tasks");
89 }
90
91
92 for (Region region : regions) {
93
94 taskManager.submitTask(new RegionFlushTask(region));
95 monitor.rethrowException();
96 }
97
98
99 LOG.debug("Flush region tasks submitted for " + regions.size() + " regions");
100 try {
101 taskManager.waitForOutstandingTasks();
102 } catch (InterruptedException e) {
103 throw new ForeignException(getMemberName(), e);
104 }
105 }
106
107
108
109
110 @Override
111 public void acquireBarrier() throws ForeignException {
112 flushRegions();
113 }
114
115 @Override
116 public byte[] insideBarrier() throws ForeignException {
117
118 return new byte[0];
119 }
120
121
122
123
124 @Override
125 public void cleanup(Exception e) {
126 LOG.info("Aborting all flush region subprocedure task threads for '"
127 + table + "' due to error", e);
128 try {
129 taskManager.cancelTasks();
130 } catch (InterruptedException e1) {
131 Thread.currentThread().interrupt();
132 }
133 }
134
135 public void releaseBarrier() {
136
137 }
138
139 }