Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: e2e rel poc - reconnection, new lamport Ts, logging #1220

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions examples/chat2-reliable/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func (c *Chat) receiveMessages() {

func (c *Chat) parseInput() {
defer c.wg.Done()

var disconnectedPeers []peer.ID

for {
select {
case <-c.ctx.Done():
Expand Down Expand Up @@ -267,10 +270,32 @@ func (c *Chat) parseInput() {
/connect multiaddress - dials a node adding it to the list of connected peers
/peers - list of peers connected to this node
/nick newNick - change the user's nickname
/disconnect - disconnect from all currently connected peers
/reconnect - attempt to reconnect to previously disconnected peers
/exit - closes the app`)
return
}

// Disconnect from peers
if line == "/disconnect" {
disconnectedPeers = c.disconnectFromPeers()
c.ui.InfoMessage("Disconnected from all peers. Use /reconnect to reconnect.")
return
}

// Reconnect to peers
if line == "/reconnect" {
if len(disconnectedPeers) == 0 {
c.ui.InfoMessage("No disconnection active. Use /disconnect first.")
} else {
c.reconnectToPeers(disconnectedPeers)
disconnectedPeers = nil
c.ui.InfoMessage("Reconnection initiated.")
}
return
}

// If no command matched, send as a regular message
c.SendMessage(line)
}()
}
Expand Down Expand Up @@ -521,6 +546,29 @@ func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {
}
}

func (c *Chat) disconnectFromPeers() []peer.ID {
disconnectedPeers := c.node.Host().Network().Peers()
for _, peerID := range disconnectedPeers {
c.node.Host().Network().ClosePeer(peerID)
}
return disconnectedPeers
}

func (c *Chat) reconnectToPeers(peers []peer.ID) {
for _, peerID := range peers {
// We're using a goroutine here to avoid blocking if a peer is unreachable
go func(p peer.ID) {
ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
defer cancel()
if _, err := c.node.Host().Network().DialPeer(ctx, p); err != nil {
c.ui.ErrorMessage(fmt.Errorf("failed to reconnect to peer %s: %w", p, err))
} else {
c.ui.InfoMessage(fmt.Sprintf("Successfully reconnected to peer %s", p))
}
}(peerID)
}
}

func generateUniqueID() string {
return uuid.New().String()
}
Expand Down
2 changes: 1 addition & 1 deletion examples/chat2-reliable/chat_reliability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestLamportTimestamps(t *testing.T) {
return true
}, 30*time.Second, 1*time.Second, "Message propagation failed")

assert.Equal(t, int32(1), env.chats[0].getLamportTimestamp(), "Sender's Lamport timestamp should be 1")
assert.Greater(t, env.chats[0].getLamportTimestamp(), int32(0), "Sender's Lamport timestamp should be greater than 0")
assert.Greater(t, env.chats[1].getLamportTimestamp(), int32(0), "Node 1's Lamport timestamp should be greater than 0")
assert.Greater(t, env.chats[2].getLamportTimestamp(), int32(0), "Node 2's Lamport timestamp should be greater than 0")

Expand Down
60 changes: 51 additions & 9 deletions examples/chat2-reliable/reliability.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"context"
"errors"
"fmt"
"log"
"os"
"strings"
"time"

"github.com/bits-and-blooms/bloom/v3"
Expand All @@ -18,15 +21,17 @@ const (
bufferSweepInterval = 5 * time.Second
syncMessageInterval = 30 * time.Second
messageAckTimeout = 60 * time.Second
maxRetries = 3
retryBaseDelay = 1 * time.Second
maxRetryDelay = 10 * time.Second
maxRetries = 5
retryBaseDelay = 3 * time.Second
maxRetryDelay = 30 * time.Second
ackTimeout = 5 * time.Second
maxResendAttempts = 5
resendBaseDelay = 1 * time.Second
maxResendDelay = 30 * time.Second
)

var reliabilityLogger *log.Logger

func (c *Chat) initReliabilityProtocol() {
c.wg.Add(4)
c.setupMessageRequestHandler()
Expand All @@ -37,6 +42,18 @@ func (c *Chat) initReliabilityProtocol() {
go c.startEagerPushMechanism()
}

func init() {
file, err := os.OpenFile("reliability.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
reliabilityLogger = log.New(file, "", log.LstdFlags)
}

func (c *Chat) logReliabilityEvent(message string) {
reliabilityLogger.Println(message)
}

func (c *Chat) startEagerPushMechanism() {
defer c.wg.Done()

Expand Down Expand Up @@ -140,6 +157,7 @@ func (c *Chat) processReceivedMessage(msg *pb.Message) {
c.ui.ChatMessage(int64(c.getLamportTimestamp()), msg.SenderId, msg.Content)
// Add to message history
c.addToMessageHistory(msg)
c.logReliabilityEvent(fmt.Sprintf("Processed message %s with Lamport timestamp %d", msg.MessageId, msg.LamportTimestamp))
}

// Process any messages in the buffer that now have their dependencies met
Expand All @@ -151,6 +169,7 @@ func (c *Chat) processReceivedMessage(msg *pb.Message) {
}
// Add to incoming buffer
c.addToIncomingBuffer(msg)
c.logReliabilityEvent(fmt.Sprintf("Message %s buffered due to missing dependencies: %v", msg.MessageId, missingDeps))
}
}

Expand Down Expand Up @@ -218,6 +237,7 @@ func (c *Chat) requestMissingMessage(messageID string) {
missedMsg, err := c.doRequestMissingMessageFromPeers(messageID)
if err == nil {
c.processReceivedMessage(missedMsg)
c.logReliabilityEvent(fmt.Sprintf("Successfully retrieved missing message %s", messageID))
return
}

Expand All @@ -229,7 +249,7 @@ func (c *Chat) requestMissingMessage(messageID string) {
time.Sleep(delay)
}

c.ui.ErrorMessage(fmt.Errorf("failed to retrieve missing message %s after %d attempts", messageID, maxRetries))
c.logReliabilityEvent(fmt.Sprintf("Failed to retrieve missing message %s after %d attempts", messageID, maxRetries))
}

func (c *Chat) checkCausalDependencies(msg *pb.Message) []string {
Expand Down Expand Up @@ -279,6 +299,21 @@ func (c *Chat) addToMessageHistory(msg *pb.Message) {
if len(c.messageHistory) > maxMessageHistory {
c.messageHistory = c.messageHistory[len(c.messageHistory)-maxMessageHistory:]
}

c.logReliabilityEvent(fmt.Sprintf("Added message %s to history at position %d with Lamport timestamp %d", msg.MessageId, insertIndex, msg.LamportTimestamp))

// Log the entire message history
c.logMessageHistory()
}

func (c *Chat) logMessageHistory() {
var historyLog strings.Builder
historyLog.WriteString("Current Message History:\n")
for i, msg := range c.messageHistory {
historyLog.WriteString(fmt.Sprintf("%d. MessageID: %s, Sender: %s, Lamport: %d, Content: %s\n",
i+1, msg.MessageId, msg.SenderId, msg.LamportTimestamp, msg.Content))
}
c.logReliabilityEvent(historyLog.String())
}

func (c *Chat) periodicBufferSweep() {
Expand Down Expand Up @@ -395,22 +430,29 @@ func (c *Chat) addToIncomingBuffer(msg *pb.Message) {
c.incomingBuffer = append(c.incomingBuffer, msg)
}

func (c *Chat) incLamportTimestamp() {
func (c *Chat) incLamportTimestamp() int32 {
c.lamportTSMutex.Lock()
defer c.lamportTSMutex.Unlock()
c.lamportTimestamp++
now := int32(time.Now().Unix())
c.lamportTimestamp = max32(now, c.lamportTimestamp+1)
return c.lamportTimestamp
}

func (c *Chat) updateLamportTimestamp(msgTs int32) {
c.lamportTSMutex.Lock()
defer c.lamportTSMutex.Unlock()
if msgTs > c.lamportTimestamp {
c.lamportTimestamp = msgTs
}
c.lamportTimestamp = max32(msgTs, c.lamportTimestamp)
}

func (c *Chat) getLamportTimestamp() int32 {
c.lamportTSMutex.Lock()
defer c.lamportTSMutex.Unlock()
return c.lamportTimestamp
}

func max32(a, b int32) int32 {
if a > b {
return a
}
return b
}
Loading