001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025
026import org.apache.hadoop.hbase.CellScannable;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.DoNotRetryIOException;
029import org.apache.hadoop.hbase.HRegionInfo;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
035import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
040import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
041
042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
043
044/**
045 * Callable that handles the <code>multi</code> method call going against a single
046 * regionserver; i.e. A RegionServerCallable for the multi call (It is NOT a
047 * RegionServerCallable that goes against multiple regions).
048 * @param <R>
049 */
050@InterfaceAudience.Private
051class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse> {
052  private MultiAction multiAction;
053  private boolean cellBlock;
054
055  MultiServerCallable(final ClusterConnection connection, final TableName tableName,
056      final ServerName location, final MultiAction multi, RpcController rpcController,
057      int rpcTimeout, RetryingTimeTracker tracker, int priority) {
058    super(connection, tableName, null, rpcController, rpcTimeout, tracker, priority);
059    this.multiAction = multi;
060    // RegionServerCallable has HRegionLocation field, but this is a multi-region request.
061    // Using region info from parent HRegionLocation would be a mistake for this class; so
062    // we will store the server here, and throw if someone tries to obtain location/regioninfo.
063    this.location = new HRegionLocation(null, location);
064    this.cellBlock = isCellBlock();
065  }
066
067  public void reset(ServerName location, MultiAction multiAction) {
068    this.location = new HRegionLocation(null, location);
069    this.multiAction = multiAction;
070    this.cellBlock = isCellBlock();
071  }
072
073  @Override
074  protected HRegionLocation getLocation() {
075    throw new RuntimeException("Cannot get region location for multi-region request");
076  }
077
078  @Override
079  public HRegionInfo getHRegionInfo() {
080    throw new RuntimeException("Cannot get region info for multi-region request");
081  }
082
083  MultiAction getMulti() {
084    return this.multiAction;
085  }
086
087  @Override
088  protected MultiResponse rpcCall() throws Exception {
089    int countOfActions = this.multiAction.size();
090    if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
091    MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
092    RegionAction.Builder regionActionBuilder = RegionAction.newBuilder();
093    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
094    MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
095
096    // Pre-size. Presume at least a KV per Action. There are likely more.
097    List<CellScannable> cells =
098        (this.cellBlock ? new ArrayList<CellScannable>(countOfActions) : null);
099
100    long nonceGroup = multiAction.getNonceGroup();
101
102    // Map from a created RegionAction to the original index for a RowMutations within
103    // the original list of actions. This will be used to process the results when there
104    // is RowMutations in the action list.
105    Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
106    // The multi object is a list of Actions by region. Iterate by region.
107    for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
108      final byte [] regionName = e.getKey();
109      final List<Action> actions = e.getValue();
110      if (this.cellBlock) {
111        // Send data in cellblocks.
112        // multiRequestBuilder will be populated with region actions.
113        // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
114        // action list.
115        RequestConverter.buildNoDataRegionActions(regionName, actions, cells, multiRequestBuilder,
116          regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
117      }
118      else {
119        // multiRequestBuilder will be populated with region actions.
120        // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
121        // action list.
122        RequestConverter.buildRegionActions(regionName, actions, multiRequestBuilder,
123          regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
124      }
125    }
126
127    if (cells != null) {
128      setRpcControllerCellScanner(CellUtil.createCellScanner(cells));
129    }
130    ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
131    ClientProtos.MultiResponse responseProto = getStub().multi(getRpcController(), requestProto);
132    if (responseProto == null) return null; // Occurs on cancel
133    return ResponseConverter.getResults(requestProto, rowMutationsIndexMap, responseProto,
134      getRpcControllerCellScanner());
135  }
136
137  /**
138   * @return True if we should send data in cellblocks.  This is an expensive call.  Cache the
139   * result if you can rather than call each time.
140   */
141  private boolean isCellBlock() {
142    // This is not exact -- the configuration could have changed on us after connection was set up
143    // but it will do for now.
144    ClusterConnection conn = getConnection();
145    return conn.hasCellBlockSupport();
146  }
147
148  @Override
149  public void prepare(boolean reload) throws IOException {
150    // Use the location we were given in the constructor rather than go look it up.
151    setStub(getConnection().getClient(this.location.getServerName()));
152  }
153
154  @VisibleForTesting
155  ServerName getServerName() {
156    return location.getServerName();
157  }
158}