1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.ExecutorService;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.HRegionInfo;
31 import org.apache.hadoop.hbase.Server;
32 import org.apache.hadoop.hbase.ServerName;
33
34
35
36
37 @InterfaceAudience.Private
38 public class BulkReOpen extends BulkAssigner {
39 private final Map<ServerName, List<HRegionInfo>> rsToRegions;
40 private final AssignmentManager assignmentManager;
41 private static final Log LOG = LogFactory.getLog(BulkReOpen.class);
42
43 public BulkReOpen(final Server server,
44 final Map<ServerName, List<HRegionInfo>> serverToRegions,
45 final AssignmentManager am) {
46 super(server);
47 this.assignmentManager = am;
48 this.rsToRegions = serverToRegions;
49 }
50
51
52
53
54
55 @Override
56 protected void populatePool(ExecutorService pool) {
57 LOG.debug("Creating threads for each region server ");
58 for (Map.Entry<ServerName, List<HRegionInfo>> e : rsToRegions
59 .entrySet()) {
60 final List<HRegionInfo> hris = e.getValue();
61
62 Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>();
63 for (HRegionInfo hri : hris) {
64 RegionPlan reOpenPlan = assignmentManager.getRegionReopenPlan(hri);
65 plans.put(hri.getEncodedName(), reOpenPlan);
66 }
67 assignmentManager.addPlans(plans);
68 pool.execute(new Runnable() {
69 public void run() {
70 try {
71 unassign(hris);
72 } catch (Throwable t) {
73 LOG.warn("Failed bulking re-open " + hris.size()
74 + " region(s)", t);
75 }
76 }
77 });
78 }
79 }
80
81
82
83
84
85 @Override
86 protected boolean waitUntilDone(long timeout) {
87 return true;
88 }
89
90
91
92
93
94
95
96 protected int getThreadCount() {
97 int defaultThreadCount = super.getThreadCount();
98 return this.server.getConfiguration().getInt(
99 "hbase.bulk.reopen.threadpool.size", defaultThreadCount);
100 }
101
102 public boolean bulkReOpen() throws InterruptedException, IOException {
103 return bulkAssign();
104 }
105
106
107
108
109
110
111
112
113
114 private void unassign(
115 List<HRegionInfo> regions) throws InterruptedException {
116 int waitTime = this.server.getConfiguration().getInt(
117 "hbase.bulk.waitbetween.reopen", 0);
118 RegionStates regionStates = assignmentManager.getRegionStates();
119 for (HRegionInfo region : regions) {
120 if (server.isStopped()) {
121 return;
122 }
123 if (regionStates.isRegionInTransition(region)) {
124 continue;
125 }
126 assignmentManager.unassign(region, false);
127 while (regionStates.isRegionInTransition(region)
128 && !server.isStopped()) {
129 regionStates.waitForUpdate(100);
130 }
131 if (waitTime > 0 && !server.isStopped()) {
132 Thread.sleep(waitTime);
133 }
134 }
135 }
136 }