Skip to content

Commit

Permalink
Merge pull request #437 from Argonus/support-snappyer-2
Browse files Browse the repository at this point in the history
Support snappyer 2
  • Loading branch information
joshuawscott committed Mar 29, 2021
2 parents 5c41cca + 8e774b6 commit ef482fc
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 7 deletions.
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ For more information on the v1.0 API, see
The standard approach for adding dependencies to an Elixir application applies:
add KafkaEx to the deps list in your project's mix.exs file.
You may also optionally add
[snappy-erlang-nif](https://github.com/fdmanana/snappy-erlang-nif) (required
[snappyer](https://hex.pm/packages/snappyer) (required
only if you want to use snappy compression).

```elixir
Expand All @@ -107,8 +107,10 @@ defmodule MyApp.Mixfile do
[
# add to your existing deps
{:kafka_ex, "~> 0.11"},
# if using snappy compression
# If using snappy-erlang-nif (snappy) compression
{:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif"}
# if using snappyer (snappy) compression
{:snappyer, "~> 1.2"}
]
end
end
Expand Down Expand Up @@ -457,3 +459,13 @@ an invite via [http://bit.ly/slackelixir](http://bit.ly/slackelixir).
The Slack channel is appropriate for quick questions or general design
discussions. The Slack discussion is archived at
[http://slack.elixirhq.com/kafkaex](http://slack.elixirhq.com/kafkaex).

## default snappy algorithm use snappy-erlang-nif package

It can be changed to snappyer by using this:

``` elixir
config :kafka_ex, snappy_module: :snappyer
```

Snappy erlang nif is deprecated and will be changed to :snappyer in 1.0.0 release.
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ config :kafka_ex,
certfile: File.cwd!() <> "/ssl/cert.pem",
keyfile: File.cwd!() <> "/ssl/key.pem"
],
snappy_module: :snappy,
# set this to the version of the kafka broker that you are using
# include only major.minor.patch versions. must be at least 0.8.0
# use "kayrock" for the new client
Expand Down
5 changes: 3 additions & 2 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use Mix.Config

config :ex_unit, capture_log: is_nil(System.get_env("SHOW_LOGS"))

config :kafka_ex,
sync_timeout: 60_000
config :kafka_ex, snappy_module: :snappy

config :kafka_ex, sync_timeout: 60_000

# Help debug tests that are tricky to understand
config :logger, :console,
Expand Down
8 changes: 6 additions & 2 deletions lib/kafka_ex/compression.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ defmodule KafkaEx.Compression do
"""
@spec compress(compression_type_t, binary) :: {binary, attribute_t}
def compress(:snappy, data) do
{:ok, compressed_data} = :snappy.compress(data)
{:ok, compressed_data} = snappy_module().compress(data)
{compressed_data, @snappy_attribute}
end

Expand All @@ -58,7 +58,11 @@ defmodule KafkaEx.Compression do
<<valsize::32-unsigned, value::size(valsize)-binary, rest::binary>>,
so_far
) do
{:ok, decompressed_value} = :snappy.decompress(value)
{:ok, decompressed_value} = snappy_module().decompress(value)
snappy_decompress_chunk(rest, so_far <> decompressed_value)
end

defp snappy_module do
Application.get_env(:kafka_ex, :snappy_module, :snappy)
end
end
1 change: 1 addition & 0 deletions lib/kafka_ex/consumer_group/heartbeat.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule KafkaEx.ConsumerGroup.Heartbeat do
@moduledoc false

# GenServer to send heartbeats to the broker
#
# A `HeartbeatRequest` is sent periodically by each active group member (after
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ defmodule KafkaEx.Mixfile do
{:dialyxir, "~> 1.0.0-rc.3", only: :dev, runtime: false},
{:excoveralls, "~> 0.7", only: :test, runtime: false},
{:snappy,
git: "https://github.com/fdmanana/snappy-erlang-nif", only: [:dev, :test]}
git: "https://github.com/fdmanana/snappy-erlang-nif", only: [:dev, :test]},
{:snappyer, "~> 1.2", only: [:dev, :test]}
]

# we need a newer version of ex_doc, but it will cause problems on older
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"},
"varint": {:hex, :varint, "1.2.0", "61bffd9dcc2d5242d59f75694506b4d4013bb103f6a23e34b94f89cebb0c1ab3", [:mix], [], "hexpm", "d94941ed8b9d1a5fdede9103a5e52035bd0aaf35081d44e67713a36799927e47"},
"snappyer": {:hex, :snappyer, "1.2.8", "201ce9067a33c71a6a5087c0c3a49a010b17112d461e6df696c722dcb6d0934a", [:rebar3], [], "hexpm", "35518e79a28548b56d8fd6aee2f565f12f51c2d3d053f9cfa817c83be88c4f3d"},
}
6 changes: 6 additions & 0 deletions test/compression_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ defmodule CompressionTest do
<<0, 0, 0, 0, 0, 0, 0, 37, 0, 0, 3, 246, 10, 44, 16, 236, 0, 0, 255, 255,
255, 255, 0, 0, 3, 232>> <> String.duplicate("ABCDEFGHIJ", 100)

## enable :snappy module, and test it
Application.put_env(:kafka_ex, :snappy_module, :snappy)
assert expected == KafkaEx.Compression.decompress(2, data)

## enable :snappyer module, and test it
Application.put_env(:kafka_ex, :snappy_module, :snappyer)
assert expected == KafkaEx.Compression.decompress(2, data)
end
end

0 comments on commit ef482fc

Please sign in to comment.