001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.List;
025import java.util.SortedSet;
026import java.util.TreeSet;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CompareOperator;
029import org.apache.hadoop.hbase.CoprocessorEnvironment;
030import org.apache.hadoop.hbase.HBaseInterfaceAudience;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.PrivateCellUtil;
033import org.apache.hadoop.hbase.client.Get;
034import org.apache.hadoop.hbase.client.Mutation;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.filter.ByteArrayComparable;
038import org.apache.hadoop.hbase.filter.Filter;
039import org.apache.hadoop.hbase.io.TimeRange;
040import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
041import org.apache.hadoop.hbase.regionserver.HRegion;
042import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
043import org.apache.hadoop.hbase.regionserver.Region;
044import org.apache.hadoop.hbase.regionserver.RegionScanner;
045import org.apache.hadoop.hbase.regionserver.WrongRegionException;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.apache.yetus.audience.InterfaceStability;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
053import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
054import org.apache.hbase.thirdparty.com.google.protobuf.Service;
055
056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
062
063/**
064 * This class implements atomic multi row transactions using
065 * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} and Coprocessor
066 * endpoints. We can also specify some conditions to perform conditional update. Defines a protocol
067 * to perform multi row transactions. See {@link MultiRowMutationEndpoint} for the implementation.
068 * <br>
069 * See {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} for details and
070 * limitations. <br>
071 * Example: <code>
072 * Put p = new Put(row1);
073 * Delete d = new Delete(row2);
074 * Increment i = new Increment(row3);
075 * Append a = new Append(row4);
076 * ...
077 * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p);
078 * Mutate m2 = ProtobufUtil.toMutate(MutateType.DELETE, d);
079 * Mutate m3 = ProtobufUtil.toMutate(MutateType.INCREMENT, i);
080 * Mutate m4 = ProtobufUtil.toMutate(MutateType.Append, a);
081 *
082 * MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
083 * mrmBuilder.addMutationRequest(m1);
084 * mrmBuilder.addMutationRequest(m2);
085 * mrmBuilder.addMutationRequest(m3);
086 * mrmBuilder.addMutationRequest(m4);
087 *
088 * // We can also specify conditions to preform conditional update
089 * mrmBuilder.addCondition(ProtobufUtil.toCondition(row, FAMILY, QUALIFIER,
090 *  CompareOperator.EQUAL, value, TimeRange.allTime()));
091 *
092 * CoprocessorRpcChannel channel = t.coprocessorService(ROW);
093 * MultiRowMutationService.BlockingInterface service =
094 *   MultiRowMutationService.newBlockingStub(channel);
095 * MutateRowsRequest mrm = mrmBuilder.build();
096 * MutateRowsResponse response = service.mutateRows(null, mrm);
097 *
098 * // We can get the result of the conditional update
099 * boolean processed = response.getProcessed();
100 * </code>
101 */
102@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
103@InterfaceStability.Evolving
104public class MultiRowMutationEndpoint extends MultiRowMutationService implements RegionCoprocessor {
105  private static final Logger LOGGER = LoggerFactory.getLogger(HRegion.class);
106
107  private RegionCoprocessorEnvironment env;
108
109  @Override
110  public void mutateRows(RpcController controller, MutateRowsRequest request,
111    RpcCallback<MutateRowsResponse> done) {
112    boolean matches = true;
113    List<Region.RowLock> rowLocks = null;
114    try {
115      // set of rows to lock, sorted to avoid deadlocks
116      SortedSet<byte[]> rowsToLock = new TreeSet<>(Bytes.BYTES_COMPARATOR);
117      List<MutationProto> mutateRequestList = request.getMutationRequestList();
118      List<Mutation> mutations = new ArrayList<>(mutateRequestList.size());
119      for (MutationProto m : mutateRequestList) {
120        mutations.add(ProtobufUtil.toMutation(m));
121      }
122
123      Region region = env.getRegion();
124
125      RegionInfo regionInfo = region.getRegionInfo();
126      for (Mutation m : mutations) {
127        // check whether rows are in range for this region
128        if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
129          String msg = "Requested row out of range '" + Bytes.toStringBinary(m.getRow()) + "'";
130          if (rowsToLock.isEmpty()) {
131            // if this is the first row, region might have moved,
132            // allow client to retry
133            throw new WrongRegionException(msg);
134          } else {
135            // rows are split between regions, do not retry
136            throw new org.apache.hadoop.hbase.DoNotRetryIOException(msg);
137          }
138        }
139        rowsToLock.add(m.getRow());
140      }
141
142      if (request.getConditionCount() > 0) {
143        // Get row locks for the mutations and the conditions
144        rowLocks = new ArrayList<>();
145        for (ClientProtos.Condition condition : request.getConditionList()) {
146          rowsToLock.add(condition.getRow().toByteArray());
147        }
148        for (byte[] row : rowsToLock) {
149          try {
150            Region.RowLock rowLock = region.getRowLock(row, false); // write lock
151            rowLocks.add(rowLock);
152          } catch (IOException ioe) {
153            LOGGER.warn("Failed getting lock, row={}, in region {}", Bytes.toStringBinary(row),
154              this, ioe);
155            throw ioe;
156          }
157        }
158
159        // Check if all the conditions match
160        for (ClientProtos.Condition condition : request.getConditionList()) {
161          if (!matches(region, condition)) {
162            matches = false;
163            break;
164          }
165        }
166      }
167
168      if (matches) {
169        // call utility method on region
170        long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
171        long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
172        region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
173      }
174    } catch (IOException e) {
175      CoprocessorRpcUtils.setControllerException(controller, e);
176    } finally {
177      if (rowLocks != null) {
178        // Release the acquired row locks
179        for (Region.RowLock rowLock : rowLocks) {
180          rowLock.release();
181        }
182      }
183    }
184    done.run(MutateRowsResponse.newBuilder().setProcessed(matches).build());
185  }
186
187  private boolean matches(Region region, ClientProtos.Condition condition) throws IOException {
188    byte[] row = condition.getRow().toByteArray();
189
190    Filter filter = null;
191    byte[] family = null;
192    byte[] qualifier = null;
193    CompareOperator op = null;
194    ByteArrayComparable comparator = null;
195
196    if (condition.hasFilter()) {
197      filter = ProtobufUtil.toFilter(condition.getFilter());
198    } else {
199      family = condition.getFamily().toByteArray();
200      qualifier = condition.getQualifier().toByteArray();
201      op = CompareOperator.valueOf(condition.getCompareType().name());
202      comparator = ProtobufUtil.toComparator(condition.getComparator());
203    }
204
205    TimeRange timeRange = condition.hasTimeRange()
206      ? ProtobufUtil.toTimeRange(condition.getTimeRange())
207      : TimeRange.allTime();
208
209    Get get = new Get(row);
210    if (family != null) {
211      checkFamily(region, family);
212      get.addColumn(family, qualifier);
213    }
214    if (filter != null) {
215      get.setFilter(filter);
216    }
217    if (timeRange != null) {
218      get.setTimeRange(timeRange.getMin(), timeRange.getMax());
219    }
220
221    boolean matches = false;
222    try (RegionScanner scanner = region.getScanner(new Scan(get))) {
223      // NOTE: Please don't use HRegion.get() instead,
224      // because it will copy cells to heap. See HBASE-26036
225      List<Cell> result = new ArrayList<>();
226      scanner.next(result);
227      if (filter != null) {
228        if (!result.isEmpty()) {
229          matches = true;
230        }
231      } else {
232        boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
233        if (result.isEmpty() && valueIsNull) {
234          matches = true;
235        } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
236          matches = true;
237        } else if (result.size() == 1 && !valueIsNull) {
238          Cell kv = result.get(0);
239          int compareResult = PrivateCellUtil.compareValue(kv, comparator);
240          matches = matches(op, compareResult);
241        }
242      }
243    }
244    return matches;
245  }
246
247  private void checkFamily(Region region, byte[] family) throws NoSuchColumnFamilyException {
248    if (!region.getTableDescriptor().hasColumnFamily(family)) {
249      throw new NoSuchColumnFamilyException("Column family " + Bytes.toString(family)
250        + " does not exist in region " + this + " in table " + region.getTableDescriptor());
251    }
252  }
253
254  private boolean matches(CompareOperator op, int compareResult) {
255    switch (op) {
256      case LESS:
257        return compareResult < 0;
258      case LESS_OR_EQUAL:
259        return compareResult <= 0;
260      case EQUAL:
261        return compareResult == 0;
262      case NOT_EQUAL:
263        return compareResult != 0;
264      case GREATER_OR_EQUAL:
265        return compareResult >= 0;
266      case GREATER:
267        return compareResult > 0;
268      default:
269        throw new RuntimeException("Unknown Compare op " + op.name());
270    }
271  }
272
273  @Override
274  public Iterable<Service> getServices() {
275    return Collections.singleton(this);
276  }
277
278  /**
279   * Stores a reference to the coprocessor environment provided by the
280   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
281   * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded on
282   * a table region, so always expects this to be an instance of
283   * {@link RegionCoprocessorEnvironment}.
284   * @param env the environment provided by the coprocessor host
285   * @throws IOException if the provided environment is not an instance of
286   *                     {@code RegionCoprocessorEnvironment}
287   */
288  @Override
289  public void start(CoprocessorEnvironment env) throws IOException {
290    if (env instanceof RegionCoprocessorEnvironment) {
291      this.env = (RegionCoprocessorEnvironment) env;
292    } else {
293      throw new CoprocessorException("Must be loaded on a table region!");
294    }
295  }
296
297  @Override
298  public void stop(CoprocessorEnvironment env) throws IOException {
299    // nothing to do
300  }
301}