From fc5265b7a365f9e4d73240d71077c4b844c6aade Mon Sep 17 00:00:00 2001 From: Fazal Majid Date: Fri, 2 Jul 2021 11:17:52 +0200 Subject: [PATCH] mitigation for #28 possible data corruption if the metadata for a queue was not synced on previous shutdown --- diskqueue.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index 26b34380..4f3abb8b 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -473,6 +473,29 @@ func (d *diskQueue) retrieveMetaData() error { d.nextReadFileNum = d.readFileNum d.nextReadPos = d.readPos + // if the metadata was not sync'd at the last shutdown of nsqd + // then the actual file size might actually be larger than the writePos, + // in which case the safest thing to do is skip to the next file for + // writes, and let the reader salvage what it can from the messages in the + // diskqueue beyond the metadata's likely also stale readPos + fileName = d.fileName(d.writeFileNum) + fileInfo, err := os.Stat(fileName) + if err != nil { + return err + } + fileSize := fileInfo.Size() + if d.writePos < fileSize { + d.logf(WARN, + "DISKQUEUE(%s) %s metadata writePos %d < file size of %d, skipping to new file", + d.name, fileName, d.writePos, fileSize) + d.writeFileNum += 1 + d.writePos = 0 + if d.writeFile != nil { + d.writeFile.Close() + d.writeFile = nil + } + } + return nil }