Skip to content

Commit

Permalink
Merge branch 'hive_max_length_bug' of https://github.com/atsign-found…
Browse files Browse the repository at this point in the history
…ation/at_server into hive_max_length_bug
  • Loading branch information
murali-shris committed Apr 4, 2024
2 parents 2026ab6 + 4e42405 commit da587a8
Show file tree
Hide file tree
Showing 33 changed files with 170 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/melos_bootstrap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2
- uses: subosito/flutter-action@1c5eb12d812966ca84680edc38353a0851c8fd56 # v2.14.0
- uses: subosito/flutter-action@44ac965b96f18d999802d4b807e3256d5a3f9fa1 # v2.16.0
with:
channel: "stable"
- name: flutter pub get
Expand Down
2 changes: 1 addition & 1 deletion packages/at_root_server/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM dart:3.3.1@sha256:7e0b4e5d3773c61b5d5b42908f48853fab04c33a080c6f73ee3a99bdd1f4536e AS buildimage
FROM dart:3.3.3@sha256:54685726fd2149932080c0c72d1934441bee9a96026f06920800177a2895107c AS buildimage
ENV HOMEDIR=/atsign
ENV BINARYDIR=/usr/local/at
ENV USER_ID=1024
Expand Down
2 changes: 1 addition & 1 deletion packages/at_root_server/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dependencies:
args: 2.4.2
uuid: 3.0.7
yaml: 3.1.2
at_commons: 4.0.3
at_commons: 4.0.5
at_utils: 3.0.16
at_server_spec: 4.0.1
at_persistence_root_server:
Expand Down
6 changes: 5 additions & 1 deletion packages/at_secondary_server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
## 3.0.42
- feat: allow filtering of requests in EnrollVerbHandler using enrollment approval status
- feat: allow filtering of requests in EnrollVerbHandler using enrollment
approval status
- feat: authorization changes for keys with no namespace and for reserved keys
- build(deps): dependabot changes
- fix: Improve socket handling for better server resilience
- fix: Ensure cached keys like 'cached:public:publicKey' are not considered
protected keys and can thus be deleted

## 3.0.41
- fix: bug in access control for otp put
## 3.0.40
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ abstract class BaseSocketConnection<T extends Socket> extends AtConnection {
T get underlying => _socket;

@override
void write(String data) {
Future<void> write(String data) async {
if (isInValid()) {
throw ConnectionInvalidException('Connection is invalid');
}
try {
underlying.write(data);
await underlying.flush();
metaData.lastAccessed = DateTime.now().toUtc();
} on Exception catch (e) {
metaData.isStale = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DummyInboundConnection implements InboundConnection {

String? lastWrittenData;
@override
void write(String data) {
Future<void> write(String data) async {
lastWrittenData = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ class InboundConnectionImpl<T extends Socket> extends BaseSocketConnection
maxRequestsPerTimeFrame = AtSecondaryConfig.maxEnrollRequestsAllowed;
timeFrameInMillis = AtSecondaryConfig.timeFrameInMills;
requestTimestampQueue = Queue();

logger.info(logger.getAtConnectionLogMessage(
metaData, 'New connection ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));

socket.done.onError((error, stackTrace) {
logger.info('socket.done.onError called with $error. Calling this.close()');
this.close();
});
}

/// Returns true if the underlying socket is not null and socket's remote address and port match.
Expand Down Expand Up @@ -221,11 +232,12 @@ class InboundConnectionImpl<T extends Socket> extends BaseSocketConnection
}

try {
var address = underlying.remoteAddress;
var port = underlying.remotePort;
logger.info(logger.getAtConnectionLogMessage(
metaData, 'destroying socket ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));
underlying.destroy();
logger.finer(logger.getAtConnectionLogMessage(
metaData, '$address:$port Disconnected'));
} catch (_) {
// Ignore exception on a connection close
metaData.isStale = true;
Expand All @@ -235,8 +247,8 @@ class InboundConnectionImpl<T extends Socket> extends BaseSocketConnection
}

@override
void write(String data) {
super.write(data);
Future<void> write(String data) async {
await super.write(data);
if (metaData is InboundConnectionMetadata) {
logger.info(logger.getAtConnectionLogMessage(
metaData, 'SENT: ${BaseSocketConnection.truncateForLogging(data)}'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class InboundMessageListener {
void listen(callback, streamCallBack) {
onStreamCallBack = streamCallBack;
onBufferEndCallBack = callback;
logger.finest('Calling inbound underlying.listen within runZonedGuarded block');
logger.finest(
'Calling inbound underlying.listen within runZonedGuarded block');

runZonedGuarded(() {
connection.underlying.listen(_messageHandler,
Expand Down Expand Up @@ -105,13 +106,12 @@ class InboundMessageListener {

/// Closes the [InboundConnection]
Future<void> _finishedHandler() async {
logger.info('_finishedHandler called - closing connection');
await _closeConnection();
}

Future<void> _closeConnection() async {
if (!connection.isInValid()) {
await connection.close();
}
await connection.close();
// Removes the connection from the InboundConnectionPool.
InboundConnectionPool.getInstance().remove(connection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class OutboundClient {
}
try {
//1. create from request
outboundConnection!.write(AtRequestFormatter.createFromRequest(
await outboundConnection!.write(AtRequestFormatter.createFromRequest(
AtSecondaryServerImpl.getInstance().currentAtSign));

//2. Receive proof
Expand All @@ -200,7 +200,7 @@ class OutboundClient {
}

//4. Create pol request
outboundConnection!.write(AtRequestFormatter.createPolRequest());
await outboundConnection!.write(AtRequestFormatter.createPolRequest());

// 5. wait for handshake result - @<current_atsign>@
var handShakeResult = await messageListener.read();
Expand Down Expand Up @@ -242,7 +242,7 @@ class OutboundClient {
}
var lookUpRequest = AtRequestFormatter.createLookUpRequest(key);
try {
outboundConnection!.write(lookUpRequest);
await outboundConnection!.write(lookUpRequest);
} on AtIOException catch (e) {
await outboundConnection!.close();
throw LookupException(
Expand Down Expand Up @@ -271,7 +271,7 @@ class OutboundClient {
scanRequest = 'scan $regex\n';
}
try {
outboundConnection!.write(scanRequest);
await outboundConnection!.write(scanRequest);
} on AtIOException catch (e) {
await outboundConnection!.close();
throw LookupException(
Expand Down Expand Up @@ -326,7 +326,7 @@ class OutboundClient {
}
try {
var notificationRequest = 'notify:$notifyCommandBody\n';
outboundConnection!.write(notificationRequest);
await outboundConnection!.write(notificationRequest);
} on AtIOException catch (e) {
await outboundConnection!.close();
throw LookupException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ class OutboundConnectionImpl<T extends Socket>
..toAtSign = toAtSign
..created = DateTime.now().toUtc()
..isCreated = true;

logger.info(logger.getAtConnectionLogMessage(
metaData, 'New connection ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));

socket.done.onError((error, stackTrace) {
logger.info('socket.done.onError called with $error. Calling this.close()');
this.close();
});
}

int _getIdleTimeMillis() {
Expand Down Expand Up @@ -47,10 +58,12 @@ class OutboundConnectionImpl<T extends Socket>

try {
var socket = underlying;
var address = socket.remoteAddress;
var port = socket.remotePort;
logger.info(logger.getAtConnectionLogMessage(
metaData, 'destroying socket ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));
socket.destroy();
logger.finer('$address:$port Disconnected');
} catch (_) {
// Ignore exception on a connection close
metaData.isStale = true;
Expand All @@ -60,8 +73,8 @@ class OutboundConnectionImpl<T extends Socket>
}

@override
void write(String data) {
super.write(data);
Future<void> write(String data) async {
await super.write(data);
logger.info(logger.getAtConnectionLogMessage(
metaData, 'SENT: ${BaseSocketConnection.truncateForLogging(data)}'));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class OutboundMessageListener {
/// Listens to the underlying connection's socket if the connection is created.
/// @throws [AtConnectException] if the connection is not yet created
void listen() async {
logger.finest('Calling outbound underlying.listen within runZonedGuarded block');
logger.finest(
'Calling outbound underlying.listen within runZonedGuarded block');

runZonedGuarded(() {
outboundClient.outboundConnection?.underlying.listen(_messageHandler,
Expand Down Expand Up @@ -131,6 +132,7 @@ class OutboundMessageListener {

/// Closes the [OutboundClient]
void _finishedHandler() async {
logger.info('_finishedHandler called - closing connection');
_closeOutboundClient();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ class AtSecondaryServerImpl implements AtSecondaryServer {
Future<void> initDynamicConfigListeners() async {
//only works if testingMode is set to true
if (AtSecondaryConfig.testingMode) {
logger.warning(
'UNSAFE: testingMode in config.yaml is set to true. Please set to false if not required.');
logger.warning('testingMode in config.yaml is set to true.'
' Please set to false if not required.');

//subscriber for inbound_max_limit change
logger.finest('Subscribing to dynamic changes made to inbound_max_limit');
Expand Down Expand Up @@ -442,7 +442,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer {
/// Throws [Exception] for any other exceptions.
/// @param - ServerSocket
void _listen(var serverSocket) {
logger.finer('serverSocket _listen : ${serverSocket.runtimeType}');
logger.info('serverSocket _listen : ${serverSocket.runtimeType}');
serverSocket.listen(((clientSocket) {
var sessionID = '_${Uuid().v4()}';
InboundConnection? connection;
Expand All @@ -459,8 +459,8 @@ class AtSecondaryServerImpl implements AtSecondaryServer {
.handle(e, atConnection: connection, clientSocket: clientSocket);
}
}), onError: (error) {
// We've got no action to take here, let's just log a warning
logger.warning("ServerSocket.listen called onError with '$error'");
// We've got no action to take here, let's just log a message
logger.info("ServerSocket.listen called onError with '$error'");
});
}

Expand Down Expand Up @@ -644,8 +644,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer {
final expiryRunRandomMins =
(expiringRunFreqMins! - 2) + Random().nextInt(8);
logger.finest('Scheduling key expiry job every $expiryRunRandomMins mins');
manager.scheduleKeyExpireTask(3,
skipCommits: skipCommitsForExpiredKeys);
manager.scheduleKeyExpireTask(3, skipCommits: skipCommitsForExpiredKeys);

var atData = AtData();
atData.data = serverContext!.sharedSecret;
Expand Down
11 changes: 8 additions & 3 deletions packages/at_secondary_server/lib/src/server/bootstrapper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,17 @@ class SecondaryServerBootStrapper {
secondaryServerInstance.setExecutor(DefaultVerbExecutor());

//starting secondary in a zone
//prevents secondary from terminating due to uncaught non-fatal errors
//prevents secondary from terminating with uncaught non-fatal errors
unawaited(runZonedGuarded(() async {
await secondaryServerInstance.start();
}, (error, StackTrace stackTrace) {
logger.severe('Uncaught error: $error \n StackTrace: $stackTrace');
handleTerminateSignal(ProcessSignal.sigstop);
logger.shout('Uncaught error: $error ;'
' StackTrace follows: $stackTrace');
if (error is SocketException) {
logger.shout('Will not terminate server for $error');
} else {
handleTerminateSignal(ProcessSignal.sigstop);
}
}));
ProcessSignal.sigterm.watch().listen(handleTerminateSignal);
ProcessSignal.sigint.watch().listen(handleTerminateSignal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ abstract class AbstractVerbHandler implements VerbHandler {
@override
Future<void> process(String command, InboundConnection atConnection) async {
var response = await processInternal(command, atConnection);
var handler = responseManager.getResponseHandler(getVerb());
await handler.process(atConnection, response);
var responseHandler = responseManager.getResponseHandler(getVerb());
await responseHandler.process(atConnection, response);
}

Future<Response> processInternal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DeleteVerbHandler extends ChangeVerbHandler {
// fetch protected keys listed in config.yaml
protectedKeys ??= _getProtectedKeys(atSign);
// check to see if a key is protected. Cannot delete key if it's protected
if (_isProtectedKey(deleteKey!)) {
if (_isProtectedKey(deleteKey!, isCached: verbParams['isCached'])) {
throw UnAuthorizedException(
'Cannot delete protected key: \'$deleteKey\'');
}
Expand Down Expand Up @@ -175,8 +175,9 @@ class DeleteVerbHandler extends ChangeVerbHandler {
return protectedKeys;
}

bool _isProtectedKey(String key) {
if (protectedKeys!.contains(key)) {
bool _isProtectedKey(String key, {String? isCached}) {
isCached ??= 'false';
if (protectedKeys!.contains(key) && isCached == 'false') {
logger.severe('Cannot delete key. \'$key\' is a protected key');
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ class MonitorVerbHandler extends AbstractVerbHandler {
(notification.notification!.contains(RegExp(regex)) ||
(notification.fromAtSign != null &&
notification.fromAtSign!.contains(RegExp(regex))))) {
atConnection
.write('notification: ${jsonEncode(notification.toJson())}\n');
await atConnection.write('notification:'
' ${jsonEncode(notification.toJson())}\n');
}
} on FormatException {
logger.severe('Invalid regular expression : $regex');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ abstract class BaseResponseHandler implements ResponseHandler {
} else {
responseMessage = getResponseMessage(result, prompt)!;
}
connection.write(responseMessage);
await connection.write(responseMessage);
} on Exception catch (e, st) {
logger.severe('exception in writing response to socket:${e.toString()}');
await GlobalExceptionHandler.getInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class StreamVerbHandler extends AbstractVerbHandler {
logger.severe('sender connection is null for stream id:$streamId');
throw UnAuthenticatedException('Invalid stream id');
}
StreamManager.senderSocketMap[streamId]!
await StreamManager.senderSocketMap[streamId]!
.write('stream:done $streamId\n');
_cleanUp(streamId);
break;
Expand Down
2 changes: 1 addition & 1 deletion packages/at_secondary_server/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies:
at_utils: 3.0.16
at_chops: 2.0.0
at_lookup: 3.0.46
at_server_spec: 4.0.1
at_server_spec: 5.0.0
at_persistence_spec: 2.0.14
at_persistence_secondary_server: 3.0.61
intl: ^0.19.0
Expand Down
Loading

0 comments on commit da587a8

Please sign in to comment.