View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.protobuf;
21  
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.UUID;
30  
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellScanner;
34  import org.apache.hadoop.hbase.CellUtil;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.io.SizedCellScanner;
37  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
38  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
39  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
40  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
41  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
42  import org.apache.hadoop.hbase.wal.WAL.Entry;
43  import org.apache.hadoop.hbase.wal.WALKey;
44  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
45  import org.apache.hadoop.hbase.util.ByteStringer;
46  import org.apache.hadoop.hbase.util.Pair;
47  
48  import com.google.protobuf.ServiceException;
49  
50  @InterfaceAudience.Private
51  public class ReplicationProtbufUtil {
52    /**
53     * A helper to replicate a list of WAL entries using admin protocol.
54     *
55     * @param admin
56     * @param entries
57     * @throws java.io.IOException
58     */
59    public static void replicateWALEntry(final AdminService.BlockingInterface admin,
60        final Entry[] entries) throws IOException {
61      Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
62        buildReplicateWALEntryRequest(entries, null);
63      PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
64      try {
65        admin.replicateWALEntry(controller, p.getFirst());
66      } catch (ServiceException se) {
67        throw ProtobufUtil.getRemoteException(se);
68      }
69    }
70  
71    /**
72     * Create a new ReplicateWALEntryRequest from a list of WAL entries
73     *
74     * @param entries the WAL entries to be replicated
75     * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
76     * found.
77     */
78    public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
79        buildReplicateWALEntryRequest(final Entry[] entries) {
80      // Accumulate all the Cells seen in here.
81      return buildReplicateWALEntryRequest(entries, null);
82    }
83  
84    /**
85     * Create a new ReplicateWALEntryRequest from a list of HLog entries
86     *
87     * @param entries the HLog entries to be replicated
88     * @param encodedRegionName alternative region name to use if not null
89     * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
90     * found.
91     */
92    public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
93        buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName) {
94      // Accumulate all the KVs seen in here.
95      List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
96      int size = 0;
97      WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
98      AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
99      AdminProtos.ReplicateWALEntryRequest.Builder builder =
100       AdminProtos.ReplicateWALEntryRequest.newBuilder();
101     HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
102     for (Entry entry: entries) {
103       entryBuilder.clear();
104       // TODO: this duplicates a lot in WALKey#getBuilder
105       WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
106       WALKey key = entry.getKey();
107       keyBuilder.setEncodedRegionName(
108         ByteStringer.wrap(encodedRegionName == null
109             ? key.getEncodedRegionName()
110             : encodedRegionName));
111       keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
112       keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
113       keyBuilder.setWriteTime(key.getWriteTime());
114       if (key.getNonce() != HConstants.NO_NONCE) {
115         keyBuilder.setNonce(key.getNonce());
116       }
117       if (key.getNonceGroup() != HConstants.NO_NONCE) {
118         keyBuilder.setNonceGroup(key.getNonceGroup());
119       }
120       for(UUID clusterId : key.getClusterIds()) {
121         uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
122         uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
123         keyBuilder.addClusterIds(uuidBuilder.build());
124       }
125       if(key.getOrigLogSeqNum() > 0) {
126         keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
127       }
128       WALEdit edit = entry.getEdit();
129       NavigableMap<byte[], Integer> scopes = key.getScopes();
130       if (scopes != null && !scopes.isEmpty()) {
131         for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
132           scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));
133           WALProtos.ScopeType scopeType =
134               WALProtos.ScopeType.valueOf(scope.getValue().intValue());
135           scopeBuilder.setScopeType(scopeType);
136           keyBuilder.addScopes(scopeBuilder.build());
137         }
138       }
139       List<Cell> cells = edit.getCells();
140       // Add up the size.  It is used later serializing out the cells.
141       for (Cell cell: cells) {
142         size += CellUtil.estimatedSerializedSizeOf(cell);
143       }
144       // Collect up the cells
145       allCells.add(cells);
146       // Write out how many cells associated with this entry.
147       entryBuilder.setAssociatedCellCount(cells.size());
148       builder.addEntry(entryBuilder.build());
149     }
150     return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
151       getCellScanner(allCells, size));
152   }
153 
154   /**
155    * @param cells
156    * @return <code>cells</code> packaged as a CellScanner
157    */
158   static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
159     return new SizedCellScanner() {
160       private final Iterator<List<? extends Cell>> entries = cells.iterator();
161       private Iterator<? extends Cell> currentIterator = null;
162       private Cell currentCell;
163 
164       @Override
165       public Cell current() {
166         return this.currentCell;
167       }
168 
169       @Override
170       public boolean advance() {
171         if (this.currentIterator == null) {
172           if (!this.entries.hasNext()) return false;
173           this.currentIterator = this.entries.next().iterator();
174         }
175         if (this.currentIterator.hasNext()) {
176           this.currentCell = this.currentIterator.next();
177           return true;
178         }
179         this.currentCell = null;
180         this.currentIterator = null;
181         return advance();
182       }
183 
184       @Override
185       public long heapSize() {
186         return size;
187       }
188     };
189   }
190 }