Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quic: various additional cleanups, fixes in Endpoint #51340

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@
'test/cctest/test_node_crypto.cc',
'test/cctest/test_node_crypto_env.cc',
'test/cctest/test_quic_cid.cc',
'test/cctest/test_quic_error.cc',
'test/cctest/test_quic_tokens.cc',
],
'node_cctest_inspector_sources': [
Expand Down
6 changes: 3 additions & 3 deletions src/crypto/crypto_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static const char system_cert_path[] = NODE_OPENSSL_SYSTEM_CERT_PATH;

static bool extra_root_certs_loaded = false;

inline X509_STORE* GetOrCreateRootCertStore() {
X509_STORE* GetOrCreateRootCertStore() {
// Guaranteed thread-safe by standard, just don't use -fno-threadsafe-statics.
static X509_STORE* store = NewRootCertStore();
return store;
Expand Down Expand Up @@ -140,6 +140,8 @@ int SSL_CTX_use_certificate_chain(SSL_CTX* ctx,
return ret;
}

} // namespace

// Read a file that contains our certificate in "PEM" format,
// possibly followed by a sequence of CA certificates that should be
// sent to the peer in the Certificate message.
Expand Down Expand Up @@ -194,8 +196,6 @@ int SSL_CTX_use_certificate_chain(SSL_CTX* ctx,
issuer);
}

} // namespace

X509_STORE* NewRootCertStore() {
static std::vector<X509*> root_certs_vector;
static Mutex root_certs_vector_mutex;
Expand Down
7 changes: 7 additions & 0 deletions src/crypto/crypto_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ void IsExtraRootCertsFileLoaded(

X509_STORE* NewRootCertStore();

X509_STORE* GetOrCreateRootCertStore();

BIOPointer LoadBIO(Environment* env, v8::Local<v8::Value> v);

class SecureContext final : public BaseObject {
Expand Down Expand Up @@ -153,6 +155,11 @@ class SecureContext final : public BaseObject {
unsigned char ticket_key_hmac_[16];
};

int SSL_CTX_use_certificate_chain(SSL_CTX* ctx,
BIOPointer&& in,
X509Pointer* cert,
X509Pointer* issuer);

} // namespace crypto
} // namespace node

Expand Down
240 changes: 151 additions & 89 deletions src/quic/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "application.h"
#include <async_wrap-inl.h>
#include <debug_utils-inl.h>
#include <ngtcp2/ngtcp2.h>
#include <node_bob.h>
#include <node_sockaddr-inl.h>
#include <uv.h>
Expand Down Expand Up @@ -95,6 +96,20 @@ Maybe<Session::Application_Options> Session::Application_Options::From(
return Just<Application_Options>(options);
}

// ============================================================================

std::string Session::Application::StreamData::ToString() const {
DebugIndentScope indent;
auto prefix = indent.Prefix();
std::string res("{");
res += prefix + "count: " + std::to_string(count);
res += prefix + "remaining: " + std::to_string(remaining);
res += prefix + "id: " + std::to_string(id);
res += prefix + "fin: " + std::to_string(fin);
res += indent.Close();
return res;
}

Session::Application::Application(Session* session, const Options& options)
: session_(session) {}

Expand Down Expand Up @@ -189,7 +204,7 @@ Packet* Session::Application::CreateStreamDataPacket() {
return Packet::Create(env(),
session_->endpoint_.get(),
session_->remote_address_,
ngtcp2_conn_get_max_tx_udp_payload_size(*session_),
session_->max_packet_size(),
"stream data");
}

Expand Down Expand Up @@ -221,141 +236,188 @@ void Session::Application::StreamReset(Stream* stream,
}

void Session::Application::SendPendingData() {
static constexpr size_t kMaxPackets = 32;
Debug(session_, "Application sending pending data");
PathStorage path;
StreamData stream_data;

Packet* packet = nullptr;
uint8_t* pos = nullptr;
int err = 0;
// The maximum size of packet to create.
const size_t max_packet_size = session_->max_packet_size();

size_t maxPacketCount = std::min(static_cast<size_t>(64000),
ngtcp2_conn_get_send_quantum(*session_));
size_t packetSendCount = 0;
// The maximum number of packets to send in this call to SendPendingData.
const size_t max_packet_count = std::min(
kMaxPackets, ngtcp2_conn_get_send_quantum(*session_) / max_packet_size);

const auto updateTimer = [&] {
Debug(session_, "Application updating the session timer");
ngtcp2_conn_update_pkt_tx_time(*session_, uv_hrtime());
session_->UpdateTimer();
};
// The number of packets that have been sent in this call to SendPendingData.
size_t packet_send_count = 0;

const auto congestionLimited = [&](auto packet) {
auto len = pos - ngtcp2_vec(*packet).base;
// We are either congestion limited or done.
if (len) {
// Some data was serialized into the packet. We need to send it.
packet->Truncate(len);
session_->Send(std::move(packet), path);
}
Packet* packet = nullptr;
uint8_t* pos = nullptr;
uint8_t* begin = nullptr;

updateTimer();
auto ensure_packet = [&] {
if (packet == nullptr) {
packet = CreateStreamDataPacket();
if (packet == nullptr) return false;
pos = begin = ngtcp2_vec(*packet).base;
}
DCHECK_NOT_NULL(packet);
DCHECK_NOT_NULL(pos);
DCHECK_NOT_NULL(begin);
return true;
};

// We're going to enter a loop here to prepare and send no more than
// max_packet_count packets.
for (;;) {
ssize_t ndatalen;
StreamData stream_data;

err = GetStreamData(&stream_data);
// ndatalen is the amount of stream data that was accepted into the packet.
ssize_t ndatalen = 0;

if (err < 0) {
// Make sure we have a packet to write data into.
if (!ensure_packet()) {
Debug(session_, "Failed to create packet for stream data");
// Doh! Could not create a packet. Time to bail.
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
return session_->Close(Session::CloseMethod::SILENT);
}

if (packet == nullptr) {
packet = CreateStreamDataPacket();
if (packet == nullptr) {
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
return session_->Close(Session::CloseMethod::SILENT);
}
pos = ngtcp2_vec(*packet).base;
// The stream_data is the next block of data from the application stream.
if (GetStreamData(&stream_data) < 0) {
Debug(session_, "Application failed to get stream data");
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
packet->Done(UV_ECANCELED);
return session_->Close(Session::CloseMethod::SILENT);
}

ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data);
// If we got here, we were at least successful in checking for stream data.
// There might not be any stream data to send.
Debug(session_, "Application using stream data: %s", stream_data);

// Awesome, let's write our packet!
ssize_t nwrite =
WriteVStream(&path, pos, &ndatalen, max_packet_size, stream_data);
Debug(session_, "Application accepted %zu bytes into packet", ndatalen);

if (nwrite <= 0) {
// A negative nwrite value indicates either an error or that there is more
// data to write into the packet.
if (nwrite < 0) {
switch (nwrite) {
case 0:
if (stream_data.id >= 0) ResumeStream(stream_data.id);
return congestionLimited(std::move(packet));
case NGTCP2_ERR_STREAM_DATA_BLOCKED: {
session().StreamDataBlocked(stream_data.id);
if (session().max_data_left() == 0) {
if (stream_data.id >= 0) ResumeStream(stream_data.id);
return congestionLimited(std::move(packet));
}
CHECK_LE(ndatalen, 0);
// We could not write any data for this stream into the packet because
// the flow control for the stream itself indicates that the stream
// is blocked. We'll skip and move on to the next stream.
// ndatalen = -1 means that no stream data was accepted into the
// packet, which is what we want here.
DCHECK_EQ(ndatalen, -1);
DCHECK(stream_data.stream);
session_->StreamDataBlocked(stream_data.id);
continue;
}
case NGTCP2_ERR_STREAM_SHUT_WR: {
// Indicates that the writable side of the stream has been closed
// Indicates that the writable side of the stream should be closed
// locally or the stream is being reset. In either case, we can't send
// any stream data!
CHECK_GE(stream_data.id, 0);
// We need to notify the stream that the writable side has been closed
// and no more outbound data can be sent.
CHECK_LE(ndatalen, 0);
auto stream = session_->FindStream(stream_data.id);
if (stream) stream->EndWritable();
Debug(session_,
"Stream %" PRIi64 " should be closed for writing",
stream_data.id);
// ndatalen = -1 means that no stream data was accepted into the
// packet, which is what we want here.
DCHECK_EQ(ndatalen, -1);
DCHECK(stream_data.stream);
stream_data.stream->EndWritable();
continue;
}
case NGTCP2_ERR_WRITE_MORE: {
CHECK_GT(ndatalen, 0);
if (!StreamCommit(&stream_data, ndatalen)) return session_->Close();
pos += ndatalen;
// This return value indicates that we should call into WriteVStream
// again to write more data into the same packet.
Debug(session_, "Application should write more to packet");
DCHECK_GE(ndatalen, 0);
if (!StreamCommit(&stream_data, ndatalen)) {
packet->Done(UV_ECANCELED);
return session_->Close(CloseMethod::SILENT);
}
continue;
}
}

packet->Done(UV_ECANCELED);
session_->last_error_ = QuicError::ForNgtcp2Error(nwrite);
return session_->Close(Session::CloseMethod::SILENT);
}

pos += nwrite;
if (ndatalen > 0 && !StreamCommit(&stream_data, ndatalen)) {
// Since we are closing the session here, we don't worry about updating
// the pkt tx time. The failed StreamCommit should have updated the
// last_error_ appropriately.
// Some other type of error happened.
DCHECK_EQ(ndatalen, -1);
Debug(session_,
"Application encountered error while writing packet: %s",
ngtcp2_strerror(nwrite));
session_->SetLastError(QuicError::ForNgtcp2Error(nwrite));
packet->Done(UV_ECANCELED);
return session_->Close(Session::CloseMethod::SILENT);
} else if (ndatalen >= 0) {
// We wrote some data into the packet. We need to update the flow control
// by committing the data.
if (!StreamCommit(&stream_data, ndatalen)) {
packet->Done(UV_ECANCELED);
return session_->Close(CloseMethod::SILENT);
}
}

if (stream_data.id >= 0 && ndatalen < 0) ResumeStream(stream_data.id);
// When nwrite is zero, it means we are congestion limited.
// We should stop trying to send additional packets.
if (nwrite == 0) {
Debug(session_, "Congestion limited.");
// There might be a partial packet already prepared. If so, send it.
size_t datalen = pos - begin;
if (datalen) {
Debug(session_, "Packet has %zu bytes to send", datalen);
// At least some data had been written into the packet. We should send
// it.
packet->Truncate(datalen);
session_->Send(packet, path);
} else {
packet->Done(UV_ECANCELED);
}

packet->Truncate(nwrite);
session_->Send(std::move(packet), path);
// If there was stream data selected, we should reschedule it to try
// sending again.
if (stream_data.id >= 0) ResumeStream(stream_data.id);

pos = nullptr;
return session_->UpdatePacketTxTime();
}

if (++packetSendCount == maxPacketCount) {
break;
// At this point we have a packet prepared to send.
pos += nwrite;
size_t datalen = pos - begin;
Debug(session_, "Sending packet with %zu bytes", datalen);
packet->Truncate(datalen);
session_->Send(packet, path);

// If we have sent the maximum number of packets, we're done.
if (++packet_send_count == max_packet_count) {
return session_->UpdatePacketTxTime();
}
}

updateTimer();
// Prepare to loop back around to prepare a new packet.
packet = nullptr;
pos = begin = nullptr;
}
}

ssize_t Session::Application::WriteVStream(PathStorage* path,
uint8_t* buf,
uint8_t* dest,
ssize_t* ndatalen,
size_t max_packet_size,
const StreamData& stream_data) {
CHECK_LE(stream_data.count, kMaxVectorCount);
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_NONE;
if (stream_data.remaining > 0) flags |= NGTCP2_WRITE_STREAM_FLAG_MORE;
DCHECK_LE(stream_data.count, kMaxVectorCount);
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
ssize_t ret = ngtcp2_conn_writev_stream(
*session_,
&path->path,
nullptr,
buf,
ngtcp2_conn_get_max_tx_udp_payload_size(*session_),
ndatalen,
flags,
stream_data.id,
stream_data.buf,
stream_data.count,
uv_hrtime());
return ret;
ngtcp2_pkt_info pi;
return ngtcp2_conn_writev_stream(*session_,
&path->path,
&pi,
dest,
max_packet_size,
ndatalen,
flags,
stream_data.id,
stream_data.buf,
stream_data.count,
uv_hrtime());
}

// The DefaultApplication is the default implementation of Session::Application
Expand Down
Loading
Loading