Skip to content

Commit 51384ff

Browse files
author
Dmitry Shabanov
committed
cleanup; added asyncio version; added venv
1 parent 5220e98 commit 51384ff

File tree

6 files changed

+172
-5
lines changed

6 files changed

+172
-5
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
.idea
2+
venv
3+
4+
*.log
5+
main.py

FIX44.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515

1616

1717
class Session:
18-
def __init__(self, sender_id: str, target_id: str, target_sub=None, sender_sub=None):
18+
def __init__(self, sender_id: str, target_id: str, target_sub=None, sender_sub=None, username=None, password=None):
19+
self.password = password
20+
self.username = username
1921
self.sender_id = sender_id
2022
self.target_sub = target_sub
2123
self.target_id = target_id
@@ -255,7 +257,7 @@ def market_data_snapshot_handler(self, message: BaseMessage):
255257
return
256258

257259
ask_idx = 1 if prices[0][Field.MDEntryType] == '0' else 0
258-
bid_idx = (ask_idx + 1) % 2
260+
bid_idx = (ask_idx + 1) % 2
259261
spread = calculate_spread(
260262
prices[bid_idx][Field.MDEntryPx],
261263
prices[ask_idx][Field.MDEntryPx],
@@ -275,7 +277,7 @@ def market_data_refresh_handler(self, message: BaseMessage):
275277
results = message.get_group(Field.Groups.MDEntry_Refresh)
276278
actions = {'0': 'New', '2': 'Delete'}
277279
types = {'0': 'BID', '1': 'ASK'}
278-
280+
279281
message = "Price Update:"
280282
for r in results:
281283
if actions[r[Field.MDUpdateAction]] == 'New':
@@ -290,7 +292,7 @@ def market_data_refresh_handler(self, message: BaseMessage):
290292
message += "\n\t\t\tSymbol: {0: <7}, ID: {1}, Action: {2}".format(
291293
'none', r[Field.MDEntryID], actions[r[Field.MDUpdateAction]]
292294
)
293-
295+
294296
self.logger.info(message)
295297

296298
def execution_report_handler(self, message: Message):

Field.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
LeavesQty = 151
5050
CumQty = 14
5151
OrdRejReason = 103
52+
BuySide = 1
53+
SellSide = 2
5254

5355

5456
class Groups:

client.py

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
#!./venv/bin/python
2+
import asyncio
3+
import uvloop
4+
from concurrent.futures import ThreadPoolExecutor
5+
from FIX44 import *
6+
import Message
7+
import logging
8+
9+
10+
class Client:
11+
session = None
12+
writer = None
13+
reader = None
14+
logger = None
15+
loop = None
16+
executor = None
17+
buffer = b''
18+
19+
def __init__(self, loop, username, password, broker=None, max_threads=None):
20+
self.session = Session(
21+
sender_id='{}.{}'.format(broker, username) if broker else username,
22+
target_id='CSERVER',
23+
target_sub='QUOTE',
24+
username=username,
25+
password=password
26+
)
27+
self.loop = loop
28+
self.executor = ThreadPoolExecutor(
29+
max_workers=max_threads if max_threads else len(self.session.symbol_table)
30+
)
31+
logging.basicConfig(
32+
format='%(asctime)s %(threadName)s %(levelname)s: %(message)s'
33+
)
34+
self.logger = logging.getLogger('fix-client.' + self.session.sender_id)
35+
36+
async def connect(self, host=None, port=None):
37+
logging.info('Connecting ')
38+
self.writer = None
39+
self.reader = None
40+
self.buffer = b''
41+
self.session.reset_sequence()
42+
(self.reader, self.writer) = await asyncio.open_connection(host, port, loop=self.loop)
43+
self.on_connect()
44+
45+
def on_connect(self):
46+
logging.info('Connected')
47+
self.write(LogonMessage(self.session.username, self.session.password, 3, self.session))
48+
49+
def on_logon(self):
50+
logging.critical('Signed in as {}'.format(self.session.sender_id))
51+
# subscribe to all symbols
52+
for symbol_id, symbol in self.session.symbol_table.items():
53+
self.write(MarketDataRequestMessage(
54+
symbol=symbol_id, request_id=symbol_id, session=self.session, refresh=False
55+
))
56+
57+
def on_test(self, message: Message.Base):
58+
self.write(TestResponseMessage(message.get_field(Field.TestReqID), self.session))
59+
60+
def on_heartbeat(self):
61+
self.write(HeartbeatMessage(self.session))
62+
63+
def on_market_data(self, message: Message.Base):
64+
prices = message.get_group(Field.Groups.MDEntry_Snapshot)
65+
66+
if len(prices) < 2 or Field.MDEntryPx not in prices[0] or Field.MDEntryPx not in prices[1]:
67+
self.logger.error("No ask or bid in price update.")
68+
return
69+
70+
ask_idx = 1 if prices[0][Field.MDEntryType] == '0' else 0
71+
bid_idx = (ask_idx + 1) % 2
72+
spread = calculate_spread(
73+
prices[bid_idx][Field.MDEntryPx],
74+
prices[ask_idx][Field.MDEntryPx],
75+
self.session.symbol_table[int(message.get_field(Field.Symbol))]['pip_position']
76+
)
77+
name = self.session.symbol_table[int(message.get_field(Field.Symbol))]['name']
78+
79+
self.logger.info("\t{0: <10}\tSPREAD: {1}\tBID: {2: <10}\tASK: {3: <10}".format(
80+
name, spread, prices[bid_idx][Field.MDEntryPx], prices[ask_idx][Field.MDEntryPx],
81+
))
82+
83+
def process(self, buffer):
84+
self.logger.debug('<<< IN {}'.format(buffer))
85+
86+
message = Message.from_string(buffer.decode(), self.session)
87+
88+
if message.get_type() == Message.Types.Heartbeat:
89+
self.on_heartbeat()
90+
elif message.get_type() == Message.Types.Logon:
91+
self.on_logon()
92+
elif message.get_type() == Message.Types.TestRequest:
93+
self.on_test(message)
94+
elif message.get_type() == Message.Types.MarketDataSnapshot:
95+
self.on_market_data(message)
96+
97+
def feed(self, data):
98+
self.buffer += data
99+
100+
header, value = data.split(b'=')
101+
if header == b'10':
102+
self.logger.debug('Submitting task to execute')
103+
self.executor.submit(self.process, self.buffer)
104+
self.logger.debug('Submitted')
105+
self.buffer = b''
106+
107+
def write(self, message: Message.Base):
108+
self.logger.debug('>>> OUT {}'.format(bytes(message)))
109+
self.loop.call_soon_threadsafe(self.writer.write, bytes(message))
110+
111+
async def run(self, host, port):
112+
await self.connect(host, port)
113+
114+
while self.loop.is_running():
115+
try:
116+
data = await self.reader.readuntil(bytes(SOH, 'ASCII'))
117+
self.feed(data)
118+
except asyncio.streams.IncompleteReadError:
119+
self.logger.critical('!Disconnected!')
120+
self.logger.info('Trying to reconnect')
121+
await self.connect(host, port)
122+
123+
124+
if __name__ == '__main__':
125+
import argparse
126+
parser = argparse.ArgumentParser(description='CTrader FIX async client.')
127+
parser.add_argument('--version', action='version', version='%(prog)s v0.21')
128+
parser.add_argument('-b', '--broker', required=True, help='Broker name, usually first part of sender id.')
129+
parser.add_argument('-u', '--username', required=True, help='Account number.')
130+
parser.add_argument('-p', '--password', required=True, help='Account password.')
131+
parser.add_argument('-s', '--server', required=True, help='Host, ex hXX.p.ctrader.com.')
132+
parser.add_argument('-v', '--verbose', action='count', help='Increase verbosity level. '
133+
'-v to something somewhat useful, '
134+
'-vv to full debug')
135+
parser.add_argument('-t', '--max-threads', help='Thread limit in thread pool. Default to symbol table length.')
136+
args = parser.parse_args()
137+
138+
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
139+
base_loop = asyncio.get_event_loop()
140+
client = Client(loop=base_loop, broker=args.broker, username=args.username, password=args.password)
141+
verbose = min(2, args.verbose) if args.verbose else 0
142+
client.logger.setLevel(logging.WARNING - (verbose * 10))
143+
asyncio.ensure_future(client.run(args.server, 5201))
144+
base_loop.run_forever()

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pkg-resources==0.0.0
2+
uvloop==0.9.1

tests.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import FIX44
22
import unittest
3+
from FIX44 import SOH
4+
from Message import make_pair
35

46

57
class TestCalculations(unittest.TestCase):
@@ -14,5 +16,15 @@ def testPipValue(self):
1416
self.assertEqual(FIX44.calculate_pip_value('1.3348', 100000, 4), '7.49176')
1517
self.assertEqual(FIX44.calculate_pip_value('112.585', 10000, 2), '0.88822')
1618

17-
def testCommission(self):
19+
def test_commission(self):
1820
self.assertEqual(FIX44.calculate_commission(10000, 1, 0.000030), 0.6)
21+
22+
def test_make_valid_tuple(self):
23+
self.assertEqual(
24+
make_pair(('first', 'second')),
25+
'first=second{}'.format(SOH)
26+
)
27+
28+
29+
if __name__ == '__main__':
30+
unittest.main()

0 commit comments

Comments
 (0)