View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.protobuf;
21  
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.UUID;
30  
31  import org.apache.hadoop.hbase.util.ByteStringer;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellScanner;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.io.SizedCellScanner;
38  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
39  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
40  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
41  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
42  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
43  import org.apache.hadoop.hbase.regionserver.wal.HLog;
44  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
45  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
46  import org.apache.hadoop.hbase.util.Pair;
47  
48  import com.google.protobuf.ServiceException;
49  
50  @InterfaceAudience.Private
51  public class ReplicationProtbufUtil {
52    /**
53     * A helper to replicate a list of HLog entries using admin protocol.
54     *
55     * @param admin
56     * @param entries
57     * @throws java.io.IOException
58     */
59    public static void replicateWALEntry(final AdminService.BlockingInterface admin,
60        final HLog.Entry[] entries) throws IOException {
61      Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
62        buildReplicateWALEntryRequest(entries);
63      PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
64      try {
65        admin.replicateWALEntry(controller, p.getFirst());
66      } catch (ServiceException se) {
67        throw ProtobufUtil.getRemoteException(se);
68      }
69    }
70  
71    /**
72     * Create a new ReplicateWALEntryRequest from a list of HLog entries
73     *
74     * @param entries the HLog entries to be replicated
75     * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
76     * found.
77     */
78    public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
79        buildReplicateWALEntryRequest(final HLog.Entry[] entries) {
80      // Accumulate all the KVs seen in here.
81      List<List<? extends Cell>> allkvs = new ArrayList<List<? extends Cell>>(entries.length);
82      int size = 0;
83      WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
84      AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
85      AdminProtos.ReplicateWALEntryRequest.Builder builder =
86        AdminProtos.ReplicateWALEntryRequest.newBuilder();
87      HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
88      for (HLog.Entry entry: entries) {
89        entryBuilder.clear();
90        // TODO: this duplicates a lot in HLogKey#getBuilder
91        WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
92        HLogKey key = entry.getKey();
93        keyBuilder.setEncodedRegionName(
94          ByteStringer.wrap(key.getEncodedRegionName()));
95        keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
96        keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
97        keyBuilder.setWriteTime(key.getWriteTime());
98        if (key.getNonce() != HConstants.NO_NONCE) {
99          keyBuilder.setNonce(key.getNonce());
100       }
101       if (key.getNonceGroup() != HConstants.NO_NONCE) {
102         keyBuilder.setNonceGroup(key.getNonceGroup());
103       }
104       for(UUID clusterId : key.getClusterIds()) {
105         uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
106         uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
107         keyBuilder.addClusterIds(uuidBuilder.build());
108       }
109       if(key.getOrigLogSeqNum() > 0) {
110         keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
111       }
112       WALEdit edit = entry.getEdit();
113       NavigableMap<byte[], Integer> scopes = key.getScopes();
114       if (scopes != null && !scopes.isEmpty()) {
115         for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
116           scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));
117           WALProtos.ScopeType scopeType =
118               WALProtos.ScopeType.valueOf(scope.getValue().intValue());
119           scopeBuilder.setScopeType(scopeType);
120           keyBuilder.addScopes(scopeBuilder.build());
121         }
122       }
123       List<KeyValue> kvs = edit.getKeyValues();
124       // Add up the size.  It is used later serializing out the kvs.
125       for (KeyValue kv: kvs) {
126         size += kv.getLength();
127       }
128       // Collect up the kvs
129       allkvs.add(kvs);
130       // Write out how many kvs associated with this entry.
131       entryBuilder.setAssociatedCellCount(kvs.size());
132       builder.addEntry(entryBuilder.build());
133     }
134     return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
135       getCellScanner(allkvs, size));
136   }
137 
138   /**
139    * @param cells
140    * @return <code>cells</code> packaged as a CellScanner
141    */
142   static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
143     return new SizedCellScanner() {
144       private final Iterator<List<? extends Cell>> entries = cells.iterator();
145       private Iterator<? extends Cell> currentIterator = null;
146       private Cell currentCell;
147 
148       @Override
149       public Cell current() {
150         return this.currentCell;
151       }
152 
153       @Override
154       public boolean advance() {
155         if (this.currentIterator == null) {
156           if (!this.entries.hasNext()) return false;
157           this.currentIterator = this.entries.next().iterator();
158         }
159         if (this.currentIterator.hasNext()) {
160           this.currentCell = this.currentIterator.next();
161           return true;
162         }
163         this.currentCell = null;
164         this.currentIterator = null;
165         return advance();
166       }
167 
168       @Override
169       public long heapSize() {
170         return size;
171       }
172     };
173   }
174 }