Skip to content

Commit

Permalink
Merge pull request #42 from mreiferson/config_42
Browse files Browse the repository at this point in the history
unintuitive behavior when using &nsq.Config{}
  • Loading branch information
jehiah committed Jun 10, 2014
2 parents cb45642 + 71f18d9 commit 1eb2787
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 38 deletions.
89 changes: 51 additions & 38 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
// (see Config.Set() for available parameters)
type Config struct {
sync.RWMutex
initOnce sync.Once

verbose bool `opt:"verbose"`

Expand Down Expand Up @@ -62,41 +63,40 @@ type Config struct {

// NewConfig returns a new default configuration
func NewConfig() *Config {
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("ERROR: unable to get hostname %s", err.Error())
}
conf := &Config{
maxInFlight: 1,
maxAttempts: 5,

lookupdPollInterval: 60 * time.Second,
lookupdPollJitter: 0.3,

lowRdyIdleTimeout: 10 * time.Second,

defaultRequeueDelay: 90 * time.Second,
maxRequeueDelay: 15 * time.Minute,

backoffMultiplier: time.Second,
maxBackoffDuration: 120 * time.Second,

readTimeout: DefaultClientTimeout,
writeTimeout: time.Second,

deflateLevel: 6,
outputBufferSize: 16 * 1024,
outputBufferTimeout: 250 * time.Millisecond,

heartbeatInterval: DefaultClientTimeout / 2,

clientID: strings.Split(hostname, ".")[0],
hostname: hostname,
userAgent: fmt.Sprintf("go-nsq/%s", VERSION),
}
conf := &Config{}
conf.initialize()
return conf
}

// initialize is used to ensure that a Config has a baseline set of defaults
// despite how it might have been insantiated
func (c *Config) initialize() {
c.initOnce.Do(func() {
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("ERROR: unable to get hostname %s", err.Error())
}
c.maxInFlight = 1
c.maxAttempts = 5
c.lookupdPollInterval = 60 * time.Second
c.lookupdPollJitter = 0.3
c.lowRdyIdleTimeout = 10 * time.Second
c.defaultRequeueDelay = 90 * time.Second
c.maxRequeueDelay = 15 * time.Minute
c.backoffMultiplier = time.Second
c.maxBackoffDuration = 120 * time.Second
c.readTimeout = DefaultClientTimeout
c.writeTimeout = time.Second
c.deflateLevel = 6
c.outputBufferSize = 16 * 1024
c.outputBufferTimeout = 250 * time.Millisecond
c.heartbeatInterval = DefaultClientTimeout / 2
c.clientID = strings.Split(hostname, ".")[0]
c.hostname = hostname
c.userAgent = fmt.Sprintf("go-nsq/%s", VERSION)
})
}

// Set takes an option as a string and a value as an interface and
// attempts to set the appropriate configuration option.
//
Expand Down Expand Up @@ -171,6 +171,8 @@ func (c *Config) Set(option string, value interface{}) error {
c.Lock()
defer c.Unlock()

c.initialize()

val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
Expand Down Expand Up @@ -298,7 +300,7 @@ func coerceString(v interface{}) (string, error) {
switch v.(type) {
case string:
return v.(string), nil
case int, int16, uint16, int32, uint32, int64, uint64:
case int, int16, int32, int64, uint, uint16, uint32, uint64:
return fmt.Sprintf("%d", v), nil
case float64:
return fmt.Sprintf("%f", v), nil
Expand All @@ -312,9 +314,12 @@ func coerceDuration(v interface{}) (time.Duration, error) {
switch v.(type) {
case string:
return time.ParseDuration(v.(string))
case int, int16, uint16, int32, uint32, int64, uint64:
case int, int16, int32, int64:
// treat like ms
return time.Duration(reflect.ValueOf(v).Int()) * time.Millisecond, nil
case uint, uint16, uint32, uint64:
// treat like ms
return time.Duration(reflect.ValueOf(v).Uint()) * time.Millisecond, nil
case time.Duration:
return v.(time.Duration), nil
}
Expand All @@ -327,8 +332,10 @@ func coerceBool(v interface{}) (bool, error) {
return v.(bool), nil
case string:
return strconv.ParseBool(v.(string))
case int, int16, uint16, int32, uint32, int64, uint64:
case int, int16, int32, int64:
return reflect.ValueOf(v).Int() != 0, nil
case uint, uint16, uint32, uint64:
return reflect.ValueOf(v).Uint() != 0, nil
}
return false, errors.New("invalid value type")
}
Expand All @@ -337,8 +344,10 @@ func coerceFloat64(v interface{}) (float64, error) {
switch v.(type) {
case string:
return strconv.ParseFloat(v.(string), 64)
case int, int16, uint16, int32, uint32, int64, uint64:
case int, int16, int32, int64:
return float64(reflect.ValueOf(v).Int()), nil
case uint, uint16, uint32, uint64:
return float64(reflect.ValueOf(v).Uint()), nil
case float64:
return v.(float64), nil
}
Expand All @@ -349,8 +358,10 @@ func coerceInt64(v interface{}) (int64, error) {
switch v.(type) {
case string:
return strconv.ParseInt(v.(string), 10, 64)
case int, int16, uint16, int32, uint32, int64, uint64:
case int, int16, int32, int64:
return reflect.ValueOf(v).Int(), nil
case uint, uint16, uint32, uint64:
return int64(reflect.ValueOf(v).Uint()), nil
}
return 0, errors.New("invalid value type")
}
Expand All @@ -359,7 +370,9 @@ func coerceUint64(v interface{}) (uint64, error) {
switch v.(type) {
case string:
return strconv.ParseUint(v.(string), 10, 64)
case int, int16, uint16, int32, uint32, int64, uint64:
case int, int16, int32, int64:
return uint64(reflect.ValueOf(v).Int()), nil
case uint, uint16, uint32, uint64:
return reflect.ValueOf(v).Uint(), nil
}
return 0, errors.New("invalid value type")
Expand Down
1 change: 1 addition & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Conn struct {

// NewConn returns a new Conn instance
func NewConn(addr string, config *Config) *Conn {
config.initialize()
return &Conn{
addr: addr,

Expand Down
2 changes: 2 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type Consumer struct {
// The returned Consumer instance is setup with sane default values. To modify
// configuration, update the values on the returned instance before connecting.
func NewConsumer(topic string, channel string, config *Config) (*Consumer, error) {
config.initialize()

if !IsValidTopicName(topic) {
return nil, errors.New("invalid topic name")
}
Expand Down
1 change: 1 addition & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (t *ProducerTransaction) finish() {

// NewProducer returns an instance of Producer for the specified address
func NewProducer(addr string, config *Config) *Producer {
config.initialize()
return &Producer{
id: atomic.AddInt64(&instCount, 1),

Expand Down

0 comments on commit 1eb2787

Please sign in to comment.