diff --git a/lua/mdp/book_builder.lua b/lua/mdp/book_builder.lua index 9195723..d4ac882 100644 --- a/lua/mdp/book_builder.lua +++ b/lua/mdp/book_builder.lua @@ -163,11 +163,12 @@ local function price_level_alloc(alloc) end -- Create a new book -local function new_book(symbol, max_entries) +local function new_book(symbol, depth) local self = setmetatable({ bid = { nr_entries = 0 }, ask = { nr_entries = 0 }, - price_levels = new_price_levels_alloc(max_entries * 2), + price_levels = new_price_levels_alloc(depth * 3), + depth = depth, symbol = symbol, next_update = nil, }, Book) @@ -184,7 +185,7 @@ local function Book_find_price_level(book_side, level) for i=1,level do cur_lev = cur_lev.next_itm - if cur_lev == book_side.entries then + if cur_lev == book_side then Logger.debug("WRAP-AROUND", "Wrapped around, yikes.") cur_lev = nil break @@ -267,8 +268,8 @@ function Book.insert_new_price_level(self, side, price, size, nr_orders, level) list_insert_after(new_lev, cur_lev) book_side.nr_entries = book_side.nr_entries + 1 - if book_side.nr_entries > 10 then - Book_prune(self.price_levels, book_side, 10) + if book_side.nr_entries > self.depth then + Book_prune(self.price_levels, book_side, self.depth) end book_side.needs_update = level <= 3 @@ -331,7 +332,7 @@ function Book.clear_all_price_levels(self, side) local cur_level = book_side.next_itm local alloc = self.price_levels - while cur_level ~= book_side.entries do + while cur_level ~= book_side do local next_level = cur_level.next_itm list_remove(cur_level) price_level_free(alloc, cur_level) @@ -361,7 +362,7 @@ function Book.remove_price_levels_from(self, side, levels) local cur_level = book_side.next_itm local rem = levels - while cur_level ~= book_side.entries and level > 0 do + while cur_level ~= book_side and level > 0 do local next_level = cur_level.next_itm list_del(cur_level) end diff --git a/lua/mdp/pkt_def.lua b/lua/mdp/pkt_def.lua index f2defbc..b31452b 100644 --- a/lua/mdp/pkt_def.lua +++ b/lua/mdp/pkt_def.lua @@ -100,6 +100,49 @@ ffi.cdef[[ int8_t flow_schedule_type; mdp_price_null_t min_price_increment_amount; int8_t user_defined_instrument; + uint16_t trading_reference_date; + } __attribute__((__packed__)); + + struct mdp_instrument_definition_spread { + uint8_t match_event_indicator; + uint32_t tot_num_reports; + uint8_t security_update_action; + uint64_t last_update_time; + uint8_t md_security_trading_status; + int16_t appl_id; + uint8_t market_segment_id; + uint8_t underlying_product; + uint32_t security_exchange; + char security_group[6]; + char asset[6]; + char symbol[20]; + int32_t security_id; + uint8_t security_id_source; + char security_type[6]; + char cfi_code[6]; + char maturity_month_year[5]; + char currency[3]; + int8_t security_sub_type; + int8_t user_defined_instrument; + uint8_t match_algorithm; + uint32_t min_trade_vol; + uint32_t max_trade_vol; + int64_t min_price_increment; + int64_t display_factor; + uint8_t price_display_format; + mdp_price_null_t price_ratio; + int8_t tick_rule; + char unit_of_measure[30]; + mdp_price_null_t trading_reference_price; + uint8_t settl_price_type; + int32_t open_interest_qty; + int32_t cleared_volume; + mdp_price_null_t high_limit_price; + mdp_price_null_t low_limit_price; + mdp_price_null_t max_price_variation; + uint8_t main_fraction; + uint8_t sub_fraction; + uint16_t trading_reference_date; } __attribute__((__packed__)); struct mdp_security_status { @@ -116,10 +159,10 @@ ffi.cdef[[ struct mdp_md_incremental_refresh_book_md_entries { mdp_price_null_t md_entry_px; - uint32_t md_entry_size; + int32_t md_entry_size; int32_t security_id; uint32_t rpt_seq; - uint32_t number_of_orders; + int32_t number_of_orders; uint8_t md_price_level; uint8_t md_update_action; uint8_t md_entry_type; @@ -129,10 +172,120 @@ ffi.cdef[[ struct mdp_md_incremental_refresh_book { uint64_t transact_time; uint8_t match_event_indicator; - uint16_t wtf; + uint8_t __padding__[2]; struct mdp_group_size md_entries_size; struct mdp_md_incremental_refresh_book_md_entries md_entries[]; } __attribute__((__packed__)); + struct mdp_md_incremental_refresh_daily_statistics_md_entries { + mdp_price_null_t md_entry_px; + int32_t md_entry_size; + int32_t security_id; + uint32_t rpt_seq; + uint16_t trading_reference_date; + uint8_t settle_price_type; + uint8_t md_update_action; + uint8_t md_entry_type; + uint8_t __padding__[7]; + } __attribute__((__packed__)); + + struct mdp_md_incremental_refresh_daily_statistics { + uint64_t transact_time; + uint8_t match_event_indicator; + uint8_t __padding__[2]; + struct mdp_group_size md_entries_size; + struct mdp_md_incremental_refresh_daily_statistics_md_entries md_entries[]; + } __attribute__((__packed__)); + + struct mdp_md_incremental_refresh_limit_banding_md_entries { + mdp_price_null_t high_limit_price; + mdp_price_null_t low_limit_price; + mdp_price_null_t max_price_variation; + int32_t security_id; + uint32_t rpt_seq; + } __attribute__((__packed__)); + + struct mdp_md_incremental_refresh_limit_banding { + uint64_t transact_time; + uint8_t match_event_indicator; + uint8_t __padding__[2]; + struct mdp_group_size md_entries_size; + struct mdp_md_incremental_refresh_limit_banding_md_entries md_entries[]; + } __attribute__((__packed__)); + + struct mdp_md_incremental_refresh_session_statistics_md_entries { + mdp_price_null_t md_entry_px; + int32_t security_id; + uint32_t rpt_seq; + uint8_t open_close_settl_flag; + uint8_t md_update_action; + uint8_t md_entry_type; + uint8_t __padding__[5]; + } __attribute__((__packed__)); + + struct mdp_md_incremental_refresh_session_statistics { + uint64_t transact_time; + uint8_t match_event_indicator; + uint8_t __padding__[2]; + struct mdp_group_size md_entries_size; + struct mdp_md_incremental_refresh_session_statistics_md_entries md_entries[]; + } __attribute__((__packed__)); + + struct mdp_md_incremental_refresh_trade_md_entries { + mdp_price_null_t md_entry_px; + int32_t md_entry_size; + int32_t security_id; + uint32_t rpt_seq; + int32_t number_of_orders; + int32_t trade_id; + uint8_t aggressor_side; + uint8_t md_update_action; + uint8_t __padding__[2]; + } __attribute__((__packed__)); + + struct mdp_md_incremental_refresh_trade { + uint64_t transact_time; + uint8_t match_event_indicator; + uint8_t __padding__[2]; + struct mdp_group_size md_entries_size; + struct mdp_md_incremental_refresh_trade_md_entries md_entries[]; + } __attribute__((__packed__)); + + struct mdp_md_incremental_refresh_volume_md_entries { + int32_t md_entry_size; + int32_t security_id; + uint32_t rpt_seq; + uint8_t md_update_action; + uint8_t __padding__[3]; + } __attribute__((__packed__)); + + struct mdp_md_incremental_refresh_volume { + uint64_t transact_time; + uint8_t match_event_indicator; + uint8_t __padding__[2]; + struct mdp_group_size md_entries_size; + struct mdp_md_incremental_refresh_volume_md_entries md_entries[]; + } __attribute__((__packed__)); + + struct mdp_md_incremental_refresh_trade_summary_md_entries { + mdp_price_null_t md_entry_px; + int32_t md_entry_size; + int32_t security_id; + uint32_t rpt_seq; + int32_t number_of_orders; + uint8_t aggressor_side; + uint8_t md_update_action; + uint8_t __padding__[6]; + } __attribute__((__packed__)); + + struct mdp_md_incremental_refresh_trade_summary { + uint64_t transact_time; + uint8_t match_event_indicator; + uint8_t __padding__[2]; + struct mdp_group_size md_entries_size; + struct mdp_md_incremental_refresh_trade_summary_md_entries md_entries[]; + // TODO NoOrderIDEntries + } __attribute__((__packed__)); + ]] diff --git a/lua/mdp/proto_mdp3_sbe.lua b/lua/mdp/proto_mdp3_sbe.lua index ab7f5ca..4e2f440 100644 --- a/lua/mdp/proto_mdp3_sbe.lua +++ b/lua/mdp/proto_mdp3_sbe.lua @@ -50,12 +50,21 @@ local kMDP_SECURITY_STATUS_PTR = ffi.typeof("struct mdp_security_status *") -- Instrument Definition: Future local kMDP_INSTRUMENT_DEFINITION_FUTURE = ffi.typeof("struct mdp_instrument_definition_future *") +-- Instrument Definition: Spread +local kMDP_INSTRUMENT_DEFINITION_SPREAD = ffi.typeof("struct mdp_instrument_definition_spread *") + -- char ptr for pointer arithmetic local kUINT8_PTR = ffi.typeof("uint8_t *") local kMDP_MD_INCREMENTAL_REFRESH_BOOK_PTR = ffi.typeof("struct mdp_md_incremental_refresh_book *") +local kMDP_MD_INCREMENTAL_REFRESH_DAILY_STATISTICS_PTR = ffi.sizeof("struct mdp_md_incremental_refresh_daily_statistics *") +local kMDP_MD_INCREMENTAL_REFRESH_LIMIT_BANDING_PTR = ffi.typeof("struct mdp_md_incremental_refresh_limit_banding *") +local kMDP_MD_INCREMENTAL_REFRESH_SESSION_STATISTICS_PTR = ffi.typeof("struct mdp_md_incremental_refresh_session_statistics *") +local kMDP_MD_INCREMENTAL_REFRESH_TRADE_PTR = ffi.typeof("struct mdp_md_incremental_refresh_trade *") +local kMDP_MD_INCREMENTAL_REFRESH_VOLUME_PTR = ffi.typeof("struct mdp_md_incremental_refresh_volume *") +local kMDP_MD_INCREMENTAL_REFRESH_TRADE_SUMMARY_PTR = ffi.typeof("struct mdp_md_incremental_refresh_trade_summary *") -local kMDP_VERSION_SUPPORTED = 5 +local kMDP_VERSION_SUPPORTED = 6 -- Update action constants local kUPDATE_ACTION_NEW = 0 @@ -70,7 +79,7 @@ local kCHANNEL_RESET = 4 local kADMIN_HEARTBEAT = 12 local kADMIN_LOGOUT = 16 local kMD_INSTRUMENT_DEFINITION_FUTURE = 27 -local kMD_INSTURMENT_DEFINITION_SPREAD = 29 +local kMD_INSTRUMENT_DEFINITION_SPREAD = 29 local kSECURITY_STATUS = 30 local kMD_INCREMENTAL_REFRESH_BOOK = 32 local kMD_INCREMENTAL_REFRESH_DAILY_STATS = 33 @@ -105,7 +114,7 @@ local function handle_admin_logout(mdstate, msg_hdr) return true end -local function handle_instrument_definition(mdstate, msg_hdr) +local function handle_instrument_definition_future(mdstate, msg_hdr) local idf = cast_payload(msg_hdr, kMDP_INSTRUMENT_DEFINITION_FUTURE) Logger.info("INSTRUMENT-DEFINITION-FUTURE", "Symbol: %s Group: %s Asset: %s Currency: %s SettlCurrency: %s Units of %d %s", @@ -115,7 +124,19 @@ local function handle_instrument_definition(mdstate, msg_hdr) ffi.string(idf.currency, 3), ffi.string(idf.settl_currency, 3), tonumber(idf.unit_of_measure_quantity), - ffi.strong(idf.unit_of_measure, 30)) + ffi.string(idf.unit_of_measure, 30)) + + return true +end + +local function handle_instrument_definition_spread(mdstate, msg_hdr) + local idf = cast_payload(msg_hdr, kMDP_INSTRUMENT_DEFINITION_SPREAD) + + Logger.info("INSTRUMENT-DEFINITION-SPREAD", "Symbol: %s Group: %s Asset: %s Currency: %s", + ffi.string(idf.symbol, 20), + ffi.string(idf.security_group, 6), + ffi.string(idf.asset, 6), + ffi.string(idf.currency, 3)) return true end @@ -175,9 +196,10 @@ end local function handle_incremental_refresh(mdstate, msg_hdr) local md_inc = cast_payload(msg_hdr, kMDP_MD_INCREMENTAL_REFRESH_BOOK_PTR) + local num_in_group = md_inc.md_entries_size.num_in_group - if md_inc.md_entries_size.num_in_group > 0 then - for i = 0,md_inc.md_entries_size.num_in_group-1 do + if num_in_group > 0 then + for i = 0,num_in_group-1 do local entry = md_inc.md_entries[i] if entry.md_update_action == kUPDATE_ACTION_NEW then mdstate.on_new_price_level(mdstate, entry) @@ -232,7 +254,7 @@ end -- Given the current message header and the remaining bytes, return the next message header local function get_next_message_header(cur_msg_hdr, bytes_remain) - if bytes_remain <= cur_msg_hdr.msg_size then + if bytes_remain <= cur_msg_hdr.msg_size + kMDP_MESSAGE_HDR_LEN then return nil, 0 else return ffi.cast(kMDP_MESSAGE_HDR_PTR, ffi.cast(kUINT8_PTR, cur_msg_hdr) + cur_msg_hdr.msg_size), bytes_remain - cur_msg_hdr.msg_size @@ -302,7 +324,9 @@ function MarketDataHandler.on_message(self, ll_payload) elseif hdr.template_id == kSECURITY_STATUS then handle_security_status(self, hdr) elseif hdr.template_id == kMD_INSTRUMENT_DEFINITION_FUTURE then - handle_instrument_definition(self, hdr) + handle_instrument_definition_future(self, hdr) + elseif hdr.template_id == kMD_INSTRUMENT_DEFINITION_SPREAD then + handle_instrument_definition_spread(self, hdr) elseif hdr.template_id == kADMIN_HEARTBEAT then handle_heartbeat(self, hdr) elseif hdr.template_id == kSNAPSHOT_FULL_REFRESH then @@ -319,6 +343,8 @@ function MarketDataHandler.on_message(self, ll_payload) handle_incremental_refresh_daily_stats(self, hdr) elseif hdr.template_id == kMD_INCREMENTAL_REFRESH_LIMITS_BANDING then handle_incremental_refresh_limits_banding(self, hdr) + elseif hdr.template_id == kMD_INCREMENTAL_REFRESH_TRADE then + handle_incremental_refresh_trade(self, hdr) else handle_unhandled(self, hdr) end