001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.List; 025import java.util.SortedSet; 026import java.util.TreeSet; 027 028import org.apache.hadoop.hbase.CoprocessorEnvironment; 029import org.apache.hadoop.hbase.HBaseInterfaceAudience; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.client.Mutation; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 034import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 035import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; 036import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; 037import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; 038import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; 039import org.apache.hadoop.hbase.regionserver.HRegion; 040import org.apache.hadoop.hbase.regionserver.WrongRegionException; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.apache.yetus.audience.InterfaceStability; 044 045import com.google.protobuf.RpcCallback; 046import com.google.protobuf.RpcController; 047import com.google.protobuf.Service; 048 049/** 050 * This class demonstrates how to implement atomic multi row transactions using 051 * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} 052 * and Coprocessor endpoints. 053 * 054 * Defines a protocol to perform multi row transactions. 055 * See {@link MultiRowMutationEndpoint} for the implementation. 056 * <br> 057 * See 058 * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} 059 * for details and limitations. 060 * <br> 061 * Example: 062 * <code> 063 * List<Mutation> mutations = ...; 064 * Put p1 = new Put(row1); 065 * Put p2 = new Put(row2); 066 * ... 067 * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p1); 068 * Mutate m2 = ProtobufUtil.toMutate(MutateType.PUT, p2); 069 * MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); 070 * mrmBuilder.addMutationRequest(m1); 071 * mrmBuilder.addMutationRequest(m2); 072 * CoprocessorRpcChannel channel = t.coprocessorService(ROW); 073 * MultiRowMutationService.BlockingInterface service = 074 * MultiRowMutationService.newBlockingStub(channel); 075 * MutateRowsRequest mrm = mrmBuilder.build(); 076 * service.mutateRows(null, mrm); 077 * </code> 078 */ 079@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) 080@InterfaceStability.Evolving 081public class MultiRowMutationEndpoint extends MultiRowMutationService implements RegionCoprocessor { 082 private RegionCoprocessorEnvironment env; 083 @Override 084 public void mutateRows(RpcController controller, MutateRowsRequest request, 085 RpcCallback<MutateRowsResponse> done) { 086 MutateRowsResponse response = MutateRowsResponse.getDefaultInstance(); 087 try { 088 // set of rows to lock, sorted to avoid deadlocks 089 SortedSet<byte[]> rowsToLock = new TreeSet<>(Bytes.BYTES_COMPARATOR); 090 List<MutationProto> mutateRequestList = request.getMutationRequestList(); 091 List<Mutation> mutations = new ArrayList<>(mutateRequestList.size()); 092 for (MutationProto m : mutateRequestList) { 093 mutations.add(ProtobufUtil.toMutation(m)); 094 } 095 096 RegionInfo regionInfo = env.getRegion().getRegionInfo(); 097 for (Mutation m : mutations) { 098 // check whether rows are in range for this region 099 if (!HRegion.rowIsInRange(regionInfo, m.getRow())) { 100 String msg = "Requested row out of range '" 101 + Bytes.toStringBinary(m.getRow()) + "'"; 102 if (rowsToLock.isEmpty()) { 103 // if this is the first row, region might have moved, 104 // allow client to retry 105 throw new WrongRegionException(msg); 106 } else { 107 // rows are split between regions, do not retry 108 throw new org.apache.hadoop.hbase.DoNotRetryIOException(msg); 109 } 110 } 111 rowsToLock.add(m.getRow()); 112 } 113 // call utility method on region 114 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 115 long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE; 116 env.getRegion().mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce); 117 } catch (IOException e) { 118 CoprocessorRpcUtils.setControllerException(controller, e); 119 } 120 done.run(response); 121 } 122 123 @Override 124 public Iterable<Service> getServices() { 125 return Collections.singleton(this); 126 } 127 128 /** 129 * Stores a reference to the coprocessor environment provided by the 130 * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this 131 * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded 132 * on a table region, so always expects this to be an instance of 133 * {@link RegionCoprocessorEnvironment}. 134 * @param env the environment provided by the coprocessor host 135 * @throws IOException if the provided environment is not an instance of 136 * {@code RegionCoprocessorEnvironment} 137 */ 138 @Override 139 public void start(CoprocessorEnvironment env) throws IOException { 140 if (env instanceof RegionCoprocessorEnvironment) { 141 this.env = (RegionCoprocessorEnvironment)env; 142 } else { 143 throw new CoprocessorException("Must be loaded on a table region!"); 144 } 145 } 146 147 @Override 148 public void stop(CoprocessorEnvironment env) throws IOException { 149 // nothing to do 150 } 151}