Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version 6. #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions lua/mdp/book_builder.lua
Original file line number Diff line number Diff line change
@@ -163,11 +163,12 @@ local function price_level_alloc(alloc)
end

-- Create a new book
local function new_book(symbol, max_entries)
local function new_book(symbol, depth)
local self = setmetatable({
bid = { nr_entries = 0 },
ask = { nr_entries = 0 },
price_levels = new_price_levels_alloc(max_entries * 2),
price_levels = new_price_levels_alloc(depth * 3),
depth = depth,
symbol = symbol,
next_update = nil,
}, Book)
@@ -184,7 +185,7 @@ local function Book_find_price_level(book_side, level)

for i=1,level do
cur_lev = cur_lev.next_itm
if cur_lev == book_side.entries then
if cur_lev == book_side then
Logger.debug("WRAP-AROUND", "Wrapped around, yikes.")
cur_lev = nil
break
@@ -267,8 +268,8 @@ function Book.insert_new_price_level(self, side, price, size, nr_orders, level)
list_insert_after(new_lev, cur_lev)
book_side.nr_entries = book_side.nr_entries + 1

if book_side.nr_entries > 10 then
Book_prune(self.price_levels, book_side, 10)
if book_side.nr_entries > self.depth then
Book_prune(self.price_levels, book_side, self.depth)
end

book_side.needs_update = level <= 3
@@ -331,7 +332,7 @@ function Book.clear_all_price_levels(self, side)
local cur_level = book_side.next_itm
local alloc = self.price_levels

while cur_level ~= book_side.entries do
while cur_level ~= book_side do
local next_level = cur_level.next_itm
list_remove(cur_level)
price_level_free(alloc, cur_level)
@@ -361,7 +362,7 @@ function Book.remove_price_levels_from(self, side, levels)
local cur_level = book_side.next_itm
local rem = levels

while cur_level ~= book_side.entries and level > 0 do
while cur_level ~= book_side and level > 0 do
local next_level = cur_level.next_itm
list_del(cur_level)
end
159 changes: 156 additions & 3 deletions lua/mdp/pkt_def.lua
Original file line number Diff line number Diff line change
@@ -100,6 +100,49 @@ ffi.cdef[[
int8_t flow_schedule_type;
mdp_price_null_t min_price_increment_amount;
int8_t user_defined_instrument;
uint16_t trading_reference_date;
} __attribute__((__packed__));

struct mdp_instrument_definition_spread {
uint8_t match_event_indicator;
uint32_t tot_num_reports;
uint8_t security_update_action;
uint64_t last_update_time;
uint8_t md_security_trading_status;
int16_t appl_id;
uint8_t market_segment_id;
uint8_t underlying_product;
uint32_t security_exchange;
char security_group[6];
char asset[6];
char symbol[20];
int32_t security_id;
uint8_t security_id_source;
char security_type[6];
char cfi_code[6];
char maturity_month_year[5];
char currency[3];
int8_t security_sub_type;
int8_t user_defined_instrument;
uint8_t match_algorithm;
uint32_t min_trade_vol;
uint32_t max_trade_vol;
int64_t min_price_increment;
int64_t display_factor;
uint8_t price_display_format;
mdp_price_null_t price_ratio;
int8_t tick_rule;
char unit_of_measure[30];
mdp_price_null_t trading_reference_price;
uint8_t settl_price_type;
int32_t open_interest_qty;
int32_t cleared_volume;
mdp_price_null_t high_limit_price;
mdp_price_null_t low_limit_price;
mdp_price_null_t max_price_variation;
uint8_t main_fraction;
uint8_t sub_fraction;
uint16_t trading_reference_date;
} __attribute__((__packed__));

struct mdp_security_status {
@@ -116,10 +159,10 @@ ffi.cdef[[

struct mdp_md_incremental_refresh_book_md_entries {
mdp_price_null_t md_entry_px;
uint32_t md_entry_size;
int32_t md_entry_size;
int32_t security_id;
uint32_t rpt_seq;
uint32_t number_of_orders;
int32_t number_of_orders;
uint8_t md_price_level;
uint8_t md_update_action;
uint8_t md_entry_type;
@@ -129,10 +172,120 @@ ffi.cdef[[
struct mdp_md_incremental_refresh_book {
uint64_t transact_time;
uint8_t match_event_indicator;
uint16_t wtf;
uint8_t __padding__[2];
struct mdp_group_size md_entries_size;
struct mdp_md_incremental_refresh_book_md_entries md_entries[];
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_daily_statistics_md_entries {
mdp_price_null_t md_entry_px;
int32_t md_entry_size;
int32_t security_id;
uint32_t rpt_seq;
uint16_t trading_reference_date;
uint8_t settle_price_type;
uint8_t md_update_action;
uint8_t md_entry_type;
uint8_t __padding__[7];
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_daily_statistics {
uint64_t transact_time;
uint8_t match_event_indicator;
uint8_t __padding__[2];
struct mdp_group_size md_entries_size;
struct mdp_md_incremental_refresh_daily_statistics_md_entries md_entries[];
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_limit_banding_md_entries {
mdp_price_null_t high_limit_price;
mdp_price_null_t low_limit_price;
mdp_price_null_t max_price_variation;
int32_t security_id;
uint32_t rpt_seq;
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_limit_banding {
uint64_t transact_time;
uint8_t match_event_indicator;
uint8_t __padding__[2];
struct mdp_group_size md_entries_size;
struct mdp_md_incremental_refresh_limit_banding_md_entries md_entries[];
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_session_statistics_md_entries {
mdp_price_null_t md_entry_px;
int32_t security_id;
uint32_t rpt_seq;
uint8_t open_close_settl_flag;
uint8_t md_update_action;
uint8_t md_entry_type;
uint8_t __padding__[5];
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_session_statistics {
uint64_t transact_time;
uint8_t match_event_indicator;
uint8_t __padding__[2];
struct mdp_group_size md_entries_size;
struct mdp_md_incremental_refresh_session_statistics_md_entries md_entries[];
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_trade_md_entries {
mdp_price_null_t md_entry_px;
int32_t md_entry_size;
int32_t security_id;
uint32_t rpt_seq;
int32_t number_of_orders;
int32_t trade_id;
uint8_t aggressor_side;
uint8_t md_update_action;
uint8_t __padding__[2];
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_trade {
uint64_t transact_time;
uint8_t match_event_indicator;
uint8_t __padding__[2];
struct mdp_group_size md_entries_size;
struct mdp_md_incremental_refresh_trade_md_entries md_entries[];
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_volume_md_entries {
int32_t md_entry_size;
int32_t security_id;
uint32_t rpt_seq;
uint8_t md_update_action;
uint8_t __padding__[3];
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_volume {
uint64_t transact_time;
uint8_t match_event_indicator;
uint8_t __padding__[2];
struct mdp_group_size md_entries_size;
struct mdp_md_incremental_refresh_volume_md_entries md_entries[];
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_trade_summary_md_entries {
mdp_price_null_t md_entry_px;
int32_t md_entry_size;
int32_t security_id;
uint32_t rpt_seq;
int32_t number_of_orders;
uint8_t aggressor_side;
uint8_t md_update_action;
uint8_t __padding__[6];
} __attribute__((__packed__));

struct mdp_md_incremental_refresh_trade_summary {
uint64_t transact_time;
uint8_t match_event_indicator;
uint8_t __padding__[2];
struct mdp_group_size md_entries_size;
struct mdp_md_incremental_refresh_trade_summary_md_entries md_entries[];
// TODO NoOrderIDEntries
} __attribute__((__packed__));

]]

42 changes: 34 additions & 8 deletions lua/mdp/proto_mdp3_sbe.lua
Original file line number Diff line number Diff line change
@@ -50,12 +50,21 @@ local kMDP_SECURITY_STATUS_PTR = ffi.typeof("struct mdp_security_status *")
-- Instrument Definition: Future
local kMDP_INSTRUMENT_DEFINITION_FUTURE = ffi.typeof("struct mdp_instrument_definition_future *")

-- Instrument Definition: Spread
local kMDP_INSTRUMENT_DEFINITION_SPREAD = ffi.typeof("struct mdp_instrument_definition_spread *")

-- char ptr for pointer arithmetic
local kUINT8_PTR = ffi.typeof("uint8_t *")

local kMDP_MD_INCREMENTAL_REFRESH_BOOK_PTR = ffi.typeof("struct mdp_md_incremental_refresh_book *")
local kMDP_MD_INCREMENTAL_REFRESH_DAILY_STATISTICS_PTR = ffi.sizeof("struct mdp_md_incremental_refresh_daily_statistics *")
local kMDP_MD_INCREMENTAL_REFRESH_LIMIT_BANDING_PTR = ffi.typeof("struct mdp_md_incremental_refresh_limit_banding *")
local kMDP_MD_INCREMENTAL_REFRESH_SESSION_STATISTICS_PTR = ffi.typeof("struct mdp_md_incremental_refresh_session_statistics *")
local kMDP_MD_INCREMENTAL_REFRESH_TRADE_PTR = ffi.typeof("struct mdp_md_incremental_refresh_trade *")
local kMDP_MD_INCREMENTAL_REFRESH_VOLUME_PTR = ffi.typeof("struct mdp_md_incremental_refresh_volume *")
local kMDP_MD_INCREMENTAL_REFRESH_TRADE_SUMMARY_PTR = ffi.typeof("struct mdp_md_incremental_refresh_trade_summary *")

local kMDP_VERSION_SUPPORTED = 5
local kMDP_VERSION_SUPPORTED = 6

-- Update action constants
local kUPDATE_ACTION_NEW = 0
@@ -70,7 +79,7 @@ local kCHANNEL_RESET = 4
local kADMIN_HEARTBEAT = 12
local kADMIN_LOGOUT = 16
local kMD_INSTRUMENT_DEFINITION_FUTURE = 27
local kMD_INSTURMENT_DEFINITION_SPREAD = 29
local kMD_INSTRUMENT_DEFINITION_SPREAD = 29
local kSECURITY_STATUS = 30
local kMD_INCREMENTAL_REFRESH_BOOK = 32
local kMD_INCREMENTAL_REFRESH_DAILY_STATS = 33
@@ -105,7 +114,7 @@ local function handle_admin_logout(mdstate, msg_hdr)
return true
end

local function handle_instrument_definition(mdstate, msg_hdr)
local function handle_instrument_definition_future(mdstate, msg_hdr)
local idf = cast_payload(msg_hdr, kMDP_INSTRUMENT_DEFINITION_FUTURE)

Logger.info("INSTRUMENT-DEFINITION-FUTURE", "Symbol: %s Group: %s Asset: %s Currency: %s SettlCurrency: %s Units of %d %s",
@@ -115,7 +124,19 @@ local function handle_instrument_definition(mdstate, msg_hdr)
ffi.string(idf.currency, 3),
ffi.string(idf.settl_currency, 3),
tonumber(idf.unit_of_measure_quantity),
ffi.strong(idf.unit_of_measure, 30))
ffi.string(idf.unit_of_measure, 30))

return true
end

local function handle_instrument_definition_spread(mdstate, msg_hdr)
local idf = cast_payload(msg_hdr, kMDP_INSTRUMENT_DEFINITION_SPREAD)

Logger.info("INSTRUMENT-DEFINITION-SPREAD", "Symbol: %s Group: %s Asset: %s Currency: %s",
ffi.string(idf.symbol, 20),
ffi.string(idf.security_group, 6),
ffi.string(idf.asset, 6),
ffi.string(idf.currency, 3))

return true
end
@@ -175,9 +196,10 @@ end

local function handle_incremental_refresh(mdstate, msg_hdr)
local md_inc = cast_payload(msg_hdr, kMDP_MD_INCREMENTAL_REFRESH_BOOK_PTR)
local num_in_group = md_inc.md_entries_size.num_in_group

if md_inc.md_entries_size.num_in_group > 0 then
for i = 0,md_inc.md_entries_size.num_in_group-1 do
if num_in_group > 0 then
for i = 0,num_in_group-1 do
local entry = md_inc.md_entries[i]
if entry.md_update_action == kUPDATE_ACTION_NEW then
mdstate.on_new_price_level(mdstate, entry)
@@ -232,7 +254,7 @@ end

-- Given the current message header and the remaining bytes, return the next message header
local function get_next_message_header(cur_msg_hdr, bytes_remain)
if bytes_remain <= cur_msg_hdr.msg_size then
if bytes_remain <= cur_msg_hdr.msg_size + kMDP_MESSAGE_HDR_LEN then
return nil, 0
else
return ffi.cast(kMDP_MESSAGE_HDR_PTR, ffi.cast(kUINT8_PTR, cur_msg_hdr) + cur_msg_hdr.msg_size), bytes_remain - cur_msg_hdr.msg_size
@@ -302,7 +324,9 @@ function MarketDataHandler.on_message(self, ll_payload)
elseif hdr.template_id == kSECURITY_STATUS then
handle_security_status(self, hdr)
elseif hdr.template_id == kMD_INSTRUMENT_DEFINITION_FUTURE then
handle_instrument_definition(self, hdr)
handle_instrument_definition_future(self, hdr)
elseif hdr.template_id == kMD_INSTRUMENT_DEFINITION_SPREAD then
handle_instrument_definition_spread(self, hdr)
elseif hdr.template_id == kADMIN_HEARTBEAT then
handle_heartbeat(self, hdr)
elseif hdr.template_id == kSNAPSHOT_FULL_REFRESH then
@@ -319,6 +343,8 @@ function MarketDataHandler.on_message(self, ll_payload)
handle_incremental_refresh_daily_stats(self, hdr)
elseif hdr.template_id == kMD_INCREMENTAL_REFRESH_LIMITS_BANDING then
handle_incremental_refresh_limits_banding(self, hdr)
elseif hdr.template_id == kMD_INCREMENTAL_REFRESH_TRADE then
handle_incremental_refresh_trade(self, hdr)
else
handle_unhandled(self, hdr)
end