001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import com.google.protobuf.RpcCallback; 021import com.google.protobuf.RpcController; 022import com.google.protobuf.Service; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.List; 028import java.util.SortedSet; 029import java.util.TreeSet; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CompareOperator; 032import org.apache.hadoop.hbase.CoprocessorEnvironment; 033import org.apache.hadoop.hbase.HBaseInterfaceAudience; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.client.Get; 037import org.apache.hadoop.hbase.client.Mutation; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.filter.ByteArrayComparable; 041import org.apache.hadoop.hbase.filter.Filter; 042import org.apache.hadoop.hbase.io.TimeRange; 043import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 044import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 045import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; 046import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; 047import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; 048import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; 049import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 052import org.apache.hadoop.hbase.regionserver.Region; 053import org.apache.hadoop.hbase.regionserver.RegionScanner; 054import org.apache.hadoop.hbase.regionserver.WrongRegionException; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.apache.yetus.audience.InterfaceStability; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * This class implements atomic multi row transactions using 063 * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} and Coprocessor 064 * endpoints. We can also specify some conditions to perform conditional update. Defines a protocol 065 * to perform multi row transactions. See {@link MultiRowMutationEndpoint} for the implementation. 066 * <br> 067 * See {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} for details and 068 * limitations. <br> 069 * Example: <code> 070 * Put p = new Put(row1); 071 * Delete d = new Delete(row2); 072 * Increment i = new Increment(row3); 073 * Append a = new Append(row4); 074 * ... 075 * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p); 076 * Mutate m2 = ProtobufUtil.toMutate(MutateType.DELETE, d); 077 * Mutate m3 = ProtobufUtil.toMutate(MutateType.INCREMENT, i); 078 * Mutate m4 = ProtobufUtil.toMutate(MutateType.Append, a); 079 * 080 * MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); 081 * mrmBuilder.addMutationRequest(m1); 082 * mrmBuilder.addMutationRequest(m2); 083 * mrmBuilder.addMutationRequest(m3); 084 * mrmBuilder.addMutationRequest(m4); 085 * 086 * // We can also specify conditions to preform conditional update 087 * mrmBuilder.addCondition(ProtobufUtil.toCondition(row, FAMILY, QUALIFIER, 088 * CompareOperator.EQUAL, value, TimeRange.allTime())); 089 * 090 * CoprocessorRpcChannel channel = t.coprocessorService(ROW); 091 * MultiRowMutationService.BlockingInterface service = 092 * MultiRowMutationService.newBlockingStub(channel); 093 * MutateRowsRequest mrm = mrmBuilder.build(); 094 * MutateRowsResponse response = service.mutateRows(null, mrm); 095 * 096 * // We can get the result of the conditional update 097 * boolean processed = response.getProcessed(); 098 * </code> 099 */ 100@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) 101@InterfaceStability.Evolving 102public class MultiRowMutationEndpoint extends MultiRowMutationService implements RegionCoprocessor { 103 private static final Logger LOGGER = LoggerFactory.getLogger(HRegion.class); 104 105 private RegionCoprocessorEnvironment env; 106 107 @Override 108 public void mutateRows(RpcController controller, MutateRowsRequest request, 109 RpcCallback<MutateRowsResponse> done) { 110 boolean matches = true; 111 List<Region.RowLock> rowLocks = null; 112 try { 113 // set of rows to lock, sorted to avoid deadlocks 114 SortedSet<byte[]> rowsToLock = new TreeSet<>(Bytes.BYTES_COMPARATOR); 115 List<MutationProto> mutateRequestList = request.getMutationRequestList(); 116 List<Mutation> mutations = new ArrayList<>(mutateRequestList.size()); 117 for (MutationProto m : mutateRequestList) { 118 mutations.add(ProtobufUtil.toMutation(m)); 119 } 120 121 Region region = env.getRegion(); 122 123 RegionInfo regionInfo = region.getRegionInfo(); 124 for (Mutation m : mutations) { 125 // check whether rows are in range for this region 126 if (!HRegion.rowIsInRange(regionInfo, m.getRow())) { 127 String msg = "Requested row out of range '" + Bytes.toStringBinary(m.getRow()) + "'"; 128 if (rowsToLock.isEmpty()) { 129 // if this is the first row, region might have moved, 130 // allow client to retry 131 throw new WrongRegionException(msg); 132 } else { 133 // rows are split between regions, do not retry 134 throw new org.apache.hadoop.hbase.DoNotRetryIOException(msg); 135 } 136 } 137 rowsToLock.add(m.getRow()); 138 } 139 140 if (request.getConditionCount() > 0) { 141 // Get row locks for the mutations and the conditions 142 rowLocks = new ArrayList<>(); 143 for (ClientProtos.Condition condition : request.getConditionList()) { 144 rowsToLock.add(condition.getRow().toByteArray()); 145 } 146 for (byte[] row : rowsToLock) { 147 try { 148 Region.RowLock rowLock = region.getRowLock(row, false); // write lock 149 rowLocks.add(rowLock); 150 } catch (IOException ioe) { 151 LOGGER.warn("Failed getting lock, row={}, in region {}", Bytes.toStringBinary(row), 152 this, ioe); 153 throw ioe; 154 } 155 } 156 157 // Check if all the conditions match 158 for (ClientProtos.Condition condition : request.getConditionList()) { 159 if (!matches(region, condition)) { 160 matches = false; 161 break; 162 } 163 } 164 } 165 166 if (matches) { 167 // call utility method on region 168 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 169 long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE; 170 region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce); 171 } 172 } catch (IOException e) { 173 CoprocessorRpcUtils.setControllerException(controller, e); 174 } finally { 175 if (rowLocks != null) { 176 // Release the acquired row locks 177 for (Region.RowLock rowLock : rowLocks) { 178 rowLock.release(); 179 } 180 } 181 } 182 done.run(MutateRowsResponse.newBuilder().setProcessed(matches).build()); 183 } 184 185 private boolean matches(Region region, ClientProtos.Condition condition) throws IOException { 186 byte[] row = condition.getRow().toByteArray(); 187 188 Filter filter = null; 189 byte[] family = null; 190 byte[] qualifier = null; 191 CompareOperator op = null; 192 ByteArrayComparable comparator = null; 193 194 if (condition.hasFilter()) { 195 filter = ProtobufUtil.toFilter(condition.getFilter()); 196 } else { 197 family = condition.getFamily().toByteArray(); 198 qualifier = condition.getQualifier().toByteArray(); 199 op = CompareOperator.valueOf(condition.getCompareType().name()); 200 comparator = ProtobufUtil.toComparator(condition.getComparator()); 201 } 202 203 TimeRange timeRange = condition.hasTimeRange() 204 ? ProtobufUtil.toTimeRange(condition.getTimeRange()) 205 : TimeRange.allTime(); 206 207 Get get = new Get(row); 208 if (family != null) { 209 checkFamily(region, family); 210 get.addColumn(family, qualifier); 211 } 212 if (filter != null) { 213 get.setFilter(filter); 214 } 215 if (timeRange != null) { 216 get.setTimeRange(timeRange.getMin(), timeRange.getMax()); 217 } 218 219 boolean matches = false; 220 try (RegionScanner scanner = region.getScanner(new Scan(get))) { 221 // NOTE: Please don't use HRegion.get() instead, 222 // because it will copy cells to heap. See HBASE-26036 223 List<Cell> result = new ArrayList<>(); 224 scanner.next(result); 225 if (filter != null) { 226 if (!result.isEmpty()) { 227 matches = true; 228 } 229 } else { 230 boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; 231 if (result.isEmpty() && valueIsNull) { 232 matches = true; 233 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { 234 matches = true; 235 } else if (result.size() == 1 && !valueIsNull) { 236 Cell kv = result.get(0); 237 int compareResult = PrivateCellUtil.compareValue(kv, comparator); 238 matches = matches(op, compareResult); 239 } 240 } 241 } 242 return matches; 243 } 244 245 private void checkFamily(Region region, byte[] family) throws NoSuchColumnFamilyException { 246 if (!region.getTableDescriptor().hasColumnFamily(family)) { 247 throw new NoSuchColumnFamilyException("Column family " + Bytes.toString(family) 248 + " does not exist in region " + this + " in table " + region.getTableDescriptor()); 249 } 250 } 251 252 private boolean matches(CompareOperator op, int compareResult) { 253 switch (op) { 254 case LESS: 255 return compareResult < 0; 256 case LESS_OR_EQUAL: 257 return compareResult <= 0; 258 case EQUAL: 259 return compareResult == 0; 260 case NOT_EQUAL: 261 return compareResult != 0; 262 case GREATER_OR_EQUAL: 263 return compareResult >= 0; 264 case GREATER: 265 return compareResult > 0; 266 default: 267 throw new RuntimeException("Unknown Compare op " + op.name()); 268 } 269 } 270 271 @Override 272 public Iterable<Service> getServices() { 273 return Collections.singleton(this); 274 } 275 276 /** 277 * Stores a reference to the coprocessor environment provided by the 278 * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this 279 * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded on 280 * a table region, so always expects this to be an instance of 281 * {@link RegionCoprocessorEnvironment}. 282 * @param env the environment provided by the coprocessor host 283 * @throws IOException if the provided environment is not an instance of 284 * {@code RegionCoprocessorEnvironment} 285 */ 286 @Override 287 public void start(CoprocessorEnvironment env) throws IOException { 288 if (env instanceof RegionCoprocessorEnvironment) { 289 this.env = (RegionCoprocessorEnvironment) env; 290 } else { 291 throw new CoprocessorException("Must be loaded on a table region!"); 292 } 293 } 294 295 @Override 296 public void stop(CoprocessorEnvironment env) throws IOException { 297 // nothing to do 298 } 299}