1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.io.IOException;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.Map;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.DoNotRetryIOException;
29 import org.apache.hadoop.hbase.client.Delete;
30 import org.apache.hadoop.hbase.client.Durability;
31 import org.apache.hadoop.hbase.client.Mutation;
32 import org.apache.hadoop.hbase.client.Put;
33 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest;
34 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse;
35 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36 import org.apache.hadoop.hbase.util.Bytes;
37
38
39
40
41 @InterfaceAudience.Private
42 class MultiRowMutationProcessor extends BaseRowProcessor<MultiRowMutationProcessorRequest,
43 MultiRowMutationProcessorResponse> {
44 Collection<byte[]> rowsToLock;
45 Collection<Mutation> mutations;
46 MiniBatchOperationInProgress<Mutation> miniBatch;
47
48 MultiRowMutationProcessor(Collection<Mutation> mutations,
49 Collection<byte[]> rowsToLock) {
50 this.rowsToLock = rowsToLock;
51 this.mutations = mutations;
52 }
53
54 @Override
55 public Collection<byte[]> getRowsToLock() {
56 return rowsToLock;
57 }
58
59 @Override
60 public boolean readOnly() {
61 return false;
62 }
63
64 @Override
65 public MultiRowMutationProcessorResponse getResult() {
66 return MultiRowMutationProcessorResponse.getDefaultInstance();
67 }
68
69 @Override
70 public void process(long now,
71 HRegion region,
72 List<Mutation> mutationsToApply,
73 WALEdit walEdit) throws IOException {
74 byte[] byteNow = Bytes.toBytes(now);
75
76 for (Mutation m : this.mutations) {
77 if (m instanceof Put) {
78 Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
79 region.checkFamilies(familyMap.keySet());
80 region.checkTimestamps(familyMap, now);
81 region.updateCellTimestamps(familyMap.values(), byteNow);
82 } else if (m instanceof Delete) {
83 Delete d = (Delete) m;
84 region.prepareDelete(d);
85 region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow);
86 } else {
87 throw new DoNotRetryIOException("Action must be Put or Delete. But was: "
88 + m.getClass().getName());
89 }
90 mutationsToApply.add(m);
91 }
92
93 for (Mutation m : mutations) {
94 for (List<Cell> cells : m.getFamilyCellMap().values()) {
95 boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
96 for (Cell cell : cells) {
97 if (writeToWAL) walEdit.add(cell);
98 }
99 }
100 }
101 }
102
103 @Override
104 public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
105 RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
106 if (coprocessorHost != null) {
107 for (Mutation m : mutations) {
108 if (m instanceof Put) {
109 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
110
111 return;
112 }
113 } else if (m instanceof Delete) {
114 Delete d = (Delete) m;
115 region.prepareDelete(d);
116 if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) {
117
118 return;
119 }
120 }
121 }
122 }
123 }
124
125 @Override
126 public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
127
128
129 RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
130 OperationStatus[] opStatus = new OperationStatus[mutations.size()];
131 Arrays.fill(opStatus, OperationStatus.NOT_RUN);
132 WALEdit[] walEditsFromCP = new WALEdit[mutations.size()];
133 if (coprocessorHost != null) {
134 miniBatch = new MiniBatchOperationInProgress<Mutation>(
135 mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
136 mutations.size());
137 coprocessorHost.preBatchMutate(miniBatch);
138 }
139
140 for (int i = 0; i < mutations.size(); i++) {
141 if (opStatus[i] == OperationStatus.NOT_RUN) {
142
143
144 if (walEditsFromCP[i] != null) {
145
146 for (Cell walCell : walEditsFromCP[i].getCells()) {
147 walEdit.add(walCell);
148 }
149 }
150 }
151 }
152 }
153
154 @Override
155 public void postBatchMutate(HRegion region) throws IOException {
156 RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
157 if (coprocessorHost != null) {
158 assert miniBatch != null;
159
160 coprocessorHost.postBatchMutate(miniBatch);
161 }
162 }
163
164 @Override
165 public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException {
166 RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
167 if (coprocessorHost != null) {
168 for (Mutation m : mutations) {
169 if (m instanceof Put) {
170 coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
171 } else if (m instanceof Delete) {
172 coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
173 }
174 }
175
176 if (miniBatch != null) {
177
178
179 coprocessorHost.postBatchMutateIndispensably(miniBatch, success);
180 }
181 }
182 }
183
184 @Override
185 public MultiRowMutationProcessorRequest getRequestData() {
186 return MultiRowMutationProcessorRequest.getDefaultInstance();
187 }
188
189 @Override
190 public void initialize(MultiRowMutationProcessorRequest msg) {
191
192 }
193
194 @Override
195 public Durability useDurability() {
196
197 Durability durability = Durability.USE_DEFAULT;
198 for (Mutation m : mutations) {
199 if (m.getDurability().ordinal() > durability.ordinal()) {
200 durability = m.getDurability();
201 }
202 }
203 return durability;
204 }
205 }