View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.master.handler;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.Map;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.HRegionInfo;
29  import org.apache.hadoop.hbase.RegionLoad;
30  import org.apache.hadoop.hbase.ServerLoad;
31  import org.apache.hadoop.hbase.ServerName;
32  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
33  import org.apache.hadoop.hbase.executor.EventHandler;
34  import org.apache.hadoop.hbase.executor.EventType;
35  import org.apache.hadoop.hbase.master.CatalogJanitor;
36  import org.apache.hadoop.hbase.master.MasterServices;
37  import org.apache.hadoop.hbase.master.RegionPlan;
38  import org.apache.hadoop.hbase.master.RegionStates;
39  import org.apache.hadoop.hbase.master.ServerManager;
40  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
41  
42  /**
43   * Handles MERGE regions request on master: move the regions together(on the
44   * same regionserver) and send MERGE RPC to regionserver.
45   *
46   * NOTE:The real merge is executed on the regionserver
47   *
48   */
49  @InterfaceAudience.Private
50  public class DispatchMergingRegionHandler extends EventHandler {
51    private static final Log LOG = LogFactory.getLog(DispatchMergingRegionHandler.class);
52    private final MasterServices masterServices;
53    private final CatalogJanitor catalogJanitor;
54    private HRegionInfo region_a;
55    private HRegionInfo region_b;
56    private final boolean forcible;
57    private final int timeout;
58  
59    public DispatchMergingRegionHandler(final MasterServices services,
60        final CatalogJanitor catalogJanitor, final HRegionInfo region_a,
61        final HRegionInfo region_b, final boolean forcible) {
62      super(services, EventType.C_M_MERGE_REGION);
63      this.masterServices = services;
64      this.catalogJanitor = catalogJanitor;
65      this.region_a = region_a;
66      this.region_b = region_b;
67      this.forcible = forcible;
68      this.timeout = server.getConfiguration().getInt(
69          "hbase.master.regionmerge.timeout", 120 * 1000);
70    }
71  
72    @Override
73    public void process() throws IOException {
74      boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(region_a);
75      if (regionAHasMergeQualifier
76          || !catalogJanitor.cleanMergeQualifier(region_b)) {
77        LOG.info("Skip merging regions " + region_a.getRegionNameAsString()
78            + ", " + region_b.getRegionNameAsString() + ", because region "
79            + (regionAHasMergeQualifier ? region_a.getEncodedName() : region_b
80                .getEncodedName()) + " has merge qualifier");
81        return;
82      }
83  
84      RegionStates regionStates = masterServices.getAssignmentManager()
85          .getRegionStates();
86      ServerName region_a_location = regionStates.getRegionServerOfRegion(region_a);
87      ServerName region_b_location = regionStates.getRegionServerOfRegion(region_b);
88      if (region_a_location == null || region_b_location == null) {
89        LOG.info("Skip merging regions " + region_a.getRegionNameAsString()
90            + ", " + region_b.getRegionNameAsString() + ", because region "
91            + (region_a_location == null ? region_a.getEncodedName() : region_b
92                .getEncodedName()) + " is not online now");
93        return;
94      }
95      long startTime = EnvironmentEdgeManager.currentTime();
96      boolean onSameRS = region_a_location.equals(region_b_location);
97  
98      // Make sure regions are on the same regionserver before send merge
99      // regions request to regionserver
100     if (!onSameRS) {
101       // Move region_b to region a's location, switch region_a and region_b if
102       // region_a's load lower than region_b's, so we will always move lower
103       // load region
104       RegionLoad loadOfRegionA = getRegionLoad(region_a_location, region_a);
105       RegionLoad loadOfRegionB = getRegionLoad(region_b_location, region_b);
106       if (loadOfRegionA != null && loadOfRegionB != null
107           && loadOfRegionA.getRequestsCount() < loadOfRegionB
108               .getRequestsCount()) {
109         // switch region_a and region_b
110         HRegionInfo tmpRegion = this.region_a;
111         this.region_a = this.region_b;
112         this.region_b = tmpRegion;
113         ServerName tmpLocation = region_a_location;
114         region_a_location = region_b_location;
115         region_b_location = tmpLocation;
116       }
117 
118       RegionPlan regionPlan = new RegionPlan(region_b, region_b_location,
119           region_a_location);
120       LOG.info("Moving regions to same server for merge: " + regionPlan.toString());
121       masterServices.getAssignmentManager().balance(regionPlan);
122       while (!masterServices.isStopped()) {
123         try {
124           Thread.sleep(20);
125           // Make sure check RIT first, then get region location, otherwise
126           // we would make a wrong result if region is online between getting
127           // region location and checking RIT
128           boolean isRIT = regionStates.isRegionInTransition(region_b);
129           region_b_location = masterServices.getAssignmentManager()
130               .getRegionStates().getRegionServerOfRegion(region_b);
131           onSameRS = region_a_location.equals(region_b_location);
132           if (onSameRS || !isRIT) {
133             // Regions are on the same RS, or region_b is not in
134             // RegionInTransition any more
135             break;
136           }
137           if ((EnvironmentEdgeManager.currentTime() - startTime) > timeout) break;
138         } catch (InterruptedException e) {
139           InterruptedIOException iioe = new InterruptedIOException();
140           iioe.initCause(e);
141           throw iioe;
142         }
143       }
144     }
145 
146     if (onSameRS) {
147       startTime = EnvironmentEdgeManager.currentTime();
148       while (!masterServices.isStopped()) {
149         try {
150           masterServices.getServerManager().sendRegionsMerge(region_a_location,
151               region_a, region_b, forcible);
152           LOG.info("Sent merge to server " + region_a_location + " for region " +
153             region_a.getEncodedName() + "," + region_b.getEncodedName() + ", focible=" + forcible);
154           break;
155         } catch (RegionOpeningException roe) {
156           if ((EnvironmentEdgeManager.currentTime() - startTime) > timeout) {
157             LOG.warn("Failed sending merge to " + region_a_location + " after " + timeout + "ms",
158               roe);
159             break;
160           }
161           // Do a retry since region should be online on RS immediately
162         } catch (IOException ie) {
163           LOG.warn("Failed sending merge to " + region_a_location + " for region " +
164             region_a.getEncodedName() + "," + region_b.getEncodedName() + ", focible=" + forcible,
165             ie);
166           break;
167         }
168       }
169     } else {
170       LOG.info("Cancel merging regions " + region_a.getRegionNameAsString()
171           + ", " + region_b.getRegionNameAsString()
172           + ", because can't move them together after "
173           + (EnvironmentEdgeManager.currentTime() - startTime) + "ms");
174     }
175   }
176 
177   private RegionLoad getRegionLoad(ServerName sn, HRegionInfo hri) {
178     ServerManager serverManager =  masterServices.getServerManager();
179     ServerLoad load = serverManager.getLoad(sn);
180     if (load != null) {
181       Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad();
182       if (regionsLoad != null) {
183         return regionsLoad.get(hri.getRegionName());
184       }
185     }
186     return null;
187   }
188 }