1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master;
19
20 import java.lang.Thread.UncaughtExceptionHandler;
21 import java.util.ArrayList;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.TimeUnit;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.Server;
36 import org.apache.hadoop.hbase.ServerName;
37
38
39
40
41
42 @InterfaceAudience.Private
43 public class GeneralBulkAssigner extends BulkAssigner {
44 private static final Log LOG = LogFactory.getLog(GeneralBulkAssigner.class);
45
46 private Map<ServerName, List<HRegionInfo>> failedPlans
47 = new ConcurrentHashMap<ServerName, List<HRegionInfo>>();
48 private ExecutorService pool;
49
50 final Map<ServerName, List<HRegionInfo>> bulkPlan;
51 final AssignmentManager assignmentManager;
52 final boolean waitTillAllAssigned;
53
54 public GeneralBulkAssigner(final Server server,
55 final Map<ServerName, List<HRegionInfo>> bulkPlan,
56 final AssignmentManager am, final boolean waitTillAllAssigned) {
57 super(server);
58 this.bulkPlan = bulkPlan;
59 this.assignmentManager = am;
60 this.waitTillAllAssigned = waitTillAllAssigned;
61 }
62
63 @Override
64 protected String getThreadNamePrefix() {
65 return this.server.getServerName() + "-GeneralBulkAssigner";
66 }
67
68 @Override
69 protected void populatePool(ExecutorService pool) {
70 this.pool = pool;
71 for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
72 pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
73 this.assignmentManager, this.failedPlans));
74 }
75 }
76
77
78
79
80
81
82 @Override
83 protected boolean waitUntilDone(final long timeout)
84 throws InterruptedException {
85 Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
86 for (List<HRegionInfo> regionList : bulkPlan.values()) {
87 regionSet.addAll(regionList);
88 }
89
90 pool.shutdown();
91 int serverCount = bulkPlan.size();
92 int regionCount = regionSet.size();
93 long startTime = System.currentTimeMillis();
94 long rpcWaitTime = startTime + timeout;
95 while (!server.isStopped() && !pool.isTerminated()
96 && rpcWaitTime > System.currentTimeMillis()) {
97 if (failedPlans.isEmpty()) {
98 pool.awaitTermination(100, TimeUnit.MILLISECONDS);
99 } else {
100 reassignFailedPlans();
101 }
102 }
103 if (!pool.isTerminated()) {
104 LOG.warn("bulk assigner is still running after "
105 + (System.currentTimeMillis() - startTime) + "ms, shut it down now");
106
107 List<Runnable> notStarted = pool.shutdownNow();
108 if (notStarted != null && !notStarted.isEmpty()) {
109 server.abort("some single server assigner hasn't started yet"
110 + " when the bulk assigner timed out", null);
111 return false;
112 }
113 }
114
115 int reassigningRegions = 0;
116 if (!failedPlans.isEmpty() && !server.isStopped()) {
117 reassigningRegions = reassignFailedPlans();
118 }
119 assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned,
120 reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime));
121
122 if (LOG.isDebugEnabled()) {
123 long elapsedTime = System.currentTimeMillis() - startTime;
124 String status = "successfully";
125 if (!regionSet.isEmpty()) {
126 status = "with " + regionSet.size() + " regions still in transition";
127 }
128 LOG.debug("bulk assigning total " + regionCount + " regions to "
129 + serverCount + " servers, took " + elapsedTime + "ms, " + status);
130 }
131 return regionSet.isEmpty();
132 }
133
134 @Override
135 protected long getTimeoutOnRIT() {
136
137
138 Configuration conf = server.getConfiguration();
139 long perRegionOpenTimeGuesstimate =
140 conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
141 int maxRegionsPerServer = 1;
142 for (List<HRegionInfo> regionList : bulkPlan.values()) {
143 int size = regionList.size();
144 if (size > maxRegionsPerServer) {
145 maxRegionsPerServer = size;
146 }
147 }
148 long timeout = perRegionOpenTimeGuesstimate * maxRegionsPerServer
149 + conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000)
150 + conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime",
151 30000) * bulkPlan.size();
152 LOG.debug("Timeout-on-RIT=" + timeout);
153 return timeout;
154 }
155
156 @Override
157 protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
158 return new UncaughtExceptionHandler() {
159 @Override
160 public void uncaughtException(Thread t, Throwable e) {
161 LOG.warn("Assigning regions in " + t.getName(), e);
162 }
163 };
164 }
165
166 private int reassignFailedPlans() {
167 List<HRegionInfo> reassigningRegions = new ArrayList<HRegionInfo>();
168 for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) {
169 LOG.info("Failed assigning " + e.getValue().size()
170 + " regions to server " + e.getKey() + ", reassigning them");
171 reassigningRegions.addAll(failedPlans.remove(e.getKey()));
172 }
173 RegionStates regionStates = assignmentManager.getRegionStates();
174 for (HRegionInfo region : reassigningRegions) {
175 if (!regionStates.isRegionOnline(region)) {
176 assignmentManager.invokeAssign(region);
177 }
178 }
179 return reassigningRegions.size();
180 }
181
182
183
184
185 static class SingleServerBulkAssigner implements Runnable {
186 private final ServerName regionserver;
187 private final List<HRegionInfo> regions;
188 private final AssignmentManager assignmentManager;
189 private final Map<ServerName, List<HRegionInfo>> failedPlans;
190
191 SingleServerBulkAssigner(final ServerName regionserver,
192 final List<HRegionInfo> regions, final AssignmentManager am,
193 final Map<ServerName, List<HRegionInfo>> failedPlans) {
194 this.regionserver = regionserver;
195 this.regions = regions;
196 this.assignmentManager = am;
197 this.failedPlans = failedPlans;
198 }
199
200 @Override
201 public void run() {
202 try {
203 if (!assignmentManager.assign(regionserver, regions)) {
204 failedPlans.put(regionserver, regions);
205 }
206 } catch (Throwable t) {
207 LOG.warn("Failed bulking assigning " + regions.size()
208 + " region(s) to " + regionserver.getServerName()
209 + ", and continue to bulk assign others", t);
210 failedPlans.put(regionserver, regions);
211 }
212 }
213 }
214 }