Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Design of Checkpoint Mechanism in XGBoost-Spark #4786

Open
CodingCat opened this issue Aug 17, 2019 · 10 comments
Open

[RFC] Design of Checkpoint Mechanism in XGBoost-Spark #4786

CodingCat opened this issue Aug 17, 2019 · 10 comments

Comments

@CodingCat
Copy link
Member

CodingCat commented Aug 17, 2019

Checkpoint Cleanup

The current implementation in XGBoost-Spark does not clean up the checkpoint file after a successful training. As a result, the user may get a wrong training process without a careful setup. For instance, the following the process will lead to a wrongly-formed model in the second run.

  • User starts training by setting checkpoint interval as 2 iterations and round as 10 iterations
  • After a successful training, the user starts a new training job with a new group of parameters (except the changing of checkpoint directory)

The second training job will finish “too fast” for two reasons:

The checkpoint built at the 8th iteration in the first was left in the checkpoint directory
Without a different checkpoint dir setup, the second run will load the left checkpoint file and only run for 2 iterations

Therefore, we propose to always cleanup the checkpoint directory with a successful training.

this has been merged with 7b5cbcc

Deterministic Partitioning

Based on the definition of gradient boosting, we should ensure that the input dataset to a booster is fixed for each iteration. However, this would be hard to achieve in the distributed training with Spark.

The current partitioning mechanism in XGBoost-Spark is to ensure that the input data can be dispatched to the correct number of partitions. The number of partitions is controlled by the user’s configuration on parameter numWorkers.

The design of deterministic partitioning is to guarantee that given a fixed input dataset the partitioning of the strategy is always the same. This goal is interpreted as follows:

  • We rely on the user to ensure that the data feed to XGBoost-Spark is fixed for each checkpoint interval
  • XGBoost-Spark guarantees that the partitioning of the given dataset is deterministic

The current partitioning strategy in XGBoost-Spark cannot achieve the second part of the goal. The current strategy is based on the repartition API in Spark RDD which is implemented to ensure an even distribution. The execution of the current partitioning is like the following:

  • We have A partitions and would like to repartition to B partitions
  • Each element in each partition of A is dispatched to B partitions but randomly start from one of B partitions.

Because of the random start, we cannot guarantee the fixed partitioning strategy for each checkpoint interval or each recovery from the failure

We propose the following mechanism to have a deterministic partitioning and achieve load balancing with the best effort

partitionId = math.abs(preventOverflow(row.getHashCode + row.getAs[Float](math.abs(row.getHashCode) % number_non_zero_features)) % numWorkers

Avoid Multiple Jobs for Checkpointing

The current checkpoint is to collect the booster produced at the last iteration of each checkpoint internal to Driver and persist it in HDFS. The major issue with this approach is that it needs to re-perform the data preparation for training if the user didn’t choose to cache the training dataset.

The proposed change is to build the external memory checkpoint in XGBoost4J layer as well so that we can instruct XGBoost4J to save the checkpoint content to HDFS from XGBoost-Spark layer after moving forward with a specified number of iterations.

In the engineering perspective, XGBoost-Spark passes in three variables to XGBoost4J, buildExternalCache, interval and a OutputStream. then in XGBoost4J layer, we simply feed booster to OutputStream in the partition where buildExternalCache is true when it has moved interval iterations

Prediction Cache Building

The key factor leading to the performance drop with checkpoint mechanism is that we lost the prediction cache after the first checkpoint was made so that we need to go through each tree to calculate the residual even we performed the same computation for evaluation in the last round.

There are two potential fixes to address this problem:

  • Rebuild the whole prediction cache in the first iteration in each checkpoint interval: this is the simplest fix as we only need to refill the prediction cache at the beginning of each checkpoint interval. However, it also implicitly pushes the user to set checkpoint interval to a relatively large value to avoid performance issues.
  • Checkpoint prediction cache and add API to load it: the current checkpoint only contains the booster itself. The proposed fix is to add prediction to the content to each checkpoint (not to change the booster format but wrap them in XGBoost-Spark layer). We also need to expose APIs to load the checkpointed cache. However, the major overhead of this fix is the request to expose new APIs which is likely only used by this XGBoost-Spark functionality. Additionally, the proposed API is to expose an internal concept of XGBoost, “Prediction Cache”, to end-users.

Based on the above analysis and the conflicts with Avoid Multiple Jobs for Checkpointing in the second approach, we adopt the first approach for pursuing the best practice of design.

@CodingCat
Copy link
Member Author

@trams welcome for the feedback

@trams
Copy link
Contributor

trams commented Aug 20, 2019

First, I want to thank you for a comprehensive RFC. I like it.

Second, let me quickly reiterate your proposition to make sure I understand correctly.

Reiteration

  1. You propose to make partitioning deterministic by using some kind of "hash" function which result will be based purely on data value.

  2. You propose to add to XGBoost4J an ability to save a booster to an OutputStream.
    This will enable us to store them in HDFS.
    This will also permit us to move checkpoint saving to executors (it can be performed by task 0 or by task number which is determined by a checkpoint number (to spread the load)).
    This would in turn allow us to launch only one Spark job to train a complete model and we would not suffer any performance degradation due to cache loss if a case of no failure cause we won't be restarting the training in this case (as you mentioned here [RFC] Redesign xgboost-spark checkpoint mechanism #4785 that is one of alternative solutions I mentioned)

  3. When we restart the training from a checkpoint we rebuild caches so only the first step will pay a performance penalty.

Tell me if I've got something wrong.

And now I want to share my comments point by point

My comments

  1. "Deterministic partitioning"
    I am all for! I am just a bit confused regarding the proposed partitioning function. Could you elaborate a bit. More specifically I am a bit confused regarding the following
  • the result function returns Float. Should we truncate? Or is it implied?
  • I guess I do not fully understand why current partitioning which is done using HashPartitioner (correct me if I am wrong) is bad. Row.hashCode is suppose to be quite good and it is based on Murmur. But if that hash value is not deterministic (your observation implies it) then I would like to understand why and if it is possible to fix it. Maybe it is because of Vector class we use... (I would like to hear your opinion)

Either way I would like to see our partitioning deterministic and I would gladly help to do this.

  1. You propose to add to XGBoost4J an ability to save a booster to an OutputStream.

Things we must be very careful about.

  • Spark discourages writing to HDFS directly from executors but I googled and it is possible (slightly tricky but not complex)
  • I suggest to save a checkpoint to a temp file and then atomically commit it by using atomic move.
  • I suggest to expose enough logging to ease debugging
  • I suggest to consider exposing some information about the progress though custom Spark Accumulators but I need to think about it a bit

I had some time to think about your comment here. I guess I just have irrational fears about Spark suddenly using speculative execution or just restarting my tasks but this has nothing to do with checkpoints cause xgboost-spark does not work with speculative execution or tasks restarts without checkpoints too.

Overall I think it is definitely much easier then exposing the "state" of the training and I agree that exposing caches may not be a good idea design and API wise (cache is an implementation detail)

  1. When we restart the training from a checkpoint we rebuild caches so only the first step will pay a performance penalty.

I agree with your analysis here

@chenqin
Copy link
Contributor

chenqin commented Aug 20, 2019

Can we test if mod based partition offers approx uniform data distribution?

partitionId = math.abs(preventOverflow(row.getHashCode + row.getAs[Float](math.abs(row.getHashCode) % number_non_zero_features)) % numWorkers

@CodingCat
Copy link
Member Author

CodingCat commented Aug 25, 2019

Can we test if mod based partition offers approx uniform data distribution?

partitionId = math.abs(preventOverflow(row.getHashCode + row.getAs[Float](math.abs(row.getHashCode) % number_non_zero_features)) % numWorkers

no, it will not without any injected randomization, mod based partition is known to subject to data skew

however the proposed strategy here is based on

  • the hash code of the entire row (which is based on murmur3)

plus

  • the value of a particular feature

which creates certain level of randomness for load balancing

@CodingCat
Copy link
Member Author

First, I want to thank you for a comprehensive RFC. I like it.

Second, let me quickly reiterate your proposition to make sure I understand correctly.

Reiteration

  1. You propose to make partitioning deterministic by using some kind of "hash" function which result will be based purely on data value.

yes

  1. You propose to add to XGBoost4J an ability to save a booster to an OutputStream.
    This will enable us to store them in HDFS.
    This will also permit us to move checkpoint saving to executors (it can be performed by task 0 or by task number which is determined by a checkpoint number (to spread the load)).

yes

This would in turn allow us to launch only one Spark job to train a complete model and we would not suffer any performance degradation due to cache loss if a case of no failure cause we won't be restarting the training in this case (as you mentioned here #4785 that is one of alternative solutions I mentioned)

yes

  1. When we restart the training from a checkpoint we rebuild caches so only the first step will pay a performance penalty.

yes

Tell me if I've got something wrong.

And now I want to share my comments point by point

My comments

  1. "Deterministic partitioning"
    I am all for! I am just a bit confused regarding the proposed partitioning function. Could you elaborate a bit. More specifically I am a bit confused regarding the following
  • the result function returns Float. Should we truncate? Or is it implied?
  • I guess I do not fully understand why current partitioning which is done using HashPartitioner (correct me if I am wrong) is bad. Row.hashCode is suppose to be quite good and it is based on Murmur. But if that hash value is not deterministic (your observation implies it) then I would like to understand why and if it is possible to fix it. Maybe it is because of Vector class we use... (I would like to hear your opinion)

the current partitioning triggered by a repartition(N) is not based on HashParititioner, it is more like a RoundRobin manner with a randomized start, check https://github.com/apache/spark/blob/02a0cdea13a5eebd27649a60d981de35156ba52c/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L469-L487

Either way I would like to see our partitioning deterministic and I would gladly help to do this.

  1. You propose to add to XGBoost4J an ability to save a booster to an OutputStream.

Things we must be very careful about.

  • Spark discourages writing to HDFS directly from executors but I googled and it is possible (slightly tricky but not complex)

I don't know why it is not supposed to call HDFS API directly from a task

  • I suggest to save a checkpoint to a temp file and then atomically commit it by using atomic move.

while adding this is not something complicated and good to have, but currently I am not doing it and didn't see any issue, it's just a checkpoint file...I will add it when upstream this part

  • I suggest to expose enough logging to ease debugging

yes

  • I suggest to consider exposing some information about the progress though custom Spark Accumulators but I need to think about it a bit

yes, but I don't think it belongs to this RFC, AnalyticsZoo does use this strategy to report many metrics, however, it relies on the fact that it runs as one job per MicroBatch

I had some time to think about your comment here. I guess I just have irrational fears about Spark suddenly using speculative execution or just restarting my tasks but this has nothing to do with checkpoints cause xgboost-spark does not work with speculative execution or tasks restarts without checkpoints too.

Overall I think it is definitely much easier then exposing the "state" of the training and I agree that exposing caches may not be a good idea design and API wise (cache is an implementation detail)

  1. When we restart the training from a checkpoint we rebuild caches so only the first step will pay a performance penalty.

I agree with your analysis here

@trams
Copy link
Contributor

trams commented Aug 26, 2019

Thank you for your response.
Here are some comments to clarify what I meant

the current partitioning triggered by a repartition(N) is not based on HashParititioner, it is more like a RoundRobin manner with a randomized start, check https://github.com/apache/spark/blob/02a0cdea13a5eebd27649a60d981de35156ba52c/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L469-L487

I did not know that. Thank you for the pointer

I don't know why it is not supposed to call HDFS API directly from a task

Just to make it clear I am not against writing to HDFS from the task directly. I was merely pointed out that it makes me uneasy cause it breaks Spark computational model (tasks should not have side effects). That being said xgboost-spark is breaking it by design :)

Also that is the reason why I suggested to "commit" checkpoint atomically.
My concern is a undetected corrupted checkpoint in a case of a user (or a script) killing the job. My use case is restart or the machine which launches the job.
I understand that this use case is far fetched and we are probably safeguarded by the fact that it is probably impossible to write the file with a booster partially and still get a valid booster

@hcho3 hcho3 mentioned this issue Sep 16, 2019
@viirya
Copy link
Contributor

viirya commented Sep 25, 2019

You propose to make partitioning deterministic by using some kind of "hash" function which result will be based purely on data value.

Hmm, is it a good thing for data diversity on learning? Does not it make some bias on data distribution?

@CodingCat
Copy link
Member Author

You propose to make partitioning deterministic by using some kind of "hash" function which result will be based purely on data value.

Hmm, is it a good thing for data diversity on learning? Does not it make some bias on data distribution?

it will not,

(1) the input to the partition is not based on a single feature, but all features, so suppose we have [0, 0, 1] in the first row, [0, 0, 2] in the second row, in most of cases they will be in different partition

(2) there is a step in xgb to get global data sketch, so say you have [0,0,1] in worker 1, [1,1,1] in worker 2, the distribution of each feature will eventually get synced

@merleyc
Copy link

merleyc commented Oct 1, 2019

Hi @CodingCat ,
In XGBoost using Spark, after the change made here 1 which (if I understood correctly) enables the deterministic data partitioning if checkpoint is enabled, should the model be deterministic after running several times with the same input?
I’m using the same input data, coalescing the data in my python script ( df = df.coalesce(…) ), setting sparkConfig as spark.task.cpus’,‘1’ , fixed random seed, setting nthread=1 in XGBoostClassifier, and the new method needDeterministicRepartitioning to TRUE.
After all of this, I still get different models when using more than 2 workers. Is it expected? Why?
With nworkers <= 2, I got the same models.
Please correct me if I understood anything incorrect regarding the method 'needDeterministicRepartitioning' and the algorithm behavior.
I asked this question xgboost discussion topic but I believe here would be the right link.
Thanks!

@trams
Copy link
Contributor

trams commented Oct 24, 2019

@merleyc here is my feedback. I am no expert here so I may be wrong (please correct me if I am wrong)

  1. I think .coalease is not deterministic.
  2. Do you use hist or approx?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants