View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.util.Arrays;
22  import java.util.Collection;
23  import java.util.List;
24  import java.util.Map;
25  
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.DoNotRetryIOException;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.KeyValueUtil;
30  import org.apache.hadoop.hbase.client.Delete;
31  import org.apache.hadoop.hbase.client.Durability;
32  import org.apache.hadoop.hbase.client.Mutation;
33  import org.apache.hadoop.hbase.client.Put;
34  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest;
35  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse;
36  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
37  import org.apache.hadoop.hbase.util.Bytes;
38  
39  /**
40   * A <code>MultiRowProcessor</code> that performs multiple puts and deletes.
41   */
42  class MultiRowMutationProcessor extends BaseRowProcessor<MultiRowMutationProcessorRequest,
43  MultiRowMutationProcessorResponse> {
44    Collection<byte[]> rowsToLock;
45    Collection<Mutation> mutations;
46    MiniBatchOperationInProgress<Mutation> miniBatch;
47  
48    MultiRowMutationProcessor(Collection<Mutation> mutations,
49                              Collection<byte[]> rowsToLock) {
50      this.rowsToLock = rowsToLock;
51      this.mutations = mutations;
52    }
53  
54    @Override
55    public Collection<byte[]> getRowsToLock() {
56      return rowsToLock;
57    }
58  
59    @Override
60    public boolean readOnly() {
61      return false;
62    }
63    
64    @Override
65    public MultiRowMutationProcessorResponse getResult() {
66      return MultiRowMutationProcessorResponse.getDefaultInstance();
67    }
68  
69    @Override
70    public void process(long now,
71                        HRegion region,
72                        List<Mutation> mutationsToApply,
73                        WALEdit walEdit) throws IOException {
74      byte[] byteNow = Bytes.toBytes(now);
75      // Check mutations
76      for (Mutation m : this.mutations) {
77        if (m instanceof Put) {
78          Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
79          region.checkFamilies(familyMap.keySet());
80          region.checkTimestamps(familyMap, now);
81          region.updateKVTimestamps(familyMap.values(), byteNow);
82        } else if (m instanceof Delete) {
83          Delete d = (Delete) m;
84          region.prepareDelete(d);
85          region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow);
86        } else {
87          throw new DoNotRetryIOException("Action must be Put or Delete. But was: "
88              + m.getClass().getName());
89        }
90        mutationsToApply.add(m);
91      }
92      // Apply edits to a single WALEdit
93      for (Mutation m : mutations) {
94        for (List<Cell> cells : m.getFamilyCellMap().values()) {
95          boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
96          for (Cell cell : cells) {
97            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
98            if (writeToWAL) walEdit.add(kv);
99          }
100       }
101     }
102   }
103 
104   @Override
105   public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
106     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
107     if (coprocessorHost != null) {
108       for (Mutation m : mutations) {
109         if (m instanceof Put) {
110           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
111             // by pass everything
112             return;
113           }
114         } else if (m instanceof Delete) {
115           Delete d = (Delete) m;
116           region.prepareDelete(d);
117           if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) {
118             // by pass everything
119             return;
120           }
121         }
122       }
123     }
124   }
125 
126   @Override
127   public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
128     // TODO we should return back the status of this hook run to HRegion so that those Mutations
129     // with OperationStatus as SUCCESS or FAILURE should not get applied to memstore.
130     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
131     OperationStatus[] opStatus = new OperationStatus[mutations.size()];
132     Arrays.fill(opStatus, OperationStatus.NOT_RUN);
133     WALEdit[] walEditsFromCP = new WALEdit[mutations.size()];
134     if (coprocessorHost != null) {
135       miniBatch = new MiniBatchOperationInProgress<Mutation>(
136           mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
137           mutations.size());
138       coprocessorHost.preBatchMutate(miniBatch);
139     }
140     // Apply edits to a single WALEdit
141     for (int i = 0; i < mutations.size(); i++) {
142       if (opStatus[i] == OperationStatus.NOT_RUN) {
143         // Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook
144         // itself. No need to apply again to region
145         if (walEditsFromCP[i] != null) {
146           // Add the WALEdit created by CP hook
147           for (KeyValue walKv : walEditsFromCP[i].getKeyValues()) {
148             walEdit.add(walKv);
149           }
150         }
151       }
152     }
153   }
154 
155   @Override
156   public void postBatchMutate(HRegion region) throws IOException {
157     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
158     if (coprocessorHost != null) {
159       assert miniBatch != null;
160       // Use the same miniBatch state used to call the preBatchMutate()
161       coprocessorHost.postBatchMutate(miniBatch);
162     }
163   }
164 
165   @Override
166   public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException {
167     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
168     if (coprocessorHost != null) {
169       for (Mutation m : mutations) {
170         if (m instanceof Put) {
171           coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
172         } else if (m instanceof Delete) {
173           coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
174         }
175       }
176       // At the end call the CP hook postBatchMutateIndispensably
177       if (miniBatch != null) {
178         // Directly calling this hook, with out calling pre/postBatchMutate() when Processor do a
179         // read only process. Then no need to call this batch based CP hook also.
180         coprocessorHost.postBatchMutateIndispensably(miniBatch, success);
181       }
182     }
183   }
184 
185   @Override
186   public MultiRowMutationProcessorRequest getRequestData() {
187     return MultiRowMutationProcessorRequest.getDefaultInstance();
188   }
189 
190   @Override
191   public void initialize(MultiRowMutationProcessorRequest msg) {
192     //nothing
193   }
194 
195   @Override
196   public Durability useDurability() {
197     // return true when at least one mutation requested a WAL flush (default)
198     Durability durability = Durability.USE_DEFAULT;
199     for (Mutation m : mutations) {
200       if (m.getDurability().ordinal() > durability.ordinal()) {
201         durability = m.getDurability();
202       }
203     }
204     return durability;
205   }
206 }