diff --git a/eden/mononoke/Cargo.toml b/eden/mononoke/Cargo.toml index 1e0bbcffc5d9c..8a30368723917 100644 --- a/eden/mononoke/Cargo.toml +++ b/eden/mononoke/Cargo.toml @@ -236,7 +236,7 @@ members = [ "commit_rewriting/cross_repo_sync", "commit_rewriting/cross_repo_sync/test_utils", "commit_rewriting/live_commit_sync_config", - "commit_rewriting/megarepolib", + "commit_rewriting/megarepo", "commit_rewriting/movers", "commit_rewriting/synced_commit_mapping", "common/allocation_tracing", diff --git a/eden/mononoke/commit_rewriting/cross_repo_sync/test_utils/Cargo.toml b/eden/mononoke/commit_rewriting/cross_repo_sync/test_utils/Cargo.toml index 6631b48457e24..17433d0b83c1d 100644 --- a/eden/mononoke/commit_rewriting/cross_repo_sync/test_utils/Cargo.toml +++ b/eden/mononoke/commit_rewriting/cross_repo_sync/test_utils/Cargo.toml @@ -17,7 +17,7 @@ blobstore = { path = "../../../blobstore" } bookmarks = { path = "../../../bookmarks" } context = { path = "../../../server/context" } cross_repo_sync = { path = ".." } -megarepolib = { path = "../../megarepolib" } +megarepolib = { path = "../../megarepo" } metaconfig_types = { path = "../../../metaconfig/types" } mononoke_types = { path = "../../../mononoke_types" } sql_construct = { path = "../../../common/sql_construct" } diff --git a/eden/mononoke/commit_rewriting/megarepolib/Cargo.toml b/eden/mononoke/commit_rewriting/megarepo/Cargo.toml similarity index 66% rename from eden/mononoke/commit_rewriting/megarepolib/Cargo.toml rename to eden/mononoke/commit_rewriting/megarepo/Cargo.toml index 3d130a118de3d..1e65f1921971b 100644 --- a/eden/mononoke/commit_rewriting/megarepolib/Cargo.toml +++ b/eden/mononoke/commit_rewriting/megarepo/Cargo.toml @@ -4,29 +4,45 @@ edition = "2018" version = "0.1.0" authors = ['Facebook'] license = "GPLv2+" -include = ["src/**/*.rs"] +include = ["src/**/*.rs", "tool/**/*.rs"] + +[lib] +path = "src/lib.rs" + +[[bin]] +name = "megarepotool" +path = "tool/main.rs" [dependencies] blobrepo = { path = "../../blobrepo" } blobrepo_hg = { path = "../../blobrepo/blobrepo_hg" } +blobrepo_utils = { path = "../../blobrepo_utils" } blobstore = { path = "../../blobstore" } bookmarks = { path = "../../bookmarks" } +cmdlib = { path = "../../cmdlib" } context = { path = "../../server/context" } +cross_repo_sync = { path = "../cross_repo_sync" } manifest = { path = "../../manifest" } mercurial_types = { path = "../../mercurial/types" } +metaconfig_types = { path = "../../metaconfig/types" } mononoke_types = { path = "../../mononoke_types" } movers = { path = "../movers" } +revset = { path = "../../revset" } +skiplist = { path = "../../reachabilityindex/skiplist" } +synced_commit_mapping = { path = "../synced_commit_mapping" } +cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } +fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } +futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } anyhow = "1.0" +clap = "2.33" futures = { version = "0.3.5", features = ["async-await", "compat"] } +futures-old = { package = "futures", version = "0.1" } itertools = "0.8" +maplit = "1.0" slog = { version = "2.5", features = ["max_level_debug"] } [dev-dependencies] fixtures = { path = "../../tests/fixtures" } tests_utils = { path = "../../tests/utils" } async_unit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } -cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } -fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } -futures-old = { package = "futures", version = "0.1" } -maplit = "1.0" tokio-compat = "0.1" diff --git a/eden/mononoke/commit_rewriting/megarepolib/src/chunking.rs b/eden/mononoke/commit_rewriting/megarepo/src/chunking.rs similarity index 100% rename from eden/mononoke/commit_rewriting/megarepolib/src/chunking.rs rename to eden/mononoke/commit_rewriting/megarepo/src/chunking.rs diff --git a/eden/mononoke/commit_rewriting/megarepolib/src/common.rs b/eden/mononoke/commit_rewriting/megarepo/src/common.rs similarity index 100% rename from eden/mononoke/commit_rewriting/megarepolib/src/common.rs rename to eden/mononoke/commit_rewriting/megarepo/src/common.rs diff --git a/eden/mononoke/commit_rewriting/megarepolib/src/lib.rs b/eden/mononoke/commit_rewriting/megarepo/src/lib.rs similarity index 100% rename from eden/mononoke/commit_rewriting/megarepolib/src/lib.rs rename to eden/mononoke/commit_rewriting/megarepo/src/lib.rs diff --git a/eden/mononoke/commit_rewriting/megarepolib/src/pre_merge_deletes.rs b/eden/mononoke/commit_rewriting/megarepo/src/pre_merge_deletes.rs similarity index 100% rename from eden/mononoke/commit_rewriting/megarepolib/src/pre_merge_deletes.rs rename to eden/mononoke/commit_rewriting/megarepo/src/pre_merge_deletes.rs diff --git a/eden/mononoke/commit_rewriting/megarepo/tool/cli.rs b/eden/mononoke/commit_rewriting/megarepo/tool/cli.rs new file mode 100644 index 0000000000000..33f4adee7ec66 --- /dev/null +++ b/eden/mononoke/commit_rewriting/megarepo/tool/cli.rs @@ -0,0 +1,160 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This software may be used and distributed according to the terms of the + * GNU General Public License version 2. + */ + +use anyhow::{format_err, Error}; +use bookmarks::BookmarkName; +use clap::{App, Arg, ArgMatches, SubCommand}; +use cmdlib::args; +use futures_ext::{try_boxfuture, BoxFuture, FutureExt}; +use futures_old::future::{err, ok}; +use megarepolib::common::ChangesetArgs; +use mononoke_types::DateTime; + +pub const COMMIT_HASH: &'static str = "commit-hash"; +pub const MOVE: &'static str = "move"; +pub const MERGE: &'static str = "merge"; +pub const MARK_PUBLIC: &'static str = "mark-public"; +pub const ORIGIN_REPO: &'static str = "origin-repo"; +pub const CHANGESET: &'static str = "commit"; +pub const FIRST_PARENT: &'static str = "first-parent"; +pub const SECOND_PARENT: &'static str = "second-parent"; +pub const COMMIT_MESSAGE: &'static str = "commit-message"; +pub const COMMIT_AUTHOR: &'static str = "commit-author"; +pub const COMMIT_DATE_RFC3339: &'static str = "commit-date-rfc3339"; +pub const COMMIT_BOOKMARK: &'static str = "bookmark"; +pub const SYNC_DIAMOND_MERGE: &'static str = "sync-diamond-merge"; +pub const MAX_NUM_OF_MOVES_IN_COMMIT: &'static str = "max-num-of-moves-in-commit"; + +pub fn cs_args_from_matches<'a>(sub_m: &ArgMatches<'a>) -> BoxFuture { + let message = try_boxfuture!(sub_m + .value_of(COMMIT_MESSAGE) + .ok_or_else(|| format_err!("missing argument {}", COMMIT_MESSAGE))) + .to_string(); + let author = try_boxfuture!(sub_m + .value_of(COMMIT_AUTHOR) + .ok_or_else(|| format_err!("missing argument {}", COMMIT_AUTHOR))) + .to_string(); + let datetime = try_boxfuture!(sub_m + .value_of(COMMIT_DATE_RFC3339) + .map(|datetime_str| DateTime::from_rfc3339(datetime_str)) + .unwrap_or_else(|| Ok(DateTime::now()))); + let bookmark = try_boxfuture!(sub_m + .value_of(COMMIT_BOOKMARK) + .map(|bookmark_str| BookmarkName::new(bookmark_str)) + .transpose()); + let mark_public = sub_m.is_present(MARK_PUBLIC); + if !mark_public && bookmark.is_some() { + return err(format_err!( + "--mark-public is required if --bookmark is provided" + )) + .boxify(); + } + + ok(ChangesetArgs { + author, + message, + datetime, + bookmark, + mark_public, + }) + .boxify() +} + +fn add_resulting_commit_args<'a, 'b>(subcommand: App<'a, 'b>) -> App<'a, 'b> { + subcommand + .arg( + Arg::with_name(COMMIT_AUTHOR) + .help("commit author to use") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name(COMMIT_MESSAGE) + .help("commit message to use") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name(MARK_PUBLIC) + .help("add the resulting commit to the public phase") + .long(MARK_PUBLIC), + ) + .arg( + Arg::with_name(COMMIT_DATE_RFC3339) + .help("commit date to use (default is now)") + .long(COMMIT_DATE_RFC3339) + .takes_value(true), + ) + .arg( + Arg::with_name(COMMIT_BOOKMARK) + .help("bookmark to point to resulting commits (no sanity checks, will move existing bookmark, be careful)") + .long(COMMIT_BOOKMARK) + .takes_value(true) + ) +} + +pub fn setup_app<'a, 'b>() -> App<'a, 'b> { + let move_subcommand = SubCommand::with_name(MOVE) + .about("create a move commit, using a provided spec") + .arg( + Arg::with_name(MAX_NUM_OF_MOVES_IN_COMMIT) + .long(MAX_NUM_OF_MOVES_IN_COMMIT) + .help("how many files a single commit moves (note - that might create a stack of move commits instead of just one)") + .takes_value(true) + .required(false), + ) + .arg( + Arg::with_name(ORIGIN_REPO) + .help("use predefined mover for part of megarepo, coming from this repo") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name(CHANGESET) + .help("a changeset hash or bookmark of move commit's parent") + .takes_value(true) + .required(true), + ); + + let merge_subcommand = SubCommand::with_name(MERGE) + .about("create a merge commit with given parents") + .arg( + Arg::with_name(FIRST_PARENT) + .help("first parent of a produced merge commit") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name(SECOND_PARENT) + .help("second parent of a produced merge commit") + .takes_value(true) + .required(true), + ); + + let sync_diamond_subcommand = SubCommand::with_name(SYNC_DIAMOND_MERGE) + .about("sync a diamond merge commit from a small repo into large repo") + .arg( + Arg::with_name(COMMIT_HASH) + .help("diamond merge commit from small repo to sync") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name(COMMIT_BOOKMARK) + .help("bookmark to point to resulting commits (no sanity checks, will move existing bookmark, be careful)") + .long(COMMIT_BOOKMARK) + .takes_value(true) + ); + + args::MononokeApp::new("megarepo preparation tool") + .with_advanced_args_hidden() + .with_source_and_target_repos() + .build() + .subcommand(add_resulting_commit_args(move_subcommand)) + .subcommand(add_resulting_commit_args(merge_subcommand)) + .subcommand(sync_diamond_subcommand) +} diff --git a/eden/mononoke/commit_rewriting/megarepo/tool/main.rs b/eden/mononoke/commit_rewriting/megarepo/tool/main.rs new file mode 100644 index 0000000000000..75870a16cab54 --- /dev/null +++ b/eden/mononoke/commit_rewriting/megarepo/tool/main.rs @@ -0,0 +1,221 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This software may be used and distributed according to the terms of the + * GNU General Public License version 2. + */ + +#![deny(warnings)] +#![feature(process_exitcode_placeholder)] + +use anyhow::{bail, format_err, Error, Result}; +use bookmarks::BookmarkName; +use clap::ArgMatches; +use cmdlib::{args, helpers}; +use context::CoreContext; +use fbinit::FacebookInit; +use futures::{ + compat::Future01CompatExt, + future::{try_join, try_join3}, +}; +use metaconfig_types::RepoConfig; +use mononoke_types::RepositoryId; +use movers::get_small_to_large_mover; +use slog::info; +use std::num::NonZeroU64; +use synced_commit_mapping::SqlSyncedCommitMapping; + +mod cli; +mod merging; +mod sync_diamond_merge; + +use crate::cli::{ + cs_args_from_matches, setup_app, CHANGESET, COMMIT_HASH, FIRST_PARENT, + MAX_NUM_OF_MOVES_IN_COMMIT, MERGE, MOVE, ORIGIN_REPO, SECOND_PARENT, SYNC_DIAMOND_MERGE, +}; +use crate::merging::perform_merge; +use megarepolib::{common::StackPosition, perform_move, perform_stack_move}; + +async fn run_move<'a>( + ctx: CoreContext, + matches: &ArgMatches<'a>, + sub_m: &ArgMatches<'a>, + repo_config: RepoConfig, +) -> Result<(), Error> { + let origin_repo = + RepositoryId::new(args::get_i32_opt(&sub_m, ORIGIN_REPO).expect("Origin repo is missing")); + let resulting_changeset_args = cs_args_from_matches(&sub_m); + let commit_sync_config = repo_config.commit_sync_config.as_ref().unwrap(); + let mover = get_small_to_large_mover(commit_sync_config, origin_repo).unwrap(); + let move_parent = sub_m.value_of(CHANGESET).unwrap().to_owned(); + + let max_num_of_moves_in_commit = + args::get_and_parse_opt::(sub_m, MAX_NUM_OF_MOVES_IN_COMMIT); + + let (repo, resulting_changeset_args) = try_join( + args::open_repo(ctx.fb, &ctx.logger().clone(), &matches).compat(), + resulting_changeset_args.compat(), + ) + .await?; + + let parent_bcs_id = helpers::csid_resolve(ctx.clone(), repo.clone(), move_parent) + .compat() + .await?; + + if let Some(max_num_of_moves_in_commit) = max_num_of_moves_in_commit { + perform_stack_move( + &ctx, + &repo, + parent_bcs_id, + mover, + max_num_of_moves_in_commit, + |num: StackPosition| { + let mut args = resulting_changeset_args.clone(); + let message = args.message + &format!(" #{}", num.0); + args.message = message; + args + }, + ) + .await + .map(|changesets| { + info!( + ctx.logger(), + "created {} commits, with the last commit {:?}", + changesets.len(), + changesets.last() + ); + () + }) + } else { + perform_move(&ctx, &repo, parent_bcs_id, mover, resulting_changeset_args) + .await + .map(|_| ()) + } +} + +async fn run_merge<'a>( + ctx: CoreContext, + matches: &ArgMatches<'a>, + sub_m: &ArgMatches<'a>, +) -> Result<(), Error> { + let first_parent = sub_m.value_of(FIRST_PARENT).unwrap().to_owned(); + let second_parent = sub_m.value_of(SECOND_PARENT).unwrap().to_owned(); + let resulting_changeset_args = cs_args_from_matches(&sub_m); + let (repo, resulting_changeset_args) = try_join( + args::open_repo(ctx.fb, &ctx.logger().clone(), &matches).compat(), + resulting_changeset_args.compat(), + ) + .await?; + + let first_parent_fut = helpers::csid_resolve(ctx.clone(), repo.clone(), first_parent); + let second_parent_fut = helpers::csid_resolve(ctx.clone(), repo.clone(), second_parent); + let (first_parent, second_parent) = + try_join(first_parent_fut.compat(), second_parent_fut.compat()).await?; + + info!(ctx.logger(), "Creating a merge commit"); + perform_merge( + ctx.clone(), + repo.clone(), + first_parent, + second_parent, + resulting_changeset_args, + ) + .compat() + .await + .map(|_| ()) +} + +async fn run_sync_diamond_merge<'a>( + ctx: CoreContext, + matches: &ArgMatches<'a>, + sub_m: &ArgMatches<'a>, +) -> Result<(), Error> { + let source_repo_id = args::get_source_repo_id(ctx.fb, matches)?; + let target_repo_id = args::get_target_repo_id(ctx.fb, matches)?; + let maybe_bookmark = sub_m + .value_of(cli::COMMIT_BOOKMARK) + .map(|bookmark_str| BookmarkName::new(bookmark_str)) + .transpose()?; + + let bookmark = maybe_bookmark.ok_or(Error::msg("bookmark must be specified"))?; + + let source_repo = args::open_repo_with_repo_id(ctx.fb, ctx.logger(), source_repo_id, matches); + let target_repo = args::open_repo_with_repo_id(ctx.fb, ctx.logger(), target_repo_id, matches); + let mapping = args::open_source_sql::(ctx.fb, &matches); + + let (_, source_repo_config) = args::get_config_by_repoid(ctx.fb, matches, source_repo_id)?; + + let merge_commit_hash = sub_m.value_of(COMMIT_HASH).unwrap().to_owned(); + let (source_repo, target_repo, mapping) = + try_join3(source_repo.compat(), target_repo.compat(), mapping.compat()).await?; + + let source_merge_cs_id = + helpers::csid_resolve(ctx.clone(), source_repo.clone(), merge_commit_hash) + .compat() + .await?; + + sync_diamond_merge::do_sync_diamond_merge( + ctx, + source_repo, + target_repo, + source_merge_cs_id, + mapping, + source_repo_config, + bookmark, + ) + .await + .map(|_| ()) +} + +fn get_and_verify_repo_config<'a>( + fb: FacebookInit, + matches: &ArgMatches<'a>, +) -> Result { + args::get_config(fb, &matches).and_then(|(repo_name, repo_config)| { + let repo_id = repo_config.repoid; + repo_config + .commit_sync_config + .as_ref() + .ok_or_else(|| format_err!("no sync config provided for {}", repo_name)) + .map(|commit_sync_config| commit_sync_config.large_repo_id) + .and_then(move |large_repo_id| { + if repo_id != large_repo_id { + Err(format_err!( + "repo must be a large repo in commit sync config" + )) + } else { + Ok(repo_config) + } + }) + }) +} + +#[fbinit::main] +fn main(fb: FacebookInit) -> Result<()> { + let app = setup_app(); + let matches = app.get_matches(); + args::init_cachelib(fb, &matches, None); + let logger = args::init_logging(fb, &matches); + let ctx = CoreContext::new_with_logger(fb, logger.clone()); + + let subcommand_future = async { + match matches.subcommand() { + (MOVE, Some(sub_m)) => { + let repo_config = get_and_verify_repo_config(fb, &matches)?; + run_move(ctx, &matches, sub_m, repo_config).await + } + (MERGE, Some(sub_m)) => run_merge(ctx, &matches, sub_m).await, + (SYNC_DIAMOND_MERGE, Some(sub_m)) => run_sync_diamond_merge(ctx, &matches, sub_m).await, + _ => bail!("oh no, wrong arguments provided!"), + } + }; + + helpers::block_execute( + subcommand_future, + fb, + "megarepotool", + &logger, + &matches, + cmdlib::monitoring::AliveService, + ) +} diff --git a/eden/mononoke/commit_rewriting/megarepo/tool/merging.rs b/eden/mononoke/commit_rewriting/megarepo/tool/merging.rs new file mode 100644 index 0000000000000..318ead3e276d0 --- /dev/null +++ b/eden/mononoke/commit_rewriting/megarepo/tool/merging.rs @@ -0,0 +1,144 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This software may be used and distributed according to the terms of the + * GNU General Public License version 2. + */ + +use anyhow::{format_err, Error}; +use blobrepo::BlobRepo; +use blobrepo_hg::BlobRepoHg; +use blobstore::Loadable; +use cloned::cloned; +use context::CoreContext; +use futures::{FutureExt, TryFutureExt}; +use futures_old::future::{err, ok, Future}; +use futures_old::stream::Stream; +use manifest::ManifestOps; +use mercurial_types::{HgChangesetId, MPath}; +use mononoke_types::ChangesetId; +use slog::info; +use std::collections::{BTreeMap, HashSet}; +use std::iter::FromIterator; + +use megarepolib::common::{create_save_and_generate_hg_changeset, ChangesetArgs}; + +fn get_all_files_in_working_copy( + ctx: CoreContext, + repo: BlobRepo, + hg_cs_id: HgChangesetId, +) -> impl Future, Error = Error> { + hg_cs_id + .load(ctx.clone(), repo.blobstore()) + .compat() + .from_err() + .and_then({ + cloned!(ctx, repo); + move |hg_cs| { + hg_cs + .manifestid() + .list_leaf_entries(ctx, repo.get_blobstore()) + .map(|(mpath, _)| mpath) + .collect() + } + }) +} + +fn fail_on_path_conflicts( + ctx: CoreContext, + repo: BlobRepo, + hg_cs_id_1: HgChangesetId, + hg_cs_id_2: HgChangesetId, +) -> impl Future { + info!(ctx.logger(), "Checking if there are any path conflicts"); + let all_files_1_fut = get_all_files_in_working_copy(ctx.clone(), repo.clone(), hg_cs_id_1); + let all_files_2_fut = get_all_files_in_working_copy(ctx.clone(), repo.clone(), hg_cs_id_2); + all_files_1_fut + .join(all_files_2_fut) + .and_then(move |(all_files_1, all_files_2)| { + let all_files_1 = HashSet::<_>::from_iter(all_files_1); + let all_files_2 = HashSet::from_iter(all_files_2); + let intersection: Vec = all_files_1 + .intersection(&all_files_2) + .take(10) + .cloned() + .collect(); + if intersection.len() > 0 { + err(format_err!( + "There are paths present in both parents: {:?} ...", + intersection + )) + } else { + info!(ctx.logger(), "Done checking path conflicts"); + ok(()) + } + }) +} + +pub fn perform_merge( + ctx: CoreContext, + repo: BlobRepo, + first_bcs_id: ChangesetId, + second_bcs_id: ChangesetId, + resulting_changeset_args: ChangesetArgs, +) -> impl Future { + let first_hg_cs_id_fut = repo.get_hg_from_bonsai_changeset(ctx.clone(), first_bcs_id.clone()); + let second_hg_cs_id_fut = repo.get_hg_from_bonsai_changeset(ctx.clone(), second_bcs_id.clone()); + first_hg_cs_id_fut + .join(second_hg_cs_id_fut) + .and_then({ + cloned!(ctx, repo); + move |(first_hg_cs_id, second_hg_cs_id)| { + fail_on_path_conflicts(ctx, repo, first_hg_cs_id, second_hg_cs_id) + } + }) + .and_then({ + cloned!(ctx, repo, first_bcs_id, second_bcs_id); + move |_| { + info!( + ctx.logger(), + "Creating a merge bonsai changeset with parents: {:?}, {:?}", + first_bcs_id, + second_bcs_id + ); + async move { + create_save_and_generate_hg_changeset( + &ctx, + &repo, + vec![first_bcs_id, second_bcs_id], + BTreeMap::new(), + resulting_changeset_args, + ) + .await + } + .boxed() + .compat() + } + }) +} + +#[cfg(test)] +mod test { + use super::*; + use fbinit::FacebookInit; + use fixtures::merge_even; + use futures::compat::Future01CompatExt; + use std::str::FromStr; + + #[fbinit::test] + fn test_path_conflict_detection(fb: FacebookInit) { + async_unit::tokio_unit_test(async move { + let repo = merge_even::getrepo(fb).await; + let ctx = CoreContext::test_mock(fb); + let p1 = HgChangesetId::from_str("4f7f3fd428bec1a48f9314414b063c706d9c1aed").unwrap(); + let p2 = HgChangesetId::from_str("16839021e338500b3cf7c9b871c8a07351697d68").unwrap(); + assert!( + fail_on_path_conflicts(ctx, repo, p1, p2) + .compat() + .await + .is_err(), + "path conflicts should've been detected" + ); + }); + } +} diff --git a/eden/mononoke/commit_rewriting/megarepo/tool/sync_diamond_merge.rs b/eden/mononoke/commit_rewriting/megarepo/tool/sync_diamond_merge.rs new file mode 100644 index 0000000000000..7914b824b6edb --- /dev/null +++ b/eden/mononoke/commit_rewriting/megarepo/tool/sync_diamond_merge.rs @@ -0,0 +1,429 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This software may be used and distributed according to the terms of the + * GNU General Public License version 2. + */ + +/// This is a very hacky temporary tool that's used with only one purpose - +/// to half-manually sync a diamond merge commit from a small repo into a large repo. +/// NOTE - this is not a production quality tool, but rather a best effort attempt to +/// half-automate a rare case that might occur. Tool most likely doesn't cover all the cases. +/// USE WITH CARE! +use anyhow::{format_err, Error}; +use blobrepo::BlobRepo; +use blobrepo_hg::BlobRepoHg; +use blobrepo_utils::convert_diff_result_into_file_change_for_diamond_merge; +use blobstore::Loadable; +use bookmarks::{BookmarkName, BookmarkUpdateReason}; +use cloned::cloned; +use context::CoreContext; +use cross_repo_sync::{ + create_commit_syncers, rewrite_commit, update_mapping, upload_commits, CommitSyncOutcome, + CommitSyncer, Syncers, +}; +use futures::{ + compat::Future01CompatExt, + future::TryFutureExt, + stream::{futures_unordered::FuturesUnordered, TryStreamExt}, +}; +use futures_ext::{BoxStream, StreamExt}; +use futures_old::{Future, IntoFuture, Stream}; +use manifest::{bonsai_diff, BonsaiDiffFileChange}; +use maplit::hashmap; +use mercurial_types::{HgFileNodeId, HgManifestId}; +use metaconfig_types::RepoConfig; +use mononoke_types::{BonsaiChangeset, ChangesetId, FileChange, MPath}; +use revset::DifferenceOfUnionsOfAncestorsNodeStream; +use skiplist::fetch_skiplist_index; +use slog::{info, warn}; +use std::collections::{BTreeMap, HashMap}; +use synced_commit_mapping::SqlSyncedCommitMapping; + +/// The function syncs merge commit M from a small repo into a large repo. +/// It's designed to handle a case described below +/// +/// Small repo state +/// M +/// | \ +/// P1 | <- P1 must already be synced +/// | | +/// | P2 <- might not be synced yet +/// ... | +/// | / +/// | / +/// ROOT +/// +/// Large repo state +/// +/// O <- ONTO value (i.e. where onto_bookmark points to) +/// ... <- commits from another small repo +/// | +/// P1' <- synced P1 commit from small repo +/// | +/// OVR' <- Potentially there can be commits from another repo between root and P1! +/// | +/// ROOT' <- synced ROOT commit +/// +/// +/// Most of the complexity stems from two facts +/// 1) If parents have different file content, then merge commit must have a file change entry for them +/// 2) that large repo might have rewritten commits from another small repo between ROOT' and P1'. +/// +/// That means that rewritten M' bonsai object must contain file change entries that were changed +/// in OVR* commits. +/// +/// So the function works as follows: +/// 1) Syncs all ROOT::P2 commits - nothing difficult here, just rewrite and save to large repo. +/// Those commits are expected to be non-merges for simplicity +/// 2) Create new merge commit +/// a) First find all the changed files from another small repo - those need to be in the merge repo +/// NOTE - we expect that all changes from this small repo are already in the bonsai changeset +/// b) Add file changes from previous step in the merge commit +/// c) Change parents +/// 3) Save merge commit in large repo +/// 4) Update the bookmark +pub async fn do_sync_diamond_merge( + ctx: CoreContext, + small_repo: BlobRepo, + large_repo: BlobRepo, + small_merge_cs_id: ChangesetId, + mapping: SqlSyncedCommitMapping, + small_repo_config: RepoConfig, + onto_bookmark: BookmarkName, +) -> Result<(), Error> { + info!( + ctx.logger(), + "Preparing to sync a merge commit {}...", small_merge_cs_id + ); + + let parents = small_repo + .get_changeset_parents_by_bonsai(ctx.clone(), small_merge_cs_id) + .compat() + .await?; + + let (p1, p2) = validate_parents(parents)?; + + let new_branch = + find_new_branch_oldest_first(ctx.clone(), &small_repo, p1, p2, &small_repo_config).await?; + + let syncers = create_commit_syncers( + small_repo.clone(), + large_repo.clone(), + &small_repo_config + .commit_sync_config + .ok_or(Error::msg("Commit sync config is not specified"))?, + mapping, + )?; + + let small_root = find_root(&new_branch)?; + + info!( + ctx.logger(), + "{} new commits are going to be merged in", + new_branch.len() + ); + for bcs in new_branch { + let cs_id = bcs.get_changeset_id(); + let parents = bcs.parents().collect::>(); + if parents.len() > 1 { + return Err(format_err!( + "{} from branch contains more than one parent", + cs_id + )); + } + info!(ctx.logger(), "syncing commit from new branch {}", cs_id); + syncers + .small_to_large + .unsafe_sync_commit(ctx.clone(), cs_id) + .await?; + } + + let maybe_onto_value = large_repo + .get_bonsai_bookmark(ctx.clone(), &onto_bookmark) + .compat() + .await?; + + let onto_value = + maybe_onto_value.ok_or(format_err!("cannot find bookmark {}", onto_bookmark))?; + + let rewritten = create_rewritten_merge_commit( + ctx.clone(), + small_merge_cs_id, + &small_repo, + &large_repo, + &syncers, + small_root, + onto_value, + ) + .await?; + + let new_merge_cs_id = rewritten.get_changeset_id(); + info!(ctx.logger(), "uploading merge commit {}", new_merge_cs_id); + upload_commits(ctx.clone(), vec![rewritten], small_repo, large_repo.clone()).await?; + + update_mapping( + ctx.clone(), + hashmap! {small_merge_cs_id => new_merge_cs_id}, + &syncers.small_to_large, + ) + .await?; + + let mut book_txn = large_repo.update_bookmark_transaction(ctx.clone()); + book_txn.force_set( + &onto_bookmark, + new_merge_cs_id, + BookmarkUpdateReason::ManualMove, + None, + )?; + book_txn.commit().await?; + + warn!( + ctx.logger(), + "It is recommended to run 'mononoke_admin crossrepo verify-wc' for {}!", new_merge_cs_id + ); + Ok(()) +} + +async fn create_rewritten_merge_commit( + ctx: CoreContext, + small_merge_cs_id: ChangesetId, + small_repo: &BlobRepo, + large_repo: &BlobRepo, + syncers: &Syncers, + small_root: ChangesetId, + onto_value: ChangesetId, +) -> Result { + let merge_bcs = small_merge_cs_id + .load(ctx.clone(), small_repo.blobstore()) + .await?; + + let parents = merge_bcs.parents().collect(); + let (p1, p2) = validate_parents(parents)?; + + let merge_bcs = merge_bcs.into_mut(); + + let large_root = remap_commit(ctx.clone(), &syncers.small_to_large, small_root).await?; + let remapped_p2 = remap_commit(ctx.clone(), &syncers.small_to_large, p2).await?; + + let remapped_parents = hashmap! { + p1 => onto_value, + p2 => remapped_p2, + }; + let maybe_rewritten = rewrite_commit( + ctx.clone(), + merge_bcs, + &remapped_parents, + syncers.small_to_large.get_mover().clone(), + syncers.small_to_large.get_source_repo().clone(), + ) + .await?; + let mut rewritten = + maybe_rewritten.ok_or(Error::msg("merge commit was unexpectedly rewritten out"))?; + + let mut additional_file_changes = generate_additional_file_changes( + ctx.clone(), + large_root, + &large_repo, + &syncers.large_to_small, + onto_value, + ) + .await?; + + for (path, fc) in rewritten.file_changes { + additional_file_changes.insert(path, fc); + } + rewritten.file_changes = additional_file_changes; + rewritten.freeze() +} + +/// This function finds all the changed file between root and onto that are from another small repo. +/// These files needed to be added to the new merge commit to preserve bonsai semantic. +async fn generate_additional_file_changes( + ctx: CoreContext, + root: ChangesetId, + large_repo: &BlobRepo, + large_to_small: &CommitSyncer, + onto_value: ChangesetId, +) -> Result>, Error> { + let bonsai_diff = find_bonsai_diff(ctx.clone(), &large_repo, root, onto_value) + .collect() + .compat() + .await?; + + let additional_file_changes = FuturesUnordered::new(); + for diff_res in bonsai_diff { + match diff_res { + BonsaiDiffFileChange::Changed(ref path, ..) + | BonsaiDiffFileChange::ChangedReusedId(ref path, ..) + | BonsaiDiffFileChange::Deleted(ref path) => { + let maybe_new_path = large_to_small.get_mover()(path)?; + if maybe_new_path.is_some() { + continue; + } + } + } + + let fc = convert_diff_result_into_file_change_for_diamond_merge( + ctx.clone(), + &large_repo, + diff_res, + ) + .compat(); + additional_file_changes.push(fc); + } + + additional_file_changes + .try_collect::>() + .await +} + +async fn remap_commit( + ctx: CoreContext, + small_to_large_commit_syncer: &CommitSyncer, + cs_id: ChangesetId, +) -> Result { + let maybe_sync_outcome = small_to_large_commit_syncer + .get_commit_sync_outcome(ctx.clone(), cs_id) + .await?; + + let sync_outcome = maybe_sync_outcome.ok_or(format_err!( + "{} from small repo hasn't been remapped in large repo", + cs_id + ))?; + + use CommitSyncOutcome::*; + match sync_outcome { + RewrittenAs(ref cs_id, _) => Ok(*cs_id), + Preserved => Ok(cs_id), + _ => Err(format_err!( + "unexpected commit sync outcome for root, got {:?}", + sync_outcome + )), + } +} + +fn find_root(new_branch: &Vec) -> Result { + let mut cs_to_parents: HashMap<_, Vec<_>> = HashMap::new(); + for bcs in new_branch { + let cs_id = bcs.get_changeset_id(); + cs_to_parents.insert(cs_id, bcs.parents().collect()); + } + + let mut roots = vec![]; + for parents in cs_to_parents.values() { + for p in parents { + if !cs_to_parents.contains_key(p) { + roots.push(p); + } + } + } + + validate_roots(roots).map(|root| *root) +} + +async fn find_new_branch_oldest_first( + ctx: CoreContext, + small_repo: &BlobRepo, + p1: ChangesetId, + p2: ChangesetId, + small_repo_config: &RepoConfig, +) -> Result, Error> { + let fetcher = small_repo.get_changeset_fetcher(); + let skiplist_index = fetch_skiplist_index( + &ctx, + &small_repo_config.skiplist_index_blobstore_key, + &small_repo.get_blobstore().boxed(), + ) + .await?; + + let new_branch = DifferenceOfUnionsOfAncestorsNodeStream::new_with_excludes( + ctx.clone(), + &fetcher, + skiplist_index, + vec![p2], + vec![p1], + ) + .map({ + cloned!(ctx, small_repo); + move |cs| { + cs.load(ctx.clone(), small_repo.blobstore()) + .compat() + .from_err() + } + }) + .buffered(100) + .collect() + .compat() + .await?; + + Ok(new_branch.into_iter().rev().collect()) +} + +fn validate_parents(parents: Vec) -> Result<(ChangesetId, ChangesetId), Error> { + if parents.len() > 2 { + return Err(format_err!( + "too many parents, expected only 2: {:?}", + parents + )); + } + let p1 = parents.get(0).ok_or(Error::msg("not a merge commit"))?; + let p2 = parents.get(1).ok_or(Error::msg("not a merge commit"))?; + + Ok((*p1, *p2)) +} + +fn validate_roots(roots: Vec<&ChangesetId>) -> Result<&ChangesetId, Error> { + if roots.len() > 1 { + return Err(format_err!("too many roots, expected only 1: {:?}", roots)); + } + + roots + .get(0) + .cloned() + .ok_or(Error::msg("no roots found, this is not a diamond merge")) +} + +fn find_bonsai_diff( + ctx: CoreContext, + repo: &BlobRepo, + ancestor: ChangesetId, + descendant: ChangesetId, +) -> BoxStream, Error> { + ( + id_to_manifestid(ctx.clone(), repo.clone(), descendant), + id_to_manifestid(ctx.clone(), repo.clone(), ancestor), + ) + .into_future() + .map({ + cloned!(ctx, repo); + move |(d_mf, a_mf)| { + bonsai_diff( + ctx, + repo.get_blobstore(), + d_mf, + Some(a_mf).into_iter().collect(), + ) + } + }) + .flatten_stream() + .boxify() +} + +fn id_to_manifestid( + ctx: CoreContext, + repo: BlobRepo, + bcs_id: ChangesetId, +) -> impl Future { + repo.get_hg_from_bonsai_changeset(ctx.clone(), bcs_id) + .and_then({ + cloned!(ctx, repo); + move |cs_id| { + cs_id + .load(ctx.clone(), repo.blobstore()) + .compat() + .from_err() + } + }) + .map(|cs| cs.manifestid()) +} diff --git a/eden/mononoke/tests/integration/manifest_deps b/eden/mononoke/tests/integration/manifest_deps index 618ef354420fe..fe09f690ef3ac 100644 --- a/eden/mononoke/tests/integration/manifest_deps +++ b/eden/mononoke/tests/integration/manifest_deps @@ -7,6 +7,7 @@ MONONOKE_BINS = { "BACKSYNCER": "backsyncer_cmd", "EDENAPI_SERVER": "edenapi_server", "LFS_SERVER": "lfs_server", + "MEGAREPO_TOOL": "megarepotool", "MONONOKE_ADMIN": "admin", "MONONOKE_ALIAS_VERIFY": "aliasverify", "MONONOKE_BACKFILL_DERIVED_DATA": "backfill_derived_data",