forked from tsuna/gohbase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpc.go
630 lines (559 loc) · 18 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
// Copyright (C) 2016 The GoHBase Authors. All rights reserved.
// This file is part of GoHBase.
// Use of this source code is governed by the Apache License 2.0
// that can be found in the COPYING file.
package gohbase
import (
"bytes"
"context"
"errors"
"fmt"
"strconv"
"time"
log "github.com/Sirupsen/logrus"
"github.com/golang/protobuf/proto"
"github.com/tsuna/gohbase/hrpc"
"github.com/tsuna/gohbase/region"
"github.com/tsuna/gohbase/zk"
)
// Constants
var (
// Name of the meta region.
metaTableName = []byte("hbase:meta")
infoFamily = map[string][]string{
"info": nil,
}
// ErrDeadline is returned when the deadline of a request has been exceeded
ErrDeadline = errors.New("deadline exceeded")
// ErrRegionUnavailable is returned when sending rpc to a region that is unavailable
ErrRegionUnavailable = errors.New("region unavailable")
// TableNotFound is returned when attempting to access a table that
// doesn't exist on this cluster.
TableNotFound = errors.New("table not found")
// ErrCannotFindRegion is returned when it took too many tries to find a
// region for the request. It's likely that hbase:meta has overlaps or some other
// inconsistency.
ErrConnotFindRegion = errors.New("cannot find region for the rpc")
// ErrMetaLookupThrottled is returned when a lookup for the rpc's region
// has been throttled.
ErrMetaLookupThrottled = errors.New("lookup to hbase:meta has been throttled")
)
const (
// maxSendRPCTries is the maximum number of times to try to send an RPC
maxSendRPCTries = 10
// How long to wait for a region lookup (either meta lookup or finding
// meta in ZooKeeper). Should be greater than or equal to the ZooKeeper
// session timeout.
regionLookupTimeout = 30 * time.Second
backoffStart = 16 * time.Millisecond
)
func (c *client) SendRPC(rpc hrpc.Call) (proto.Message, error) {
var err error
for i := 0; i < maxSendRPCTries; i++ {
// Check the cache for a region that can handle this request
reg := c.getRegionFromCache(rpc.Table(), rpc.Key())
if reg == nil {
reg, err = c.findRegion(rpc.Context(), rpc.Table(), rpc.Key())
if err == ErrRegionUnavailable {
continue
} else if err == ErrMetaLookupThrottled {
// lookup for region has been throttled, check the cache
// again but don't count this as SendRPC try as there
// might be just too many request going on at a time.
i--
continue
} else if err != nil {
return nil, err
}
}
msg, err := c.sendRPCToRegion(rpc, reg)
switch err {
case ErrRegionUnavailable:
if ch := reg.AvailabilityChan(); ch != nil {
// The region is unavailable. Wait for it to become available,
// a new region or for the deadline to be exceeded.
select {
case <-rpc.Context().Done():
return nil, ErrDeadline
case <-ch:
}
}
default:
return msg, err
}
}
return nil, ErrConnotFindRegion
}
func sendBlocking(rc hrpc.RegionClient, rpc hrpc.Call) (hrpc.RPCResult, error) {
rc.QueueRPC(rpc)
var res hrpc.RPCResult
// Wait for the response
select {
case res = <-rpc.ResultChan():
return res, nil
case <-rpc.Context().Done():
return res, ErrDeadline
}
}
func (c *client) sendRPCToRegion(rpc hrpc.Call, reg hrpc.RegionInfo) (proto.Message, error) {
if reg.IsUnavailable() {
return nil, ErrRegionUnavailable
}
rpc.SetRegion(reg)
// Queue the RPC to be sent to the region
client := reg.Client()
if client == nil {
// There was an error queueing the RPC.
// Mark the region as unavailable.
if reg.MarkUnavailable() {
// If this was the first goroutine to mark the region as
// unavailable, start a goroutine to reestablish a connection
go c.reestablishRegion(reg)
}
return nil, ErrRegionUnavailable
}
res, err := sendBlocking(client, rpc)
if err != nil {
return nil, err
}
// Check for errors
switch res.Error.(type) {
case region.RetryableError:
// There's an error specific to this region, but
// our region client is fine. Mark this region as
// unavailable (as opposed to all regions sharing
// the client), and start a goroutine to reestablish
// it.
if reg.MarkUnavailable() {
go c.reestablishRegion(reg)
}
return nil, ErrRegionUnavailable
case region.UnrecoverableError:
// If it was an unrecoverable error, the region client is
// considered dead.
if reg == c.adminRegionInfo {
// If this is the admin client, mark the region
// as unavailable and start up a goroutine to
// reconnect if it wasn't already marked as such.
if reg.MarkUnavailable() {
go c.reestablishRegion(reg)
}
} else {
c.clientDown(client)
}
// Fall through to the case of the region being unavailable,
// which will result in blocking until it's available again.
return nil, ErrRegionUnavailable
default:
// RPC was successfully sent, or an unknown type of error
// occurred. In either case, return the results.
return res.Msg, res.Error
}
}
// clientDown removes client from cache and marks
// all the regions sharing this region's
// client as unavailable, and start a goroutine
// to reconnect for each of them.
func (c *client) clientDown(client hrpc.RegionClient) {
downregions := c.clients.clientDown(client)
for downreg := range downregions {
if downreg.MarkUnavailable() {
downreg.SetClient(nil)
go c.reestablishRegion(downreg)
}
}
}
func (c *client) lookupRegion(ctx context.Context,
table, key []byte) (hrpc.RegionInfo, string, error) {
var reg hrpc.RegionInfo
var addr string
var err error
backoff := backoffStart
for {
// If it takes longer than regionLookupTimeout, fail so that we can sleep
lookupCtx, cancel := context.WithTimeout(ctx, regionLookupTimeout)
if c.clientType == adminClient {
log.WithField("resource", zk.Master).Debug("looking up master")
addr, err = c.zkLookup(lookupCtx, zk.Master)
cancel()
reg = c.adminRegionInfo
} else if bytes.Compare(table, metaTableName) == 0 {
log.WithField("resource", zk.Meta).Debug("looking up region server of hbase:meta")
addr, err = c.zkLookup(lookupCtx, zk.Meta)
cancel()
reg = c.metaRegionInfo
} else {
log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
"key": strconv.Quote(string(key)),
}).Debug("looking up region")
reg, addr, err = c.metaLookup(lookupCtx, table, key)
cancel()
if err == TableNotFound {
log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
"key": strconv.Quote(string(key)),
"err": err,
}).Debug("hbase:meta does not know about this table/key")
return nil, "", err
} else if err == ErrMetaLookupThrottled {
return nil, "", err
}
}
if err == nil {
log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
"key": strconv.Quote(string(key)),
"region": reg,
"addr": addr,
}).Debug("looked up a region")
return reg, addr, nil
}
log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
"key": strconv.Quote(string(key)),
"backoff": backoff,
"err": err,
}).Error("failed looking up region")
// This will be hit if there was an error locating the region
backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
if err != nil {
return nil, "", err
}
}
}
func (c *client) findRegion(ctx context.Context, table, key []byte) (hrpc.RegionInfo, error) {
// The region was not in the cache, it
// must be looked up in the meta table
reg, addr, err := c.lookupRegion(ctx, table, key)
if err != nil {
return nil, err
}
// We are the ones that looked up the region, so we need to
// mark in unavailable and find a client for it.
reg.MarkUnavailable()
if reg != c.metaRegionInfo && reg != c.adminRegionInfo {
// Check that the region wasn't added to
// the cache while we were looking it up.
overlaps, replaced := c.regions.put(reg)
if !replaced {
// the same or younger regions are already in cache, retry looking up in cache
return nil, ErrRegionUnavailable
}
// otherwise, new region in cache, delete overlaps from client's cache
for _, r := range overlaps {
c.clients.del(r)
}
}
// Start a goroutine to connect to the region
go c.establishRegion(reg, addr)
// Wait for the new region to become
// available, and then send the RPC
return reg, nil
}
// Searches in the regions cache for the region hosting the given row.
func (c *client) getRegionFromCache(table, key []byte) hrpc.RegionInfo {
if c.clientType == adminClient {
return c.adminRegionInfo
} else if bytes.Equal(table, metaTableName) {
return c.metaRegionInfo
}
regionName := createRegionSearchKey(table, key)
_, region := c.regions.get(regionName)
if region == nil {
return nil
}
// make sure the returned region is for the same table
if !bytes.Equal(fullyQualifiedTable(region), table) {
// not the same table, can happen if we got the last region
return nil
}
if len(region.StopKey()) != 0 &&
// If the stop key is an empty byte array, it means this region is the
// last region for this table and this key ought to be in that region.
bytes.Compare(key, region.StopKey()) >= 0 {
return nil
}
return region
}
// Creates the META key to search for in order to locate the given key.
func createRegionSearchKey(table, key []byte) []byte {
metaKey := make([]byte, 0, len(table)+len(key)+3)
metaKey = append(metaKey, table...)
metaKey = append(metaKey, ',')
metaKey = append(metaKey, key...)
metaKey = append(metaKey, ',')
// ':' is the first byte greater than '9'. We always want to find the
// entry with the greatest timestamp, so by looking right before ':'
// we'll find it.
metaKey = append(metaKey, ':')
return metaKey
}
// lookupLimit throttles lookups to hbase:meta to metaLookupLimit requests
// per metaLookupInterval. It returns true if we were lucky enough to
// reserve right away and false otherwise.
func (c *client) metaLookupLimit(ctx context.Context) bool {
r := c.metaLookupLimiter.Reserve()
if !r.OK() {
panic("wtf: cannot reserve a meta lookup")
}
delay := r.Delay()
if delay <= 0 {
return true
}
// wait until it's our turn or passed context has expired
ctx, cancel := context.WithTimeout(ctx, delay)
<-ctx.Done()
cancel()
return false
}
// metaLookup checks meta table for the region in which the given row key for the given table is.
func (c *client) metaLookup(ctx context.Context,
table, key []byte) (hrpc.RegionInfo, string, error) {
metaKey := createRegionSearchKey(table, key)
rpc, err := hrpc.NewGetBefore(ctx, metaTableName, metaKey,
hrpc.Families(infoFamily), hrpc.SkipBatch())
if err != nil {
return nil, "", err
}
resp, err := c.Get(rpc)
if err != nil {
return nil, "", err
}
if len(resp.Cells) == 0 {
return nil, "", TableNotFound
}
reg, addr, err := region.ParseRegionInfo(resp)
if err != nil {
return nil, "", err
}
if !bytes.Equal(table, fullyQualifiedTable(reg)) {
// This would indicate a bug in HBase.
return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong table!"+
" Looked up table=%q key=%q got region=%s", table, key, reg)
} else if len(reg.StopKey()) != 0 &&
bytes.Compare(key, reg.StopKey()) >= 0 {
// This would indicate a hole in the meta table.
return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong region!"+
" Looked up table=%q key=%q got region=%s", table, key, reg)
}
return reg, addr, nil
}
func fullyQualifiedTable(reg hrpc.RegionInfo) []byte {
namespace := reg.Namespace()
table := reg.Table()
if namespace == nil {
return table
}
// non-default namespace table
fqTable := make([]byte, 0, len(namespace)+1+len(table))
fqTable = append(fqTable, namespace...)
fqTable = append(fqTable, byte(':'))
fqTable = append(fqTable, table...)
return fqTable
}
func (c *client) reestablishRegion(reg hrpc.RegionInfo) {
log.WithField("region", reg).Debug("reestablishing region")
c.establishRegion(reg, "")
}
// probeKey returns a key in region that is unlikely to have data at it
// in order to test if the region is online. This prevents the Get request
// to actually fetch the data from the storage which consumes resources
// of the region server
func probeKey(reg hrpc.RegionInfo) []byte {
// now we create a probe key: reg.StartKey() + 17 zeros
probe := make([]byte, len(reg.StartKey())+17)
copy(probe, reg.StartKey())
return probe
}
// isRegionEstablished checks whether regionserver accepts rpcs for the region.
// Returns the cause if not established.
func isRegionEstablished(rc hrpc.RegionClient, reg hrpc.RegionInfo) error {
probe, err := hrpc.NewGet(context.Background(), fullyQualifiedTable(reg), probeKey(reg),
hrpc.SkipBatch())
if err != nil {
panic(fmt.Sprintf("should not happen: %s", err))
}
probe.ExistsOnly()
probe.SetRegion(reg)
res, err := sendBlocking(rc, probe)
if err != nil {
panic(fmt.Sprintf("should not happen: %s", err))
}
switch res.Error.(type) {
case region.RetryableError, region.UnrecoverableError:
return res.Error
default:
return nil
}
}
func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
var backoff time.Duration
var err error
for {
backoff, err = sleepAndIncreaseBackoff(reg.Context(), backoff)
if err != nil {
// region is dead
reg.MarkAvailable()
return
}
if addr == "" {
// need to look up region and address of the regionserver
originalReg := reg
// lookup region forever until we get it or we learn that it doesn't exist
reg, addr, err = c.lookupRegion(originalReg.Context(),
fullyQualifiedTable(originalReg), originalReg.StartKey())
if err == TableNotFound {
// region doesn't exist, delete it from caches
c.regions.del(originalReg)
c.clients.del(originalReg)
originalReg.MarkAvailable()
log.WithFields(log.Fields{
"region": originalReg.String(),
"err": err,
"backoff": backoff,
}).Info("region does not exist anymore")
return
} else if originalReg.Context().Err() != nil {
// region is dead
originalReg.MarkAvailable()
log.WithFields(log.Fields{
"region": originalReg.String(),
"err": err,
"backoff": backoff,
}).Info("region became dead while establishing client for it")
return
} else if err == ErrMetaLookupThrottled {
// We've been throttled, backoff and retry the lookup
// TODO: backoff might be unnecessary
reg = originalReg
continue
} else if err != nil {
log.WithFields(log.Fields{
"region": originalReg.String(),
"err": err,
"backoff": backoff,
}).Fatal("unknown error occured when looking up region")
}
if !bytes.Equal(reg.Name(), originalReg.Name()) {
// put new region and remove overlapping ones.
// Should remove the original region as well.
reg.MarkUnavailable()
overlaps, replaced := c.regions.put(reg)
if !replaced {
// a region that is the same or younger is already in cache
reg.MarkAvailable()
originalReg.MarkAvailable()
return
}
// otherwise delete the overlapped regions in cache
for _, r := range overlaps {
c.clients.del(r)
}
// let rpcs know that they can retry and either get the newly
// added region from cache or lookup the one they need
originalReg.MarkAvailable()
} else {
// same region, discard the looked up one
reg = originalReg
}
}
// connect to the region's regionserver
client, err := c.establishRegionClient(reg, addr)
if err == nil {
if reg == c.adminRegionInfo {
reg.SetClient(client)
reg.MarkAvailable()
return
}
if existing := c.clients.put(client, reg); existing != client {
// a client for this regionserver is already in cache, discard this one.
client.Close()
client = existing
}
if err = isRegionEstablished(client, reg); err == nil {
// set region client so that as soon as we mark it available,
// concurrent readers are able to find the client
reg.SetClient(client)
reg.MarkAvailable()
return
} else if _, ok := err.(region.UnrecoverableError); ok {
// the client we got died
c.clientDown(client)
}
} else if err == context.Canceled {
// region is dead
reg.MarkAvailable()
return
}
log.WithFields(log.Fields{
"region": reg,
"backoff": backoff,
"err": err,
}).Debug("region was not established, retrying")
// reset address because we weren't able to connect to it
// or regionserver says it's still offline, should look up again
addr = ""
}
}
func sleepAndIncreaseBackoff(ctx context.Context, backoff time.Duration) (time.Duration, error) {
if backoff == 0 {
return backoffStart, nil
}
select {
case <-time.After(backoff):
case <-ctx.Done():
return 0, ErrDeadline
}
// TODO: Revisit how we back off here.
if backoff < 5000*time.Millisecond {
return backoff * 2, nil
} else {
return backoff + 5000*time.Millisecond, nil
}
}
func (c *client) establishRegionClient(reg hrpc.RegionInfo,
addr string) (hrpc.RegionClient, error) {
if c.clientType != adminClient {
// if rpc is not for hbasemaster, check if client for regionserver
// already exists
if client := c.clients.checkForClient(addr); client != nil {
// There's already a client
return client, nil
}
}
var clientType region.ClientType
if c.clientType == standardClient {
clientType = region.RegionClient
} else {
clientType = region.MasterClient
}
clientCtx, cancel := context.WithTimeout(reg.Context(), regionLookupTimeout)
defer cancel()
return region.NewClient(clientCtx, addr, clientType,
c.rpcQueueSize, c.flushInterval, c.effectiveUser)
}
// zkResult contains the result of a ZooKeeper lookup (when we're looking for
// the meta region or the HMaster).
type zkResult struct {
addr string
err error
}
// zkLookup asynchronously looks up the meta region or HMaster in ZooKeeper.
func (c *client) zkLookup(ctx context.Context, resource zk.ResourceName) (string, error) {
// We make this a buffered channel so that if we stop waiting due to a
// timeout, we won't block the zkLookupSync() that we start in a
// separate goroutine.
reschan := make(chan zkResult, 1)
go func() {
addr, err := c.zkClient.LocateResource(resource.Prepend(c.zkRoot))
// This is guaranteed to never block as the channel is always buffered.
reschan <- zkResult{addr, err}
}()
select {
case res := <-reschan:
return res.addr, res.err
case <-ctx.Done():
return "", ErrDeadline
}
}