001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.ListIterator;
025import java.util.stream.Collectors;
026import java.util.stream.IntStream;
027import java.util.stream.Stream;
028import org.apache.commons.lang3.ArrayUtils;
029import org.apache.hadoop.hbase.HBaseIOException;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.client.RegionInfo;
032import org.apache.hadoop.hbase.client.RegionReplicaUtil;
033import org.apache.hadoop.hbase.favored.FavoredNodesManager;
034import org.apache.hadoop.hbase.ipc.HBaseRpcController;
035import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
036import org.apache.hadoop.hbase.wal.WALSplitUtil;
037import org.apache.yetus.audience.InterfaceAudience;
038
039import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
040
041import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
042import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
046
047/**
048 * Utility for this assignment package only.
049 */
050@InterfaceAudience.Private
051final class AssignmentManagerUtil {
052  private static final int DEFAULT_REGION_REPLICA = 1;
053
054  private AssignmentManagerUtil() {
055  }
056
057  /**
058   * Raw call to remote regionserver to get info on a particular region.
059   * @throws IOException Let it out so can report this IOE as reason for failure
060   */
061  static GetRegionInfoResponse getRegionInfoResponse(final MasterProcedureEnv env,
062      final ServerName regionLocation, final RegionInfo hri) throws IOException {
063    return getRegionInfoResponse(env, regionLocation, hri, false);
064  }
065
066  static GetRegionInfoResponse getRegionInfoResponse(final MasterProcedureEnv env,
067      final ServerName regionLocation, final RegionInfo hri, boolean includeBestSplitRow)
068      throws IOException {
069    // TODO: There is no timeout on this controller. Set one!
070    HBaseRpcController controller =
071      env.getMasterServices().getClusterConnection().getRpcControllerFactory().newController();
072    final AdminService.BlockingInterface admin =
073      env.getMasterServices().getClusterConnection().getAdmin(regionLocation);
074    GetRegionInfoRequest request = null;
075    if (includeBestSplitRow) {
076      request = RequestConverter.buildGetRegionInfoRequest(hri.getRegionName(), false, true);
077    } else {
078      request = RequestConverter.buildGetRegionInfoRequest(hri.getRegionName());
079    }
080    try {
081      return admin.getRegionInfo(controller, request);
082    } catch (ServiceException e) {
083      throw ProtobufUtil.handleRemoteException(e);
084    }
085  }
086
087  private static void lock(List<RegionStateNode> regionNodes) {
088    regionNodes.iterator().forEachRemaining(RegionStateNode::lock);
089  }
090
091  private static void unlock(List<RegionStateNode> regionNodes) {
092    for (ListIterator<RegionStateNode> iter = regionNodes.listIterator(regionNodes.size()); iter
093      .hasPrevious();) {
094      iter.previous().unlock();
095    }
096  }
097
098  static TransitRegionStateProcedure[] createUnassignProceduresForSplitOrMerge(
099      MasterProcedureEnv env, Stream<RegionInfo> regions, int regionReplication)
100      throws IOException {
101    List<RegionStateNode> regionNodes = regions
102      .flatMap(hri -> IntStream.range(0, regionReplication)
103        .mapToObj(i -> RegionReplicaUtil.getRegionInfoForReplica(hri, i)))
104      .map(env.getAssignmentManager().getRegionStates()::getOrCreateRegionStateNode)
105      .collect(Collectors.toList());
106    TransitRegionStateProcedure[] procs = new TransitRegionStateProcedure[regionNodes.size()];
107    boolean rollback = true;
108    int i = 0;
109    // hold the lock at once, and then release it in finally. This is important as SCP may jump in
110    // if we release the lock in the middle when we want to do rollback, and cause problems.
111    lock(regionNodes);
112    try {
113      for (; i < procs.length; i++) {
114        RegionStateNode regionNode = regionNodes.get(i);
115        TransitRegionStateProcedure proc =
116          TransitRegionStateProcedure.unassign(env, regionNode.getRegionInfo());
117        if (regionNode.getProcedure() != null) {
118          throw new HBaseIOException(
119            "The parent region " + regionNode + " is currently in transition, give up");
120        }
121        regionNode.setProcedure(proc);
122        procs[i] = proc;
123      }
124      // all succeeded, set rollback to false
125      rollback = false;
126    } finally {
127      if (rollback) {
128        for (;;) {
129          i--;
130          if (i < 0) {
131            break;
132          }
133          RegionStateNode regionNode = regionNodes.get(i);
134          regionNode.unsetProcedure(procs[i]);
135        }
136      }
137      unlock(regionNodes);
138    }
139    return procs;
140  }
141
142  /**
143   * Create assign procedures for the give regions, according to the {@code regionReplication}.
144   * <p/>
145   * For rolling back, we will submit procedures directly to the {@code ProcedureExecutor}, so it is
146   * possible that we persist the newly scheduled procedures, and then crash before persisting the
147   * rollback state, so when we arrive here the second time, it is possible that some regions have
148   * already been associated with a TRSP.
149   * @param ignoreIfInTransition if true, will skip creating TRSP for the given region if it is
150   *          already in transition, otherwise we will add an assert that it should not in
151   *          transition.
152   */
153  private static TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env,
154      List<RegionInfo> regions, int regionReplication, ServerName targetServer,
155      boolean ignoreIfInTransition) {
156    // create the assign procs only for the primary region using the targetServer
157    TransitRegionStateProcedure[] primaryRegionProcs =
158      regions.stream().map(env.getAssignmentManager().getRegionStates()::getOrCreateRegionStateNode)
159        .map(regionNode -> {
160          TransitRegionStateProcedure proc =
161            TransitRegionStateProcedure.assign(env, regionNode.getRegionInfo(), targetServer);
162          regionNode.lock();
163          try {
164            if (ignoreIfInTransition) {
165              if (regionNode.isInTransition()) {
166                return null;
167              }
168            } else {
169              // should never fail, as we have the exclusive region lock, and the region is newly
170              // created, or has been successfully closed so should not be on any servers, so SCP
171              // will
172              // not process it either.
173              assert !regionNode.isInTransition();
174            }
175            regionNode.setProcedure(proc);
176          } finally {
177            regionNode.unlock();
178          }
179          return proc;
180        }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
181    if (regionReplication == DEFAULT_REGION_REPLICA) {
182      // this is the default case
183      return primaryRegionProcs;
184    }
185    // collect the replica region infos
186    List<RegionInfo> replicaRegionInfos =
187        new ArrayList<RegionInfo>(regions.size() * (regionReplication - 1));
188    for (RegionInfo hri : regions) {
189      // start the index from 1
190      for (int i = 1; i < regionReplication; i++) {
191        replicaRegionInfos.add(RegionReplicaUtil.getRegionInfoForReplica(hri, i));
192      }
193    }
194    // create round robin procs. Note that we exclude the primary region's target server
195    TransitRegionStateProcedure[] replicaRegionAssignProcs =
196        env.getAssignmentManager().createRoundRobinAssignProcedures(replicaRegionInfos,
197          Collections.singletonList(targetServer));
198    // combine both the procs and return the result
199    return ArrayUtils.addAll(primaryRegionProcs, replicaRegionAssignProcs);
200  }
201
202  static TransitRegionStateProcedure[] createAssignProceduresForOpeningNewRegions(
203      MasterProcedureEnv env, List<RegionInfo> regions, int regionReplication,
204      ServerName targetServer) {
205    return createAssignProcedures(env, regions, regionReplication, targetServer, false);
206  }
207
208  static void reopenRegionsForRollback(MasterProcedureEnv env, List<RegionInfo> regions,
209      int regionReplication, ServerName targetServer) {
210    TransitRegionStateProcedure[] procs =
211        createAssignProcedures(env, regions, regionReplication, targetServer, true);
212    if (procs.length > 0) {
213      env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs);
214    }
215  }
216
217  static void removeNonDefaultReplicas(MasterProcedureEnv env, Stream<RegionInfo> regions,
218      int regionReplication) {
219    // Remove from in-memory states
220    regions.flatMap(hri -> IntStream.range(1, regionReplication)
221      .mapToObj(i -> RegionReplicaUtil.getRegionInfoForReplica(hri, i))).forEach(hri -> {
222        env.getAssignmentManager().getRegionStates().deleteRegion(hri);
223        env.getMasterServices().getServerManager().removeRegion(hri);
224        FavoredNodesManager fnm = env.getMasterServices().getFavoredNodesManager();
225        if (fnm != null) {
226          fnm.deleteFavoredNodesForRegions(Collections.singletonList(hri));
227        }
228      });
229  }
230
231  static void checkClosedRegion(MasterProcedureEnv env, RegionInfo regionInfo) throws IOException {
232    if (WALSplitUtil.hasRecoveredEdits(env.getMasterConfiguration(), regionInfo)) {
233      throw new IOException("Recovered.edits are found in Region: " + regionInfo +
234        ", abort split/merge to prevent data loss");
235    }
236  }
237}