View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.util.Arrays;
22  import java.util.Collection;
23  import java.util.List;
24  import java.util.Map;
25  
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.DoNotRetryIOException;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.KeyValueUtil;
31  import org.apache.hadoop.hbase.client.Delete;
32  import org.apache.hadoop.hbase.client.Durability;
33  import org.apache.hadoop.hbase.client.Mutation;
34  import org.apache.hadoop.hbase.client.Put;
35  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest;
36  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse;
37  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
38  import org.apache.hadoop.hbase.util.Bytes;
39  
40  /**
41   * A <code>MultiRowProcessor</code> that performs multiple puts and deletes.
42   */
43  @InterfaceAudience.Private
44  class MultiRowMutationProcessor extends BaseRowProcessor<MultiRowMutationProcessorRequest,
45  MultiRowMutationProcessorResponse> {
46    Collection<byte[]> rowsToLock;
47    Collection<Mutation> mutations;
48    MiniBatchOperationInProgress<Mutation> miniBatch;
49  
50    MultiRowMutationProcessor(Collection<Mutation> mutations,
51                              Collection<byte[]> rowsToLock) {
52      this.rowsToLock = rowsToLock;
53      this.mutations = mutations;
54    }
55  
56    @Override
57    public Collection<byte[]> getRowsToLock() {
58      return rowsToLock;
59    }
60  
61    @Override
62    public boolean readOnly() {
63      return false;
64    }
65    
66    @Override
67    public MultiRowMutationProcessorResponse getResult() {
68      return MultiRowMutationProcessorResponse.getDefaultInstance();
69    }
70  
71    @Override
72    public void process(long now,
73                        HRegion region,
74                        List<Mutation> mutationsToApply,
75                        WALEdit walEdit) throws IOException {
76      byte[] byteNow = Bytes.toBytes(now);
77      // Check mutations
78      for (Mutation m : this.mutations) {
79        if (m instanceof Put) {
80          Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
81          region.checkFamilies(familyMap.keySet());
82          region.checkTimestamps(familyMap, now);
83          region.updateKVTimestamps(familyMap.values(), byteNow);
84        } else if (m instanceof Delete) {
85          Delete d = (Delete) m;
86          region.prepareDelete(d);
87          region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow);
88        } else {
89          throw new DoNotRetryIOException("Action must be Put or Delete. But was: "
90              + m.getClass().getName());
91        }
92        mutationsToApply.add(m);
93      }
94      // Apply edits to a single WALEdit
95      for (Mutation m : mutations) {
96        for (List<Cell> cells : m.getFamilyCellMap().values()) {
97          boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
98          for (Cell cell : cells) {
99            if (writeToWAL) walEdit.add(cell);
100         }
101       }
102     }
103   }
104 
105   @Override
106   public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
107     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
108     if (coprocessorHost != null) {
109       for (Mutation m : mutations) {
110         if (m instanceof Put) {
111           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
112             // by pass everything
113             return;
114           }
115         } else if (m instanceof Delete) {
116           Delete d = (Delete) m;
117           region.prepareDelete(d);
118           if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) {
119             // by pass everything
120             return;
121           }
122         }
123       }
124     }
125   }
126 
127   @Override
128   public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
129     // TODO we should return back the status of this hook run to HRegion so that those Mutations
130     // with OperationStatus as SUCCESS or FAILURE should not get applied to memstore.
131     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
132     OperationStatus[] opStatus = new OperationStatus[mutations.size()];
133     Arrays.fill(opStatus, OperationStatus.NOT_RUN);
134     WALEdit[] walEditsFromCP = new WALEdit[mutations.size()];
135     if (coprocessorHost != null) {
136       miniBatch = new MiniBatchOperationInProgress<Mutation>(
137           mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
138           mutations.size());
139       coprocessorHost.preBatchMutate(miniBatch);
140     }
141     // Apply edits to a single WALEdit
142     for (int i = 0; i < mutations.size(); i++) {
143       if (opStatus[i] == OperationStatus.NOT_RUN) {
144         // Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook
145         // itself. No need to apply again to region
146         if (walEditsFromCP[i] != null) {
147           // Add the WALEdit created by CP hook
148           for (Cell walCell : walEditsFromCP[i].getCells()) {
149             walEdit.add(walCell);
150           }
151         }
152       }
153     }
154   }
155 
156   @Override
157   public void postBatchMutate(HRegion region) throws IOException {
158     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
159     if (coprocessorHost != null) {
160       assert miniBatch != null;
161       // Use the same miniBatch state used to call the preBatchMutate()
162       coprocessorHost.postBatchMutate(miniBatch);
163     }
164   }
165 
166   @Override
167   public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException {
168     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
169     if (coprocessorHost != null) {
170       for (Mutation m : mutations) {
171         if (m instanceof Put) {
172           coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
173         } else if (m instanceof Delete) {
174           coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
175         }
176       }
177       // At the end call the CP hook postBatchMutateIndispensably
178       if (miniBatch != null) {
179         // Directly calling this hook, with out calling pre/postBatchMutate() when Processor do a
180         // read only process. Then no need to call this batch based CP hook also.
181         coprocessorHost.postBatchMutateIndispensably(miniBatch, success);
182       }
183     }
184   }
185 
186   @Override
187   public MultiRowMutationProcessorRequest getRequestData() {
188     return MultiRowMutationProcessorRequest.getDefaultInstance();
189   }
190 
191   @Override
192   public void initialize(MultiRowMutationProcessorRequest msg) {
193     //nothing
194   }
195 
196   @Override
197   public Durability useDurability() {
198     // return true when at least one mutation requested a WAL flush (default)
199     Durability durability = Durability.USE_DEFAULT;
200     for (Mutation m : mutations) {
201       if (m.getDurability().ordinal() > durability.ordinal()) {
202         durability = m.getDurability();
203       }
204     }
205     return durability;
206   }
207 }