Skip to content

Commit

Permalink
MInor fixes
Browse files Browse the repository at this point in the history
- Update ssl related code and nest it into describes
- Updates tests to support new broker version
- Update docker-compose to create topics
- fix check
  • Loading branch information
Argonus committed Apr 21, 2022
1 parent 9ad7bd4 commit 52568e7
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 241 deletions.
86 changes: 46 additions & 40 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,55 @@ jobs:
erlang: ['24.3']

steps:
- name: Cancel previous runs
uses: styfle/cancel-workflow-action@0.9.0
with:
access_token: ${{ github.token }}
- name: Checkout Github repo
uses: actions/checkout@v2
- name: Setup elixir & erlang enviroment
uses: actions/setup-elixir@v1
with:
elixir-version: ${{matrix.elixir}} # Define the elixir version [required]
otp-version: ${{matrix.erlang}} # Define the OTP version [required]
experimental-otp: true # More info https://github.com/actions/setup-elixir/issues/31
- name: Retrieve cached dependencies
uses: actions/cache@v2
id: mix-cache
with:
path: |
deps
_build
key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-${{ hashFiles('mix.lock') }}
- name: Install dependencies
if: steps.mix-cache.outputs.cache-hit != 'true'
run: |
mix local.rebar --force
mix local.hex --force
mix deps.get
mix deps.compile
- name: Cancel previous runs
uses: styfle/cancel-workflow-action@0.9.0
with:
access_token: ${{ github.token }}
- name: Checkout Github repo
uses: actions/checkout@v2
- name: Setup elixir & erlang enviroment
uses: actions/setup-elixir@v1
with:
elixir-version: ${{matrix.elixir}} # Define the elixir version [required]
otp-version: ${{matrix.erlang}} # Define the OTP version [required]
experimental-otp: true # More info https://github.com/actions/setup-elixir/issues/31

- name: Retrieve Mix Dependencies Cache
uses: actions/cache@v2
id: mix-cache # id to use in retrieve action
with:
path: deps
key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}

- name: Retrieve Mix Dependencies Compilation Cache
uses: actions/cache@v2
id: mix-deps-compile-cache # id to use in retrieve action
with:
path: _build
key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-mix-deps-compile-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
- name: Install Mix Dependencies
if: steps.mix-cache.outputs.cache-hit != 'true'
run: |
mix local.rebar --force
mix local.hex --force
mix deps.get
- name: Compile Mix Dependencies
if: steps.mix-deps-compile-cache.outputs.cache-hit != 'true'
run: mix deps.compile

run-checks:
name: check
runs-on: ubuntu-20.04
needs: [setup]
needs: [dependencies]
env:
MIX_ENV: dev

strategy:
fail-fast: false
matrix:
include:
- elixir: 1.10.2
erlang: 22.3
checks:
- mix credo
- mix compile --warnings-as-errors --force
- mix format --check-formatted
elixir: ['1.12.3']
erlang: ['24.3']

steps:
- uses: actions/checkout@v2
Expand All @@ -88,20 +92,22 @@ jobs:
path: _build
key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-mix-deps-compile-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}

- run: ${{ matrix.checks }}
- run: |
mix compile --force
mix credo
mix format --check-formatted
dialyzer:
name: check / dialyzer
needs: [setup]
needs: [dependencies]
runs-on: ubuntu-latest
env:
MIX_ENV: dev

strategy:
matrix:
include:
- elixir: 1.10.2
erlang: 22.3
elixir: ['1.12.3']
erlang: ['24.3']

steps:
- uses: actions/checkout@v2
Expand Down
10 changes: 6 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ services:
- ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z
- ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z

ready:
image: busybox:1.31-musl
command: tail -f /dev/null
kafka_setup:
image: confluentinc/cp-kafka:5.5.1
depends_on:
kafka-1:
condition: service_healthy
kafka-2:
condition: service_healthy
kafka-3:
condition: service_healthy
condition: service_healthy
command: "bash -c 'kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic consumer_group_implementation_test && \
kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic test0p8p0 && \
kafka-topics --zookeeper zookeeper:32181 --list'"
4 changes: 1 addition & 3 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -810,9 +810,7 @@ defmodule KafkaEx.GenConsumer do
KafkaEx.latest_offset(topic, partition, worker_name)

_ ->
raise "Offset out of range while consuming topic #{topic}, partition #{
partition
}."
raise "Offset out of range while consuming topic #{topic}, partition #{partition}."
end

%State{
Expand Down
16 changes: 4 additions & 12 deletions lib/kafka_ex/network_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ defmodule KafkaEx.NetworkClient do
err ->
Logger.log(
:error,
"Could not connect to broker #{inspect(host)}:#{inspect(port)} because of error #{
inspect(err)
}"
"Could not connect to broker #{inspect(host)}:#{inspect(port)} because of error #{inspect(err)}"
)

nil
Expand All @@ -50,9 +48,7 @@ defmodule KafkaEx.NetworkClient do
{_, reason} ->
Logger.log(
:error,
"Asynchronously sending data to broker #{inspect(broker.host)}:#{
inspect(broker.port)
} failed with #{inspect(reason)}"
"Asynchronously sending data to broker #{inspect(broker.host)}:#{inspect(broker.port)} failed with #{inspect(reason)}"
)

reason
Expand All @@ -77,9 +73,7 @@ defmodule KafkaEx.NetworkClient do
{:error, reason} ->
Logger.log(
:error,
"Receiving data from broker #{inspect(broker.host)}:#{
inspect(broker.port)
} failed with #{inspect(reason)}"
"Receiving data from broker #{inspect(broker.host)}:#{inspect(broker.port)} failed with #{inspect(reason)}"
)

Socket.close(socket)
Expand All @@ -90,9 +84,7 @@ defmodule KafkaEx.NetworkClient do
{_, reason} ->
Logger.log(
:error,
"Sending data to broker #{inspect(broker.host)}:#{
inspect(broker.port)
} failed with #{inspect(reason)}"
"Sending data to broker #{inspect(broker.host)}:#{inspect(broker.port)} failed with #{inspect(reason)}"
)

Socket.close(socket)
Expand Down
4 changes: 1 addition & 3 deletions lib/kafka_ex/protocol/fetch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ defmodule KafkaEx.Protocol.Fetch do
)
when byte_size(partial_message_data) < msg_size do
raise RuntimeError,
"Insufficient data fetched at offset #{offset}. Message size is #{
msg_size
} but only received #{byte_size(partial_message_data)} bytes. Try increasing max_bytes."
"Insufficient data fetched at offset #{offset}. Message size is #{msg_size} but only received #{byte_size(partial_message_data)} bytes. Try increasing max_bytes."
end

# handles the single message case and the batch (compression) case
Expand Down
36 changes: 9 additions & 27 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,7 @@ defmodule KafkaEx.Server do
def terminate(reason, state) do
Logger.log(
:debug,
"Shutting down worker #{inspect(state.worker_name)}, reason: #{
inspect(reason)
}"
"Shutting down worker #{inspect(state.worker_name)}, reason: #{inspect(reason)}"
)

if state.event_pid do
Expand Down Expand Up @@ -471,9 +469,7 @@ defmodule KafkaEx.Server do
nil ->
Logger.log(
:error,
"kafka_server_produce_send_request: leader for topic #{
produce_request.topic
}/#{produce_request.partition} is not available"
"kafka_server_produce_send_request: leader for topic #{produce_request.topic}/#{produce_request.partition} is not available"
)

:leader_not_available
Expand Down Expand Up @@ -678,9 +674,7 @@ defmodule KafkaEx.Server do
) do
Logger.log(
:error,
"Metadata request for topic #{inspect(topic)} failed with error_code #{
inspect(error_code)
}"
"Metadata request for topic #{inspect(topic)} failed with error_code #{inspect(error_code)}"
)

{correlation_id, %Metadata.Response{}}
Expand Down Expand Up @@ -731,9 +725,7 @@ defmodule KafkaEx.Server do
end
else
message =
"Unable to fetch metadata from any brokers. Timeout is #{
sync_timeout
}."
"Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."

Logger.log(:error, message)
raise message
Expand Down Expand Up @@ -862,9 +854,7 @@ defmodule KafkaEx.Server do
case broker do
nil ->
Logger.error(fn ->
"network_request: leader for topic #{request.topic}/#{
request.partition
} is not available"
"network_request: leader for topic #{request.topic}/#{request.partition} is not available"
end)

{{:error, :topic_not_found}, updated_state}
Expand All @@ -891,15 +881,11 @@ defmodule KafkaEx.Server do
rescue
_ ->
Logger.error(
"Failed to parse a response from the server: #{
inspect(response)
}"
"Failed to parse a response from the server: #{inspect(response)}"
)

Kernel.reraise(
"Parse error during #{inspect(module)}.parse_response. Couldn't parse: #{
inspect(response)
}",
"Parse error during #{inspect(module)}.parse_response. Couldn't parse: #{inspect(response)}",
System.stacktrace()
)
end
Expand Down Expand Up @@ -932,9 +918,7 @@ defmodule KafkaEx.Server do
Enum.each(brokers_to_remove, fn broker ->
Logger.log(
:debug,
"Closing connection to broker #{broker.node_id}: #{
inspect(broker.host)
} on port #{inspect(broker.port)}"
"Closing connection to broker #{broker.node_id}: #{inspect(broker.host)} on port #{inspect(broker.port)}"
)

NetworkClient.close_socket(broker.socket)
Expand All @@ -956,9 +940,7 @@ defmodule KafkaEx.Server do
nil ->
Logger.log(
:debug,
"Establishing connection to broker #{metadata_broker.node_id}: #{
inspect(metadata_broker.host)
} on port #{inspect(metadata_broker.port)}"
"Establishing connection to broker #{metadata_broker.node_id}: #{inspect(metadata_broker.host)} on port #{inspect(metadata_broker.port)}"
)

add_new_brokers(
Expand Down
4 changes: 1 addition & 3 deletions lib/kafka_ex/server_0_p_8_p_2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,7 @@ defmodule KafkaEx.Server0P8P2 do
) do
Logger.log(
:error,
"Fetching consumer_group #{consumer_group} metadata failed with error_code #{
inspect(error_code)
}"
"Fetching consumer_group #{consumer_group} metadata failed with error_code #{inspect(error_code)}"
)

{%ConsumerMetadataResponse{error_code: error_code}, state}
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule KafkaEx.Mixfile do
main_deps = [
{:kayrock, "~> 0.1.12"},
{:credo, "~> 1.1", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.0.0-rc.3", only: :dev, runtime: false},
{:dialyxir, "~> 1.0", only: :dev, runtime: false},
{:excoveralls, "~> 0.7", only: :test, runtime: false},
{:snappy,
git: "https://github.com/fdmanana/snappy-erlang-nif", only: [:dev, :test]}
Expand Down
4 changes: 2 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm", "4a0850c9be22a43af9920a71ab17c051f5f7d45c209e40269a1938832510e4d9"},
"crc32cer": {:hex, :crc32cer, "0.1.10", "fb87abbf34b72f180f8c3a908cd1826c6cb9a59787d156a29e05de9e98be385e", [:rebar3], [], "hexpm", "5b1f47efd0a1b4b7411f1f35e14d3c8c6da6e6a2a725ec8f2cf1ab13703e5f38"},
"credo": {:hex, :credo, "1.1.5", "caec7a3cadd2e58609d7ee25b3931b129e739e070539ad1a0cd7efeeb47014f4", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d0bbd3222607ccaaac5c0340f7f525c627ae4d7aee6c8c8c108922620c5b6446"},
"dialyxir": {:hex, :dialyxir, "1.0.0-rc.7", "6287f8f2cb45df8584317a4be1075b8c9b8a69de8eeb82b4d9e6c761cf2664cd", [:mix], [{:erlex, ">= 0.2.5", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "506294d6c543e4e5282d4852aead19ace8a35bedeb043f9256a06a6336827122"},
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
"earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"},
"erlex": {:hex, :erlex, "0.2.5", "e51132f2f472e13d606d808f0574508eeea2030d487fc002b46ad97e738b0510", [:mix], [], "hexpm", "756d3e19b056339af674b715fdd752c5dac468cf9d0e2d1a03abf4574e99fbf8"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.21.2", "caca5bc28ed7b3bdc0b662f8afe2bee1eedb5c3cf7b322feeeb7c6ebbde089d6", [:mix], [{:earmark, "~> 1.3.3 or ~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f1155337ae17ff7a1255217b4c1ceefcd1860b7ceb1a1874031e7a861b052e39"},
"excoveralls": {:hex, :excoveralls, "0.12.1", "a553c59f6850d0aff3770e4729515762ba7c8e41eedde03208182a8dc9d0ce07", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "5c1f717066a299b1b732249e736c5da96bb4120d1e55dc2e6f442d251e18a812"},
"hackney": {:hex, :hackney, "1.15.2", "07e33c794f8f8964ee86cebec1a8ed88db5070e52e904b8f12209773c1036085", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.5", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "e0100f8ef7d1124222c11ad362c857d3df7cb5f4204054f9f0f4a728666591fc"},
Expand Down
Loading

0 comments on commit 52568e7

Please sign in to comment.