001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.List;
025import java.util.SortedSet;
026import java.util.TreeSet;
027
028import org.apache.hadoop.hbase.CoprocessorEnvironment;
029import org.apache.hadoop.hbase.HBaseInterfaceAudience;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.client.Mutation;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
034import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
035import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
036import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
037import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
038import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
039import org.apache.hadoop.hbase.regionserver.HRegion;
040import org.apache.hadoop.hbase.regionserver.WrongRegionException;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.yetus.audience.InterfaceStability;
044
045import com.google.protobuf.RpcCallback;
046import com.google.protobuf.RpcController;
047import com.google.protobuf.Service;
048
049/**
050 * This class demonstrates how to implement atomic multi row transactions using
051 * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)}
052 * and Coprocessor endpoints.
053 *
054 * Defines a protocol to perform multi row transactions.
055 * See {@link MultiRowMutationEndpoint} for the implementation.
056 * <br>
057 * See
058 * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)}
059 * for details and limitations.
060 * <br>
061 * Example:
062 * <code>
063 * List&lt;Mutation&gt; mutations = ...;
064 * Put p1 = new Put(row1);
065 * Put p2 = new Put(row2);
066 * ...
067 * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p1);
068 * Mutate m2 = ProtobufUtil.toMutate(MutateType.PUT, p2);
069 * MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
070 * mrmBuilder.addMutationRequest(m1);
071 * mrmBuilder.addMutationRequest(m2);
072 * CoprocessorRpcChannel channel = t.coprocessorService(ROW);
073 * MultiRowMutationService.BlockingInterface service =
074 *    MultiRowMutationService.newBlockingStub(channel);
075 * MutateRowsRequest mrm = mrmBuilder.build();
076 * service.mutateRows(null, mrm);
077 * </code>
078 */
079@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
080@InterfaceStability.Evolving
081public class MultiRowMutationEndpoint extends MultiRowMutationService implements RegionCoprocessor {
082  private RegionCoprocessorEnvironment env;
083  @Override
084  public void mutateRows(RpcController controller, MutateRowsRequest request,
085      RpcCallback<MutateRowsResponse> done) {
086    MutateRowsResponse response = MutateRowsResponse.getDefaultInstance();
087    try {
088      // set of rows to lock, sorted to avoid deadlocks
089      SortedSet<byte[]> rowsToLock = new TreeSet<>(Bytes.BYTES_COMPARATOR);
090      List<MutationProto> mutateRequestList = request.getMutationRequestList();
091      List<Mutation> mutations = new ArrayList<>(mutateRequestList.size());
092      for (MutationProto m : mutateRequestList) {
093        mutations.add(ProtobufUtil.toMutation(m));
094      }
095
096      RegionInfo regionInfo = env.getRegion().getRegionInfo();
097      for (Mutation m : mutations) {
098        // check whether rows are in range for this region
099        if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
100          String msg = "Requested row out of range '"
101              + Bytes.toStringBinary(m.getRow()) + "'";
102          if (rowsToLock.isEmpty()) {
103            // if this is the first row, region might have moved,
104            // allow client to retry
105            throw new WrongRegionException(msg);
106          } else {
107            // rows are split between regions, do not retry
108            throw new org.apache.hadoop.hbase.DoNotRetryIOException(msg);
109          }
110        }
111        rowsToLock.add(m.getRow());
112      }
113      // call utility method on region
114      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
115      long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
116      env.getRegion().mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
117    } catch (IOException e) {
118      CoprocessorRpcUtils.setControllerException(controller, e);
119    }
120    done.run(response);
121  }
122
123  @Override
124  public Iterable<Service> getServices() {
125    return Collections.singleton(this);
126  }
127
128  /**
129   * Stores a reference to the coprocessor environment provided by the
130   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
131   * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
132   * on a table region, so always expects this to be an instance of
133   * {@link RegionCoprocessorEnvironment}.
134   * @param env the environment provided by the coprocessor host
135   * @throws IOException if the provided environment is not an instance of
136   * {@code RegionCoprocessorEnvironment}
137   */
138  @Override
139  public void start(CoprocessorEnvironment env) throws IOException {
140    if (env instanceof RegionCoprocessorEnvironment) {
141      this.env = (RegionCoprocessorEnvironment)env;
142    } else {
143      throw new CoprocessorException("Must be loaded on a table region!");
144    }
145  }
146
147  @Override
148  public void stop(CoprocessorEnvironment env) throws IOException {
149    // nothing to do
150  }
151}