1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.protobuf;
21
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NavigableMap;
29 import java.util.UUID;
30
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellScanner;
34 import org.apache.hadoop.hbase.CellUtil;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.io.SizedCellScanner;
37 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
38 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
39 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
40 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
41 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
42 import org.apache.hadoop.hbase.wal.WAL.Entry;
43 import org.apache.hadoop.hbase.wal.WALKey;
44 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
45 import org.apache.hadoop.hbase.util.ByteStringer;
46 import org.apache.hadoop.hbase.util.Pair;
47
48 import com.google.protobuf.ServiceException;
49
50 @InterfaceAudience.Private
51 public class ReplicationProtbufUtil {
52
53
54
55
56
57
58
59 public static void replicateWALEntry(final AdminService.BlockingInterface admin,
60 final Entry[] entries) throws IOException {
61 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
62 buildReplicateWALEntryRequest(entries, null);
63 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
64 try {
65 admin.replicateWALEntry(controller, p.getFirst());
66 } catch (ServiceException se) {
67 throw ProtobufUtil.getRemoteException(se);
68 }
69 }
70
71
72
73
74
75
76
77
78 public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
79 buildReplicateWALEntryRequest(final Entry[] entries) {
80
81 return buildReplicateWALEntryRequest(entries, null);
82 }
83
84
85
86
87
88
89
90
91
92 public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
93 buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName) {
94
95 List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
96 int size = 0;
97 WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
98 AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
99 AdminProtos.ReplicateWALEntryRequest.Builder builder =
100 AdminProtos.ReplicateWALEntryRequest.newBuilder();
101 HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
102 for (Entry entry: entries) {
103 entryBuilder.clear();
104
105 WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
106 WALKey key = entry.getKey();
107 keyBuilder.setEncodedRegionName(
108 ByteStringer.wrap(encodedRegionName == null
109 ? key.getEncodedRegionName()
110 : encodedRegionName));
111 keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
112 keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
113 keyBuilder.setWriteTime(key.getWriteTime());
114 if (key.getNonce() != HConstants.NO_NONCE) {
115 keyBuilder.setNonce(key.getNonce());
116 }
117 if (key.getNonceGroup() != HConstants.NO_NONCE) {
118 keyBuilder.setNonceGroup(key.getNonceGroup());
119 }
120 for(UUID clusterId : key.getClusterIds()) {
121 uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
122 uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
123 keyBuilder.addClusterIds(uuidBuilder.build());
124 }
125 if(key.getOrigLogSeqNum() > 0) {
126 keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
127 }
128 WALEdit edit = entry.getEdit();
129 NavigableMap<byte[], Integer> scopes = key.getScopes();
130 if (scopes != null && !scopes.isEmpty()) {
131 for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
132 scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));
133 WALProtos.ScopeType scopeType =
134 WALProtos.ScopeType.valueOf(scope.getValue().intValue());
135 scopeBuilder.setScopeType(scopeType);
136 keyBuilder.addScopes(scopeBuilder.build());
137 }
138 }
139 List<Cell> cells = edit.getCells();
140
141 for (Cell cell: cells) {
142 size += CellUtil.estimatedSerializedSizeOf(cell);
143 }
144
145 allCells.add(cells);
146
147 entryBuilder.setAssociatedCellCount(cells.size());
148 builder.addEntry(entryBuilder.build());
149 }
150 return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
151 getCellScanner(allCells, size));
152 }
153
154
155
156
157
158 static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
159 return new SizedCellScanner() {
160 private final Iterator<List<? extends Cell>> entries = cells.iterator();
161 private Iterator<? extends Cell> currentIterator = null;
162 private Cell currentCell;
163
164 @Override
165 public Cell current() {
166 return this.currentCell;
167 }
168
169 @Override
170 public boolean advance() {
171 if (this.currentIterator == null) {
172 if (!this.entries.hasNext()) return false;
173 this.currentIterator = this.entries.next().iterator();
174 }
175 if (this.currentIterator.hasNext()) {
176 this.currentCell = this.currentIterator.next();
177 return true;
178 }
179 this.currentCell = null;
180 this.currentIterator = null;
181 return advance();
182 }
183
184 @Override
185 public long heapSize() {
186 return size;
187 }
188 };
189 }
190 }