Skip to content

Commit

Permalink
[katran]: adding iobuf writer
Browse files Browse the repository at this point in the history
today KatranMonitor storing packets on a disk in pcap format.
sometime it is more usefull to store em in memory and allow for remote client
get em (e.g. through thrift/grpc call). in this diff i'm adding such ability
by implementing IOBufWriter

Tested By:
modified katran_tester. making sure that iobuf contains proper pcap
data (by writing this data in a file in tester, and then reading
w/ tcpdump)
  • Loading branch information
tehnerd committed Sep 30, 2019
1 parent f0e86b8 commit 4b53616
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 19 deletions.
2 changes: 2 additions & 0 deletions katran/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ target_include_directories(
add_library(pcapwriter STATIC
ByteRangeWriter.cpp
FileWriter.cpp
IOBufWriter.cpp
PcapMsg.cpp
PcapMsgMeta.cpp
PcapWriter.cpp
ByteRangeWriter.h
IOBufWriter.h
DataWriter.h
FileWriter.h
PcapMsg.h
Expand Down
38 changes: 38 additions & 0 deletions katran/lib/IOBufWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; version 2 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/

#include "katran/lib/IOBufWriter.h"
#include <cstring>

namespace katran {

IOBufWriter::IOBufWriter(folly::IOBuf* iobuf)
: iobuf_(iobuf) {}

void IOBufWriter::writeData(const void* ptr, std::size_t size) {
::memcpy(static_cast<void*>(iobuf_->writableTail()), ptr, size);
iobuf_->append(size);
}

bool IOBufWriter::available(std::size_t amount) {
return iobuf_->tailroom() >= amount;
}

bool IOBufWriter::restart() {
iobuf_->clear();
return true;
}

} // namespace katran
47 changes: 47 additions & 0 deletions katran/lib/IOBufWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; version 2 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#pragma once

#include <memory>
#include <string>
#include <folly/io/IOBuf.h>

#include "katran/lib/DataWriter.h"

namespace katran {

/**
* IOBufWriter is used to write pcap-data into IOBuf.
*/
class IOBufWriter : public DataWriter {
public:
/**
* @param unique_ptr<IOBuf> iobuf for packets to written into
*/
explicit IOBufWriter(folly::IOBuf* iobuf);

void writeData(const void* ptr, std::size_t size) override;

bool available(std::size_t amount) override;

bool restart() override;

bool stop() override {return true;}

private:
folly::IOBuf* iobuf_;
};

} // namespace katran
9 changes: 9 additions & 0 deletions katran/lib/KatranLb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,14 @@ bool KatranLb::stopKatranMonitor() {
return true;
}

std::unique_ptr<folly::IOBuf> KatranLb::getKatranMonitorEventBuffer(int event) {
if (!features_.introspection) {
return nullptr;
}
return monitor_->getEventBuffer(event);
}


bool KatranLb::restartKatranMonitor(uint32_t limit) {
if (!features_.introspection) {
return false;
Expand All @@ -877,6 +885,7 @@ KatranMonitorStats KatranLb::getKatranMonitorStats() {
auto writer_stats = monitor_->getWriterStats();
stats.limit = writer_stats.limit;
stats.amount = writer_stats.amount;
stats.bufferExceedCount = writer_stats.bufferExceedCount;
return stats;
}

Expand Down
12 changes: 12 additions & 0 deletions katran/lib/KatranLb.h
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,18 @@ class KatranLb {
*/
bool restartKatranMonitor(uint32_t limit);

/**
* @param int event monitoring event it. see balancer_consts.h
* @return unique_ptr<IOBuf> on success or nullptr otherwise
*
* getKatranMonitorEventBuffer return iobuf which contains all the packets
* for specified event. if event number was not defined or
* PcapStorageFormat was not set to IOBUF nullptr would be returned.
* This function is not thread safe. underlying IOBuf, when accessed while
* monitoring is still running, could point to partially written packet
*/
std::unique_ptr<folly::IOBuf> getKatranMonitorEventBuffer(int event);

/**
* @return KatranMonitorStats stats from katran monitor
*
Expand Down
11 changes: 11 additions & 0 deletions katran/lib/KatranLbStructs.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ enum class AddressType {
NETWORK,
};

/**
* types of monitoring
*/
enum class PcapStorageFormat {
FILE,
IOBUF,
};

/**
* @param uint32_t nCpus number of cpus
* @param uint32_t pages number of pages for even pipe shared memory
Expand All @@ -107,6 +115,8 @@ struct KatranMonitorConfig {
uint32_t snapLen{kDefaultMonitorSnapLen};
uint32_t maxEvents{kDefaultMonitorMaxEvents};
std::string path{"/tmp/katran_pcap"};
PcapStorageFormat storage{PcapStorageFormat::FILE};
uint32_t bufferSize{0};
};

/**
Expand Down Expand Up @@ -188,6 +198,7 @@ struct KatranConfig {
struct KatranMonitorStats {
uint32_t limit{0};
uint32_t amount{0};
uint32_t bufferExceedCount{0};
};

/**
Expand Down
28 changes: 24 additions & 4 deletions katran/lib/KatranMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <folly/io/async/ScopedEventBaseThread.h>

#include "katran/lib/FileWriter.h"
#include "katran/lib/IOBufWriter.h"
#include "katran/lib/KatranEventReader.h"


Expand Down Expand Up @@ -49,11 +50,18 @@ KatranMonitor::KatranMonitor(const KatranMonitorConfig& config)
throw std::runtime_error("none of eventReaders were initialized");
}

std::vector<std::shared_ptr<DataWriter>> data_writers;
std::vector<std::shared_ptr<DataWriter>> data_writers;
for (int i = 0; i < config_.maxEvents; i++) {
std::string fname;
folly::toAppend(config_.path, "_", i, &fname);
data_writers.push_back(std::make_shared<FileWriter>(fname));
if (config_.storage == PcapStorageFormat::FILE) {
std::string fname;
folly::toAppend(config_.path, "_", i, &fname);
data_writers.push_back(std::make_shared<FileWriter>(fname));
} else {
// PcapStorageFormat::IOBuf
buffers_.emplace_back(folly::IOBuf::create(config_.bufferSize));
data_writers.push_back(
std::make_shared<IOBufWriter>(buffers_.back().get()));
}
}

writer_ = std::make_shared<PcapWriter>(
Expand Down Expand Up @@ -84,6 +92,18 @@ void KatranMonitor::restartMonitor(uint32_t limit) {
queue_->blockingWrite(std::move(msg));
}

std::unique_ptr<folly::IOBuf> KatranMonitor::getEventBuffer(int event) {
if (buffers_.size() == 0) {
LOG(ERROR) << "PcapStorageFormat is not set to IOBuf";
return nullptr;
}
if (event < 0 || event > (buffers_.size() - 1)) {
LOG(ERROR) << "Undefined event id";
return nullptr;
}
return buffers_[event]->cloneOne();
}

PcapWriterStats KatranMonitor::getWriterStats() {
return writer_->getStats();
}
Expand Down
9 changes: 9 additions & 0 deletions katran/lib/KatranMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <thread>
#include <vector>
#include <folly/io/IOBuf.h>
#include <folly/MPMCQueue.h>

#include "katran/lib/KatranLbStructs.h"
Expand Down Expand Up @@ -50,6 +51,8 @@ class KatranMonitor {
void restartMonitor(uint32_t limit);

PcapWriterStats getWriterStats();

std::unique_ptr<folly::IOBuf> getEventBuffer(int event);

private:
/**
Expand Down Expand Up @@ -77,6 +80,12 @@ class KatranMonitor {
* thread which runs pcap writer
*/
std::thread writerThread_;

/**
* vector of iobufs where we store packets if IOBUF storage
* is being used
*/
std::vector<std::unique_ptr<folly::IOBuf>> buffers_;
};

} // namespace katran
12 changes: 7 additions & 5 deletions katran/lib/PcapWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ bool PcapWriter::writePcapHeader(uint32_t writerId) {
LOG(ERROR) << "no writer w/ specified ID: " << writerId;
return false;
}
if (!dataWriters_[writerId]->available(sizeof(struct pcap_hdr_s))) {
LOG(ERROR) << "DataWriter failed to write a header. Too few space.";
return false;
}
if (headerExists_[writerId]) {
VLOG(4) << "header already exists";
return true;
}
if (!dataWriters_[writerId]->available(sizeof(struct pcap_hdr_s))) {
LOG(ERROR) << "DataWriter failed to write a header. Not enough space.";
return false;
}
struct pcap_hdr_s hdr {
.magic_number = kPcapWriterMagic, .version_major = kVersionMajor,
.version_minor = kVersionMinor, .thiszone = kGmt, .sigfigs = kAccuracy,
Expand Down Expand Up @@ -116,7 +116,7 @@ void PcapWriter::run(std::shared_ptr<folly::MPMCQueue<PcapMsg>> queue) {
}
if (!dataWriters_[kDefaultWriter]->available(
msg.getCapturedLen() + sizeof(pcaprec_hdr_s))) {
LOG(INFO) << "DataWriter is full.";
++bufferExceedCount_;
break;
}
writePacket(msg, kDefaultWriter);
Expand All @@ -129,6 +129,7 @@ PcapWriterStats PcapWriter::getStats() {
Guard lock(cntrLock_);
stats.limit = packetLimit_;
stats.amount = packetAmount_;
stats.bufferExceedCount = bufferExceedCount_;
return stats;
}

Expand Down Expand Up @@ -183,6 +184,7 @@ void PcapWriter::runMulti(
msg.getPcapMsg().trim(snaplen);
if (!dataWriters_[msg.getEventId()]->available(
msg.getPcapMsg().getCapturedLen() + sizeof(pcaprec_hdr_s))) {
++bufferExceedCount_;
continue;
}
writePacket(msg.getPcapMsg(), msg.getEventId());
Expand Down
7 changes: 7 additions & 0 deletions katran/lib/PcapWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
struct PcapWriterStats {
uint32_t limit{0};
uint32_t amount{0};
uint32_t bufferExceedCount{0};
};

namespace katran {
Expand Down Expand Up @@ -154,6 +155,12 @@ class PcapWriter {
*/
uint32_t packetLimit_{0};

/**
* Number of bufferExceedCount events: when writer does not have enough
* space to write packet
*/
uint32_t bufferExceedCount_{0};

/**
* Max number of bytes to be stored.
*/
Expand Down
Loading

0 comments on commit 4b53616

Please sign in to comment.