View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.coprocessor;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.SortedSet;
24  import java.util.TreeSet;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.hbase.Coprocessor;
29  import org.apache.hadoop.hbase.CoprocessorEnvironment;
30  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HRegionInfo;
33  import org.apache.hadoop.hbase.client.Mutation;
34  import org.apache.hadoop.hbase.regionserver.HRegion;
35  import org.apache.hadoop.hbase.regionserver.WrongRegionException;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
39  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
40  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
41  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
42  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
43  
44  import com.google.protobuf.RpcCallback;
45  import com.google.protobuf.RpcController;
46  import com.google.protobuf.Service;
47  
48  /**
49   * This class demonstrates how to implement atomic multi row transactions using
50   * {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)}
51   * and Coprocessor endpoints.
52   *
53   * Defines a protocol to perform multi row transactions.
54   * See {@link MultiRowMutationEndpoint} for the implementation.
55   * <br>
56   * See
57   * {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)}
58   * for details and limitations.
59   * <br>
60   * Example:
61   * <code>
62   * List&lt;Mutation&gt; mutations = ...;
63   * Put p1 = new Put(row1);
64   * Put p2 = new Put(row2);
65   * ...
66   * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p1);
67   * Mutate m2 = ProtobufUtil.toMutate(MutateType.PUT, p2);
68   * MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
69   * mrmBuilder.addMutationRequest(m1);
70   * mrmBuilder.addMutationRequest(m2);
71   * CoprocessorRpcChannel channel = t.coprocessorService(ROW);
72   * MultiRowMutationService.BlockingInterface service = 
73   *    MultiRowMutationService.newBlockingStub(channel);
74   * MutateRowsRequest mrm = mrmBuilder.build();
75   * service.mutateRows(null, mrm);
76   * </code>
77   */
78  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
79  @InterfaceStability.Evolving
80  public class MultiRowMutationEndpoint extends MultiRowMutationService implements
81  CoprocessorService, Coprocessor {
82    private RegionCoprocessorEnvironment env;
83    @Override
84    public void mutateRows(RpcController controller, MutateRowsRequest request,
85        RpcCallback<MutateRowsResponse> done) {
86      MutateRowsResponse response = MutateRowsResponse.getDefaultInstance();
87      try {
88        // set of rows to lock, sorted to avoid deadlocks
89        SortedSet<byte[]> rowsToLock = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
90        List<MutationProto> mutateRequestList = request.getMutationRequestList();
91        List<Mutation> mutations = new ArrayList<Mutation>(mutateRequestList.size());
92        for (MutationProto m : mutateRequestList) {
93          mutations.add(ProtobufUtil.toMutation(m));
94        }
95  
96        HRegionInfo regionInfo = env.getRegion().getRegionInfo();
97        for (Mutation m : mutations) {
98          // check whether rows are in range for this region
99          if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
100           String msg = "Requested row out of range '"
101               + Bytes.toStringBinary(m.getRow()) + "'";
102           if (rowsToLock.isEmpty()) {
103             // if this is the first row, region might have moved,
104             // allow client to retry
105             throw new WrongRegionException(msg);
106           } else {
107             // rows are split between regions, do not retry
108             throw new org.apache.hadoop.hbase.DoNotRetryIOException(msg);
109           }
110         }
111         rowsToLock.add(m.getRow());
112       }
113       // call utility method on region
114       long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
115       long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
116       env.getRegion().mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
117     } catch (IOException e) {
118       ResponseConverter.setControllerException(controller, e);
119     }
120     done.run(response);
121   }
122 
123 
124   @Override
125   public Service getService() {
126     return this;
127   }
128 
129   /**
130    * Stores a reference to the coprocessor environment provided by the
131    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
132    * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
133    * on a table region, so always expects this to be an instance of
134    * {@link RegionCoprocessorEnvironment}.
135    * @param env the environment provided by the coprocessor host
136    * @throws IOException if the provided environment is not an instance of
137    * {@code RegionCoprocessorEnvironment}
138    */
139   @Override
140   public void start(CoprocessorEnvironment env) throws IOException {
141     if (env instanceof RegionCoprocessorEnvironment) {
142       this.env = (RegionCoprocessorEnvironment)env;
143     } else {
144       throw new CoprocessorException("Must be loaded on a table region!");
145     }
146   }
147 
148   @Override
149   public void stop(CoprocessorEnvironment env) throws IOException {
150     // nothing to do
151   }
152 }