View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.util.Arrays;
22  import java.util.Collection;
23  import java.util.List;
24  import java.util.Map;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.DoNotRetryIOException;
29  import org.apache.hadoop.hbase.client.Delete;
30  import org.apache.hadoop.hbase.client.Durability;
31  import org.apache.hadoop.hbase.client.Mutation;
32  import org.apache.hadoop.hbase.client.Put;
33  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest;
34  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse;
35  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36  import org.apache.hadoop.hbase.util.Bytes;
37  
38  /**
39   * A <code>MultiRowProcessor</code> that performs multiple puts and deletes.
40   */
41  @InterfaceAudience.Private
42  class MultiRowMutationProcessor extends BaseRowProcessor<MultiRowMutationProcessorRequest,
43  MultiRowMutationProcessorResponse> {
44    Collection<byte[]> rowsToLock;
45    Collection<Mutation> mutations;
46    MiniBatchOperationInProgress<Mutation> miniBatch;
47  
48    MultiRowMutationProcessor(Collection<Mutation> mutations,
49                              Collection<byte[]> rowsToLock) {
50      this.rowsToLock = rowsToLock;
51      this.mutations = mutations;
52    }
53  
54    @Override
55    public Collection<byte[]> getRowsToLock() {
56      return rowsToLock;
57    }
58  
59    @Override
60    public boolean readOnly() {
61      return false;
62    }
63    
64    @Override
65    public MultiRowMutationProcessorResponse getResult() {
66      return MultiRowMutationProcessorResponse.getDefaultInstance();
67    }
68  
69    @Override
70    public void process(long now,
71                        HRegion region,
72                        List<Mutation> mutationsToApply,
73                        WALEdit walEdit) throws IOException {
74      byte[] byteNow = Bytes.toBytes(now);
75      // Check mutations
76      for (Mutation m : this.mutations) {
77        if (m instanceof Put) {
78          Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
79          region.checkFamilies(familyMap.keySet());
80          region.checkTimestamps(familyMap, now);
81          region.updateCellTimestamps(familyMap.values(), byteNow);
82        } else if (m instanceof Delete) {
83          Delete d = (Delete) m;
84          region.prepareDelete(d);
85          region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow);
86        } else {
87          throw new DoNotRetryIOException("Action must be Put or Delete. But was: "
88              + m.getClass().getName());
89        }
90        mutationsToApply.add(m);
91      }
92      // Apply edits to a single WALEdit
93      for (Mutation m : mutations) {
94        for (List<Cell> cells : m.getFamilyCellMap().values()) {
95          boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
96          for (Cell cell : cells) {
97            if (writeToWAL) walEdit.add(cell);
98          }
99        }
100     }
101   }
102 
103   @Override
104   public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
105     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
106     if (coprocessorHost != null) {
107       for (Mutation m : mutations) {
108         if (m instanceof Put) {
109           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
110             // by pass everything
111             return;
112           }
113         } else if (m instanceof Delete) {
114           Delete d = (Delete) m;
115           region.prepareDelete(d);
116           if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) {
117             // by pass everything
118             return;
119           }
120         }
121       }
122     }
123   }
124 
125   @Override
126   public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
127     // TODO we should return back the status of this hook run to HRegion so that those Mutations
128     // with OperationStatus as SUCCESS or FAILURE should not get applied to memstore.
129     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
130     OperationStatus[] opStatus = new OperationStatus[mutations.size()];
131     Arrays.fill(opStatus, OperationStatus.NOT_RUN);
132     WALEdit[] walEditsFromCP = new WALEdit[mutations.size()];
133     if (coprocessorHost != null) {
134       miniBatch = new MiniBatchOperationInProgress<Mutation>(
135           mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
136           mutations.size());
137       coprocessorHost.preBatchMutate(miniBatch);
138     }
139     // Apply edits to a single WALEdit
140     for (int i = 0; i < mutations.size(); i++) {
141       if (opStatus[i] == OperationStatus.NOT_RUN) {
142         // Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook
143         // itself. No need to apply again to region
144         if (walEditsFromCP[i] != null) {
145           // Add the WALEdit created by CP hook
146           for (Cell walCell : walEditsFromCP[i].getCells()) {
147             walEdit.add(walCell);
148           }
149         }
150       }
151     }
152   }
153 
154   @Override
155   public void postBatchMutate(HRegion region) throws IOException {
156     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
157     if (coprocessorHost != null) {
158       assert miniBatch != null;
159       // Use the same miniBatch state used to call the preBatchMutate()
160       coprocessorHost.postBatchMutate(miniBatch);
161     }
162   }
163 
164   @Override
165   public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException {
166     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
167     if (coprocessorHost != null) {
168       for (Mutation m : mutations) {
169         if (m instanceof Put) {
170           coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
171         } else if (m instanceof Delete) {
172           coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
173         }
174       }
175       // At the end call the CP hook postBatchMutateIndispensably
176       if (miniBatch != null) {
177         // Directly calling this hook, with out calling pre/postBatchMutate() when Processor do a
178         // read only process. Then no need to call this batch based CP hook also.
179         coprocessorHost.postBatchMutateIndispensably(miniBatch, success);
180       }
181     }
182   }
183 
184   @Override
185   public MultiRowMutationProcessorRequest getRequestData() {
186     return MultiRowMutationProcessorRequest.getDefaultInstance();
187   }
188 
189   @Override
190   public void initialize(MultiRowMutationProcessorRequest msg) {
191     //nothing
192   }
193 
194   @Override
195   public Durability useDurability() {
196     // return true when at least one mutation requested a WAL flush (default)
197     Durability durability = Durability.USE_DEFAULT;
198     for (Mutation m : mutations) {
199       if (m.getDurability().ordinal() > durability.ordinal()) {
200         durability = m.getDurability();
201       }
202     }
203     return durability;
204   }
205 }