diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 6fc9cade1..c358c22e6 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -71,6 +71,10 @@ func (id trackingMessageID) ack() bool { return true } +func (id messageID) isEntryIDValid() bool { + return id.entryID >= 0 +} + func (id messageID) greater(other messageID) bool { if id.ledgerID != other.ledgerID { return id.ledgerID > other.ledgerID diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 8caaff4c9..d76865ef8 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -161,15 +161,15 @@ func (r *reader) HasNext() bool { func (r *reader) hasMoreMessages() bool { if !r.pc.lastDequeuedMsg.Undefined() { - return r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID) + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID) } if r.pc.options.startMessageIDInclusive { - return r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID) + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID) } // Non-inclusive - return r.lastMessageInBroker.greater(r.pc.startMessageID.messageID) + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.startMessageID.messageID) } func (r *reader) Close() { diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 793dc8dc8..36204cf28 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -312,6 +312,26 @@ func TestReaderOnLatestWithBatching(t *testing.T) { cancel() } +func TestReaderHasNextAgainstEmptyTopic(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + // create reader on 5th message (not included) + reader, err := client.CreateReader(ReaderOptions{ + Topic: "an-empty-topic", + StartMessageID: EarliestMessageID(), + }) + + assert.Nil(t, err) + defer reader.Close() + + assert.Equal(t, reader.HasNext(), false) +} + func TestReaderHasNext(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL,