1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master.handler;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.ExecutorService;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.HRegionInfo;
30  import org.apache.hadoop.hbase.Server;
31  import org.apache.hadoop.hbase.ServerName;
32  import org.apache.hadoop.hbase.exceptions.TableNotDisabledException;
33  import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
34  import org.apache.hadoop.hbase.catalog.CatalogTracker;
35  import org.apache.hadoop.hbase.catalog.MetaReader;
36  import org.apache.hadoop.hbase.executor.EventHandler;
37  import org.apache.hadoop.hbase.executor.EventType;
38  import org.apache.hadoop.hbase.master.AssignmentManager;
39  import org.apache.hadoop.hbase.master.BulkAssigner;
40  import org.apache.hadoop.hbase.master.HMaster;
41  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
42  import org.apache.hadoop.hbase.master.RegionPlan;
43  import org.apache.hadoop.hbase.master.RegionStates;
44  import org.apache.hadoop.hbase.master.ServerManager;
45  import org.apache.hadoop.hbase.master.TableLockManager;
46  import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.Pair;
49  import org.apache.zookeeper.KeeperException;
50  import org.cloudera.htrace.Trace;
51  
52  /**
53   * Handler to run enable of a table.
54   */
55  @InterfaceAudience.Private
56  public class EnableTableHandler extends EventHandler {
57    private static final Log LOG = LogFactory.getLog(EnableTableHandler.class);
58    private final byte [] tableName;
59    private final String tableNameStr;
60    private final AssignmentManager assignmentManager;
61    private final TableLockManager tableLockManager;
62    private final CatalogTracker catalogTracker;
63    private boolean retainAssignment = false;
64    private TableLock tableLock;
65  
66    public EnableTableHandler(Server server, byte [] tableName,
67        CatalogTracker catalogTracker, AssignmentManager assignmentManager,
68        TableLockManager tableLockManager, boolean skipTableStateCheck) {
69      super(server, EventType.C_M_ENABLE_TABLE);
70      this.tableName = tableName;
71      this.tableNameStr = Bytes.toString(tableName);
72      this.catalogTracker = catalogTracker;
73      this.assignmentManager = assignmentManager;
74      this.tableLockManager = tableLockManager;
75      this.retainAssignment = skipTableStateCheck;
76    }
77  
78    public EnableTableHandler prepare()
79        throws TableNotFoundException, TableNotDisabledException, IOException {
80      //acquire the table write lock, blocking
81      this.tableLock = this.tableLockManager.writeLock(tableName,
82          EventType.C_M_ENABLE_TABLE.toString());
83      this.tableLock.acquire();
84  
85      boolean success = false;
86      try {
87        // Check if table exists
88        if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
89          // retainAssignment is true only during recovery.  In normal case it is false
90          if (!this.retainAssignment) {
91            throw new TableNotFoundException(tableNameStr);
92          } 
93          try {
94            this.assignmentManager.getZKTable().removeEnablingTable(tableNameStr, true);
95          } catch (KeeperException e) {
96            // TODO : Use HBCK to clear such nodes
97            LOG.warn("Failed to delete the ENABLING node for the table " + tableNameStr
98                + ".  The table will remain unusable. Run HBCK to manually fix the problem.");
99          }
100       }
101 
102       // There could be multiple client requests trying to disable or enable
103       // the table at the same time. Ensure only the first request is honored
104       // After that, no other requests can be accepted until the table reaches
105       // DISABLED or ENABLED.
106       if (!retainAssignment) {
107         try {
108           if (!this.assignmentManager.getZKTable().checkDisabledAndSetEnablingTable
109             (this.tableNameStr)) {
110             LOG.info("Table " + tableNameStr + " isn't disabled; skipping enable");
111             throw new TableNotDisabledException(this.tableNameStr);
112           }
113         } catch (KeeperException e) {
114           throw new IOException("Unable to ensure that the table will be" +
115             " enabling because of a ZooKeeper issue", e);
116         }
117       }
118       success = true;
119     } finally {
120       if (!success) {
121         releaseTableLock();
122       }
123     }
124     return this;
125   }
126 
127   @Override
128   public String toString() {
129     String name = "UnknownServerName";
130     if(server != null && server.getServerName() != null) {
131       name = server.getServerName().toString();
132     }
133     return getClass().getSimpleName() + "-" + name + "-" + getSeqid() + "-" +
134       tableNameStr;
135   }
136 
137   @Override
138   public void process() {
139     try {
140       LOG.info("Attempting to enable the table " + this.tableNameStr);
141       MasterCoprocessorHost cpHost = ((HMaster) this.server)
142           .getCoprocessorHost();
143       if (cpHost != null) {
144         cpHost.preEnableTableHandler(this.tableName);
145       }
146       handleEnableTable();
147       if (cpHost != null) {
148         cpHost.postEnableTableHandler(this.tableName);
149       }
150     } catch (IOException e) {
151       LOG.error("Error trying to enable the table " + this.tableNameStr, e);
152     } catch (KeeperException e) {
153       LOG.error("Error trying to enable the table " + this.tableNameStr, e);
154     } catch (InterruptedException e) {
155       LOG.error("Error trying to enable the table " + this.tableNameStr, e);
156     } finally {
157       releaseTableLock();
158     }
159   }
160 
161   private void releaseTableLock() {
162     if (this.tableLock != null) {
163       try {
164         this.tableLock.release();
165       } catch (IOException ex) {
166         LOG.warn("Could not release the table lock", ex);
167       }
168     }
169   }
170 
171   private void handleEnableTable() throws IOException, KeeperException, InterruptedException {
172     // I could check table is disabling and if so, not enable but require
173     // that user first finish disabling but that might be obnoxious.
174 
175     // Set table enabling flag up in zk.
176     this.assignmentManager.getZKTable().setEnablingTable(this.tableNameStr);
177     boolean done = false;
178     // Get the regions of this table. We're done when all listed
179     // tables are onlined.
180     List<Pair<HRegionInfo, ServerName>> tableRegionsAndLocations = MetaReader
181         .getTableRegionsAndLocations(this.catalogTracker, tableName, true);
182     int countOfRegionsInTable = tableRegionsAndLocations.size();
183     List<HRegionInfo> regions = regionsToAssignWithServerName(tableRegionsAndLocations);
184     int regionsCount = regions.size();
185     if (regionsCount == 0) {
186       done = true;
187     }
188     LOG.info("Table '" + this.tableNameStr + "' has " + countOfRegionsInTable
189       + " regions, of which " + regionsCount + " are offline.");
190     BulkEnabler bd = new BulkEnabler(this.server, regions, countOfRegionsInTable,
191         this.retainAssignment);
192     try {
193       if (bd.bulkAssign()) {
194         done = true;
195       }
196     } catch (InterruptedException e) {
197       LOG.warn("Enable operation was interrupted when enabling table '"
198         + this.tableNameStr + "'");
199       // Preserve the interrupt.
200       Thread.currentThread().interrupt();
201     }
202     if (done) {
203       // Flip the table to enabled.
204       this.assignmentManager.getZKTable().setEnabledTable(
205         this.tableNameStr);
206       LOG.info("Table '" + this.tableNameStr
207       + "' was successfully enabled. Status: done=" + done);
208     } else {
209       LOG.warn("Table '" + this.tableNameStr
210       + "' wasn't successfully enabled. Status: done=" + done);
211     }
212   }
213 
214   /**
215    * @param regionsInMeta
216    * @return List of regions neither in transition nor assigned.
217    * @throws IOException
218    */
219   private List<HRegionInfo> regionsToAssignWithServerName(
220       final List<Pair<HRegionInfo, ServerName>> regionsInMeta) throws IOException {
221     ServerManager serverManager = ((HMaster) this.server).getServerManager();
222     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
223     RegionStates regionStates = this.assignmentManager.getRegionStates();
224     for (Pair<HRegionInfo, ServerName> regionLocation : regionsInMeta) {
225       HRegionInfo hri = regionLocation.getFirst();
226       ServerName sn = regionLocation.getSecond();
227       if (!regionStates.isRegionInTransition(hri) && !regionStates.isRegionAssigned(hri)) {
228         if (this.retainAssignment && sn != null && serverManager.isServerOnline(sn)) {
229           this.assignmentManager.addPlan(hri.getEncodedName(), new RegionPlan(hri, null, sn));
230         }
231         regions.add(hri);
232       } else {
233         if (LOG.isDebugEnabled()) {
234           LOG.debug("Skipping assign for the region " + hri + " during enable table "
235               + hri.getTableNameAsString() + " because its already in tranition or assigned.");
236         }
237       }
238     }
239     return regions;
240   }
241 
242   /**
243    * Run bulk enable.
244    */
245   class BulkEnabler extends BulkAssigner {
246     private final List<HRegionInfo> regions;
247     // Count of regions in table at time this assign was launched.
248     private final int countOfRegionsInTable;
249     private final boolean retainAssignment;
250 
251     BulkEnabler(final Server server, final List<HRegionInfo> regions,
252         final int countOfRegionsInTable, boolean retainAssignment) {
253       super(server);
254       this.regions = regions;
255       this.countOfRegionsInTable = countOfRegionsInTable;
256       this.retainAssignment = retainAssignment;
257     }
258 
259     @Override
260     protected void populatePool(ExecutorService pool) throws IOException {
261       boolean roundRobinAssignment = this.server.getConfiguration().getBoolean(
262           "hbase.master.enabletable.roundrobin", false);
263 
264       // In case of masterRestart always go with single assign.  Going thro
265       // roundRobinAssignment will use bulkassign which may lead to double assignment.
266       if (retainAssignment || !roundRobinAssignment) {
267         for (HRegionInfo region : regions) {
268           if (assignmentManager.getRegionStates()
269               .isRegionInTransition(region)) {
270             continue;
271           }
272           final HRegionInfo hri = region;
273           pool.execute(Trace.wrap(new Runnable() {
274             public void run() {
275               assignmentManager.assign(hri, true);
276             }
277           }));
278         }
279       } else {
280         try {
281           assignmentManager.assign(regions);
282         } catch (InterruptedException e) {
283           LOG.warn("Assignment was interrupted");
284           Thread.currentThread().interrupt();
285         }
286       }
287     }
288 
289     @Override
290     protected boolean waitUntilDone(long timeout)
291     throws InterruptedException {
292       long startTime = System.currentTimeMillis();
293       long remaining = timeout;
294       List<HRegionInfo> regions = null;
295       int lastNumberOfRegions = 0;
296       while (!server.isStopped() && remaining > 0) {
297         Thread.sleep(waitingTimeForEvents);
298         regions = assignmentManager.getRegionStates()
299           .getRegionsOfTable(tableName);
300         if (isDone(regions)) break;
301 
302         // Punt on the timeout as long we make progress
303         if (regions.size() > lastNumberOfRegions) {
304           lastNumberOfRegions = regions.size();
305           timeout += waitingTimeForEvents;
306         }
307         remaining = timeout - (System.currentTimeMillis() - startTime);
308       }
309       return isDone(regions);
310     }
311 
312     private boolean isDone(final List<HRegionInfo> regions) {
313       return regions != null && regions.size() >= this.countOfRegionsInTable;
314     }
315   }
316 }