1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure.flush;
19
20 import java.io.IOException;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ThreadPoolExecutor;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
33 import org.apache.hadoop.hbase.HRegionInfo;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.MetaTableAccessor;
37 import org.apache.hadoop.hbase.errorhandling.ForeignException;
38 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
39 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
40 import org.apache.hadoop.hbase.master.MasterServices;
41 import org.apache.hadoop.hbase.master.MetricsMaster;
42 import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
43 import org.apache.hadoop.hbase.procedure.Procedure;
44 import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
45 import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
46 import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
47 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
48 import org.apache.hadoop.hbase.util.Pair;
49 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
50 import org.apache.zookeeper.KeeperException;
51
52 import com.google.common.collect.Lists;
53
54 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
55 public class MasterFlushTableProcedureManager extends MasterProcedureManager {
56
57 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
58
59 private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis";
60 private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
61 private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis";
62 private static final int FLUSH_WAKE_MILLIS_DEFAULT = 500;
63
64 private static final String FLUSH_PROC_POOL_THREADS_KEY =
65 "hbase.flush.procedure.master.threads";
66 private static final int FLUSH_PROC_POOL_THREADS_DEFAULT = 1;
67
68 private static final Log LOG = LogFactory.getLog(MasterFlushTableProcedureManager.class);
69
70 private MasterServices master;
71 private ProcedureCoordinator coordinator;
72 private Map<TableName, Procedure> procMap = new HashMap<TableName, Procedure>();
73 private boolean stopped;
74
75 public MasterFlushTableProcedureManager() {};
76
77 @Override
78 public void stop(String why) {
79 LOG.info("stop: " + why);
80 this.stopped = true;
81 }
82
83 @Override
84 public boolean isStopped() {
85 return this.stopped;
86 }
87
88 @Override
89 public void initialize(MasterServices master, MetricsMaster metricsMaster)
90 throws KeeperException, IOException, UnsupportedOperationException {
91 this.master = master;
92
93
94 Configuration conf = master.getConfiguration();
95 long wakeFrequency = conf.getInt(FLUSH_WAKE_MILLIS_KEY, FLUSH_WAKE_MILLIS_DEFAULT);
96 long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
97 int threads = conf.getInt(FLUSH_PROC_POOL_THREADS_KEY, FLUSH_PROC_POOL_THREADS_DEFAULT);
98
99
100 String name = master.getServerName().toString();
101 ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads);
102 ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
103 master.getZooKeeper(), getProcedureSignature(), name);
104
105 this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
106 }
107
108 @Override
109 public String getProcedureSignature() {
110 return FLUSH_TABLE_PROCEDURE_SIGNATURE;
111 }
112
113 @Override
114 public void execProcedure(ProcedureDescription desc) throws IOException {
115
116 TableName tableName = TableName.valueOf(desc.getInstance());
117
118
119 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
120 if (cpHost != null) {
121 cpHost.preTableFlush(tableName);
122 }
123
124
125
126
127
128
129 List<Pair<HRegionInfo, ServerName>> regionsAndLocations;
130
131 if (TableName.META_TABLE_NAME.equals(tableName)) {
132 regionsAndLocations = new MetaTableLocator().getMetaRegionsAndLocations(
133 master.getZooKeeper());
134 } else {
135 regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(
136 master.getZooKeeper(), master.getConnection(), tableName, false);
137 }
138
139 Set<String> regionServers = new HashSet<String>(regionsAndLocations.size());
140 for (Pair<HRegionInfo, ServerName> region : regionsAndLocations) {
141 if (region != null && region.getFirst() != null && region.getSecond() != null) {
142 HRegionInfo hri = region.getFirst();
143 if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
144 regionServers.add(region.getSecond().toString());
145 }
146 }
147
148 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
149
150
151
152
153 Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(),
154 new byte[0], Lists.newArrayList(regionServers));
155 monitor.rethrowException();
156 if (proc == null) {
157 String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '"
158 + desc.getInstance() + "'. " + "Another flush procedure is running?";
159 LOG.error(msg);
160 throw new IOException(msg);
161 }
162
163 procMap.put(tableName, proc);
164
165 try {
166
167
168 proc.waitForCompleted();
169 LOG.info("Done waiting - exec procedure " + desc.getSignature() + " for '"
170 + desc.getInstance() + "'");
171 LOG.info("Master flush table procedure is successful!");
172 } catch (InterruptedException e) {
173 ForeignException ee =
174 new ForeignException("Interrupted while waiting for flush table procdure to finish", e);
175 monitor.receive(ee);
176 Thread.currentThread().interrupt();
177 } catch (ForeignException e) {
178 ForeignException ee =
179 new ForeignException("Exception while waiting for flush table procdure to finish", e);
180 monitor.receive(ee);
181 }
182 monitor.rethrowException();
183 }
184
185 @Override
186 public synchronized boolean isProcedureDone(ProcedureDescription desc) throws IOException {
187
188 TableName tableName = TableName.valueOf(desc.getInstance());
189 Procedure proc = procMap.get(tableName);
190 if (proc == null) {
191
192
193
194 return false;
195 }
196
197 return proc.isCompleted();
198 }
199
200 }