001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.List; 025import java.util.SortedSet; 026import java.util.TreeSet; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CompareOperator; 029import org.apache.hadoop.hbase.CoprocessorEnvironment; 030import org.apache.hadoop.hbase.HBaseInterfaceAudience; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.PrivateCellUtil; 033import org.apache.hadoop.hbase.client.Get; 034import org.apache.hadoop.hbase.client.Mutation; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.filter.ByteArrayComparable; 038import org.apache.hadoop.hbase.filter.Filter; 039import org.apache.hadoop.hbase.io.TimeRange; 040import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 041import org.apache.hadoop.hbase.regionserver.HRegion; 042import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 043import org.apache.hadoop.hbase.regionserver.Region; 044import org.apache.hadoop.hbase.regionserver.RegionScanner; 045import org.apache.hadoop.hbase.regionserver.WrongRegionException; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.apache.yetus.audience.InterfaceStability; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 053import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 054import org.apache.hbase.thirdparty.com.google.protobuf.Service; 055 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; 062 063/** 064 * This class implements atomic multi row transactions using 065 * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} and Coprocessor 066 * endpoints. We can also specify some conditions to perform conditional update. Defines a protocol 067 * to perform multi row transactions. See {@link MultiRowMutationEndpoint} for the implementation. 068 * <br> 069 * See {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} for details and 070 * limitations. <br> 071 * Example: <code> 072 * Put p = new Put(row1); 073 * Delete d = new Delete(row2); 074 * Increment i = new Increment(row3); 075 * Append a = new Append(row4); 076 * ... 077 * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p); 078 * Mutate m2 = ProtobufUtil.toMutate(MutateType.DELETE, d); 079 * Mutate m3 = ProtobufUtil.toMutate(MutateType.INCREMENT, i); 080 * Mutate m4 = ProtobufUtil.toMutate(MutateType.Append, a); 081 * 082 * MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); 083 * mrmBuilder.addMutationRequest(m1); 084 * mrmBuilder.addMutationRequest(m2); 085 * mrmBuilder.addMutationRequest(m3); 086 * mrmBuilder.addMutationRequest(m4); 087 * 088 * // We can also specify conditions to preform conditional update 089 * mrmBuilder.addCondition(ProtobufUtil.toCondition(row, FAMILY, QUALIFIER, 090 * CompareOperator.EQUAL, value, TimeRange.allTime())); 091 * 092 * CoprocessorRpcChannel channel = t.coprocessorService(ROW); 093 * MultiRowMutationService.BlockingInterface service = 094 * MultiRowMutationService.newBlockingStub(channel); 095 * MutateRowsRequest mrm = mrmBuilder.build(); 096 * MutateRowsResponse response = service.mutateRows(null, mrm); 097 * 098 * // We can get the result of the conditional update 099 * boolean processed = response.getProcessed(); 100 * </code> 101 */ 102@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) 103@InterfaceStability.Evolving 104public class MultiRowMutationEndpoint extends MultiRowMutationService implements RegionCoprocessor { 105 private static final Logger LOGGER = LoggerFactory.getLogger(HRegion.class); 106 107 private RegionCoprocessorEnvironment env; 108 109 @Override 110 public void mutateRows(RpcController controller, MutateRowsRequest request, 111 RpcCallback<MutateRowsResponse> done) { 112 boolean matches = true; 113 List<Region.RowLock> rowLocks = null; 114 try { 115 // set of rows to lock, sorted to avoid deadlocks 116 SortedSet<byte[]> rowsToLock = new TreeSet<>(Bytes.BYTES_COMPARATOR); 117 List<MutationProto> mutateRequestList = request.getMutationRequestList(); 118 List<Mutation> mutations = new ArrayList<>(mutateRequestList.size()); 119 for (MutationProto m : mutateRequestList) { 120 mutations.add(ProtobufUtil.toMutation(m)); 121 } 122 123 Region region = env.getRegion(); 124 125 RegionInfo regionInfo = region.getRegionInfo(); 126 for (Mutation m : mutations) { 127 // check whether rows are in range for this region 128 if (!HRegion.rowIsInRange(regionInfo, m.getRow())) { 129 String msg = "Requested row out of range '" + Bytes.toStringBinary(m.getRow()) + "'"; 130 if (rowsToLock.isEmpty()) { 131 // if this is the first row, region might have moved, 132 // allow client to retry 133 throw new WrongRegionException(msg); 134 } else { 135 // rows are split between regions, do not retry 136 throw new org.apache.hadoop.hbase.DoNotRetryIOException(msg); 137 } 138 } 139 rowsToLock.add(m.getRow()); 140 } 141 142 if (request.getConditionCount() > 0) { 143 // Get row locks for the mutations and the conditions 144 rowLocks = new ArrayList<>(); 145 for (ClientProtos.Condition condition : request.getConditionList()) { 146 rowsToLock.add(condition.getRow().toByteArray()); 147 } 148 for (byte[] row : rowsToLock) { 149 try { 150 Region.RowLock rowLock = region.getRowLock(row, false); // write lock 151 rowLocks.add(rowLock); 152 } catch (IOException ioe) { 153 LOGGER.warn("Failed getting lock, row={}, in region {}", Bytes.toStringBinary(row), 154 this, ioe); 155 throw ioe; 156 } 157 } 158 159 // Check if all the conditions match 160 for (ClientProtos.Condition condition : request.getConditionList()) { 161 if (!matches(region, condition)) { 162 matches = false; 163 break; 164 } 165 } 166 } 167 168 if (matches) { 169 // call utility method on region 170 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 171 long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE; 172 region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce); 173 } 174 } catch (IOException e) { 175 CoprocessorRpcUtils.setControllerException(controller, e); 176 } finally { 177 if (rowLocks != null) { 178 // Release the acquired row locks 179 for (Region.RowLock rowLock : rowLocks) { 180 rowLock.release(); 181 } 182 } 183 } 184 done.run(MutateRowsResponse.newBuilder().setProcessed(matches).build()); 185 } 186 187 private boolean matches(Region region, ClientProtos.Condition condition) throws IOException { 188 byte[] row = condition.getRow().toByteArray(); 189 190 Filter filter = null; 191 byte[] family = null; 192 byte[] qualifier = null; 193 CompareOperator op = null; 194 ByteArrayComparable comparator = null; 195 196 if (condition.hasFilter()) { 197 filter = ProtobufUtil.toFilter(condition.getFilter()); 198 } else { 199 family = condition.getFamily().toByteArray(); 200 qualifier = condition.getQualifier().toByteArray(); 201 op = CompareOperator.valueOf(condition.getCompareType().name()); 202 comparator = ProtobufUtil.toComparator(condition.getComparator()); 203 } 204 205 TimeRange timeRange = condition.hasTimeRange() 206 ? ProtobufUtil.toTimeRange(condition.getTimeRange()) 207 : TimeRange.allTime(); 208 209 Get get = new Get(row); 210 if (family != null) { 211 checkFamily(region, family); 212 get.addColumn(family, qualifier); 213 } 214 if (filter != null) { 215 get.setFilter(filter); 216 } 217 if (timeRange != null) { 218 get.setTimeRange(timeRange.getMin(), timeRange.getMax()); 219 } 220 221 boolean matches = false; 222 try (RegionScanner scanner = region.getScanner(new Scan(get))) { 223 // NOTE: Please don't use HRegion.get() instead, 224 // because it will copy cells to heap. See HBASE-26036 225 List<Cell> result = new ArrayList<>(); 226 scanner.next(result); 227 if (filter != null) { 228 if (!result.isEmpty()) { 229 matches = true; 230 } 231 } else { 232 boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; 233 if (result.isEmpty() && valueIsNull) { 234 matches = true; 235 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { 236 matches = true; 237 } else if (result.size() == 1 && !valueIsNull) { 238 Cell kv = result.get(0); 239 int compareResult = PrivateCellUtil.compareValue(kv, comparator); 240 matches = matches(op, compareResult); 241 } 242 } 243 } 244 return matches; 245 } 246 247 private void checkFamily(Region region, byte[] family) throws NoSuchColumnFamilyException { 248 if (!region.getTableDescriptor().hasColumnFamily(family)) { 249 throw new NoSuchColumnFamilyException("Column family " + Bytes.toString(family) 250 + " does not exist in region " + this + " in table " + region.getTableDescriptor()); 251 } 252 } 253 254 private boolean matches(CompareOperator op, int compareResult) { 255 switch (op) { 256 case LESS: 257 return compareResult < 0; 258 case LESS_OR_EQUAL: 259 return compareResult <= 0; 260 case EQUAL: 261 return compareResult == 0; 262 case NOT_EQUAL: 263 return compareResult != 0; 264 case GREATER_OR_EQUAL: 265 return compareResult >= 0; 266 case GREATER: 267 return compareResult > 0; 268 default: 269 throw new RuntimeException("Unknown Compare op " + op.name()); 270 } 271 } 272 273 @Override 274 public Iterable<Service> getServices() { 275 return Collections.singleton(this); 276 } 277 278 /** 279 * Stores a reference to the coprocessor environment provided by the 280 * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this 281 * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded on 282 * a table region, so always expects this to be an instance of 283 * {@link RegionCoprocessorEnvironment}. 284 * @param env the environment provided by the coprocessor host 285 * @throws IOException if the provided environment is not an instance of 286 * {@code RegionCoprocessorEnvironment} 287 */ 288 @Override 289 public void start(CoprocessorEnvironment env) throws IOException { 290 if (env instanceof RegionCoprocessorEnvironment) { 291 this.env = (RegionCoprocessorEnvironment) env; 292 } else { 293 throw new CoprocessorException("Must be loaded on a table region!"); 294 } 295 } 296 297 @Override 298 public void stop(CoprocessorEnvironment env) throws IOException { 299 // nothing to do 300 } 301}