From e8ab531d430647785bbf41bd5b29300f42836c14 Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Fri, 16 Jul 2021 08:29:32 +0000 Subject: [PATCH 1/9] add PandasQuote --- qlib/backtest/exchange.py | 272 ++++++++++++++++++++++++-------------- 1 file changed, 171 insertions(+), 101 deletions(-) diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 58f57ed731..8d47392514 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -102,10 +102,11 @@ def __init__( # TODO: the quote, trade_dates, codes are not necessray. # It is just for performance consideration. + self.limit_type = BaseQuote._get_limit_type(limit_threshold) if limit_threshold is None: if C.region == REG_CN: self.logger.warning(f"limit_threshold not set. The stocks hit the limit may be bought/sold") - elif self._get_limit_type(limit_threshold) == self.LT_FLT and abs(limit_threshold) > 0.1: + elif self.limit_type == BaseQuote.LT_FLT and abs(limit_threshold) > 0.1: if C.region == REG_CN: self.logger.warning(f"limit_threshold may not be set to a reasonable value") @@ -127,10 +128,9 @@ def __init__( # $change is for calculating the limit of the stock necessary_fields = {self.buy_price, self.sell_price, "$close", "$change", "$factor", "$volume"} - if self._get_limit_type(limit_threshold) == self.LT_TP_EXP: + if self.limit_type == BaseQuote.LT_TP_EXP: for exp in limit_threshold: necessary_fields.add(exp) - subscribe_fields = list(necessary_fields | set(subscribe_fields)) all_fields = list(necessary_fields | set(subscribe_fields)) self.all_fields = all_fields @@ -140,94 +140,22 @@ def __init__( self.limit_threshold: Union[Tuple[str, str], float, None] = limit_threshold self.volume_threshold = volume_threshold self.extra_quote = extra_quote - self.set_quote(codes, start_time, end_time) - def set_quote(self, codes, start_time, end_time): - if len(codes) == 0: - codes = D.instruments() - - self.quote = D.features(codes, self.all_fields, start_time, end_time, freq=self.freq, disk_cache=True).dropna( - subset=["$close"] + # init quote + self.quote = PandasQuote( + start_time = self.start_time, + end_time = self.end_time, + freq = self.freq, + codes = self.codes, + all_fields = self.all_fields, + limit_threshold = self.limit_threshold, + buy_price = self.buy_price, + sell_price = self.sell_price, + extra_quote = self.extra_quote, ) - self.quote.columns = self.all_fields - - for attr in "buy_price", "sell_price": - pstr = getattr(self, attr) # price string - if self.quote[pstr].isna().any(): - self.logger.warning("{} field data contains nan.".format(pstr)) - - if self.quote["$factor"].isna().any(): - # The 'factor.day.bin' file not exists, and `factor` field contains `nan` - # Use adjusted price - self.trade_w_adj_price = True - self.logger.warning("factor.day.bin file not exists or factor contains `nan`. Order using adjusted_price.") - if self.trade_unit is not None: - self.logger.warning(f"trade unit {self.trade_unit} is not supported in adjusted_price mode.") - - else: - # The `factor.day.bin` file exists and all data `close` and `factor` are not `nan` - # Use normal price - self.trade_w_adj_price = False - - # update limit - self._update_limit() - - quote_df = self.quote - if self.extra_quote is not None: - # process extra_quote - if "$close" not in self.extra_quote: - raise ValueError("$close is necessray in extra_quote") - for attr in "buy_price", "sell_price": - pstr = getattr(self, attr) # price string - if pstr not in self.extra_quote.columns: - self.extra_quote[pstr] = self.extra_quote["$close"] - self.logger.warning(f"No {pstr} set for extra_quote. Use $close as {pstr}.") - if "$factor" not in self.extra_quote.columns: - self.extra_quote["$factor"] = 1.0 - self.logger.warning("No $factor set for extra_quote. Use 1.0 as $factor.") - if "limit_sell" not in self.extra_quote.columns: - self.extra_quote["limit_sell"] = False - self.logger.warning("No limit_sell set for extra_quote. All stock will be able to be sold.") - if "limit_buy" not in self.extra_quote.columns: - self.extra_quote["limit_buy"] = False - self.logger.warning("No limit_buy set for extra_quote. All stock will be able to be bought.") - - assert set(self.extra_quote.columns) == set(quote_df.columns) - {"$change"} - quote_df = pd.concat([quote_df, self.extra_quote], sort=False, axis=0) - - quote_dict = {} - for stock_id, stock_val in quote_df.groupby(level="instrument"): - quote_dict[stock_id] = stock_val.droplevel(level="instrument") - - self.quote = quote_dict - - LT_TP_EXP = "(exp)" # Tuple[str, str] - LT_FLT = "float" # float - LT_NONE = "none" # none - - def _get_limit_type(self, limit_threshold): - if isinstance(limit_threshold, Tuple): - return self.LT_TP_EXP - elif isinstance(limit_threshold, float): - return self.LT_FLT - elif limit_threshold is None: - return self.LT_NONE - else: - raise NotImplementedError(f"This type of `limit_threshold` is not supported") - - def _update_limit(self): - # check limit_threshold - lt_type = self._get_limit_type(self.limit_threshold) - if lt_type == self.LT_NONE: - self.quote["limit_buy"] = False - self.quote["limit_sell"] = False - elif lt_type == self.LT_TP_EXP: - # set limit - self.quote["limit_buy"] = self.quote[self.limit_threshold[0]] - self.quote["limit_sell"] = self.quote[self.limit_threshold[1]] - elif lt_type == self.LT_FLT: - self.quote["limit_buy"] = self.quote["$change"].ge(self.limit_threshold) - self.quote["limit_sell"] = self.quote["$change"].le(-self.limit_threshold) # pylint: disable=E1130 + self.trade_w_adj_price = self.quote.get_trade_w_adj_price() + if(self.trade_w_adj_price and (self.trade_unit is not None)): + self.logger.warning(f"trade unit {self.trade_unit} is not supported in adjusted_price mode.") def check_stock_limit(self, stock_id, start_time, end_time, direction=None): """ @@ -241,20 +169,20 @@ def check_stock_limit(self, stock_id, start_time, end_time, direction=None): """ if direction is None: - buy_limit = resam_ts_data(self.quote[stock_id]["limit_buy"], start_time, end_time, method="all") - sell_limit = resam_ts_data(self.quote[stock_id]["limit_sell"], start_time, end_time, method="all") + buy_limit = self.quote.get_data(stock_id, start_time, end_time, fields="limit_buy", method="all") + sell_limit = self.quote.get_data(stock_id, start_time, end_time, fields="limit_sell", method="all") return buy_limit or sell_limit elif direction == Order.BUY: - return resam_ts_data(self.quote[stock_id]["limit_buy"], start_time, end_time, method="all") + return self.quote.get_data(stock_id, start_time, end_time, fields="limit_buy", method="all") elif direction == Order.SELL: - return resam_ts_data(self.quote[stock_id]["limit_sell"], start_time, end_time, method="all") + return self.quote.get_data(stock_id, start_time, end_time, fields="limit_sell", method="all") else: raise ValueError(f"direction {direction} is not supported!") def check_stock_suspended(self, stock_id, start_time, end_time): # is suspended - if stock_id in self.quote: - return resam_ts_data(self.quote[stock_id], start_time, end_time, method=None) is None + if stock_id in self.quote.get_all_stock(): + return self.quote.get_data(stock_id, start_time, end_time) is None else: return True @@ -313,13 +241,13 @@ def deal_order(self, order, trade_account=None, position=None): return trade_val, trade_cost, trade_price def get_quote_info(self, stock_id, start_time, end_time, method=ts_data_last): - return resam_ts_data(self.quote[stock_id], start_time, end_time, method=method) + return self.quote.get_data(stock_id, start_time, end_time, method=method) def get_close(self, stock_id, start_time, end_time, method=ts_data_last): - return resam_ts_data(self.quote[stock_id]["$close"], start_time, end_time, method=method) + return self.quote.get_data(stock_id, start_time, end_time, fields="$close", method=method) def get_volume(self, stock_id, start_time, end_time, method="sum"): - return resam_ts_data(self.quote[stock_id]["$volume"], start_time, end_time, method=method) + return self.quote.get_data(stock_id, start_time, end_time, fields="$volume", method=method) def get_deal_price(self, stock_id, start_time, end_time, direction: OrderDir, method=ts_data_last): if direction == OrderDir.SELL: @@ -328,7 +256,7 @@ def get_deal_price(self, stock_id, start_time, end_time, direction: OrderDir, me pstr = self.buy_price else: raise NotImplementedError(f"This type of input is not supported") - deal_price = resam_ts_data(self.quote[stock_id][pstr], start_time, end_time, method=method) + deal_price = self.quote.get_data(stock_id, start_time, end_time, fields=pstr, method=method) if method is not None and (np.isclose(deal_price, 0.0) or np.isnan(deal_price)): self.logger.warning(f"(stock_id:{stock_id}, trade_time:{(start_time, end_time)}, {pstr}): {deal_price}!!!") self.logger.warning(f"setting deal_price to close price") @@ -343,9 +271,9 @@ def get_factor(self, stock_id, start_time, end_time) -> Union[float, None]: `None`: if the stock is suspended `None` may be returned `float`: return factor if the factor exists """ - if stock_id not in self.quote: + if stock_id not in self.quote.get_all_stock(): return None - return resam_ts_data(self.quote[stock_id]["$factor"], start_time, end_time, method=ts_data_last) + return self.quote.get_data(stock_id, start_time, end_time, fields="$factor", method=ts_data_last) def generate_amount_position_from_weight_position( self, weight_position, cash, start_time, end_time, direction=OrderDir.BUY @@ -596,3 +524,145 @@ def get_order_helper(self) -> OrderHelper: # cache to avoid recreate the same instance self._order_helper = OrderHelper(self) return self._order_helper + + +class BaseQuote: + + def __init__(self): + self.logger = get_module_logger("online operator", level=logging.INFO) + + def _update_limit(self, limit_threshold): + raise NotImplementedError(f"Please implement the `_update_limit` method") + + def get_trade_w_adj_price(self): + raise NotImplementedError(f"Please implement the `get_trade_w_adj_price` method") + + def get_all_stock(self): + raise NotImplementedError(f"Please implement the `get_all_stock` method") + + def get_data(self, stock_id, start_time, end_time, fields, method): + raise NotImplementedError(f"Please implement the `get_data` method") + + LT_TP_EXP = "(exp)" # Tuple[str, str] + LT_FLT = "float" # float + LT_NONE = "none" # none + + @staticmethod + def _get_limit_type(limit_threshold): + if isinstance(limit_threshold, Tuple): + return BaseQuote.LT_TP_EXP + elif isinstance(limit_threshold, float): + return BaseQuote.LT_FLT + elif limit_threshold is None: + return BaseQuote.LT_NONE + else: + raise NotImplementedError(f"This type of `limit_threshold` is not supported") + + +class PandasQuote(BaseQuote): + + def __init__( + self, + start_time, + end_time, + freq, + codes, + all_fields, + limit_threshold, + buy_price, + sell_price, + extra_quote + ): + + super().__init__() + + # get stock data from qlib + if len(codes) == 0: + codes = D.instruments() + self.data = D.features( + codes, + all_fields, + start_time, + end_time, + freq=freq, + disk_cache=True + ).dropna(subset=["$close"]) + self.data.columns = all_fields + + # check buy_price data and sell_price data + self.buy_price = buy_price + self.sell_price = sell_price + for attr in "buy_price", "sell_price": + pstr = getattr(self, attr) # price string + if self.data[pstr].isna().any(): + self.logger.warning("{} field data contains nan.".format(pstr)) + + # update trade_w_adj_price + if self.data["$factor"].isna().any(): + # The 'factor.day.bin' file not exists, and `factor` field contains `nan` + # Use adjusted price + self.logger.warning("factor.day.bin file not exists or factor contains `nan`. Order using adjusted_price.") + self.trade_w_adj_price = True + else: + # The `factor.day.bin` file exists and all data `close` and `factor` are not `nan` + # Use normal price + self.trade_w_adj_price = False + + # update limit + self._update_limit(limit_threshold) + + # concat extra_quote + quote_df = self.data + if extra_quote is not None: + # process extra_quote + if "$close" not in extra_quote: + raise ValueError("$close is necessray in extra_quote") + for attr in "buy_price", "sell_price": + pstr = getattr(self, attr) # price string + if pstr not in extra_quote.columns: + extra_quote[pstr] = extra_quote["$close"] + self.logger.warning(f"No {pstr} set for extra_quote. Use $close as {pstr}.") + if "$factor" not in extra_quote.columns: + extra_quote["$factor"] = 1.0 + self.logger.warning("No $factor set for extra_quote. Use 1.0 as $factor.") + if "limit_sell" not in extra_quote.columns: + extra_quote["limit_sell"] = False + self.logger.warning("No limit_sell set for extra_quote. All stock will be able to be sold.") + if "limit_buy" not in extra_quote.columns: + extra_quote["limit_buy"] = False + self.logger.warning("No limit_buy set for extra_quote. All stock will be able to be bought.") + assert set(extra_quote.columns) == set(quote_df.columns) - {"$change"} + quote_df = pd.concat([quote_df, extra_quote], sort=False, axis=0) + + quote_dict = {} + for stock_id, stock_val in quote_df.groupby(level="instrument"): + quote_dict[stock_id] = stock_val.droplevel(level="instrument") + self.data = quote_dict + + def _update_limit(self, limit_threshold): + # check limit_threshold + limit_type = self._get_limit_type(limit_threshold) + if limit_type == self.LT_NONE: + self.data["limit_buy"] = False + self.data["limit_sell"] = False + elif limit_type == self.LT_TP_EXP: + # set limit + self.data["limit_buy"] = self.data[limit_threshold[0]] + self.data["limit_sell"] = self.data[limit_threshold[1]] + elif limit_type == self.LT_FLT: + self.data["limit_buy"] = self.data["$change"].ge(limit_threshold) + self.data["limit_sell"] = self.data["$change"].le(-limit_threshold) # pylint: disable=E1130 + + def get_all_stock(self): + return self.data.keys() + + def get_data(self, stock_id, start_time, end_time, fields = None, method = None): + if(fields is None): + return resam_ts_data(self.data[stock_id], start_time, end_time, method=method) + elif(isinstance(fields, (str, list))): + return resam_ts_data(self.data[stock_id][fields], start_time, end_time, method=method) + else: + raise ValueError(f"fields must be None, str or list") + + def get_trade_w_adj_price(self): + return self.trade_w_adj_price \ No newline at end of file From bf2a5a8516f02b335758d48371dd863331d267ad Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Fri, 16 Jul 2021 09:17:29 +0000 Subject: [PATCH 2/9] add doc --- qlib/backtest/exchange.py | 65 +++++++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 8d47392514..2e865d5916 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -532,15 +532,24 @@ def __init__(self): self.logger = get_module_logger("online operator", level=logging.INFO) def _update_limit(self, limit_threshold): + """add limitation information to data based on limit_threshold + """ raise NotImplementedError(f"Please implement the `_update_limit` method") def get_trade_w_adj_price(self): + """return whether use the trade price with adjusted weight + """ raise NotImplementedError(f"Please implement the `get_trade_w_adj_price` method") def get_all_stock(self): + """return all stock codes + """ raise NotImplementedError(f"Please implement the `get_all_stock` method") - def get_data(self, stock_id, start_time, end_time, fields, method): + def get_data(self, stock_id, start_time, end_time, fields=None, method=None): + """get the specific fields of stock data during start time and end_time, + and apply method to the data, please refer to resam_ts_data + """ raise NotImplementedError(f"Please implement the `get_data` method") LT_TP_EXP = "(exp)" # Tuple[str, str] @@ -549,6 +558,8 @@ def get_data(self, stock_id, start_time, end_time, fields, method): @staticmethod def _get_limit_type(limit_threshold): + """get limit type + """ if isinstance(limit_threshold, Tuple): return BaseQuote.LT_TP_EXP elif isinstance(limit_threshold, float): @@ -560,6 +571,8 @@ def _get_limit_type(limit_threshold): class PandasQuote(BaseQuote): + """ + """ def __init__( self, @@ -567,12 +580,52 @@ def __init__( end_time, freq, codes, - all_fields, - limit_threshold, - buy_price, - sell_price, - extra_quote + all_fields: List[str], + limit_threshold: Union[Tuple[str, str], float, None], + buy_price: str, + sell_price: str, + extra_quote: pd.DataFrame, ): + """init stock data based on pandas + + Parameters + ---------- + start_time : pd.Timestamp|str + closed start time for backtest + end_time : pd.Timestamp|str + closed end time for backtest + freq : str + frequency of data + codes : [type] + all stock code + all_fields : List[str] + all subscribe fields in qlib + limit_threshold : Union[Tuple[str, str], float, None] + 1) `None`: no limitation + 2) float, 0.1 for example, default None + 3) Tuple[str, str]: (, + ) + `False` value indicates the stock is tradable + `True` value indicates the stock is limited and not tradable + buy_price : str + the data field for buying stock + sell_price : str + the data field for selling stock + extra_quote : pd.DataFrame + columns: like ['$vwap', '$close', '$volume', '$factor', 'limit_sell', 'limit_buy']. + The limit indicates that the etf is tradable on a specific day. + Necessary fields: + $close is for calculating the total value at end of each day. + Optional fields: + $volume is only necessary when we limit the trade amount or caculate PA(vwap) indicator + $vwap is only necessary when we use the $vwap price as the deal price + $factor is for rounding to the trading unit + limit_sell will be set to False by default(False indicates we can sell this + target on this day). + limit_buy will be set to False by default(False indicates we can buy this + target on this day). + index: MultipleIndex(instrument, pd.Datetime) + """ super().__init__() From 9d053a33f032c8f9f83cb47528212bcfa3a8ab35 Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Fri, 16 Jul 2021 12:56:49 +0000 Subject: [PATCH 3/9] get qlib data in exchange --- qlib/backtest/exchange.py | 318 +++++++++++++++++--------------------- 1 file changed, 143 insertions(+), 175 deletions(-) diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 2e865d5916..82f57462e3 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -102,11 +102,11 @@ def __init__( # TODO: the quote, trade_dates, codes are not necessray. # It is just for performance consideration. - self.limit_type = BaseQuote._get_limit_type(limit_threshold) + self.limit_type = self._get_limit_type(limit_threshold) if limit_threshold is None: if C.region == REG_CN: self.logger.warning(f"limit_threshold not set. The stocks hit the limit may be bought/sold") - elif self.limit_type == BaseQuote.LT_FLT and abs(limit_threshold) > 0.1: + elif self.limit_type == self.LT_FLT and abs(limit_threshold) > 0.1: if C.region == REG_CN: self.logger.warning(f"limit_threshold may not be set to a reasonable value") @@ -128,7 +128,7 @@ def __init__( # $change is for calculating the limit of the stock necessary_fields = {self.buy_price, self.sell_price, "$close", "$change", "$factor", "$volume"} - if self.limit_type == BaseQuote.LT_TP_EXP: + if self.limit_type == self.LT_TP_EXP: for exp in limit_threshold: necessary_fields.add(exp) all_fields = list(necessary_fields | set(subscribe_fields)) @@ -140,22 +140,98 @@ def __init__( self.limit_threshold: Union[Tuple[str, str], float, None] = limit_threshold self.volume_threshold = volume_threshold self.extra_quote = extra_quote + self.get_quote_from_qlib() - # init quote - self.quote = PandasQuote( - start_time = self.start_time, - end_time = self.end_time, - freq = self.freq, - codes = self.codes, - all_fields = self.all_fields, - limit_threshold = self.limit_threshold, - buy_price = self.buy_price, - sell_price = self.sell_price, - extra_quote = self.extra_quote, - ) - self.trade_w_adj_price = self.quote.get_trade_w_adj_price() - if(self.trade_w_adj_price and (self.trade_unit is not None)): - self.logger.warning(f"trade unit {self.trade_unit} is not supported in adjusted_price mode.") + # init quote by quote_df + self.quote = PandasQuote(self.quote_df) + + def get_quote_from_qlib(self): + # get stock data from qlib + if len(self.codes) == 0: + self.codes = D.instruments() + self.quote_df = D.features( + self.codes, + self.all_fields, + self.start_time, + self.end_time, + freq=self.freq, + disk_cache=True + ).dropna(subset=["$close"]) + self.quote_df.columns = self.all_fields + + # check buy_price data and sell_price data + for attr in "buy_price", "sell_price": + pstr = getattr(self, attr) # price string + if self.quote_df[pstr].isna().any(): + self.logger.warning("{} field data contains nan.".format(pstr)) + + # update trade_w_adj_price + if self.quote_df["$factor"].isna().any(): + # The 'factor.day.bin' file not exists, and `factor` field contains `nan` + # Use adjusted price + self.trade_w_adj_price = True + self.logger.warning("factor.day.bin file not exists or factor contains `nan`. Order using adjusted_price.") + if self.trade_unit is not None: + self.logger.warning(f"trade unit {self.trade_unit} is not supported in adjusted_price mode.") + else: + # The `factor.day.bin` file exists and all data `close` and `factor` are not `nan` + # Use normal price + self.trade_w_adj_price = False + + # update limit + self._update_limit(self.limit_threshold) + + # concat extra_quote + if self.extra_quote is not None: + # process extra_quote + if "$close" not in self.extra_quote: + raise ValueError("$close is necessray in extra_quote") + for attr in "buy_price", "sell_price": + pstr = getattr(self, attr) # price string + if pstr not in self.extra_quote.columns: + self.extra_quote[pstr] = self.extra_quote["$close"] + self.logger.warning(f"No {pstr} set for extra_quote. Use $close as {pstr}.") + if "$factor" not in self.extra_quote.columns: + self.extra_quote["$factor"] = 1.0 + self.logger.warning("No $factor set for extra_quote. Use 1.0 as $factor.") + if "limit_sell" not in self.extra_quote.columns: + self.extra_quote["limit_sell"] = False + self.logger.warning("No limit_sell set for extra_quote. All stock will be able to be sold.") + if "limit_buy" not in self.extra_quote.columns: + self.extra_quote["limit_buy"] = False + self.logger.warning("No limit_buy set for extra_quote. All stock will be able to be bought.") + assert set(self.extra_quote.columns) == set(self.quote_df.columns) - {"$change"} + self.quote_df = pd.concat([self.quote_df, extra_quote], sort=False, axis=0) + + LT_TP_EXP = "(exp)" # Tuple[str, str] + LT_FLT = "float" # float + LT_NONE = "none" # none + + def _get_limit_type(self, limit_threshold): + """get limit type + """ + if isinstance(limit_threshold, Tuple): + return self.LT_TP_EXP + elif isinstance(limit_threshold, float): + return self.LT_FLT + elif limit_threshold is None: + return self.LT_NONE + else: + raise NotImplementedError(f"This type of `limit_threshold` is not supported") + + def _update_limit(self, limit_threshold): + # check limit_threshold + limit_type = self._get_limit_type(limit_threshold) + if limit_type == self.LT_NONE: + self.quote_df["limit_buy"] = False + self.quote_df["limit_sell"] = False + elif limit_type == self.LT_TP_EXP: + # set limit + self.quote_df["limit_buy"] = self.quote_df[limit_threshold[0]] + self.quote_df["limit_sell"] = self.quote_df[limit_threshold[1]] + elif limit_type == self.LT_FLT: + self.quote_df["limit_buy"] = self.quote_df["$change"].ge(limit_threshold) + self.quote_df["limit_sell"] = self.quote_df["$change"].le(-limit_threshold) # pylint: disable=E1130 def check_stock_limit(self, stock_id, start_time, end_time, direction=None): """ @@ -528,184 +604,79 @@ def get_order_helper(self) -> OrderHelper: class BaseQuote: - def __init__(self): + def __init__(self, quote_df: pd.DataFrame): self.logger = get_module_logger("online operator", level=logging.INFO) - def _update_limit(self, limit_threshold): - """add limitation information to data based on limit_threshold - """ - raise NotImplementedError(f"Please implement the `_update_limit` method") - - def get_trade_w_adj_price(self): - """return whether use the trade price with adjusted weight - """ - raise NotImplementedError(f"Please implement the `get_trade_w_adj_price` method") - def get_all_stock(self): """return all stock codes + + Return + ------ + Union[list, Dict.keys(), set, tuple] + all stock codes """ raise NotImplementedError(f"Please implement the `get_all_stock` method") - def get_data(self, stock_id, start_time, end_time, fields=None, method=None): + def get_data(self, stock_id: str, start_time, end_time, fields: Union[str, list]=None, method=None): """get the specific fields of stock data during start time and end_time, - and apply method to the data, please refer to resam_ts_data - """ - raise NotImplementedError(f"Please implement the `get_data` method") - - LT_TP_EXP = "(exp)" # Tuple[str, str] - LT_FLT = "float" # float - LT_NONE = "none" # none - - @staticmethod - def _get_limit_type(limit_threshold): - """get limit type - """ - if isinstance(limit_threshold, Tuple): - return BaseQuote.LT_TP_EXP - elif isinstance(limit_threshold, float): - return BaseQuote.LT_FLT - elif limit_threshold is None: - return BaseQuote.LT_NONE - else: - raise NotImplementedError(f"This type of `limit_threshold` is not supported") - - -class PandasQuote(BaseQuote): - """ - """ - - def __init__( - self, - start_time, - end_time, - freq, - codes, - all_fields: List[str], - limit_threshold: Union[Tuple[str, str], float, None], - buy_price: str, - sell_price: str, - extra_quote: pd.DataFrame, - ): - """init stock data based on pandas + and apply method to the data. + + Example: + .. code-block:: + $close $volume + instrument datetime + SH600000 2010-01-04 86.778313 16162960.0 + 2010-01-05 87.433578 28117442.0 + 2010-01-06 85.713585 23632884.0 + 2010-01-07 83.788803 20813402.0 + 2010-01-08 84.730675 16044853.0 + + SH600655 2010-01-04 2699.567383 158193.328125 + 2010-01-08 2612.359619 77501.406250 + 2010-01-11 2712.982422 160852.390625 + 2010-01-12 2788.688232 164587.937500 + 2010-01-13 2790.604004 145460.453125 + + print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) + + $close 87.433578 + $volume 28117442.0 + + print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields="$close", method="last")) + + 87.433578 Parameters ---------- + stock_id: Union[str, list] start_time : pd.Timestamp|str closed start time for backtest end_time : pd.Timestamp|str closed end time for backtest - freq : str - frequency of data - codes : [type] - all stock code - all_fields : List[str] - all subscribe fields in qlib - limit_threshold : Union[Tuple[str, str], float, None] - 1) `None`: no limitation - 2) float, 0.1 for example, default None - 3) Tuple[str, str]: (, - ) - `False` value indicates the stock is tradable - `True` value indicates the stock is limited and not tradable - buy_price : str - the data field for buying stock - sell_price : str - the data field for selling stock - extra_quote : pd.DataFrame - columns: like ['$vwap', '$close', '$volume', '$factor', 'limit_sell', 'limit_buy']. - The limit indicates that the etf is tradable on a specific day. - Necessary fields: - $close is for calculating the total value at end of each day. - Optional fields: - $volume is only necessary when we limit the trade amount or caculate PA(vwap) indicator - $vwap is only necessary when we use the $vwap price as the deal price - $factor is for rounding to the trading unit - limit_sell will be set to False by default(False indicates we can sell this - target on this day). - limit_buy will be set to False by default(False indicates we can buy this - target on this day). - index: MultipleIndex(instrument, pd.Datetime) - """ + fields : Union[str, List] + the columns of data to fetch + method : Union[str, Callable] + the method apply to data. + e.g ["None", "last", "all", "sum", "mean", qlib/utils/resam.py/ts_data_last] - super().__init__() + Return + ---------- + Union[None, float, pd.Series] + The resampled Series/value, return None when the resampled data is empty. + """ - # get stock data from qlib - if len(codes) == 0: - codes = D.instruments() - self.data = D.features( - codes, - all_fields, - start_time, - end_time, - freq=freq, - disk_cache=True - ).dropna(subset=["$close"]) - self.data.columns = all_fields + raise NotImplementedError(f"Please implement the `get_data` method") - # check buy_price data and sell_price data - self.buy_price = buy_price - self.sell_price = sell_price - for attr in "buy_price", "sell_price": - pstr = getattr(self, attr) # price string - if self.data[pstr].isna().any(): - self.logger.warning("{} field data contains nan.".format(pstr)) - # update trade_w_adj_price - if self.data["$factor"].isna().any(): - # The 'factor.day.bin' file not exists, and `factor` field contains `nan` - # Use adjusted price - self.logger.warning("factor.day.bin file not exists or factor contains `nan`. Order using adjusted_price.") - self.trade_w_adj_price = True - else: - # The `factor.day.bin` file exists and all data `close` and `factor` are not `nan` - # Use normal price - self.trade_w_adj_price = False - - # update limit - self._update_limit(limit_threshold) - - # concat extra_quote - quote_df = self.data - if extra_quote is not None: - # process extra_quote - if "$close" not in extra_quote: - raise ValueError("$close is necessray in extra_quote") - for attr in "buy_price", "sell_price": - pstr = getattr(self, attr) # price string - if pstr not in extra_quote.columns: - extra_quote[pstr] = extra_quote["$close"] - self.logger.warning(f"No {pstr} set for extra_quote. Use $close as {pstr}.") - if "$factor" not in extra_quote.columns: - extra_quote["$factor"] = 1.0 - self.logger.warning("No $factor set for extra_quote. Use 1.0 as $factor.") - if "limit_sell" not in extra_quote.columns: - extra_quote["limit_sell"] = False - self.logger.warning("No limit_sell set for extra_quote. All stock will be able to be sold.") - if "limit_buy" not in extra_quote.columns: - extra_quote["limit_buy"] = False - self.logger.warning("No limit_buy set for extra_quote. All stock will be able to be bought.") - assert set(extra_quote.columns) == set(quote_df.columns) - {"$change"} - quote_df = pd.concat([quote_df, extra_quote], sort=False, axis=0) +class PandasQuote(BaseQuote): + def __init__(self, quote_df: pd.DataFrame): + super().__init__(quote_df=quote_df) quote_dict = {} for stock_id, stock_val in quote_df.groupby(level="instrument"): quote_dict[stock_id] = stock_val.droplevel(level="instrument") self.data = quote_dict - def _update_limit(self, limit_threshold): - # check limit_threshold - limit_type = self._get_limit_type(limit_threshold) - if limit_type == self.LT_NONE: - self.data["limit_buy"] = False - self.data["limit_sell"] = False - elif limit_type == self.LT_TP_EXP: - # set limit - self.data["limit_buy"] = self.data[limit_threshold[0]] - self.data["limit_sell"] = self.data[limit_threshold[1]] - elif limit_type == self.LT_FLT: - self.data["limit_buy"] = self.data["$change"].ge(limit_threshold) - self.data["limit_sell"] = self.data["$change"].le(-limit_threshold) # pylint: disable=E1130 - def get_all_stock(self): return self.data.keys() @@ -715,7 +686,4 @@ def get_data(self, stock_id, start_time, end_time, fields = None, method = None) elif(isinstance(fields, (str, list))): return resam_ts_data(self.data[stock_id][fields], start_time, end_time, method=method) else: - raise ValueError(f"fields must be None, str or list") - - def get_trade_w_adj_price(self): - return self.trade_w_adj_price \ No newline at end of file + raise ValueError(f"fields must be None, str or list") \ No newline at end of file From 527e94f8012ea9b3ab345904436ac4f781eb52d9 Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Fri, 16 Jul 2021 13:55:49 +0000 Subject: [PATCH 4/9] black and doc --- qlib/backtest/exchange.py | 59 ++++++++++++++------------ qlib/contrib/strategy/rule_strategy.py | 4 +- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 82f57462e3..7733891fed 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -150,12 +150,7 @@ def get_quote_from_qlib(self): if len(self.codes) == 0: self.codes = D.instruments() self.quote_df = D.features( - self.codes, - self.all_fields, - self.start_time, - self.end_time, - freq=self.freq, - disk_cache=True + self.codes, self.all_fields, self.start_time, self.end_time, freq=self.freq, disk_cache=True ).dropna(subset=["$close"]) self.quote_df.columns = self.all_fields @@ -177,10 +172,9 @@ def get_quote_from_qlib(self): # The `factor.day.bin` file exists and all data `close` and `factor` are not `nan` # Use normal price self.trade_w_adj_price = False - # update limit self._update_limit(self.limit_threshold) - + # concat extra_quote if self.extra_quote is not None: # process extra_quote @@ -199,7 +193,7 @@ def get_quote_from_qlib(self): self.logger.warning("No limit_sell set for extra_quote. All stock will be able to be sold.") if "limit_buy" not in self.extra_quote.columns: self.extra_quote["limit_buy"] = False - self.logger.warning("No limit_buy set for extra_quote. All stock will be able to be bought.") + self.logger.warning("No limit_buy set for extra_quote. All stock will be able to be bought.") assert set(self.extra_quote.columns) == set(self.quote_df.columns) - {"$change"} self.quote_df = pd.concat([self.quote_df, extra_quote], sort=False, axis=0) @@ -208,8 +202,7 @@ def get_quote_from_qlib(self): LT_NONE = "none" # none def _get_limit_type(self, limit_threshold): - """get limit type - """ + """get limit type""" if isinstance(limit_threshold, Tuple): return self.LT_TP_EXP elif isinstance(limit_threshold, float): @@ -603,7 +596,6 @@ def get_order_helper(self) -> OrderHelper: class BaseQuote: - def __init__(self, quote_df: pd.DataFrame): self.logger = get_module_logger("online operator", level=logging.INFO) @@ -617,10 +609,17 @@ def get_all_stock(self): """ raise NotImplementedError(f"Please implement the `get_all_stock` method") - def get_data(self, stock_id: str, start_time, end_time, fields: Union[str, list]=None, method=None): + def get_data( + self, + stock_id: Union[str, list], + start_time: Union[pd.Timestamp, str], + end_time: Union[pd.Timestamp, str], + fields: Union[str, list] = None, + method: Union[str, Callable] = None, + ): """get the specific fields of stock data during start time and end_time, and apply method to the data. - + Example: .. code-block:: $close $volume @@ -637,8 +636,15 @@ def get_data(self, stock_id: str, start_time, end_time, fields: Union[str, list] 2010-01-12 2788.688232 164587.937500 2010-01-13 2790.604004 145460.453125 + print(get_data(stock_id=["SH600000", "SH600655"], start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) + + $close $volume + instrument + SH600000 87.433578 28117442.0 + SH600655 2699.567383 158193.328125 + print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) - + $close 87.433578 $volume 28117442.0 @@ -649,27 +655,26 @@ def get_data(self, stock_id: str, start_time, end_time, fields: Union[str, list] Parameters ---------- stock_id: Union[str, list] - start_time : pd.Timestamp|str + start_time : Union[pd.Timestamp, str] closed start time for backtest - end_time : pd.Timestamp|str + end_time : Union[pd.Timestamp, str] closed end time for backtest fields : Union[str, List] the columns of data to fetch method : Union[str, Callable] - the method apply to data. - e.g ["None", "last", "all", "sum", "mean", qlib/utils/resam.py/ts_data_last] + the method apply to data. + e.g ["None", "last", "all", "sum", "mean", "any", qlib/utils/resam.py/ts_data_last] Return ---------- - Union[None, float, pd.Series] - The resampled Series/value, return None when the resampled data is empty. + Union[None, float, pd.Series, pd.DataFrame] + The resampled DataFrame/Series/value, return None when the resampled data is empty. """ - raise NotImplementedError(f"Please implement the `get_data` method") + raise NotImplementedError(f"Please implement the `get_data` method") class PandasQuote(BaseQuote): - def __init__(self, quote_df: pd.DataFrame): super().__init__(quote_df=quote_df) quote_dict = {} @@ -680,10 +685,10 @@ def __init__(self, quote_df: pd.DataFrame): def get_all_stock(self): return self.data.keys() - def get_data(self, stock_id, start_time, end_time, fields = None, method = None): - if(fields is None): + def get_data(self, stock_id, start_time, end_time, fields=None, method=None): + if fields is None: return resam_ts_data(self.data[stock_id], start_time, end_time, method=method) - elif(isinstance(fields, (str, list))): + elif isinstance(fields, (str, list)): return resam_ts_data(self.data[stock_id][fields], start_time, end_time, method=method) else: - raise ValueError(f"fields must be None, str or list") \ No newline at end of file + raise ValueError(f"fields must be None, str or list") diff --git a/qlib/contrib/strategy/rule_strategy.py b/qlib/contrib/strategy/rule_strategy.py index 56884cd488..970734df56 100644 --- a/qlib/contrib/strategy/rule_strategy.py +++ b/qlib/contrib/strategy/rule_strategy.py @@ -687,7 +687,9 @@ class FileOrderStrategy(BaseStrategy): - This class provides an interface for user to read orders from csv files. """ - def __init__(self, file: Union[IO, str, Path], trade_range: Union[Tuple[int, int], TradeRange]= None, *args, **kwargs): + def __init__( + self, file: Union[IO, str, Path], trade_range: Union[Tuple[int, int], TradeRange] = None, *args, **kwargs + ): """ Parameters From b23a8523f7c403cc46278785a43cf8ccc91d01dc Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Fri, 16 Jul 2021 14:09:36 +0000 Subject: [PATCH 5/9] callable --- qlib/backtest/exchange.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 7733891fed..8d02e78934 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -615,7 +615,7 @@ def get_data( start_time: Union[pd.Timestamp, str], end_time: Union[pd.Timestamp, str], fields: Union[str, list] = None, - method: Union[str, Callable] = None, + method: Union[str, "Callable"] = None, ): """get the specific fields of stock data during start time and end_time, and apply method to the data. From 3548226484ff29833f21186333bc6f8d5a9c5376 Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Wed, 21 Jul 2021 12:47:31 +0000 Subject: [PATCH 6/9] pandas_order_indicator --- qlib/backtest/report.py | 330 ++++++++++++++++++++++++++++++---------- 1 file changed, 251 insertions(+), 79 deletions(-) diff --git a/qlib/backtest/report.py b/qlib/backtest/report.py index 308decd12e..8e093e0a66 100644 --- a/qlib/backtest/report.py +++ b/qlib/backtest/report.py @@ -5,8 +5,9 @@ from collections import OrderedDict from logging import warning import pathlib -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, Union import warnings +import inspect import numpy as np import pandas as pd @@ -62,6 +63,7 @@ def __init__(self, freq: str = "day", benchmark_config: dict = {}): - Else, it represent end time of benchmark, by default None """ + self.init_vars() self.init_bench(freq=freq, benchmark_config=benchmark_config) @@ -255,7 +257,7 @@ class Indicator: def __init__(self): # order indicator is metrics for a single order for a specific step self.order_indicator_his = OrderedDict() - self.order_indicator: Dict[str, pd.Series] = OrderedDict() + self.order_indicator = PandasOrderIndicator() # trade indicator is metrics for all orders for a specific step self.trade_indicator_his = OrderedDict() @@ -265,12 +267,12 @@ def __init__(self): # def reset(self, trade_calendar: TradeCalendarManager): def reset(self): - self.order_indicator = OrderedDict() + self.order_indicator = PandasOrderIndicator() self.trade_indicator = OrderedDict() # self._trade_calendar = trade_calendar def record(self, trade_start_time): - self.order_indicator_his[trade_start_time] = self.order_indicator + self.order_indicator_his[trade_start_time] = self.order_indicator.data self.trade_indicator_his[trade_start_time] = self.trade_indicator def _update_order_trade_info(self, trade_info: list): @@ -280,6 +282,7 @@ def _update_order_trade_info(self, trade_info: list): trade_value = dict() trade_cost = dict() trade_dir = dict() + pa = dict() for order, _trade_val, _trade_cost, _trade_price in trade_info: amount[order.stock_id] = order.amount_delta @@ -288,66 +291,58 @@ def _update_order_trade_info(self, trade_info: list): trade_value[order.stock_id] = _trade_val * order.sign trade_cost[order.stock_id] = _trade_cost trade_dir[order.stock_id] = order.direction + pa[order.stock_id] = 0 - self.order_indicator["amount"] = self.order_indicator["inner_amount"] = pd.Series(amount) - self.order_indicator["deal_amount"] = pd.Series(deal_amount) + self.order_indicator.assign("amount", amount) + self.order_indicator.assign("inner_amount", amount) + self.order_indicator.assign("deal_amount", deal_amount) # NOTE: trade_price and baseline price will be same on the lowest-level - self.order_indicator["trade_price"] = pd.Series(trade_price) - self.order_indicator["trade_value"] = pd.Series(trade_value) - self.order_indicator["trade_cost"] = pd.Series(trade_cost) - self.order_indicator["trade_dir"] = pd.Series(trade_dir) + self.order_indicator.assign("trade_price", trade_price) + self.order_indicator.assign("trade_value", trade_value) + self.order_indicator.assign("trade_cost", trade_cost) + self.order_indicator.assign("trade_dir", trade_dir) + self.order_indicator.assign("pa", pa) def _update_order_fulfill_rate(self): - self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"] + def func(deal_amount, amount): + return deal_amount / amount + self.order_indicator.transfer(func, "ffr") + """ def _update_order_price_advantage(self): # NOTE: # trade_price and baseline price will be same on the lowest-level # So Pa should be 0 or do nothing - self.order_indicator["pa"] = 0 + self.order_indicator.assign("pa", 0) + """ def update_order_indicators(self, trade_info: list): self._update_order_trade_info(trade_info=trade_info) self._update_order_fulfill_rate() - self._update_order_price_advantage() + # self._update_order_price_advantage() def _agg_order_trade_info(self, inner_order_indicators: List[Dict[str, pd.Series]]): - inner_amount = pd.Series() - deal_amount = pd.Series() - trade_price = pd.Series() - trade_value = pd.Series() - trade_cost = pd.Series() - trade_dir = pd.Series() - for _order_indicator in inner_order_indicators: - inner_amount = inner_amount.add(_order_indicator["inner_amount"], fill_value=0) - deal_amount = deal_amount.add(_order_indicator["deal_amount"], fill_value=0) - trade_price = trade_price.add( - _order_indicator["trade_price"] * _order_indicator["deal_amount"], fill_value=0 - ) - trade_value = trade_value.add(_order_indicator["trade_value"], fill_value=0) - trade_cost = trade_cost.add(_order_indicator["trade_cost"], fill_value=0) - trade_dir = trade_dir.add(_order_indicator["trade_dir"], fill_value=0) + all_metric = ["inner_amount", "deal_amount", "trade_price", + "trade_value", "trade_cost", "trade_dir"] + metric_dict = PandasOrderIndicator.agg_all_indicators(inner_order_indicators, all_metric, fill_value=0) + for metric in metric_dict: + self.order_indicator.assign(metric, metric_dict[metric]) - trade_dir = trade_dir.apply(Order.parse_dir) + def func(trade_price, deal_amount): + return trade_price / deal_amount + self.order_indicator.transfer(func, "trade_price") - self.order_indicator["inner_amount"] = inner_amount - self.order_indicator["deal_amount"] = deal_amount - trade_price /= self.order_indicator["deal_amount"] - self.order_indicator["trade_price"] = trade_price - self.order_indicator["trade_value"] = trade_value - self.order_indicator["trade_cost"] = trade_cost - self.order_indicator["trade_dir"] = trade_dir + def func_apply(trade_dir): + return trade_dir.apply(Order.parse_dir) + self.order_indicator.transfer(func_apply, "trade_dir") def _update_trade_amount(self, outer_trade_decision: BaseTradeDecision): # NOTE: these indicator is designed for order execution, so the decision: List[Order] = outer_trade_decision.get_decision() if decision is None: - self.order_indicator["amount"] = pd.Series() + self.order_indicator.assign("amount", {}) else: - self.order_indicator["amount"] = pd.Series({order.stock_id: order.amount_delta for order in decision}) - - def _agg_order_fulfill_rate(self): - self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"] + self.order_indicator.assign("amount", {order.stock_id: order.amount_delta for order in decision}) def _get_base_vol_pri( self, @@ -423,17 +418,16 @@ def _agg_base_price( "price": "$close", # TODO: this is not supported now!!!!! # default to use deal price of the exchange } - """ # TODO: I think there are potentials to be optimized - trade_dir = self.order_indicator["trade_dir"] + trade_dir = self.order_indicator.get_metric_series("trade_dir") if len(trade_dir) > 0: bp_all, bv_all = [], [] # for oi, (dec, start, end) in zip(inner_order_indicators, decision_list): - bp_s = oi.get("base_price", pd.Series()).reindex(trade_dir.index) - bv_s = oi.get("base_volume", pd.Series()).reindex(trade_dir.index) + bp_s = oi.get_metric_series("base_price").reindex(trade_dir.index) + bv_s = oi.get_metric_series("base_volume").reindex(trade_dir.index) bp_new, bv_new = {}, {} for pr, v, (inst, direction) in zip(bp_s.values, bv_s.values, trade_dir.items()): if np.isnan(pr): @@ -457,17 +451,21 @@ def _agg_base_price( bp_all = pd.concat(bp_all, axis=1) bv_all = pd.concat(bv_all, axis=1) - self.order_indicator["base_volume"] = bv_all.sum(axis=1) - self.order_indicator["base_price"] = (bp_all * bv_all).sum(axis=1) / self.order_indicator["base_volume"] + base_volume = bv_all.sum(axis=1) + self.order_indicator.assign("base_volume", base_volume) + self.order_indicator.assign("base_price", (bp_all * bv_all).sum(axis=1) / base_volume) def _agg_order_price_advantage(self): - if not self.order_indicator["trade_price"].empty: - sign = 1 - self.order_indicator["trade_dir"] * 2 - self.order_indicator["pa"] = sign * ( - self.order_indicator["trade_price"] / self.order_indicator["base_price"] - 1 - ) + def if_empty_func(trade_price): + return trade_price.empty + if_empty = self.order_indicator.transfer(if_empty_func) + if not if_empty: + def func(trade_dir, trade_price, base_price): + sign = 1 - trade_dir * 2 + return sign * (trade_price / base_price - 1) + self.order_indicator.transfer(func, "pa") else: - self.order_indicator["pa"] = pd.Series() + self.order_indicator.assign("pa", {}) def agg_order_indicators( self, @@ -477,57 +475,60 @@ def agg_order_indicators( trade_exchange: Exchange, indicator_config={}, ): - self._agg_order_trade_info(inner_order_indicators) + self._agg_order_trade_info(inner_order_indicators) # TODO self._update_trade_amount(outer_trade_decision) - self._agg_order_fulfill_rate() + self._update_order_fulfill_rate() pa_config = indicator_config.get("pa_config", {}) - self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config) + self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config) # TODO self._agg_order_price_advantage() def _cal_trade_fulfill_rate(self, method="mean"): if method == "mean": - return self.order_indicator["ffr"].mean() + def func(ffr): + return ffr.mean() elif method == "amount_weighted": - weights = self.order_indicator["deal_amount"].abs() - return (self.order_indicator["ffr"] * weights).sum() / weights.sum() + def func(ffr, deal_amount): + return (ffr * deal_amount.abs()).sum() / (deal_amount.abs().sum()) elif method == "value_weighted": - weights = self.order_indicator["trade_value"].abs() - return (self.order_indicator["ffr"] * weights).sum() / weights.sum() + def func(ffr, trade_value): + return (ffr * trade_value.abs()).sum() / (trade_value.abs().sum()) else: raise ValueError(f"method {method} is not supported!") + return self.order_indicator.transfer(func) def _cal_trade_price_advantage(self, method="mean"): - pa_order = self.order_indicator["pa"] - if isinstance(pa_order, (int, float)): - # pa from atomic executor - return pa_order - if method == "mean": - return pa_order.mean() + def func(pa): + return pa.mean() elif method == "amount_weighted": - weights = self.order_indicator["deal_amount"].abs() - return (pa_order * weights).sum() / weights.sum() + def func(pa, deal_amount): + return (pa * deal_amount.abs()).sum() / (deal_amount.abs().sum()) elif method == "value_weighted": - weights = self.order_indicator["trade_value"].abs() - return (pa_order * weights).sum() / weights.sum() + def func(pa, trade_value): + return (pa * trade_value.abs()).sum() / (trade_value.abs().sum()) else: raise ValueError(f"method {method} is not supported!") + return self.order_indicator.transfer(func) def _cal_trade_positive_rate(self): - pa_order = self.order_indicator["pa"] - if isinstance(pa_order, (int, float)): - # pa from atomic executor - return pa_order - return (pa_order > 0).astype(int).sum() / pa_order.count() + def func(pa): + return (pa > 0).astype(int).sum() / pa.count() + return self.order_indicator.transfer(func) def _cal_deal_amount(self): - return self.order_indicator["deal_amount"].abs().sum() + def func(deal_amount): + return deal_amount.abs().sum() + return self.order_indicator.transfer(func) def _cal_trade_value(self): - return self.order_indicator["trade_value"].abs().sum() + def func(trade_value): + return trade_value.abs().sum() + return self.order_indicator.transfer(func) def _cal_trade_order_count(self): - return self.order_indicator["amount"].count() + def func(amount): + return amount.count() + return self.order_indicator.transfer(func) def cal_trade_indicators(self, trade_start_time, freq, indicator_config={}): show_indicator = indicator_config.get("show_indicator", False) @@ -560,3 +561,174 @@ def get_trade_indicator(self): def generate_trade_indicators_dataframe(self): return pd.DataFrame.from_dict(self.trade_indicator_his, orient="index") + + +class BaseOrderIndicator: + + def __init__(self): + pass + + def assign(self, col: str, metric: Union[dict, pd.Series]): + pass + + def transfer(self, func: "Callable", new_col = None): + pass + + def get_metric_series(self, metric: str): + pass + + @classmethod + def agg_all_indicators(indicators, metrics: Union[str, List[str]], fill_value = None): + pass + + +class PandasOrderIndicator(BaseOrderIndicator): + + class SingleMetric: + def __init__(self, metric: Union[dict, pd.Series]): + if isinstance(metric, dict): + self.metric = pd.Series(metric) + elif isinstance(metric, pd.Series): + self.metric = metric + else: + raise ValueError(f"metric must be dict or pd.Series") + + def __add__(self, other): + if isinstance(other, (int, float)): + return PandasOrderIndicator.SingleMetric(self.metric + other) + elif isinstance(other, PandasOrderIndicator.SingleMetric): + return PandasOrderIndicator.SingleMetric(self.metric + other.metric) + else: + return NotImplemented + + def __radd__(self, other): + if isinstance(other, (int, float)): + return PandasOrderIndicator.SingleMetric(other + self.metric) + elif isinstance(other, PandasOrderIndicator.SingleMetric): + return PandasOrderIndicator.SingleMetric(other.metric + self.metric) + else: + return NotImplemented + + def __sub__(self, other): + if isinstance(other, (int, float)): + return PandasOrderIndicator.SingleMetric(self.metric - other) + elif isinstance(other, PandasOrderIndicator.SingleMetric): + return PandasOrderIndicator.SingleMetric(self.metric - other.metric) + else: + return NotImplemented + + def __rsub__(self, other): + if isinstance(other, (int, float)): + return PandasOrderIndicator.SingleMetric(other - self.metric) + elif isinstance(other, PandasOrderIndicator.SingleMetric): + return PandasOrderIndicator.SingleMetric(other.metric - self.metric) + else: + return NotImplemented + + def __mul__(self, other): + if isinstance(other, (int, float)): + return PandasOrderIndicator.SingleMetric(self.metric * other) + elif isinstance(other, PandasOrderIndicator.SingleMetric): + return PandasOrderIndicator.SingleMetric(self.metric * other.metric) + else: + return NotImplemented + + def __truediv__(self, other): + if isinstance(other, (int, float)): + return PandasOrderIndicator.SingleMetric(self.metric / other) + elif isinstance(other, PandasOrderIndicator.SingleMetric): + return PandasOrderIndicator.SingleMetric(self.metric / other.metric) + else: + return NotImplemented + + def __eq__(self, other): + if isinstance(other, (int, float)): + return PandasOrderIndicator.SingleMetric(self.metric == other) + elif isinstance(other, PandasOrderIndicator.SingleMetric): + return PandasOrderIndicator.SingleMetric(self.metric == other.metric) + else: + return NotImplemented + + def __gt__(self, other): + if isinstance(other, (int, float)): + return PandasOrderIndicator.SingleMetric(self.metric < other) + elif isinstance(other, PandasOrderIndicator.SingleMetric): + return PandasOrderIndicator.SingleMetric(self.metric < other.metric) + else: + return NotImplemented + + def __lt__(self, other): + if isinstance(other, (int, float)): + return PandasOrderIndicator.SingleMetric(self.metric > other) + elif isinstance(other, PandasOrderIndicator.SingleMetric): + return PandasOrderIndicator.SingleMetric(self.metric > other.metric) + else: + return NotImplemented + + def __len__(self): + return len(self.metric) + + def sum(self): + return self.metric.sum() + + def mean(self): + return self.metric.mean() + + def count(self): + return self.metric.count() + + def abs(self): + return PandasOrderIndicator.SingleMetric(self.metric.abs()) + + def astype(self, type): + return PandasOrderIndicator.SingleMetric(self.metric.astype(type)) + + @property + def empty(self): + return self.metric.empty + + """ + @property + def index(self): + return self.metric.index + """ + + def add(self, other, fill_value: None): + return PandasOrderIndicator.SingleMetric(self.metric.add(other.metric, fill_value = fill_value)) + + def apply(self, map_dict: dict): + return PandasOrderIndicator.SingleMetric(self.metric.apply(map_dict)) + + def __init__(self): + self.data: Dict[str, self.SingleMetric] = OrderedDict() + + def assign(self, col: str, metric: Union[dict, pd.Series]): + self.data[col] = self.SingleMetric(metric) + + def transfer(self, func: "Callable", new_col = None): + func_sig = inspect.signature(func).parameters.keys() + func_kwargs = {sig: self.data[sig] for sig in func_sig} + tmp_metric = func(**func_kwargs) + if(new_col is not None): + self.data[new_col] = tmp_metric + return tmp_metric + + def get_metric_series(self, metric: str): + if(metric in self.data): + return self.data[metric].metric + else: + return pd.Series() + + @staticmethod + def agg_all_indicators(indicators: list, metrics: Union[str, List[str]], fill_value = None): + """add all order indicators with same metric""" + + metric_dict = {} + if isinstance(metrics, str): + metrics = [metrics] + for metric in metrics: + tmp_metric = PandasOrderIndicator.SingleMetric({}) + for indicator in indicators: + tmp_metric.add(indicator.data[metric], fill_value) + metric_dict[metric] = tmp_metric.metric + return metric_dict \ No newline at end of file From d88334c9d4418640f907b32867bb93c6384a4e90 Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Wed, 21 Jul 2021 14:09:12 +0000 Subject: [PATCH 7/9] add order_indicator doc --- qlib/backtest/report.py | 96 ++++++++++++++++++++++++++++++++--------- 1 file changed, 75 insertions(+), 21 deletions(-) diff --git a/qlib/backtest/report.py b/qlib/backtest/report.py index 8e093e0a66..1ae50f5e2a 100644 --- a/qlib/backtest/report.py +++ b/qlib/backtest/report.py @@ -308,14 +308,6 @@ def func(deal_amount, amount): return deal_amount / amount self.order_indicator.transfer(func, "ffr") - """ - def _update_order_price_advantage(self): - # NOTE: - # trade_price and baseline price will be same on the lowest-level - # So Pa should be 0 or do nothing - self.order_indicator.assign("pa", 0) - """ - def update_order_indicators(self, trade_info: list): self._update_order_trade_info(trade_info=trade_info) self._update_order_fulfill_rate() @@ -475,7 +467,7 @@ def agg_order_indicators( trade_exchange: Exchange, indicator_config={}, ): - self._agg_order_trade_info(inner_order_indicators) # TODO + self._agg_order_trade_info(inner_order_indicators) self._update_trade_amount(outer_trade_decision) self._update_order_fulfill_rate() pa_config = indicator_config.get("pa_config", {}) @@ -564,27 +556,97 @@ def generate_trade_indicators_dataframe(self): class BaseOrderIndicator: + """The data structure of order indicator. + """ def __init__(self): pass def assign(self, col: str, metric: Union[dict, pd.Series]): + """assign one metric. + + Parameters + ---------- + col : str + the metric name of one metric. + metric : Union[dict, pd.Series] + the metric data. + """ + pass - def transfer(self, func: "Callable", new_col = None): + def transfer(self, func: "Callable", new_col: str = None): + """compute new metric with existing. + + Parameters + ---------- + func : Callable + the func of computing new metric. + the kwargs of func will be replaced with metric data by name in this function. + e.g. + def func(pa): + return (pa > 0).astype(int).sum() / pa.count() + new_col : str, optional + New metric will be assigned in the data if new_col is not None, by default None. + + Return + ---------- + SingleMetric + new metric. + """ + pass def get_metric_series(self, metric: str): + """return the single metric with pd.Series format + + Parameters + ---------- + metric : str + the metric name. + + Return + ---------- + pd.Series + the single metric. + If there is no metric name in the data, return pd.Series(). + """ + pass @classmethod - def agg_all_indicators(indicators, metrics: Union[str, List[str]], fill_value = None): + def agg_all_indicators(indicators: list, metrics: Union[str, List[str]], fill_value: float = None): + """sum indicators with the same metrics. + + Parameters + ---------- + indicators : List[BaseOrderIndicator] + the list of all inner indicators. + metrics : Union[str, List[str]] + all metrics needs ot be sumed. + fill_value : float, optional + fill np.NaN with value. By default None. + + Return + ---------- + Dict[str: SingleMetric] + a dict of metric name and data. + """ + pass class PandasOrderIndicator(BaseOrderIndicator): + """The data structure is OrderedDict(str: SingleMetric). + Each SingleMetric based on pd.Series is one metric. + Str is the name of metric. + """ class SingleMetric: + """The data structure of the single metric. + The following methods are used for computing metrics in one indicator. + """ + def __init__(self, metric: Union[dict, pd.Series]): if isinstance(metric, dict): self.metric = pd.Series(metric) @@ -687,12 +749,6 @@ def astype(self, type): def empty(self): return self.metric.empty - """ - @property - def index(self): - return self.metric.index - """ - def add(self, other, fill_value: None): return PandasOrderIndicator.SingleMetric(self.metric.add(other.metric, fill_value = fill_value)) @@ -705,7 +761,7 @@ def __init__(self): def assign(self, col: str, metric: Union[dict, pd.Series]): self.data[col] = self.SingleMetric(metric) - def transfer(self, func: "Callable", new_col = None): + def transfer(self, func: "Callable", new_col: str = None): func_sig = inspect.signature(func).parameters.keys() func_kwargs = {sig: self.data[sig] for sig in func_sig} tmp_metric = func(**func_kwargs) @@ -721,14 +777,12 @@ def get_metric_series(self, metric: str): @staticmethod def agg_all_indicators(indicators: list, metrics: Union[str, List[str]], fill_value = None): - """add all order indicators with same metric""" - metric_dict = {} if isinstance(metrics, str): metrics = [metrics] for metric in metrics: tmp_metric = PandasOrderIndicator.SingleMetric({}) for indicator in indicators: - tmp_metric.add(indicator.data[metric], fill_value) + tmp_metric = tmp_metric.add(indicator.data[metric], fill_value) metric_dict[metric] = tmp_metric.metric return metric_dict \ No newline at end of file From fb336ca31703367d16162d4a4d7f4facc7593009 Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Thu, 22 Jul 2021 15:20:03 +0000 Subject: [PATCH 8/9] high_performance_data_structure --- qlib/backtest/exchange.py | 106 +------ qlib/backtest/high_performance_ds.py | 414 +++++++++++++++++++++++++++ qlib/backtest/report.py | 277 +++--------------- 3 files changed, 453 insertions(+), 344 deletions(-) create mode 100644 qlib/backtest/high_performance_ds.py diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 8d02e78934..edcd7baaf6 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -4,7 +4,7 @@ import random import logging -from typing import List, Tuple, Union +from typing import List, Tuple, Union, Callable, Iterable import numpy as np import pandas as pd @@ -15,6 +15,7 @@ from ..utils.resam import resam_ts_data, ts_data_last from ..log import get_module_logger from .order import Order, OrderDir, OrderHelper +from .high_performane_ds import PandasQuote class Exchange: @@ -32,6 +33,7 @@ def __init__( close_cost=0.0025, min_cost=5, extra_quote=None, + quote_cls=PandasQuote, **kwargs, ): """__init__ @@ -143,7 +145,8 @@ def __init__( self.get_quote_from_qlib() # init quote by quote_df - self.quote = PandasQuote(self.quote_df) + self.quote_cls = quote_cls + self.quote = self.quote_cls(self.quote_df) def get_quote_from_qlib(self): # get stock data from qlib @@ -593,102 +596,3 @@ def get_order_helper(self) -> OrderHelper: # cache to avoid recreate the same instance self._order_helper = OrderHelper(self) return self._order_helper - - -class BaseQuote: - def __init__(self, quote_df: pd.DataFrame): - self.logger = get_module_logger("online operator", level=logging.INFO) - - def get_all_stock(self): - """return all stock codes - - Return - ------ - Union[list, Dict.keys(), set, tuple] - all stock codes - """ - raise NotImplementedError(f"Please implement the `get_all_stock` method") - - def get_data( - self, - stock_id: Union[str, list], - start_time: Union[pd.Timestamp, str], - end_time: Union[pd.Timestamp, str], - fields: Union[str, list] = None, - method: Union[str, "Callable"] = None, - ): - """get the specific fields of stock data during start time and end_time, - and apply method to the data. - - Example: - .. code-block:: - $close $volume - instrument datetime - SH600000 2010-01-04 86.778313 16162960.0 - 2010-01-05 87.433578 28117442.0 - 2010-01-06 85.713585 23632884.0 - 2010-01-07 83.788803 20813402.0 - 2010-01-08 84.730675 16044853.0 - - SH600655 2010-01-04 2699.567383 158193.328125 - 2010-01-08 2612.359619 77501.406250 - 2010-01-11 2712.982422 160852.390625 - 2010-01-12 2788.688232 164587.937500 - 2010-01-13 2790.604004 145460.453125 - - print(get_data(stock_id=["SH600000", "SH600655"], start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) - - $close $volume - instrument - SH600000 87.433578 28117442.0 - SH600655 2699.567383 158193.328125 - - print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) - - $close 87.433578 - $volume 28117442.0 - - print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields="$close", method="last")) - - 87.433578 - - Parameters - ---------- - stock_id: Union[str, list] - start_time : Union[pd.Timestamp, str] - closed start time for backtest - end_time : Union[pd.Timestamp, str] - closed end time for backtest - fields : Union[str, List] - the columns of data to fetch - method : Union[str, Callable] - the method apply to data. - e.g ["None", "last", "all", "sum", "mean", "any", qlib/utils/resam.py/ts_data_last] - - Return - ---------- - Union[None, float, pd.Series, pd.DataFrame] - The resampled DataFrame/Series/value, return None when the resampled data is empty. - """ - - raise NotImplementedError(f"Please implement the `get_data` method") - - -class PandasQuote(BaseQuote): - def __init__(self, quote_df: pd.DataFrame): - super().__init__(quote_df=quote_df) - quote_dict = {} - for stock_id, stock_val in quote_df.groupby(level="instrument"): - quote_dict[stock_id] = stock_val.droplevel(level="instrument") - self.data = quote_dict - - def get_all_stock(self): - return self.data.keys() - - def get_data(self, stock_id, start_time, end_time, fields=None, method=None): - if fields is None: - return resam_ts_data(self.data[stock_id], start_time, end_time, method=method) - elif isinstance(fields, (str, list)): - return resam_ts_data(self.data[stock_id][fields], start_time, end_time, method=method) - else: - raise ValueError(f"fields must be None, str or list") diff --git a/qlib/backtest/high_performance_ds.py b/qlib/backtest/high_performance_ds.py new file mode 100644 index 0000000000..3e5a9d8e28 --- /dev/null +++ b/qlib/backtest/high_performance_ds.py @@ -0,0 +1,414 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +import logging +from typing import List, Tuple, Union, Callable, Iterable, Dict +from collections import OrderedDict + +import inspect +import pandas as pd + +from ..utils.resam import resam_ts_data +from ..log import get_module_logger + + +class BaseQuote: + def __init__(self, quote_df: pd.DataFrame): + self.logger = get_module_logger("online operator", level=logging.INFO) + + def get_all_stock(self) -> Iterable: + """return all stock codes + + Return + ------ + Iterable + all stock codes + """ + raise NotImplementedError(f"Please implement the `get_all_stock` method") + + def get_data( + self, + stock_id: Union[str, list], + start_time: Union[pd.Timestamp, str], + end_time: Union[pd.Timestamp, str], + fields: Union[str, list] = None, + method: Union[str, Callable] = None, + ) -> Union[None, float, pd.Series, pd.DataFrame]: + """get the specific fields of stock data during start time and end_time, + and apply method to the data. + + Example: + .. code-block:: + $close $volume + instrument datetime + SH600000 2010-01-04 86.778313 16162960.0 + 2010-01-05 87.433578 28117442.0 + 2010-01-06 85.713585 23632884.0 + 2010-01-07 83.788803 20813402.0 + 2010-01-08 84.730675 16044853.0 + + SH600655 2010-01-04 2699.567383 158193.328125 + 2010-01-08 2612.359619 77501.406250 + 2010-01-11 2712.982422 160852.390625 + 2010-01-12 2788.688232 164587.937500 + 2010-01-13 2790.604004 145460.453125 + + print(get_data(stock_id=["SH600000", "SH600655"], start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) + + $close $volume + instrument + SH600000 87.433578 28117442.0 + SH600655 2699.567383 158193.328125 + + print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) + + $close 87.433578 + $volume 28117442.0 + + print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields="$close", method="last")) + + 87.433578 + + Parameters + ---------- + stock_id: Union[str, list] + start_time : Union[pd.Timestamp, str] + closed start time for backtest + end_time : Union[pd.Timestamp, str] + closed end time for backtest + fields : Union[str, List] + the columns of data to fetch + method : Union[str, Callable] + the method apply to data. + e.g ["None", "last", "all", "sum", "mean", "any", qlib/utils/resam.py/ts_data_last] + + Return + ---------- + Union[None, float, pd.Series, pd.DataFrame] + The resampled DataFrame/Series/value, return None when the resampled data is empty. + """ + + raise NotImplementedError(f"Please implement the `get_data` method") + + +class PandasQuote(BaseQuote): + def __init__(self, quote_df: pd.DataFrame): + super().__init__(quote_df=quote_df) + quote_dict = {} + for stock_id, stock_val in quote_df.groupby(level="instrument"): + quote_dict[stock_id] = stock_val.droplevel(level="instrument") + self.data = quote_dict + + def get_all_stock(self): + return self.data.keys() + + def get_data(self, stock_id, start_time, end_time, fields=None, method=None): + if fields is None: + return resam_ts_data(self.data[stock_id], start_time, end_time, method=method) + elif isinstance(fields, (str, list)): + return resam_ts_data(self.data[stock_id][fields], start_time, end_time, method=method) + else: + raise ValueError(f"fields must be None, str or list") + + +class BaseSingleMetric: + """ + The data structure of the single metric. + The following methods are used for computing metrics in one indicator. + """ + + def __init__(self, metric: Union[dict, pd.Series]): + pass + + def __add__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __radd__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + return self + other + + def __sub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __rsub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __mul__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __truediv__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __eq__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __gt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __lt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __len__(self) -> int: + pass + + def sum(self) -> float: + pass + + def mean(self) -> float: + pass + + def count(self) -> int: + pass + + def abs(self) -> "BaseSingleMetric": + pass + + def astype(self, type: type) -> "BaseSingleMetric": + pass + + @property + def empty(self) -> bool: + """If metric is empyt, return True.""" + pass + + def add(self, other: "BaseSingleMetric", fill_value: float = None) -> "BaseSingleMetric": + """Replace np.NaN with fill_value in two metrics and add them.""" + pass + + def apply(self, map_dict: dict) -> "BaseSingleMetric": + """Replace the value of metric according to map_dict.""" + pass + + +class BaseOrderIndicator: + """ + The data structure of order indicator. + !!!NOTE: There are two ways to organize the data structure. Please choose a better way. + 1. one way is use BaseSingleMetric to represent each metric. For example, the data + structure of PandasOrderIndicator is Dict[str: PandasSingleMetric]. It uses + PandasSingleMetric based on pd.Series to represent each metric. + 2. the another way doesn't BaseSingleMetric to represent each metric. The data + structure of PandasOrderIndicator is a whole matrix. + """ + + def assign(self, col: str, metric: Union[dict, pd.Series]): + """assign one metric. + + Parameters + ---------- + col : str + the metric name of one metric. + metric : Union[dict, pd.Series] + the metric data. + """ + + pass + + def transfer(self, func: Callable, new_col: str = None) -> Union[None, BaseSingleMetric]: + """compute new metric with existing metrics. + + Parameters + ---------- + func : Callable + the func of computing new metric. + the kwargs of func will be replaced with metric data by name in this function. + e.g. + def func(pa): + return (pa > 0).astype(int).sum() / pa.count() + new_col : str, optional + New metric will be assigned in the data if new_col is not None, by default None. + + Return + ---------- + BaseSingleMetric + new metric. + """ + + pass + + def get_metric_series(self, metric: str) -> pd.Series: + """return the single metric with pd.Series format. + + Parameters + ---------- + metric : str + the metric name. + + Return + ---------- + pd.Series + the single metric. + If there is no metric name in the data, return pd.Series(). + """ + + pass + + @staticmethod + def sum_all_indicators( + indicators: list, metrics: Union[str, List[str]], fill_value: float = None + ) -> Dict[str, BaseSingleMetric]: + """sum indicators with the same metrics. + + Parameters + ---------- + indicators : List[BaseOrderIndicator] + the list of all inner indicators. + metrics : Union[str, List[str]] + all metrics needs ot be sumed. + fill_value : float, optional + fill np.NaN with value. By default None. + + Return + ---------- + Dict[str: PandasSingleMetric] + a dict of metric name and data. + """ + + pass + + +class PandasSingleMetric: + """Each SingleMetric is based on pd.Series.""" + + def __init__(self, metric: Union[dict, pd.Series]): + if isinstance(metric, dict): + self.metric = pd.Series(metric) + elif isinstance(metric, pd.Series): + self.metric = metric + else: + raise ValueError(f"metric must be dict or pd.Series") + + def __add__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric + other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric + other.metric) + else: + return NotImplemented + + def __sub__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric - other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric - other.metric) + else: + return NotImplemented + + def __rsub__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(other - self.metric) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(other.metric - self.metric) + else: + return NotImplemented + + def __mul__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric * other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric * other.metric) + else: + return NotImplemented + + def __truediv__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric / other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric / other.metric) + else: + return NotImplemented + + def __eq__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric == other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric == other.metric) + else: + return NotImplemented + + def __gt__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric < other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric < other.metric) + else: + return NotImplemented + + def __lt__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric > other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric > other.metric) + else: + return NotImplemented + + def __len__(self): + return len(self.metric) + + def sum(self): + return self.metric.sum() + + def mean(self): + return self.metric.mean() + + def count(self): + return self.metric.count() + + def abs(self): + return PandasSingleMetric(self.metric.abs()) + + def astype(self, type): + return PandasSingleMetric(self.metric.astype(type)) + + @property + def empty(self): + return self.metric.empty + + def add(self, other, fill_value=None): + return PandasSingleMetric(self.metric.add(other.metric, fill_value=fill_value)) + + def apply(self, map_dict: dict): + return PandasSingleMetric(self.metric.apply(map_dict)) + + +class PandasOrderIndicator(BaseOrderIndicator): + """ + The data structure is OrderedDict(str: PandasSingleMetric). + Each PandasSingleMetric based on pd.Series is one metric. + Str is the name of metric. + """ + + def __init__(self): + self.data: Dict[str, PandasSingleMetric] = OrderedDict() + + def assign(self, col: str, metric: Union[dict, pd.Series]): + self.data[col] = PandasSingleMetric(metric) + + def transfer(self, func: Callable, new_col: str = None) -> Union[None, PandasSingleMetric]: + func_sig = inspect.signature(func).parameters.keys() + func_kwargs = {sig: self.data[sig] for sig in func_sig} + tmp_metric = func(**func_kwargs) + if new_col is not None: + self.data[new_col] = tmp_metric + else: + return tmp_metric + + def get_metric_series(self, metric: str) -> Union[pd.Series]: + if metric in self.data: + return self.data[metric].metric + else: + return pd.Series() + + @staticmethod + def sum_all_indicators( + indicators: list, metrics: Union[str, List[str]], fill_value=None + ) -> Dict[str, PandasSingleMetric]: + metric_dict = {} + if isinstance(metrics, str): + metrics = [metrics] + for metric in metrics: + tmp_metric = PandasSingleMetric({}) + for indicator in indicators: + tmp_metric = tmp_metric.add(indicator.data[metric], fill_value) + metric_dict[metric] = tmp_metric.metric + return metric_dict diff --git a/qlib/backtest/report.py b/qlib/backtest/report.py index 1ae50f5e2a..98d8b4f635 100644 --- a/qlib/backtest/report.py +++ b/qlib/backtest/report.py @@ -5,7 +5,7 @@ from collections import OrderedDict from logging import warning import pathlib -from typing import Dict, List, Tuple, Union +from typing import Dict, List, Tuple, Union, Callable import warnings import inspect @@ -18,6 +18,7 @@ from qlib.backtest.order import BaseTradeDecision, Order, OrderDir from qlib.backtest.utils import TradeCalendarManager +from .high_performane_ds import PandasOrderIndicator from ..data import D from ..tests.config import CSI300_BENCH from ..utils.resam import get_higher_eq_freq_feature, resam_ts_data @@ -254,10 +255,12 @@ class Indicator: """ - def __init__(self): + def __init__(self, order_indicator_cls=PandasOrderIndicator): + self.order_indicator_cls = order_indicator_cls + # order indicator is metrics for a single order for a specific step self.order_indicator_his = OrderedDict() - self.order_indicator = PandasOrderIndicator() + self.order_indicator = self.order_indicator_cls() # trade indicator is metrics for all orders for a specific step self.trade_indicator_his = OrderedDict() @@ -267,7 +270,7 @@ def __init__(self): # def reset(self, trade_calendar: TradeCalendarManager): def reset(self): - self.order_indicator = PandasOrderIndicator() + self.order_indicator = self.order_indicator_cls() self.trade_indicator = OrderedDict() # self._trade_calendar = trade_calendar @@ -291,6 +294,7 @@ def _update_order_trade_info(self, trade_info: list): trade_value[order.stock_id] = _trade_val * order.sign trade_cost[order.stock_id] = _trade_cost trade_dir[order.stock_id] = order.direction + # The PA in the innermost layer is meanless pa[order.stock_id] = 0 self.order_indicator.assign("amount", amount) @@ -306,32 +310,33 @@ def _update_order_trade_info(self, trade_info: list): def _update_order_fulfill_rate(self): def func(deal_amount, amount): return deal_amount / amount + self.order_indicator.transfer(func, "ffr") def update_order_indicators(self, trade_info: list): self._update_order_trade_info(trade_info=trade_info) self._update_order_fulfill_rate() - # self._update_order_price_advantage() def _agg_order_trade_info(self, inner_order_indicators: List[Dict[str, pd.Series]]): - all_metric = ["inner_amount", "deal_amount", "trade_price", - "trade_value", "trade_cost", "trade_dir"] - metric_dict = PandasOrderIndicator.agg_all_indicators(inner_order_indicators, all_metric, fill_value=0) + all_metric = ["inner_amount", "deal_amount", "trade_price", "trade_value", "trade_cost", "trade_dir"] + metric_dict = self.order_indicator_cls.sum_all_indicators(inner_order_indicators, all_metric, fill_value=0) for metric in metric_dict: self.order_indicator.assign(metric, metric_dict[metric]) def func(trade_price, deal_amount): return trade_price / deal_amount + self.order_indicator.transfer(func, "trade_price") def func_apply(trade_dir): return trade_dir.apply(Order.parse_dir) + self.order_indicator.transfer(func_apply, "trade_dir") def _update_trade_amount(self, outer_trade_decision: BaseTradeDecision): # NOTE: these indicator is designed for order execution, so the decision: List[Order] = outer_trade_decision.get_decision() - if decision is None: + if len(decision) == 0: self.order_indicator.assign("amount", {}) else: self.order_indicator.assign("amount", {order.stock_id: order.amount_delta for order in decision}) @@ -450,11 +455,14 @@ def _agg_base_price( def _agg_order_price_advantage(self): def if_empty_func(trade_price): return trade_price.empty + if_empty = self.order_indicator.transfer(if_empty_func) if not if_empty: + def func(trade_dir, trade_price, base_price): sign = 1 - trade_dir * 2 return sign * (trade_price / base_price - 1) + self.order_indicator.transfer(func, "pa") else: self.order_indicator.assign("pa", {}) @@ -471,33 +479,45 @@ def agg_order_indicators( self._update_trade_amount(outer_trade_decision) self._update_order_fulfill_rate() pa_config = indicator_config.get("pa_config", {}) - self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config) # TODO + self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config) # TODO self._agg_order_price_advantage() def _cal_trade_fulfill_rate(self, method="mean"): if method == "mean": + def func(ffr): return ffr.mean() + elif method == "amount_weighted": + def func(ffr, deal_amount): return (ffr * deal_amount.abs()).sum() / (deal_amount.abs().sum()) + elif method == "value_weighted": + def func(ffr, trade_value): return (ffr * trade_value.abs()).sum() / (trade_value.abs().sum()) + else: raise ValueError(f"method {method} is not supported!") return self.order_indicator.transfer(func) def _cal_trade_price_advantage(self, method="mean"): if method == "mean": + def func(pa): return pa.mean() + elif method == "amount_weighted": + def func(pa, deal_amount): return (pa * deal_amount.abs()).sum() / (deal_amount.abs().sum()) + elif method == "value_weighted": + def func(pa, trade_value): return (pa * trade_value.abs()).sum() / (trade_value.abs().sum()) + else: raise ValueError(f"method {method} is not supported!") return self.order_indicator.transfer(func) @@ -505,21 +525,25 @@ def func(pa, trade_value): def _cal_trade_positive_rate(self): def func(pa): return (pa > 0).astype(int).sum() / pa.count() + return self.order_indicator.transfer(func) def _cal_deal_amount(self): def func(deal_amount): return deal_amount.abs().sum() + return self.order_indicator.transfer(func) def _cal_trade_value(self): def func(trade_value): return trade_value.abs().sum() + return self.order_indicator.transfer(func) def _cal_trade_order_count(self): def func(amount): return amount.count() + return self.order_indicator.transfer(func) def cal_trade_indicators(self, trade_start_time, freq, indicator_config={}): @@ -553,236 +577,3 @@ def get_trade_indicator(self): def generate_trade_indicators_dataframe(self): return pd.DataFrame.from_dict(self.trade_indicator_his, orient="index") - - -class BaseOrderIndicator: - """The data structure of order indicator. - """ - - def __init__(self): - pass - - def assign(self, col: str, metric: Union[dict, pd.Series]): - """assign one metric. - - Parameters - ---------- - col : str - the metric name of one metric. - metric : Union[dict, pd.Series] - the metric data. - """ - - pass - - def transfer(self, func: "Callable", new_col: str = None): - """compute new metric with existing. - - Parameters - ---------- - func : Callable - the func of computing new metric. - the kwargs of func will be replaced with metric data by name in this function. - e.g. - def func(pa): - return (pa > 0).astype(int).sum() / pa.count() - new_col : str, optional - New metric will be assigned in the data if new_col is not None, by default None. - - Return - ---------- - SingleMetric - new metric. - """ - - pass - - def get_metric_series(self, metric: str): - """return the single metric with pd.Series format - - Parameters - ---------- - metric : str - the metric name. - - Return - ---------- - pd.Series - the single metric. - If there is no metric name in the data, return pd.Series(). - """ - - pass - - @classmethod - def agg_all_indicators(indicators: list, metrics: Union[str, List[str]], fill_value: float = None): - """sum indicators with the same metrics. - - Parameters - ---------- - indicators : List[BaseOrderIndicator] - the list of all inner indicators. - metrics : Union[str, List[str]] - all metrics needs ot be sumed. - fill_value : float, optional - fill np.NaN with value. By default None. - - Return - ---------- - Dict[str: SingleMetric] - a dict of metric name and data. - """ - - pass - - -class PandasOrderIndicator(BaseOrderIndicator): - """The data structure is OrderedDict(str: SingleMetric). - Each SingleMetric based on pd.Series is one metric. - Str is the name of metric. - """ - - class SingleMetric: - """The data structure of the single metric. - The following methods are used for computing metrics in one indicator. - """ - - def __init__(self, metric: Union[dict, pd.Series]): - if isinstance(metric, dict): - self.metric = pd.Series(metric) - elif isinstance(metric, pd.Series): - self.metric = metric - else: - raise ValueError(f"metric must be dict or pd.Series") - - def __add__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric + other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric + other.metric) - else: - return NotImplemented - - def __radd__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(other + self.metric) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(other.metric + self.metric) - else: - return NotImplemented - - def __sub__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric - other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric - other.metric) - else: - return NotImplemented - - def __rsub__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(other - self.metric) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(other.metric - self.metric) - else: - return NotImplemented - - def __mul__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric * other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric * other.metric) - else: - return NotImplemented - - def __truediv__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric / other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric / other.metric) - else: - return NotImplemented - - def __eq__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric == other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric == other.metric) - else: - return NotImplemented - - def __gt__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric < other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric < other.metric) - else: - return NotImplemented - - def __lt__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric > other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric > other.metric) - else: - return NotImplemented - - def __len__(self): - return len(self.metric) - - def sum(self): - return self.metric.sum() - - def mean(self): - return self.metric.mean() - - def count(self): - return self.metric.count() - - def abs(self): - return PandasOrderIndicator.SingleMetric(self.metric.abs()) - - def astype(self, type): - return PandasOrderIndicator.SingleMetric(self.metric.astype(type)) - - @property - def empty(self): - return self.metric.empty - - def add(self, other, fill_value: None): - return PandasOrderIndicator.SingleMetric(self.metric.add(other.metric, fill_value = fill_value)) - - def apply(self, map_dict: dict): - return PandasOrderIndicator.SingleMetric(self.metric.apply(map_dict)) - - def __init__(self): - self.data: Dict[str, self.SingleMetric] = OrderedDict() - - def assign(self, col: str, metric: Union[dict, pd.Series]): - self.data[col] = self.SingleMetric(metric) - - def transfer(self, func: "Callable", new_col: str = None): - func_sig = inspect.signature(func).parameters.keys() - func_kwargs = {sig: self.data[sig] for sig in func_sig} - tmp_metric = func(**func_kwargs) - if(new_col is not None): - self.data[new_col] = tmp_metric - return tmp_metric - - def get_metric_series(self, metric: str): - if(metric in self.data): - return self.data[metric].metric - else: - return pd.Series() - - @staticmethod - def agg_all_indicators(indicators: list, metrics: Union[str, List[str]], fill_value = None): - metric_dict = {} - if isinstance(metrics, str): - metrics = [metrics] - for metric in metrics: - tmp_metric = PandasOrderIndicator.SingleMetric({}) - for indicator in indicators: - tmp_metric = tmp_metric.add(indicator.data[metric], fill_value) - metric_dict[metric] = tmp_metric.metric - return metric_dict \ No newline at end of file From be70d228517d2b078a001d4991fd048da3fb3171 Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Fri, 23 Jul 2021 05:50:41 +0000 Subject: [PATCH 9/9] fix little bug --- qlib/backtest/exchange.py | 2 +- qlib/backtest/high_performance_ds.py | 53 +++++++++++++++------------- qlib/backtest/report.py | 6 ++-- 3 files changed, 32 insertions(+), 29 deletions(-) diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index edcd7baaf6..5677e855d9 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -15,7 +15,7 @@ from ..utils.resam import resam_ts_data, ts_data_last from ..log import get_module_logger from .order import Order, OrderDir, OrderHelper -from .high_performane_ds import PandasQuote +from .high_performance_ds import PandasQuote class Exchange: diff --git a/qlib/backtest/high_performance_ds.py b/qlib/backtest/high_performance_ds.py index 3e5a9d8e28..8a908fbf0b 100644 --- a/qlib/backtest/high_performance_ds.py +++ b/qlib/backtest/high_performance_ds.py @@ -25,6 +25,7 @@ def get_all_stock(self) -> Iterable: Iterable all stock codes """ + raise NotImplementedError(f"Please implement the `get_all_stock` method") def get_data( @@ -119,76 +120,80 @@ class BaseSingleMetric: """ def __init__(self, metric: Union[dict, pd.Series]): - pass + raise NotImplementedError(f"Please implement the `__init__` method") def __add__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": - pass + raise NotImplementedError(f"Please implement the `__add__` method") def __radd__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": return self + other def __sub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": - pass + raise NotImplementedError(f"Please implement the `__sub__` method") def __rsub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": - pass + raise NotImplementedError(f"Please implement the `__rsub__` method") def __mul__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": - pass + raise NotImplementedError(f"Please implement the `__mul__` method") def __truediv__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": - pass + raise NotImplementedError(f"Please implement the `__truediv__` method") def __eq__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": - pass + raise NotImplementedError(f"Please implement the `__eq__` method") def __gt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": - pass + raise NotImplementedError(f"Please implement the `__gt__` method") def __lt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": - pass + raise NotImplementedError(f"Please implement the `__lt__` method") def __len__(self) -> int: - pass + raise NotImplementedError(f"Please implement the `__len__` method") def sum(self) -> float: - pass + raise NotImplementedError(f"Please implement the `sum` method") def mean(self) -> float: - pass + raise NotImplementedError(f"Please implement the `mean` method") def count(self) -> int: - pass + """Return the count of the single metric, NaN is not included. + """ + + raise NotImplementedError(f"Please implement the `count` method") def abs(self) -> "BaseSingleMetric": - pass + raise NotImplementedError(f"Please implement the `abs` method") def astype(self, type: type) -> "BaseSingleMetric": - pass + raise NotImplementedError(f"Please implement the `astype` method") @property def empty(self) -> bool: """If metric is empyt, return True.""" - pass + raise NotImplementedError(f"Please implement the `empty` method") def add(self, other: "BaseSingleMetric", fill_value: float = None) -> "BaseSingleMetric": """Replace np.NaN with fill_value in two metrics and add them.""" - pass + raise NotImplementedError(f"Please implement the `add` method") - def apply(self, map_dict: dict) -> "BaseSingleMetric": + def map(self, map_dict: dict) -> "BaseSingleMetric": """Replace the value of metric according to map_dict.""" - pass + raise NotImplementedError(f"Please implement the `map` method") class BaseOrderIndicator: """ The data structure of order indicator. !!!NOTE: There are two ways to organize the data structure. Please choose a better way. - 1. one way is use BaseSingleMetric to represent each metric. For example, the data - structure of PandasOrderIndicator is Dict[str: PandasSingleMetric]. It uses + 1. One way is using BaseSingleMetric to represent each metric. For example, the data + structure of PandasOrderIndicator is Dict[str, PandasSingleMetric]. It uses PandasSingleMetric based on pd.Series to represent each metric. - 2. the another way doesn't BaseSingleMetric to represent each metric. The data - structure of PandasOrderIndicator is a whole matrix. + 2. The another way doesn't use BaseSingleMetric to represent each metric. The data + structure of PandasOrderIndicator is a whole matrix. It means you are not neccesary + to inherit the BaseSingleMetric. """ def assign(self, col: str, metric: Union[dict, pd.Series]): @@ -367,7 +372,7 @@ def empty(self): def add(self, other, fill_value=None): return PandasSingleMetric(self.metric.add(other.metric, fill_value=fill_value)) - def apply(self, map_dict: dict): + def map(self, map_dict: dict): return PandasSingleMetric(self.metric.apply(map_dict)) diff --git a/qlib/backtest/report.py b/qlib/backtest/report.py index 98d8b4f635..375100cbad 100644 --- a/qlib/backtest/report.py +++ b/qlib/backtest/report.py @@ -6,8 +6,6 @@ from logging import warning import pathlib from typing import Dict, List, Tuple, Union, Callable -import warnings -import inspect import numpy as np import pandas as pd @@ -18,7 +16,7 @@ from qlib.backtest.order import BaseTradeDecision, Order, OrderDir from qlib.backtest.utils import TradeCalendarManager -from .high_performane_ds import PandasOrderIndicator +from .high_performance_ds import PandasOrderIndicator from ..data import D from ..tests.config import CSI300_BENCH from ..utils.resam import get_higher_eq_freq_feature, resam_ts_data @@ -329,7 +327,7 @@ def func(trade_price, deal_amount): self.order_indicator.transfer(func, "trade_price") def func_apply(trade_dir): - return trade_dir.apply(Order.parse_dir) + return trade_dir.map(Order.parse_dir) self.order_indicator.transfer(func_apply, "trade_dir")