Skip to content

Commit

Permalink
Update kayrock, more compression tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dantswain committed Sep 10, 2019
1 parent d29e912 commit 7197a90
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 6 deletions.
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule KafkaEx.Mixfile do

defp deps do
[
{:kayrock, "~> 0.1.5"},
{:kayrock, "~> 0.1.7"},
{:credo, "~> 0.8.10", only: :dev},
{:dialyxir, "~> 1.0.0-rc.3", only: :dev},
{:excoveralls, "~> 0.7", only: :test},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"hackney": {:hex, :hackney, "1.8.6", "21a725db3569b3fb11a6af17d5c5f654052ce9624219f1317e8639183de4a423", [:rebar3], [{:certifi, "1.2.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.0.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"idna": {:hex, :idna, "5.0.2", "ac203208ada855d95dc591a764b6e87259cb0e2a364218f215ad662daa8cd6b4", [:rebar3], [{:unicode_util_compat, "0.2.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], [], "hexpm"},
"kayrock": {:hex, :kayrock, "0.1.5", "df4159d2be8a89c0aabc6d65d2c87a711d3c857fc2d09cafe80755b32f02d15a", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"},
"kayrock": {:hex, :kayrock, "0.1.7", "6b04b5181cf358c31ba16807dc2dd8b25e5df442b9190e6762c03f42c6ab545f", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"},
"makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"},
Expand Down
256 changes: 252 additions & 4 deletions test/integration/kayrock/record_batch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do
assert message.offset == offset
end

test "compression - produce v0, read v3", %{client: client} do
test "gzip compression - produce v0, fetch v3", %{client: client} do
topic = "food"
msg = TestHelper.generate_random_string()

Expand All @@ -206,7 +206,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: offset - 2,
offset: max(offset - 2, 0),
auto_commit: false,
worker_name: client,
protocol_version: 3
Expand All @@ -220,7 +220,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do
assert message.offset == offset
end

test "compression - produce v0, read v5", %{client: client} do
test "gzip compression - produce v0, fetch v5", %{client: client} do
topic = "food"
msg = TestHelper.generate_random_string()

Expand All @@ -237,7 +237,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: offset - 2,
offset: max(offset - 2, 0),
auto_commit: false,
worker_name: client,
protocol_version: 5
Expand All @@ -250,4 +250,252 @@ defmodule KafkaEx.KayrockRecordBatchTest do
assert message.value == msg
assert message.offset == offset
end

test "gzip compression - produce v3, fetch v0", %{client: client} do
topic = "food"
msg = TestHelper.generate_random_string()

{:ok, offset} =
KafkaEx.produce(
topic,
0,
msg,
worker_name: client,
required_acks: 1,
compression: :gzip,
protocol_version: 3
)

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
auto_commit: false,
worker_name: client,
protocol_version: 0
)

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

assert message.value == msg
assert message.offset == offset
end

test "gzip compression - produce v3, fetch v3", %{client: client} do
topic = "food"
msg = TestHelper.generate_random_string()

{:ok, offset} =
KafkaEx.produce(
topic,
0,
msg,
worker_name: client,
required_acks: 1,
compression: :gzip,
protocol_version: 3
)

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
auto_commit: false,
worker_name: client,
protocol_version: 0
)

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

assert message.value == msg
assert message.offset == offset
end

test "gzip compression - produce v3, fetch v5", %{client: client} do
topic = "food"
msg = TestHelper.generate_random_string()

{:ok, offset} =
KafkaEx.produce(
topic,
0,
msg,
worker_name: client,
required_acks: 1,
compression: :gzip,
protocol_version: 3
)

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
auto_commit: false,
worker_name: client,
protocol_version: 0
)

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

assert message.value == msg
assert message.offset == offset
end

test "snappy compression - produce v0, fetch v3", %{client: client} do
topic = "food"
msg = TestHelper.generate_random_string()

{:ok, offset} =
KafkaEx.produce(
topic,
0,
msg,
worker_name: client,
required_acks: 1,
compression: :snappy,
protocol_version: 0
)

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
auto_commit: false,
worker_name: client,
protocol_version: 3
)

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

assert message.value == msg
assert message.offset == offset
end

test "snappy compression - produce v0, fetch v5", %{client: client} do
topic = "food"
msg = TestHelper.generate_random_string()

{:ok, offset} =
KafkaEx.produce(
topic,
0,
msg,
worker_name: client,
required_acks: 1,
compression: :snappy,
protocol_version: 0
)

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
auto_commit: false,
worker_name: client,
protocol_version: 5
)

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

assert message.value == msg
assert message.offset == offset
end

test "snappy compression - produce v3, fetch v0", %{client: client} do
topic = "food"
msg = TestHelper.generate_random_string()

{:ok, offset} =
KafkaEx.produce(
topic,
0,
msg,
worker_name: client,
required_acks: 1,
compression: :snappy,
protocol_version: 3
)

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
auto_commit: false,
worker_name: client,
protocol_version: 0
)

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

assert message.value == msg
assert message.offset == offset
end

test "snappy compression - produce v3, fetch v3", %{client: client} do
topic = "food"
msg = TestHelper.generate_random_string()

{:ok, offset} =
KafkaEx.produce(
topic,
0,
msg,
worker_name: client,
required_acks: 1,
compression: :snappy,
protocol_version: 3
)

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
auto_commit: false,
worker_name: client,
protocol_version: 0
)

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

assert message.value == msg
assert message.offset == offset
end

test "snappy compression - produce v3, fetch v5", %{client: client} do
topic = "food"
msg = TestHelper.generate_random_string()

{:ok, offset} =
KafkaEx.produce(
topic,
0,
msg,
worker_name: client,
required_acks: 1,
compression: :snappy,
protocol_version: 3
)

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
auto_commit: false,
worker_name: client,
protocol_version: 0
)

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

assert message.value == msg
assert message.offset == offset
end
end

0 comments on commit 7197a90

Please sign in to comment.