Skip to content

✨ feat: Add multi-threaded socket acceptor and initiator support #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Changelog

## v0.2.0

### Breaking changes

Add support for multi threaded socket acceptor and initiator.

To create socket acceptor and initiator you must change below code:

```diff
- let mut acceptor = SocketInitiator::try_new(&settings, &app, &store_factory, &log_factory)?;
+ let mut acceptor = Initiator::try_new(
+ &settings,
+ &app,
+ &store_factory,
+ &log_factory,
+ FixSocketServerKind::SingleThreaded,
+ )?;

- let mut initiator = SocketInitiator::try_new(&settings, &app, &message_store, &logger)?;
+ let mut initiator = Initiator::try_new(
+ &settings,
+ &app,
+ &message_store,
+ &logger,
+ FixSocketServerKind::default(),
+ )?;
```
15 changes: 5 additions & 10 deletions doc/ABOUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,12 @@ What I do **not** plan to bind from this crate:

Use original library instead obviously.

3. Threaded versions of socket acceptor / initiator.

Multithreading model is just too different between Rust / C++.
It is much more simple to handle correctly multithreading from Rust side and use single thread C++ socket handler.

4. Autotools build toolchain.
3. Autotools build toolchain.

Just use `cmake` once and for all !
We are in 2023+ and not targeting OS from the 80s.

5. FIX 5x messages generated code.
4. FIX 5x messages generated code.

FIX 5x XML definition is a little bit weird ...
For example:
Expand All @@ -48,20 +43,20 @@ What I do **not** plan to bind from this crate:
You can edit XML spec to your need and create a package with desired spec locally.\
Check FAQ for more info on this.

6. All binding of `LogFactory`.
5. All binding of `LogFactory`.

I just provide Rust standard trait.
You can implement whatever you want using standard Rust crate and impl 3 callbacks (logger / redis / syslog / sql / ...).

Moreover Rust file descriptor are protected by mutex, so this avoid mixing log from C++ / Rust in the same program.

7. Custom `MessageStoreFactory` from rust.
6. Custom `MessageStoreFactory` from rust.

For now, only `FileMessageStoreFactory` and `MemoryMessageStoreFactory` are bind.
You can use also use `MySqlMessageStoreFactory` and `PostgresMessageStoreFactory` when enabling crate feature flag.
Implementing message store from rust side is a little bit tricky and I am not 100% sure of the correct way to proceed.

8. Exotic operating system.
7. Exotic operating system.

AIX / Solaris are not targeted.
They are not Rust [Tier1](https://doc.rust-lang.org/nightly/rustc/platform-support.html) for now.
8 changes: 7 additions & 1 deletion examples/coinbase-example/examples/cb-order-sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,13 @@ fn main() -> anyhow::Result<()> {
let log_factory = LogFactory::try_new(&StdLogger::Stdout)?;
let app = Application::try_new(&my_app)?;

let mut acceptor = SocketInitiator::try_new(&settings, &app, &store_factory, &log_factory)?;
let mut acceptor = Initiator::try_new(
&settings,
&app,
&store_factory,
&log_factory,
FixSocketServerKind::SingleThreaded,
)?;

// Start the engine.
println!(">> Starting FIX engine 🚀");
Expand Down
8 changes: 7 additions & 1 deletion examples/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,13 @@ fn main() -> Result<(), QuickFixError> {
let log_factory = LogFactory::try_new(&StdLogger::Stdout)?;
let app = Application::try_new(&executor)?;

let mut acceptor = SocketAcceptor::try_new(&settings, &app, &store_factory, &log_factory)?;
let mut acceptor = Acceptor::try_new(
&settings,
&app,
&store_factory,
&log_factory,
FixSocketServerKind::SingleThreaded,
)?;
acceptor.start()?;

println!(">> App running, press 'q' to quit");
Expand Down
8 changes: 7 additions & 1 deletion examples/single-order-sender/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ fn main() -> anyhow::Result<()> {
let log_factory = LogFactory::try_new(&StdLogger::Stdout)?;
let app = Application::try_new(&single_order_sender)?;

let mut acceptor = SocketInitiator::try_new(&settings, &app, &store_factory, &log_factory)?;
let mut acceptor = Initiator::try_new(
&settings,
&app,
&store_factory,
&log_factory,
FixSocketServerKind::SingleThreaded,
)?;

// Start the engine.
println!(">> Starting FIX engine 🚀");
Expand Down
2 changes: 0 additions & 2 deletions quickfix-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ include = [
"!/libquickfix/**/*.sh",
# Exclude SSL related code (we will inject dummy files to replace them in build.rs)
"!/libquickfix/src/C++/*SSL*.*",
# Exclude Threaded socket application related code (we will inject dummy files to replace them in build.rs)
"!/libquickfix/src/C++/ThreadedSocket*.*",
# Do not include auto generated message files.
"!/libquickfix/src/C++/fix40",
"!/libquickfix/src/C++/fix41",
Expand Down
7 changes: 0 additions & 7 deletions quickfix-ffi/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@ fn main() {
touch_file(libquickfix_cpp_dir.join("ThreadedSSLSocketInitiator.h"));
touch_file(libquickfix_cpp_dir.join("UtilitySSL.h"));

touch_file(libquickfix_cpp_dir.join("ThreadedSocketAcceptor.h"));
touch_file(libquickfix_cpp_dir.join("ThreadedSocketAcceptor.cpp"));
touch_file(libquickfix_cpp_dir.join("ThreadedSocketConnection.h"));
touch_file(libquickfix_cpp_dir.join("ThreadedSocketConnection.cpp"));
touch_file(libquickfix_cpp_dir.join("ThreadedSocketInitiator.h"));
touch_file(libquickfix_cpp_dir.join("ThreadedSocketInitiator.cpp"));

// Build quickfix as a static library
let quickfix_dst = Config::new(libquickfix_build_dir)
.define("CMAKE_POLICY_VERSION_MINIMUM", "3.10")
Expand Down
8 changes: 4 additions & 4 deletions quickfix-ffi/examples/demo_basic_binding.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,20 @@ int main(int argc, char **argv) {
FixMessageStoreFactory_t *storeFactory = FixFileMessageStoreFactory_new(settings);
FixLogFactory_t *logFactory = FixLogFactory_new((void *)0xFEED, &LOG_CALLBACKS);
FixApplication_t *application = FixApplication_new((void *)0xBEEF, &APP_CALLBACKS);
FixSocketAcceptor_t *acceptor = FixSocketAcceptor_new(application, storeFactory, settings, logFactory);
FixAcceptor_t *acceptor = FixAcceptor_new(application, storeFactory, settings, logFactory, false);

printf(">> Acceptor START\n");
FixSocketAcceptor_start(acceptor);
FixAcceptor_start(acceptor);

printf(">> Press Q to exit\n");
while (getchar() != 'q') {
}

printf(">> Acceptor STOP\n");
FixSocketAcceptor_stop(acceptor);
FixAcceptor_stop(acceptor);

printf(">> Cleaning resources\n");
FixSocketAcceptor_delete(acceptor);
FixAcceptor_delete(acceptor);
FixApplication_delete(application);
FixLogFactory_delete(logFactory);
FixMessageStoreFactory_delete(storeFactory);
Expand Down
8 changes: 4 additions & 4 deletions quickfix-ffi/examples/demo_basic_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ fn main() {
.expect("Fail to build log factory");
let application = FixApplication_new(0xFEED as *const ffi::c_void, &APP_CALLBACKS)
.expect("Fail to build application");
let acceptor = FixSocketAcceptor_new(application, store_factory, settings, log_factory)
let acceptor = FixAcceptor_new(application, store_factory, settings, log_factory, 0)
.expect("Fail to build acceptor");

println!(">> Acceptor START");
assert_eq!(FixSocketAcceptor_start(acceptor), 0);
assert_eq!(FixAcceptor_start(acceptor), 0);

println!(">> Press Q to exit");
loop {
Expand All @@ -132,10 +132,10 @@ fn main() {
}

println!(">> Acceptor STOP");
assert_eq!(FixSocketAcceptor_stop(acceptor), 0);
assert_eq!(FixAcceptor_stop(acceptor), 0);

println!(">> Cleaning resources");
FixSocketAcceptor_delete(acceptor);
FixAcceptor_delete(acceptor);
FixApplication_delete(application);
FixLogFactory_delete(log_factory);
FixMessageStoreFactory_delete(store_factory);
Expand Down
48 changes: 25 additions & 23 deletions quickfix-ffi/quickfix-bind/include/quickfix_bind.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ typedef struct DataDictionary FixDataDictionary_t;
typedef struct MessageStoreFactory FixMessageStoreFactory_t;
typedef struct LogFactory FixLogFactory_t;
typedef struct Application FixApplication_t;
typedef struct SocketAcceptor FixSocketAcceptor_t;
typedef struct SocketInitiator FixSocketInitiator_t;
typedef struct Acceptor FixAcceptor_t;
typedef struct Initiator FixInitiator_t;
typedef struct Session FixSession_t;
typedef struct SessionID FixSessionID_t;
typedef struct Message FixMessage_t;
Expand Down Expand Up @@ -136,27 +136,29 @@ void FixLogFactory_delete(const FixLogFactory_t *obj);
FixApplication_t *FixApplication_new(const void *data, const FixApplicationCallbacks_t *callbacks);
void FixApplication_delete(const FixApplication_t *obj);

FixSocketAcceptor_t *FixSocketAcceptor_new(FixApplication_t *application, FixMessageStoreFactory_t *storeFactory,
const FixSessionSettings_t *settings, FixLogFactory_t *logFactory);
int8_t FixSocketAcceptor_start(FixSocketAcceptor_t *obj);
int8_t FixSocketAcceptor_block(FixSocketAcceptor_t *obj);
int8_t FixSocketAcceptor_poll(FixSocketAcceptor_t *obj);
int8_t FixSocketAcceptor_stop(FixSocketAcceptor_t *obj);
int8_t FixSocketAcceptor_isLoggedOn(const FixSocketAcceptor_t *obj);
int8_t FixSocketAcceptor_isStopped(const FixSocketAcceptor_t *obj);
FixSession_t *FixSocketAcceptor_getSession(const FixSocketAcceptor_t *obj, const FixSessionID_t *sessionId);
void FixSocketAcceptor_delete(const FixSocketAcceptor_t *obj);

FixSocketInitiator_t *FixSocketInitiator_new(FixApplication_t *application, FixMessageStoreFactory_t *storeFactory,
const FixSessionSettings_t *settings, FixLogFactory_t *logFactory);
int8_t FixSocketInitiator_start(FixSocketInitiator_t *obj);
int8_t FixSocketInitiator_block(FixSocketInitiator_t *obj);
int8_t FixSocketInitiator_poll(FixSocketInitiator_t *obj);
int8_t FixSocketInitiator_stop(FixSocketInitiator_t *obj);
int8_t FixSocketInitiator_isLoggedOn(const FixSocketInitiator_t *obj);
int8_t FixSocketInitiator_isStopped(const FixSocketInitiator_t *obj);
FixSession_t *FixSocketInitiator_getSession(const FixSocketInitiator_t *obj, const FixSessionID_t *sessionId);
void FixSocketInitiator_delete(const FixSocketInitiator_t *obj);
FixAcceptor_t *FixAcceptor_new(FixApplication_t *application, FixMessageStoreFactory_t *storeFactory,
const FixSessionSettings_t *settings, FixLogFactory_t *logFactory,
int8_t isMultiThreaded);
int8_t FixAcceptor_start(FixAcceptor_t *obj);
int8_t FixAcceptor_block(FixAcceptor_t *obj);
int8_t FixAcceptor_poll(FixAcceptor_t *obj);
int8_t FixAcceptor_stop(FixAcceptor_t *obj);
int8_t FixAcceptor_isLoggedOn(const FixAcceptor_t *obj);
int8_t FixAcceptor_isStopped(const FixAcceptor_t *obj);
FixSession_t *FixAcceptor_getSession(const FixAcceptor_t *obj, const FixSessionID_t *sessionId);
void FixAcceptor_delete(const FixAcceptor_t *obj);

FixInitiator_t *FixInitiator_new(FixApplication_t *application, FixMessageStoreFactory_t *storeFactory,
const FixSessionSettings_t *settings, FixLogFactory_t *logFactory,
int8_t isMultiThreaded);
int8_t FixInitiator_start(FixInitiator_t *obj);
int8_t FixInitiator_block(FixInitiator_t *obj);
int8_t FixInitiator_poll(FixInitiator_t *obj);
int8_t FixInitiator_stop(FixInitiator_t *obj);
int8_t FixInitiator_isLoggedOn(const FixInitiator_t *obj);
int8_t FixInitiator_isStopped(const FixInitiator_t *obj);
FixSession_t *FixInitiator_getSession(const FixInitiator_t *obj, const FixSessionID_t *sessionId);
void FixInitiator_delete(const FixInitiator_t *obj);

FixSessionID_t *FixSessionID_new(const char *beginString, const char *senderCompID, const char *targetCompID,
const char *sessionQualifier);
Expand Down
Loading