001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import com.google.protobuf.RpcCallback;
021import com.google.protobuf.RpcController;
022import com.google.protobuf.Service;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.List;
028import java.util.SortedSet;
029import java.util.TreeSet;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CompareOperator;
032import org.apache.hadoop.hbase.CoprocessorEnvironment;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.PrivateCellUtil;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Mutation;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.filter.ByteArrayComparable;
041import org.apache.hadoop.hbase.filter.Filter;
042import org.apache.hadoop.hbase.io.TimeRange;
043import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
044import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
045import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
046import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
047import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
048import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
049import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
052import org.apache.hadoop.hbase.regionserver.Region;
053import org.apache.hadoop.hbase.regionserver.RegionScanner;
054import org.apache.hadoop.hbase.regionserver.WrongRegionException;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.apache.yetus.audience.InterfaceStability;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * This class implements atomic multi row transactions using
063 * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} and Coprocessor
064 * endpoints. We can also specify some conditions to perform conditional update. Defines a protocol
065 * to perform multi row transactions. See {@link MultiRowMutationEndpoint} for the implementation.
066 * <br>
067 * See {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} for details and
068 * limitations. <br>
069 * Example: <code>
070 * Put p = new Put(row1);
071 * Delete d = new Delete(row2);
072 * Increment i = new Increment(row3);
073 * Append a = new Append(row4);
074 * ...
075 * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p);
076 * Mutate m2 = ProtobufUtil.toMutate(MutateType.DELETE, d);
077 * Mutate m3 = ProtobufUtil.toMutate(MutateType.INCREMENT, i);
078 * Mutate m4 = ProtobufUtil.toMutate(MutateType.Append, a);
079 *
080 * MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
081 * mrmBuilder.addMutationRequest(m1);
082 * mrmBuilder.addMutationRequest(m2);
083 * mrmBuilder.addMutationRequest(m3);
084 * mrmBuilder.addMutationRequest(m4);
085 *
086 * // We can also specify conditions to preform conditional update
087 * mrmBuilder.addCondition(ProtobufUtil.toCondition(row, FAMILY, QUALIFIER,
088 *  CompareOperator.EQUAL, value, TimeRange.allTime()));
089 *
090 * CoprocessorRpcChannel channel = t.coprocessorService(ROW);
091 * MultiRowMutationService.BlockingInterface service =
092 *   MultiRowMutationService.newBlockingStub(channel);
093 * MutateRowsRequest mrm = mrmBuilder.build();
094 * MutateRowsResponse response = service.mutateRows(null, mrm);
095 *
096 * // We can get the result of the conditional update
097 * boolean processed = response.getProcessed();
098 * </code>
099 */
100@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
101@InterfaceStability.Evolving
102public class MultiRowMutationEndpoint extends MultiRowMutationService implements RegionCoprocessor {
103  private static final Logger LOGGER = LoggerFactory.getLogger(HRegion.class);
104
105  private RegionCoprocessorEnvironment env;
106
107  @Override
108  public void mutateRows(RpcController controller, MutateRowsRequest request,
109    RpcCallback<MutateRowsResponse> done) {
110    boolean matches = true;
111    List<Region.RowLock> rowLocks = null;
112    try {
113      // set of rows to lock, sorted to avoid deadlocks
114      SortedSet<byte[]> rowsToLock = new TreeSet<>(Bytes.BYTES_COMPARATOR);
115      List<MutationProto> mutateRequestList = request.getMutationRequestList();
116      List<Mutation> mutations = new ArrayList<>(mutateRequestList.size());
117      for (MutationProto m : mutateRequestList) {
118        mutations.add(ProtobufUtil.toMutation(m));
119      }
120
121      Region region = env.getRegion();
122
123      RegionInfo regionInfo = region.getRegionInfo();
124      for (Mutation m : mutations) {
125        // check whether rows are in range for this region
126        if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
127          String msg = "Requested row out of range '" + Bytes.toStringBinary(m.getRow()) + "'";
128          if (rowsToLock.isEmpty()) {
129            // if this is the first row, region might have moved,
130            // allow client to retry
131            throw new WrongRegionException(msg);
132          } else {
133            // rows are split between regions, do not retry
134            throw new org.apache.hadoop.hbase.DoNotRetryIOException(msg);
135          }
136        }
137        rowsToLock.add(m.getRow());
138      }
139
140      if (request.getConditionCount() > 0) {
141        // Get row locks for the mutations and the conditions
142        rowLocks = new ArrayList<>();
143        for (ClientProtos.Condition condition : request.getConditionList()) {
144          rowsToLock.add(condition.getRow().toByteArray());
145        }
146        for (byte[] row : rowsToLock) {
147          try {
148            Region.RowLock rowLock = region.getRowLock(row, false); // write lock
149            rowLocks.add(rowLock);
150          } catch (IOException ioe) {
151            LOGGER.warn("Failed getting lock, row={}, in region {}", Bytes.toStringBinary(row),
152              this, ioe);
153            throw ioe;
154          }
155        }
156
157        // Check if all the conditions match
158        for (ClientProtos.Condition condition : request.getConditionList()) {
159          if (!matches(region, condition)) {
160            matches = false;
161            break;
162          }
163        }
164      }
165
166      if (matches) {
167        // call utility method on region
168        long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
169        long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
170        region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
171      }
172    } catch (IOException e) {
173      CoprocessorRpcUtils.setControllerException(controller, e);
174    } finally {
175      if (rowLocks != null) {
176        // Release the acquired row locks
177        for (Region.RowLock rowLock : rowLocks) {
178          rowLock.release();
179        }
180      }
181    }
182    done.run(MutateRowsResponse.newBuilder().setProcessed(matches).build());
183  }
184
185  private boolean matches(Region region, ClientProtos.Condition condition) throws IOException {
186    byte[] row = condition.getRow().toByteArray();
187
188    Filter filter = null;
189    byte[] family = null;
190    byte[] qualifier = null;
191    CompareOperator op = null;
192    ByteArrayComparable comparator = null;
193
194    if (condition.hasFilter()) {
195      filter = ProtobufUtil.toFilter(condition.getFilter());
196    } else {
197      family = condition.getFamily().toByteArray();
198      qualifier = condition.getQualifier().toByteArray();
199      op = CompareOperator.valueOf(condition.getCompareType().name());
200      comparator = ProtobufUtil.toComparator(condition.getComparator());
201    }
202
203    TimeRange timeRange = condition.hasTimeRange()
204      ? ProtobufUtil.toTimeRange(condition.getTimeRange())
205      : TimeRange.allTime();
206
207    Get get = new Get(row);
208    if (family != null) {
209      checkFamily(region, family);
210      get.addColumn(family, qualifier);
211    }
212    if (filter != null) {
213      get.setFilter(filter);
214    }
215    if (timeRange != null) {
216      get.setTimeRange(timeRange.getMin(), timeRange.getMax());
217    }
218
219    boolean matches = false;
220    try (RegionScanner scanner = region.getScanner(new Scan(get))) {
221      // NOTE: Please don't use HRegion.get() instead,
222      // because it will copy cells to heap. See HBASE-26036
223      List<Cell> result = new ArrayList<>();
224      scanner.next(result);
225      if (filter != null) {
226        if (!result.isEmpty()) {
227          matches = true;
228        }
229      } else {
230        boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
231        if (result.isEmpty() && valueIsNull) {
232          matches = true;
233        } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
234          matches = true;
235        } else if (result.size() == 1 && !valueIsNull) {
236          Cell kv = result.get(0);
237          int compareResult = PrivateCellUtil.compareValue(kv, comparator);
238          matches = matches(op, compareResult);
239        }
240      }
241    }
242    return matches;
243  }
244
245  private void checkFamily(Region region, byte[] family) throws NoSuchColumnFamilyException {
246    if (!region.getTableDescriptor().hasColumnFamily(family)) {
247      throw new NoSuchColumnFamilyException("Column family " + Bytes.toString(family)
248        + " does not exist in region " + this + " in table " + region.getTableDescriptor());
249    }
250  }
251
252  private boolean matches(CompareOperator op, int compareResult) {
253    switch (op) {
254      case LESS:
255        return compareResult < 0;
256      case LESS_OR_EQUAL:
257        return compareResult <= 0;
258      case EQUAL:
259        return compareResult == 0;
260      case NOT_EQUAL:
261        return compareResult != 0;
262      case GREATER_OR_EQUAL:
263        return compareResult >= 0;
264      case GREATER:
265        return compareResult > 0;
266      default:
267        throw new RuntimeException("Unknown Compare op " + op.name());
268    }
269  }
270
271  @Override
272  public Iterable<Service> getServices() {
273    return Collections.singleton(this);
274  }
275
276  /**
277   * Stores a reference to the coprocessor environment provided by the
278   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
279   * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded on
280   * a table region, so always expects this to be an instance of
281   * {@link RegionCoprocessorEnvironment}.
282   * @param env the environment provided by the coprocessor host
283   * @throws IOException if the provided environment is not an instance of
284   *                     {@code RegionCoprocessorEnvironment}
285   */
286  @Override
287  public void start(CoprocessorEnvironment env) throws IOException {
288    if (env instanceof RegionCoprocessorEnvironment) {
289      this.env = (RegionCoprocessorEnvironment) env;
290    } else {
291      throw new CoprocessorException("Must be loaded on a table region!");
292    }
293  }
294
295  @Override
296  public void stop(CoprocessorEnvironment env) throws IOException {
297    // nothing to do
298  }
299}