Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

telegraf tcp_listener can corrupt data #991

Closed
mb-m opened this issue Apr 7, 2016 · 8 comments
Closed

telegraf tcp_listener can corrupt data #991

mb-m opened this issue Apr 7, 2016 · 8 comments

Comments

@mb-m
Copy link

mb-m commented Apr 7, 2016

The telegraf tcp_listener input plugin is great for allowing measurements to be injected into telegraf from some outside scheduling (eg. sensu-client), however, its use of bufio.Scanner appears to be problematic.

From the code:

    scanner := bufio.NewScanner(conn)
    for {
        select {
        case <-t.done:
            return
        default:
            if !scanner.Scan() {
                return
            }
            select {
            case t.in <- scanner.Bytes():
            default:
                log.Printf(dropwarn)
            }
        }
    }

The problem comes with the select { case t.in <- scanner.Bytes(): }. From the bufio docs:

Bytes returns the most recent token generated by a call to Scan. The underlying array may point to data that will be overwritten by a subsequent call to Scan. It does no allocation.

It's quite possible for there to be multiple lines off conn at a time, in which case, that for loop may have gone round more than once (with scanner.Scan() having changed the underlying slice-handle or data reference passed onto t.in by the time the tcpParser gets to select and pull off the t.in channel). The fix would be to change this to something like:

    scanner := bufio.NewScanner(conn)
    for {
        select {
        case <-t.done:
            return
        default:
            if !scanner.Scan() {
                return
            }
            s_bytes := scanner.Bytes()
            t_bytes := make([]byte, len(s_bytes))
            copy(t_bytes, s_bytes)
            select {
            case t.in <- t_bytes:
            default:
                log.Printf(dropwarn)
            }
        }
    }

which means that each underlying []byte passed onto the t.in channel is independent and its underlying array structure is immutable (by scanner, at least).

(edited to correct the go syntax as described in the comment below)

@sparrc
Copy link
Contributor

sparrc commented Apr 7, 2016

good idea, thanks for pointing this out. I very recently changed the TCP and UDP listeners in an attempt to reduce allocations. I'm surprised my testing did not expose this....

@mb-m
Copy link
Author

mb-m commented Apr 7, 2016

@sparrc it's a race - I'm throwing a lot of data at the tcp listener, and even when I tried to change the packet sizes, there was still lots of data coming in at once, and I was seeing data being dropped from when I'd switched earlier today from direct http to influxdb to telegraf's TCP listener. Call it luck ;-) (or perhaps not). I'm not sure there's even an easy way to set up a test-case for this, because it depends on how fast the parser goroutine picks up off the channel and when exactly the bufio.Scanner decides to memmove (or its equivalent) the underlying array data. Either way, my pretty trivial fix should work (though I admit to not actually having a compile environment etc - hence no PR).

@mb-m
Copy link
Author

mb-m commented Apr 7, 2016

(oh, and thanks for jumping on this so quickly too!)

@sparrc
Copy link
Contributor

sparrc commented Apr 7, 2016

@mb-m, are you building off of master? I don't think that release 0.12 should have this bug, but I could be wrong (I'm likely going to remove bufio.Scanner completely, because it's a bit of a black box and I'd rather just allocate the buffers explicitly).

sparrc added a commit that referenced this issue Apr 7, 2016
@mb-m
Copy link
Author

mb-m commented Apr 7, 2016

I'm not - I believe we're using a pre-packaged 0.12.0, but I just came to github to read the code and realised that the []byte slice would have that effect because of the docs for bufio.Scanner.Bytes(). If the code for 0.12.0 is different, then there's a different bug corrupting lines somewhere, and I have to admit to not having looked into the issue.

@mb-m
Copy link
Author

mb-m commented Apr 7, 2016

Oops, I've just realised my lines "s_bytes" and "t_bytes" in my example should actually be := (I blame not having done much go this year for that mistake), but I also notice that 0.12.0 doesn't change much (except you have the variable buf as an intermediary - you're still not creating a copy, which is what you need to do).

@mb-m
Copy link
Author

mb-m commented Apr 7, 2016

And if you're going to remove the bufio.Scanner, I think the right way to deal with it is to read all the data until the socket is closed, and then pass that onto t.in

sparrc added a commit that referenced this issue Apr 7, 2016
sparrc added a commit that referenced this issue Apr 7, 2016
sparrc added a commit that referenced this issue Apr 7, 2016
@sparrc sparrc closed this as completed in be379f3 Apr 8, 2016
@mb-m
Copy link
Author

mb-m commented Apr 11, 2016

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants