View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.util.Collection;
22  import java.util.List;
23  import java.util.Map;
24  
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.DoNotRetryIOException;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.KeyValueUtil;
29  import org.apache.hadoop.hbase.client.Delete;
30  import org.apache.hadoop.hbase.client.Durability;
31  import org.apache.hadoop.hbase.client.Mutation;
32  import org.apache.hadoop.hbase.client.Put;
33  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest;
34  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse;
35  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36  import org.apache.hadoop.hbase.util.Bytes;
37  
38  /**
39   * A <code>MultiRowProcessor</code> that performs multiple puts and deletes.
40   */
41  class MultiRowMutationProcessor extends BaseRowProcessor<MultiRowMutationProcessorRequest,
42  MultiRowMutationProcessorResponse> {
43    Collection<byte[]> rowsToLock;
44    Collection<Mutation> mutations;
45  
46    MultiRowMutationProcessor(Collection<Mutation> mutations,
47                              Collection<byte[]> rowsToLock) {
48      this.rowsToLock = rowsToLock;
49      this.mutations = mutations;
50    }
51  
52    @Override
53    public Collection<byte[]> getRowsToLock() {
54      return rowsToLock;
55    }
56  
57    @Override
58    public boolean readOnly() {
59      return false;
60    }
61    
62    @Override
63    public MultiRowMutationProcessorResponse getResult() {
64      return MultiRowMutationProcessorResponse.getDefaultInstance();
65    }
66  
67    @Override
68    public void process(long now,
69                        HRegion region,
70                        List<KeyValue> mutationKvs,
71                        WALEdit walEdit) throws IOException {
72      byte[] byteNow = Bytes.toBytes(now);
73      // Check mutations and apply edits to a single WALEdit
74      for (Mutation m : mutations) {
75        if (m instanceof Put) {
76          Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
77          region.checkFamilies(familyMap.keySet());
78          region.checkTimestamps(familyMap, now);
79          region.updateKVTimestamps(familyMap.values(), byteNow);
80        } else if (m instanceof Delete) {
81          Delete d = (Delete) m;
82          region.prepareDelete(d);
83          region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow);
84        } else {
85          throw new DoNotRetryIOException(
86              "Action must be Put or Delete. But was: "
87              + m.getClass().getName());
88        }
89        for (List<Cell> cells: m.getFamilyCellMap().values()) {
90          boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
91          for (Cell cell : cells) {
92            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
93            mutationKvs.add(kv);
94            if (writeToWAL) {
95              walEdit.add(kv);
96            }
97          }
98        }
99      }
100   }
101 
102   @Override
103   public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
104     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
105     if (coprocessorHost != null) {
106       for (Mutation m : mutations) {
107         if (m instanceof Put) {
108           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
109             // by pass everything
110             return;
111           }
112         } else if (m instanceof Delete) {
113           Delete d = (Delete) m;
114           region.prepareDelete(d);
115           if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) {
116             // by pass everything
117             return;
118           }
119         }
120       }
121     }
122   }
123 
124   @Override
125   public void postProcess(HRegion region, WALEdit walEdit) throws IOException {
126     RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
127     if (coprocessorHost != null) {
128       for (Mutation m : mutations) {
129         if (m instanceof Put) {
130           coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
131         } else if (m instanceof Delete) {
132           coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
133         }
134       }
135     }
136   }
137 
138   @Override
139   public MultiRowMutationProcessorRequest getRequestData() {
140     return MultiRowMutationProcessorRequest.getDefaultInstance();
141   }
142 
143   @Override
144   public void initialize(MultiRowMutationProcessorRequest msg) {
145     //nothing
146   }
147 
148   @Override
149   public Durability useDurability() {
150     // return true when at least one mutation requested a WAL flush (default)
151     Durability durability = Durability.USE_DEFAULT;
152     for (Mutation m : mutations) {
153       if (m.getDurability().ordinal() > durability.ordinal()) {
154         durability = m.getDurability();
155       }
156     }
157     return durability;
158   }
159 }