[原始碼解析] PyTorch 分散式(10)------DistributedDataParallel之Reducer靜態架構

語言: CN / TW / HK

0x00 摘要

通過上文分析,我們已經知道了 DDP 的基本架構和如何初始化,本文就看看其核心 Reducer 的靜態架構。Reducer 提供了反向傳播中梯度同步的核心實現。

本系列其他文章如下:

[原始碼解析] PyTorch 分散式(1)------歷史和概述

[原始碼解析] PyTorch 如何使用GPU

[原始碼解析] PyTorch 分散式(2) ----- DataParallel(上)

[原始碼解析] PyTorch 分散式(3) ----- DataParallel(下)

[原始碼解析] PyTorch 分散式(4)------分散式應用基礎概念

[原始碼解析] PyTorch 分散式(5) ------ DistributedDataParallel 總述&如何使用

[原始碼解析] PyTorch分散式(6) ---DistributedDataParallel -- 初始化&store

[原始碼解析] PyTorch 分散式(7) ----- DistributedDataParallel 之程序組

[原始碼解析] PyTorch 分散式(8) -------- DistributedDataParallel之論文篇

[原始碼解析] PyTorch 分散式(9) ----- DistributedDataParallel 之初始化

0x01 引論

1.1 呼叫

Reducer 的建立程式碼如下,是在_ddp_init_helper 之中。

python # Note: reverse list of buckets because we want to approximate the # order in which their gradients are produced, and assume they # are used in the forward pass in the order they are defined. self.reducer = dist.Reducer( parameters, # parameters[0]是張量列表 list(reversed(bucket_indices)), # 桶資訊 self.process_group, expect_sparse_gradient, self.bucket_bytes_cap, self.find_unused_parameters, self.gradient_as_bucket_view, param_to_name_mapping, )

呼叫的 parameters 舉例如下, parameters[0] 就是 rank 0 上模型的 parameters,可以看到其只有 [0] 元素有意義,這個 [0] 原始本身包括 20 個元素:

cpp parameters = {list: 1} 0 = {list: 4} 0 = {Parameter: 10} Parameter containing:\ntensor([[-4.0381e-02, 3.8828e-02, 1 ) 1 = {Parameter: 10} Parameter containing:\ntensor([-0.0438, -0.2033, 0.2771, 0.0721, ) 2 = {Parameter: 5} Parameter containing:\ntensor([[-0.0094, -0.1319, 0.0713, 0.3155, ) 3 = {Parameter: 5} Parameter containing:\ntensor([-0.0008, 0.0582, -0.1245, -0.2538, ) ... 20 = {Parameter: 5} Parameter containing:\ntensor([-0.0008, 0.0582, -0.1245, -0.2538, ) __len__ = {int} 20 __len__ = {int} 1

bucket_indices 舉例如下:

關於 tensor indices,就是給所有的tensor一個index,從0開始遞增,一直到 tensors.size()。假如模型的 parameters 一共有20個張量,則 tensor index 從 0 到 19,分成 6 個buckets,則在這6個buckets之中,每個 tensor index 都是唯一不重複的。

```python +-----------------------------------------------------------------------+ | | | | | | | | | | | | | | | ...... | | | | | | | | | +-----------------------------------------------------------------------+

```

python程式碼無意義,我們只能看看C++。

python class Reducer(__pybind11_builtins.pybind11_object): def __init__(self, replicas, *args, **kwargs): """ __init__(self: torch._C._distributed_c10d.Reducer, replicas: List[List[at::Tensor]], bucket_indices: List[List[int]], process_group: c10d::ProcessGroup, expect_sparse_gradients: List[List[bool]] = [], bucket_bytes_cap: int = 26214400, find_unused_parameters: bool = False, gradient_as_bucket_view: bool = False, param_to_name_mapping: Dict[int, str] = {}) -> None """ pass

於是我們來到了 torch/lib/c10d/reducer.h 和 torch/lib/c10d/reducer.cpp。

0x02 Reducer 定義

Reducer 提供了反向傳播中梯度同步的核心實現,其定義相當複雜,我們甚至需要去掉一些不重要的成員變數以便展示:

``c++ class Reducer { public: // The constructor takes a list of variables for every model replica. // The bucket assignment for this reducer is specified as a list of // buckets, each of which is specified as a list of indices into the // variables list for **a single replica** (i.e.variables[0]`). explicit Reducer( std::vector> replicas, std::vector> bucket_indices, c10::intrusive_ptr process_group, std::vector> expect_sparse_gradients, int64_t bucket_bytes_cap, bool find_unused_parameters, bool gradient_as_bucket_view, std::unordered_map paramNames);

protected: // Forward declaration. struct Bucket;

void push_rebuilt_params(const VariableIndex& index);

mutable std::mutex mutex_; const std::vector> replicas_; const c10::intrusive_ptr<::c10d::ProcessGroup> process_group_; std::vector> expect_sparse_gradients_;

std::vector>> grad_accumulators_; std::unordered_map gradAccToVariableMap_; std::vector>> hooks_;

bool expect_autograd_hooks_; bool require_finalize_; size_t next_bucket_;

bool has_marked_unused_parameters_; const bool find_unused_parameters_; const bool gradient_as_bucket_view_; std::vector unused_parameters_; // 如果沒有用到,直接設定為就緒,第一次迭代之後久不會改變了 // Locally used parameter maps indicating if parameters are used locally // during the current iteration or no_sync session if no_sync is on. One // tensor for each model replica and each tensor is one-dim int32 tensor of // number of parameters. These tensors are marked in autograd_hook to indicate // the corresponding param has been used, and get allreduced in the end of // backward of current iteration or no_sync session for figuring out the // globally unused parameters. // // local_used_maps_: CPU tensors for bookkeeping locally used params // local_used_maps_dev_: dev tensors for reducing globally unused params std::vector local_used_maps_; std::vector local_used_maps_dev_; // Indicate that reduction is done and D2H copy is done as well. bool local_used_maps_reduced_;

using GradCallback = torch::distributed::autograd::DistAutogradContext::GradCallback;

// A bucket replica represents [1..N] gradients to be reduced, // with the same dtype, on the same device. // // Batching gradients together before reducing them can result in lower // overhead and/or faster time to completion. Only gradients of the same type // and on the same device can be batched. The tensor that represents the // flattened gradient uses the same type and is placed on the same device. // Buckets are filled as the gradients they hold are computed (triggered by // autograd hooks). Buckets are reduced in a predetermined order that is // identical across processes. struct BucketReplica { // Flattened (1 dimensional) contents of bucket. at::Tensor contents;

// Views into contents for each grad.  Each view will be created with
// layout (sizes + strides) matching the grad's expected layout
// ("Gradient Layout Contract" in torch/csrc/autograd/AccumulateGrad.h).
// `bucket_views_in[i].copy_(grad)` and
// `grad.copy_(bucket_views_out[i])`
// provide convenient ways to move grad data in/out of contents.
// The reason we keep two states for bucket_views is that if DDP
// communication hook was registered, `bucket_views_out` could be
// re-initialized with the value of hook's `future_work`. We still need to
// keep a separate view reference to replica's original contents for
// `bucket_views_in[i].copy_(grad)` call.
std::vector<at::Tensor> bucket_views_in;
std::vector<at::Tensor> bucket_views_out;

// Variables that contribute to this bucket replica. Use refcounted value
// here so that we can easily unflatten the bucket contents into the
// participating variables after reduction has completed.
std::vector<at::Tensor> variables;

// Per-variable offset/length into the flat bucket contents tensor and grad
// bucket.
std::vector<size_t> offsets;
std::vector<size_t> lengths;

// Per-variable sizes into the grad bucekt.
std::vector<c10::IntArrayRef> sizes_vec;

// Number of tensors to be added before this bucket is complete.
// This is reset to `variables.size()` every iteration.
size_t pending;

// TODO(@pietern)
// Memory copies from gradient tensors into the bucket are potentially
// done on different CUDA streams. We record an event for every copy
// so that we can synchronize with them prior to kicking off the reduction.
// std::vector<at::cuda::CUDAEvent> events;

}; // A bucket holds N bucket replicas (1 per model replica). // // If every bucket in this struct is ready, the reduction can be kicked off. // One bucket per replica. Reduction is kicked off when every bucket is ready. // struct Bucket { std::vector replicas;

// Global indices of participating variables in the bucket
std::vector<size_t> variable_indices;

// Number of replicas to be marked done before this bucket is ready.
size_t pending;

// Keep work handle around when this set of buckets is being reduced.
c10::intrusive_ptr<c10d::ProcessGroup::Work> work;

// Keep future work handle around if DDP comm hook is registered.
c10::intrusive_ptr<torch::jit::Future> future_work;

// If this bucket should expect a single sparse gradient.
// Implies: replicas[i].variables.size() == 1.
bool expect_sparse_gradient = false;

};

std::vector buckets_;

// A variable locator locates a particular variable in the bucket // structure. The bucket_index field points to the bucket in the buckets_ // vector. The intra_bucket_index field points to the index of the variable // in any of the vector fields in the bucket replica. struct VariableLocator { // Index into the buckets_ variable. size_t bucket_index; // Index of parameter in single bucket replica. size_t intra_bucket_index;

VariableLocator() = default;

VariableLocator(size_t bucket_index_, size_t intra_bucket_index_) {
  bucket_index = bucket_index_;
  intra_bucket_index = intra_bucket_index_;
}

};

// Map the index of a variable to its location in the bucket structure. std::vector variable_locators_;

// track the number of iterations to synchronize grads in training so far. long num_iterations_; // track the number of buckets that have been ready for // communication calls like allReduce or communication hooks. int num_buckets_ready_;

// We collect the relative timestamp of every gradient being ready // when executing autograd. This can be used to derive a timeline of // the point in time buckets were ready, or ideal bucket assignment/ordering. std::vector> backward_stats_;

int ddp_runtime_logging_sample_rate_ = kDDPRuntimeLoggingSampleRate;

bool is_multi_device_module_ = false;

// Following variables are to help build dynamic bucket order bool has_rebuilt_bucket_; std::vector rebuilt_params_; std::vector rebuilt_param_indices_; const int64_t bucket_bytes_cap_;

struct RpcContext { using ContextPtr = torch::distributed::autograd::ContextPtr; // The shared_ptr is to hold the context instance. ContextPtr context_ptr_holder; std::atomic context_ptr{nullptr};

void set(ContextPtr&& new_context_ptr);

}; RpcContext rpc_context_;

// A struct containing work handle and tensor for allreduce scheduled in // forward pass, if applicable. struct ForwardPassAllreduceWork { c10::intrusive_ptr workHandle; at::Tensor resultTensor; // whether we should divide by the initial world_size or the no. of // remaining DDP ranks. bool useStaticWorldSize; };

// Handle for the currently scheduled allreduce in the forward pass, if // applicable. ForwardPassAllreduceWork forwardPassWorkHandle_;

// Division factor for reduction of gradients. int divFactor_;

bool static_graph_;

// Key: VariableIndex, Value: the number of times that a variable's autograd_hook() // should be triggered before marking this variable's grad as ready for communication. // Map will not change after 1st iteration. std::unordered_map> numGradHooksTriggeredMap_; // Key: VariableIndex, Value: the number of times that a variable's autograd_hook() // are left to be triggered before marking this variable's grad as ready for communication. // Map will change after 1st iteration to track a grad is ready for communication or not. std::unordered_map> numGradHooksTriggeredMapPerIteration_;

private: // comm_hook_ is used to access the DDP communication hook if registered. std::unique_ptr comm_hook_; // Current thread local state at::ThreadLocalState thread_local_state_; // Debug level setting. It is parsed once when Reducer is constructed, and // remains the same across a single invocation of DDP training. DistributedDebugLevel ddp_debug_level_; // Mapping of variable index to fully qualified name of model to notify users // about errors when certain parameters do not get gradient. std::unordered_map param_names_; // Per iteration set of parameter indices that have been marked ready. std::unordered_set perIterationReadyParams_; // Retrieves parameter names that have not been marked as ready as part of // previous iteration. std::vector getUnmarkedParamsForIteration(); // Retrives parameter indices that have not been marked as ready as part of // previous iteration. std::vector getUnmarkedParamIndicesForIteration(); // Raises appropriate error if mark_variable_ready is called on the same // variable twice, which is unexpected. void checkAndRaiseMarkedTwiceError(size_t curVariableIndex);

friend class Logger; }; ```

Reducer 的關鍵成員變數如下。

```c++ std::vector>> grad_accumulators_; // 對應的 index 存了相應的 grad_accumulator,就是 tensor index對應的grad_accumulator std::unordered_map gradAccToVariableMap_; // 存了grad_accumulator & index 的對應關係,這樣以後在 autograd graph 尋找 unused parameters 就方便了 std::vector>> hooks_;

std::vector buckets_;

const std::vector> replicas_; // 傳入的張量 const c10::intrusive_ptr<::c10d::ProcessGroup> process_group_; // 程序組 ```

我們接下來一一分析這些成員變數。

0x03 Bucket

3.1 設計

在規約梯度之前將梯度批處理在一起可以降低開銷和/或加快完成時間。但是隻能對同一裝置上相同型別的梯度進行批處理。

桶是梯度的集合,統一裝置上相同型別的梯度被放到同一個桶之中。在程式碼之中,Bucket 就是桶的概念。

在每次向後傳播中,將所有引數梯度中的張量複製到桶中,並在AllReduce之後將平均梯度複製回桶中。為了加速複製操作,儲存桶始終與引數在同一裝置上建立。如果模型跨越多個裝置,DDP會考慮裝置關聯性,以確保同一儲存桶中的所有引數都位於同一裝置上。AllReduce的順序也會對結果產生影響,因為它決定了多少通訊可以與計算重疊。DDP按model.parameters()的相反順序啟動AllReduce

3.2 定義

3.2.1 BucketReplica有幾個

為了更好的說明,我們首先要分析一下 BucketReplica 是什麼。我們從註釋出發看看。

首先,一個桶 Bucket 有多個BucketReplica,每一個模型對應一個BucketReplica。

cpp // A bucket holds N bucket replicas (1 per model replica).

但是隻用了一個 [0] 元素,因為目前不支援單程序多裝置模式,所以假定桶裡只有一個replica。

cpp GradBucket grad_bucket( next_bucket_, tensors[0], // 這裡的註釋指明瞭不支援 SPMD // Since currently we do not support single-process multiple-device // mode, we can assume only one replica in the bucket. bucket.replicas[0].offsets, bucket.replicas[0].lengths, bucket.replicas[0].sizes_vec); bucket.future_work = comm_hook_->runHook(grad_bucket);

再結合前文程式碼,未來不會支援 SPMD。parameters 就是 [ToyModel] 這個模型列表的引數集合,parameters[0] 就是 ToyModel 的引數。

```python # 下面註釋指明瞭未來也不會支援 SPMD # TODO([email protected]): Remove this field since SPMD is no longer supported, # and also remove all the relevant unnecessary loops. # Module replication within process (single-process multi device)

self._module_copies = [self.module] # 構建一個比如 [ToyModel] 這樣的列表
# Build parameters for reducer.
parameters, expect_sparse_gradient = self._build_params_for_reducer()

```

綜合以上我們知道:

  • DDP 原來是希望像 DP 那樣支援 SPMD,所以本程序就需要維護多個 GPU 之上的多個模型副本的引數,即,parameters 就是一個數組,陣列中每個元素是一個模型副本的引數。
  • parameters 被賦值為 Reducer.replicas_,而 Reducer.replicas_ 用來賦值給 bucket.replicas。
  • 因為未來不支援Reducer.replicas_,所以只有 parameters[0] 有意義。

所以我們得出結論:

  • BucketReplica 就是一個模型的待求梯度引數組。replica 對應一個 device (GPU)上的模型副本的引數資訊(部分),即,一個 replica 代表了 [1..N] 個需要被規約的梯度,這些梯度擁有同樣的 dtype,位於同樣的裝置上。
  • 事實上,只有 bucket.replicas[0] 有意義,就對應了上面程式碼中的 [self.module] 之中的部分需求導張量,就是 parameters[0] 。

3.2.2 關鍵

我們再總結一下 Bucket 的關鍵:

  • replicas 成員變數就是 bucket 對應的各個BucketReplica。一個 BucketReplica 代表了 [1..N] 個需要被規約的梯度,這些梯度擁有同樣的 dtype,位於同樣的裝置上。

  • 只有 bucket.replicas[0] 有意義,就對應了本模型的待求梯度引數組之中本bucket對應的張量

  • 如何賦值?就是使用 Reducer.replicas_ 來賦值,而 replicas_ 就是引數 parameters。我們下面就會介紹。

  • variable_indices 成員變數用來記錄本桶之中有哪些variable 的index。

如何賦值?使用前面介紹的 bucket_indices 進行賦值。

cpp bucket.variable_indices = std::move(bucket_indices[bucket_index]);

如何使用?intra_bucket_index 是bucket.variable_indices的序號,利用序號得到真正的variable index。後文會依據程式碼再進行闡釋。

cpp size_t variable_index = bucket.variable_indices[intra_bucket_index];

3.2.3 具體定義

最後,Bucket 具體定義如下:

```c++ // A bucket holds N bucket replicas (1 per model replica). // // If every bucket in this struct is ready, the reduction can be kicked off. // One bucket per replica. Reduction is kicked off when every bucket is ready. // struct Bucket { std::vector replicas;// 每個模型副本對應一個桶

// Global indices of participating variables in the bucket
std::vector<size_t> variable_indices; // 具體每個桶裡面有哪些 variable。

// Number of replicas to be marked done before this bucket is ready.
size_t pending; // 計數,

// Keep work handle around when this set of buckets is being reduced.
c10::intrusive_ptr<c10d::ProcessGroup::Work> work;

// Keep future work handle around if DDP comm hook is registered.
c10::intrusive_ptr<torch::jit::Future> future_work;

// If this bucket should expect a single sparse gradient.
// Implies: replicas[i].variables.size() == 1.
bool expect_sparse_gradient = false;

}; ```

3.3 設定

Reducer 的成員變數buckets_ 是關鍵,這是Reducer 之中所有的桶。

cpp std::vector<Bucket> buckets_;

在初始化函式中有如何初始化 buckets_,核心是:

  • 找到本bucket在 bucket_indices 之中的 index。
  • 在 parameters 之中找到 index 對應的張量。
  • 在 BucketReplica 之中配置這些張量,就是本bucket應該規約的張量。

```cpp void Reducer::initialize_buckets( std::vector> bucket_indices) {

buckets_.reserve(bucket_count);

for (size_t bucket_index = 0; bucket_index < bucket_count; bucket_index++) { Bucket bucket;

// Variables that expect sparse gradients must have their own bucket.
if (bucket_indices[bucket_index].size() == 1) {
  const auto variable_index = bucket_indices[bucket_index].front();
  bucket.expect_sparse_gradient = // 設定 bucket
      expect_sparse_gradients_[0][variable_index];
}     
// Iterate over model replicas.
for (size_t replica_index = 0; replica_index < replica_count;
     replica_index++) {

  BucketReplica replica; // 設定replica

  if (bucket.expect_sparse_gradient) {
    const auto variable_index = bucket_indices[bucket_index].front();
    // 找到index對應的tensor
    const auto& variable = replicas_[replica_index][variable_index];
    replica.variables = {variable};
  } else {

    // Iterate over bucket variables.
    for (const auto variable_index : bucket_indices[bucket_index]) {
      // 找到index對應的tensor
      const auto& variable = replicas_[replica_index][variable_index];
      if (!options.has_device()) {
        options = options.device(variable.device());
      } 
      if (!options.has_dtype()) {
        options = options.dtype(variable.dtype());
      }

      const auto length = variable.numel();
      replica.variables.push_back(variable); // 插入張量
      replica.offsets.push_back(offset);
      replica.lengths.push_back(length);
      replica.sizes_vec.push_back(variable.sizes());
      offset += length;
    }

    // Allocate bucket contents tensor.
     initialize_bucket_views(replica, replica.contents);
  }

  // Add bucket replica to enclosing bucket.
  bucket.replicas.push_back(std::move(replica)); // 配置bucket
}

bucket.variable_indices = std::move(bucket_indices[bucket_index]);
buckets_.push_back(std::move(bucket)); //插入桶列表

}
} ```

用圖例表示如下,這裡假設 bucket index 是 1,即第 2 個桶,所以 variable_indices 對應了 bucket_indices 中的相應部分。比如 BucketReplica[0] 裡面是 Tensor 4,5,6,而variable_indices就是 Tensor 4,5,6 分別的 index。

下圖中的 bucket_indices 是 Reducer 建構函式的引數之一。

```python +--------------------------------+ +------------------------------------+ |Reducer | | | | | |bucket 0, bucket 1, ...... bucket n | | vector buckets_ +---> | + | | | | | | +--------------------------------+ +------------------------------------+ | +---------------+ +------------------------------+ | +--> | Tensor 4, Tensor 5, Tensor 6 | | | +------------------------------+ | | v +-----------------------------------------+ +-------------------------+-----------+ | | | | Bucket | | +---+-----------+ +---------------+ | | | | | BucketReplica | | BucketReplica | | | | | | | ... | | | | vector replicas +--------> | +---------------+ +---------------+ | | | +-----------------------------------------+ | | | vector variable_indices +-------> | | +-------------------------------------+

bucket_indices +-----------------------------------------------------------------------+ + | | | | | | | | +----------> | | | | | | | | | ...... | | | | | | | | | +-----------------------------------------------------------------------+

```

0x03 BucketReplica

如前面討論的,一個 BucketReplica 代表了 [1..N] 個需要被規約的梯度,這些梯度擁有同樣的 dtype,位於同樣的裝置上。是一個模型待求梯度引數的一部分,具體是哪些,由 bucket 的 variable_indices 決定。

其關鍵成員變數為:

  • std::vector<at::Tensor> variables 是構成此bucket副本的variable。我們在這裡使用refcounted value,這樣我們就可以在完成規約之後,輕鬆地將bucket內容 unflatten 到參與變數中。
  • at::Tensor contents :把桶的內容展平的結果,即Flattened (1 dimensional) 之後的結果。
  • std::vector<at::Tensor> bucket_views_in :提供了從輸入角度在 contents 之中檢視具體梯度的方法。
  • std::vector<at::Tensor> bucket_views_out :提供了從輸出角度在 contents 之中檢視具體梯度的方法。

具體可以參見如下注釋:

Views serve as entry points to copy_ each grad's data in/out of the flat contents tensor.

3.1 Views

關於 std::vector<at::Tensor> bucket_views_instd::vector<at::Tensor> bucket_views_out 的進一步說明:

  • 在 PyTorch 之中,檢視是指建立一個方便檢視的東西,檢視與原資料共享記憶體,它只是將原有的資料進行整理,直接顯示其中部分內容或者進行重排序後再顯示出來。
  • 每個 view 都將按照如下佈局(sizes + strides)建立,這個佈局與grad的預期佈局相匹配。
  • bucket_views_in 和 bucket_views_out 這兩個變數提供在 contents 之中操作具體梯度的方法,或者說,它們提供了檢視(views),該檢視可以操作contents 之中每個張量的梯度。使用者把這兩個變數作為入口點來把每個梯度的資料從 content 之中移入和移出。
  • 我們為bucket_檢視保留兩種狀態的原因是:如果註冊了DDP通訊鉤子(communication hook), bucket_views_out 可以用鉤子的 future_work值重新初始化。所以我們需要為bucket_views_in[i].copy_(grad) 保留一個對 replica 原始 contents 的單獨檢視引用。
  • bucket_views_in[i].copy_(grad)grad.copy_(bucket_views_out[i]) 提供了將梯度資料移入/移出contents的方便方法。

另外,以下三個成員變數儲存桶的每個flat張量資訊,比如offsets儲存了各個張量在flat bucket contents中的offset。

cpp // Per-variable offset/length into the flat bucket contents tensor and grad // bucket. std::vector<size_t> offsets; std::vector<size_t> lengths; // Per-variable sizes into the grad bucekt. std::vector<c10::IntArrayRef> sizes_vec;

3.2 定義

BucketReplica 具體定義為:

```cpp // A bucket replica represents [1..N] gradients to be reduced, // with the same dtype, on the same device. // // Batching gradients together before reducing them can result in lower // overhead and/or faster time to completion. Only gradients of the same type // and on the same device can be batched. The tensor that represents the // flattened gradient uses the same type and is placed on the same device. // Buckets are filled as the gradients they hold are computed (triggered by // autograd hooks). Buckets are reduced in a predetermined order that is // identical across processes. struct BucketReplica { // Flattened (1 dimensional) contents of bucket. at::Tensor contents; // 這裡打平了

// Views into contents for each grad. Each view will be created with // layout (sizes + strides) matching the grad's expected layout // ("Gradient Layout Contract" in torch/csrc/autograd/AccumulateGrad.h). // bucket_views_in[i].copy_(grad) and // grad.copy_(bucket_views_out[i]) // provide convenient ways to move grad data in/out of contents. // The reason we keep two states for bucket_views is that if DDP // communication hook was registered, bucket_views_out could be // re-initialized with the value of hook's future_work. We still need to // keep a separate view reference to replica's original contents for // bucket_views_in[i].copy_(grad) call. std::vector bucket_views_in; // 怎麼從contents 之中查詢 std::vector bucket_views_out; // 一個輸出檢視

// Variables that contribute to this bucket replica. Use refcounted value // here so that we can easily unflatten the bucket contents into the // participating variables after reduction has completed. std::vector variables;

// Per-variable offset/length into the flat bucket contents tensor and grad // bucket. std::vector offsets; std::vector lengths;

// Per-variable sizes into the grad bucekt. std::vector sizes_vec;

// Number of tensors to be added before this bucket is complete. // This is reset to variables.size() every iteration. size_t pending;

// TODO(@pietern) // Memory copies from gradient tensors into the bucket are potentially // done on different CUDA streams. We record an event for every copy // so that we can synchronize with them prior to kicking off the reduction. // std::vector events; }; ```

目前為止,邏輯如下,如前所述,每個bucket只有 replicas[0] 有意義。

```python +-----------------------------------------------------+ +----------------------------+ | +-------+ +----------------------------------+ | | Reducer | | |Bucket | |Bucket | | | | | | | | | | | | | | | | Future future_work | | | vector buckets_ +------> | | | ... | | | | | | | | | ProcessGroup::Work work | | | | | | | | | | | | | | | | vector variable_indices | | | | | | | | | | | | | | | | vector replicas | | | | | | | | + | | | | | | | | | | | | | | | | | | | | +----------------------------+ | +-------+ +----------------------------------+ | +-----------------------------------------------------+ | | v +--------------------------------------------------------------+ | +---------------+ +----------------------------------+ | | |BucketReplica | | BucketReplica | | | | | | | | | | | | | | | | | | vector bucket_views_in | | | | | ... | | | | | | | vector bucket_views_out | | | | | | | | | | | | Tensor contents | | | | | | | | | | | | vector variables | | | | | | | | | | | | | | | +---------------+ +----------------------------------+ | +--------------------------------------------------------------+

```

3.3 初始化

部分初始化的程式碼在 Reducer::initialize_buckets 之中。

```cpp // Allocate bucket contents tensor. 分配記憶體 replica.contents = at::empty({static_cast(offset)}, options);

initialize_bucket_views(replica, replica.contents); ```

initialize_bucket_views 具體程式碼如下,這裡需要對幾個 PyTorch 函式進行說明。

  • as_strided :依據現有tensor以及給定的步長來建立一個檢視(型別仍然為tensor),與原資料共享記憶體,不儲存詩句,所以兩個view都不是真實的儲存,只是檢視。
  • narrow :返回一個新的張量,其是原來張量的縮小版。

initialize_bucket_views 主要邏輯是:

  • 遍歷replica的張量,針對每一個張量,依據其是dense還是sparse進行不同處理,最後插入到replica.bucket_views_in之中。

  • 把 replica.bucket_views_out 設定為 replica.bucket_views_in,正常應該是相等的。

  • 如果gradient_as_bucket_view_設定為true,則需要處理兩種情況:

  • 當呼叫 rebuild_buckets 重建 bucket時,initialize_bucket_view 可以在initialize_bucket內呼叫,如果grad在上一次迭代中已經定義/計算過,則需要將舊的grad複製到新的bucket_view中,並讓grad指向新的bucket_view。

  • initialize_bucket_view 也可以在構建時候在 initialize_bucket 內呼叫。在構建時間內不會定義 Grad,

    在這種情況下,不要讓梯度指向bucket_view,因為對於全域性未使用的引數,梯度應保持為未定義。

具體程式碼如下:

``cpp // (see Note: "Gradient Layout Contract" in initialize_buckets). void Reducer::initialize_bucket_views( Reducer::BucketReplica& replica, at::Tensor& contents) { for (size_t i = 0; i < replica.variables.size(); i++) { // 遍歷replica的張量 auto& v = replica.variables[i]; const auto offset = replica.offsets[i]; const auto length = replica.lengths[i]; if (v.is_non_overlapping_and_dense()) { // If the param's memory is dense, match its layout, anticipating // the autograd engine (AccumulateGrad) will also create gradients // matching its layout. replica.bucket_views_in.push_back( // dense型別 contents.as_strided(v.sizes(), v.strides(), offset)); } else { // Fall back to a C-style contiguous view, again anticipating // AccumulateGrad will do the same when stashing grads for non-dense // params. replica.bucket_views_in.push_back( // sparse型別 contents.narrow(0, offset, length).view(v.sizes())); } // By defaultbucket_views_outandbucket_views_in` are // essentially the same thing. replica.bucket_views_out = replica.bucket_views_in;

// If gradient_as_bucket_view_ is set as true, then there are two cases to
// handle: initialize_bucket_views could be called inside initialize_buckets
// when rebuild_buckets, if grad has already been defined/calculated in
// previous iteration, old grad needs to be copied into new bucket_view and
// let grad point to the new bucket_view, initialize_bucket_views could also
// be called inside initialize_buckets during construction. Grads are not
// defined during construction time, in this case, do not let grad point to
// bucket_view, because grads should be kept as being undefined for globally
// unused parameters.
if (gradient_as_bucket_view_) {
  auto& bucket_view = replica.bucket_views_in.back();
  runGradCallbackForVariable(v, [&](auto& grad) {
    if (grad.defined() && !grad.is_alias_of(bucket_view)) {
      bucket_view.copy_(grad);
      grad = bucket_view;
      // 梯度被修改,需要寫回去
      // The grad is modefied and needs to be written back.
      return true;
    }
    // 梯度沒有被修改,不需要回寫
    // The grad is not modified and does not need to be written back.
    return false;
  });
}

} } ```

具體如下圖:

```python +------------------------------------------+ | BucketReplica | | | | vector bucket_views_in +--------------------+ | | | | | | | vector bucket_views_out +--------------+ | | | | | | | | | | | v v | | +-----+----+--------------------------+ | Tensor contents +---------------------> |Flattened (Tensor1, Tensor2, Tensor3)| | | +-------------------------------------+ | | | | | vector variables +------------> [Tensor1,Tensor2,Tensor3] | | | | | | +------------------------------------------+

```

另外,mark_variable_ready_sparse, mark_variable_ready_dense, finalize_backward 都有對 contents 賦值。

0x04 查詢類

以下兩個類用來讓 autograd hook 函式確定張量對應桶。

4.1 VariableIndex

VariableIndex 就是確定某個 tensor 在某個桶中的位置。這個對於 autograd hook 有用。對於autograd hook 回撥,回撥函式所在程序只是知道自己的梯度張量,但是回撥函式需要知道這個張量位於哪個replica,以及位於replica之中哪個位置,這樣才能進一步規約。

4.1.1 成員變數

Reducer 等類的例項之中,只有一個 VariableIndex 的成員變數,這個獨立成員變數是:

cpp std::vector<VariableIndex> unused_parameters_

VariableIndex 更多是作為其他成員變數的一部分或者引數存在,比如在 Reducer 之中,gradAccToVariableMap_ 就是使用了 VaribaleIndex。

cpp std::unordered_map<torch::autograd::Node*, VariableIndex> gradAccToVariableMap_; // 存了grad_accumulator & index 的對應關係,這樣以後在 autograd graph 尋找 unused parameters 就方便了

4.1.2 定義

VariableIndex 定義如下:

```cpp // Locates a specific variable by replica index and variable index. struct VariableIndex { size_t replica_index; // 位於哪個replica size_t variable_index; // variable index,注意,不是"位於replica之中哪個位置",而是所有 varibale的index,比如一共有10個引數,variable_index 的取值是從0~9。那麼"位於replica之中哪個位置"由什麼來確定?由下面的 VariableLocator 確定。

VariableIndex() = default;

VariableIndex(size_t replica_index_, size_t variable_index_) { replica_index = replica_index_; variable_index = variable_index_; }

static size_t hash(const VariableIndex& key) { return c10::get_hash(key.replica_index, key.variable_index); } }; ```

在 Reducer 的建構函式中,有如下程式碼用於autogrid_hook的設定,這是給每個 replica 上的每個張量設定了一個 hook。如果autograd hook 不知道此梯度對應哪個 bucket,就無法告訴 DDP,這個 bucket 整體ready了。

如何找到桶?需要使用下面的 VariableLocator。

```cpp auto& variable = replicas_[replica_index][variable_index]; const auto index = VariableIndex(replica_index, variable_index); // 生成了 VariableIndex hooks_.emplace_back( grad_accumulator->add_post_hook( torch::make_unique( = {

ifndef _WIN32

                  this->rpc_context_.set(  
                      ThreadLocalDistAutogradContext::getContextPtr());

endif

                  this->autograd_hook(index); // Hook的引數是 VariableIndex,目的是為了讓 hook 可以順利找到張量
                  return outputs;
                })),
        grad_accumulator);

```

4.2 VariableLocator

4.2.1 定義

VariableLocator 用來在 bucket 之中確定一個varaible。為了找到一個張量位置,我們需要知道在哪個桶,在桶的張量之中的哪個位置。

  • 哪個桶 : bucket_indexReducer.buckets_列表的位置,表示 buckets_ 之上的一個bucket。
  • 桶副本的哪個位置 : intra_bucket_index 是在 bucket.replica 之中 vector 域的 variable index。

``cpp // A variable locator locates a particular variable in the bucket // structure. Thebucket_indexfield points to the bucket in thebuckets_// vector. Theintra_bucket_indexfield points to the index of the variable // in any of the vector fields in the bucket replica. struct VariableLocator { // Index into thebuckets_` variable. size_t bucket_index; // 哪個桶 // Index of parameter in single bucket replica. size_t intra_bucket_index; // 在桶副本的哪個位置

VariableLocator() = default;

VariableLocator(size_t bucket_index_, size_t intra_bucket_index_) { bucket_index = bucket_index_; intra_bucket_index = intra_bucket_index_; } }; ```

4.2.2 成員變數

Reducer 的成員變數為:

cpp // Map the index of a variable to its location in the bucket structure. std::vector<VariableLocator> variable_locators_;

4.2.2.1 初始化

如何初始化?

```cpp void Reducer::initialize_buckets( std::vector> bucket_indices) { // Clear current bucket assignment. buckets_.clear(); variable_locators_.clear(); // Ensure we have a bucket index for every variable. variable_locators_.resize(replicas_[0].size());

// Iterate over buckets. const auto bucket_count = bucket_indices.size(); const auto replica_count = replicas_.size(); buckets_.reserve(bucket_count);

for (size_t bucket_index = 0; bucket_index < bucket_count; bucket_index++) { // 遍歷桶
// Map participating variables to this bucket. // This is identical across replicas so we only need to do this once. size_t intra_bucket_index = 0; for (const auto variable_index : bucket_indices[bucket_index]) { // 遍歷桶裡面的張量,所有桶裡每個張量index 都是唯一的 variable_locators_[variable_index] = VariableLocator(bucket_index, intra_bucket_index++); // intra_bucket_index 就是遞加 } } } ```

問題:variable_locators_[variable_index] 在不同的桶之間,不會重複嗎?不會,因為 VariableLocator(bucket_index, intra_bucket_index++) 從定義上看,bucket_index 和 intra_bucket_index 的組合是唯一的。

我們給出一個例子。關於 tensor indices,就是給所有的tensor一個index,從0開始遞增,一直到 tensors.size()。假如模型的 parameters 一共有12個張量,則 tensor index 從 0 到 11。假如分成 6 個buckets,則在這6個buckets之中,每個 tensor index 都是唯一不重複的。

```python +-----------------------------------------------------------------------+ | | | | | | | | | | | | | | | ...... | | | | | | | | | +-----------------------------------------------------------------------+

```

這樣,對應的 variable_locators_ 是:

```python variable_locators_[tensor index 0] = VariableLocator(bucket 0, 0),即 tensor index 0 屬於 bucket 0 的 第一個variable。

variable_locators_[tensor index 1] = VariableLocator(bucket 0, 1),即 tensor index 1 屬於 bucket 0 的 第二個variable。

variable_locators_[tensor index 2] = VariableLocator(bucket 0, 2),即 tensor index 2 屬於 bucket 0 的 第三個variable。

variable_locators_[tensor index 3] = VariableLocator(bucket 0, 3),即 tensor index 3 屬於 bucket 0 的 第四個variable。 ```

4.2.2.2 使用

如何使用?我們用下面做為例子。

當 autograd hook 呼叫時候,使用 VariableIndex index 來回調,

cpp this->autograd_hook(index)

autograd_hook 最終呼叫到 mark_variable_ready_dense,這裡進而通過 variable_locators_ 來確定桶,然後進行後續操作。

```cpp void Reducer::mark_variable_ready_dense(VariableIndex index) { const auto replica_index = index.replica_index; const auto variable_index = index.variable_index; const auto& bucket_index = variable_locators_[variable_index]; // 找到張量對應的桶index auto& bucket = buckets_[bucket_index.bucket_index]; // 找到桶 auto& replica = bucket.replicas[replica_index]; // 再通過桶找到對應的 replica auto& variable = replica.variables[bucket_index.intra_bucket_index]; // 找到了張量 const auto offset = replica.offsets[bucket_index.intra_bucket_index]; // 找到了張量資訊 const auto length = replica.lengths[bucket_index.intra_bucket_index]; auto& bucket_view = replica.bucket_views_in[bucket_index.intra_bucket_index];

// 接下來就可以繼續處理了

// Copy contents of gradient tensor to bucket tensor. // If the gradient is not set, we assume it wasn't computed // as part of the current backwards pass, and zero the part // of the bucket it would otherwise hold. runGradCallbackForVariable(variable, & { if (grad.defined()) { this->check_grad_layout(grad, bucket_view); // When gradient_as_bucket_view_ is false, or even when // gradient_as_bucket_view_ is true, in rare cases users may set grad to // be None after every iteration. In these cases, grad and bucket_view are // pointing to different storages and thus need to copy grads to // bucket_view. If gradient_as_bucket_view_ is set as true, let grad point // to bucket_view. If grad has already been set as views of buckets in // previous iterations, no copy is needed. if (!grad.is_alias_of(bucket_view)) { this->copy_grad_to_bucket(grad, bucket_view); if (gradient_as_bucket_view_) { // Let grad point to bucket_view buffer. grad = bucket_view; // The grad is modified and need to be written back. return true; } } else { // If grad and bucket view point to the same storage, no need to copy if (comm_hook_ == nullptr) { bucket_view.div_(divFactor_); } } } else { bucket_view.zero_(); } // The grad is not modified and doesn't need to be written back. return false; }); }

```

0x05 累積相關類

以下是梯度累積相關類。

5.1 grad_accumulators_

grad_accumulators_ 可以認為是一個矩陣,矩陣的每個item就是一個 AccumulateGrad(Node型別),就是用來計算梯度的。目前看來,這裡只是一個bookkeeping作用。

cpp std::vector<std::vector<std::shared_ptr<torch::autograd::Node>>> grad_accumulators_;

具體如下圖,variable1 是一個實際的 張量,grad_accumulators_ 中的一個item 就指向 variable1 的 AccumulateGrad。

```c p pc p variable1 +----+ | | v +-----------------------------------+ +-------------+-----------+ |grad_accumulators_ | | Variable | | | | | | | | +------------------+ | | [replica_index][variable_index]+---------->+ AccumulateGrad | | | | | | | | | | | | | | +-----------------------------------+ | | post_hooks_+--------> autograd_hook(index) | | | | | | | | | +------------------+ | | | +-------------------------+

```

5.1.1 初始化

如何初始化?在 Reducer 構建函式之中有:

```cpp { const auto replica_count = replicas_.size();

// 以下兩個for迴圈會遍歷所有的張量
for (size_t replica_index = 0; replica_index < replica_count;
     replica_index++) {

  for (size_t variable_index = 0; variable_index < variable_count;
       variable_index++) {

    auto& variable = replicas_[replica_index][variable_index];
    const auto index = VariableIndex(replica_index, variable_index);

    // The gradient accumulator function is lazily initialized once.
    // Therefore we can use its presence in the autograd graph as
    // evidence that the parameter has participated in an iteration.

    auto grad_accumulator = // 得到一個張量的grad_accumulator
        torch::autograd::impl::grad_accumulator(variable);

    // Hook to execute after the gradient accumulator has executed.
    hooks_.emplace_back(
        grad_accumulator->add_post_hook(
            torch::make_unique<torch::autograd::utils::LambdaPostHook>(
                [=](const torch::autograd::variable_list& outputs,
                    const torch::autograd::variable_list& /* unused */) {

ifndef _WIN32

                  this->rpc_context_.set(
                      ThreadLocalDistAutogradContext::getContextPtr());

endif

                  this->autograd_hook(index);
                  return outputs;
                })),
        grad_accumulator);

    // Map raw function pointer to replica index and parameter index.
    // This is used later on when the autograd graph is traversed
    // to check for parameters for which no gradient is computed, if
    // find_unused_parameters=True.
    // Note that the mapping of gradient accumulator to variable should be
    // one to one as we deduplicate shared parameters before constructing
    // Reducer.
    if (find_unused_parameters_) {
      gradAccToVariableMap_[grad_accumulator.get()] = index;
    }

    numGradHooksTriggeredMap_[index] = 0;

    // The gradient accumulator is stored as weak_ptr in the autograd
    // metadata of the variable, so we have to keep it alive here for
    // the raw pointer to be valid.
    grad_accumulators_[replica_index][variable_index] =
        std::move(grad_accumulator); // 把這個張量的 grad_accumulator 複製到 grad_accumulators_
  }
}

} ```

5.1.2 使用

grad_accumulator 返回的是 Node,也就是 AccumulateGrad,是一個Node型別,我們取出了檢查校驗程式碼。

```cpp std::shared_ptr grad_accumulator(const Variable& self) { auto autograd_meta = get_autograd_meta(self);

std::lock_guard lock(autograd_meta->mutex_);

auto result = autograd_meta->grad_accumulator_.lock(); if (result) return result;

c10::raw::intrusive_ptr::incref(self.unsafeGetTensorImpl()); auto intrusive_from_this = c10::intrusive_ptr::reclaim(self.unsafeGetTensorImpl()); result = std::make_shared(Variable(std::move(intrusive_from_this))); autograd_meta->grad_accumulator_ = result; return result; } ```

5.2 gradAccToVariableMap_

gradAccToVariableMap_ 的定義如下:

cpp std::unordered_map<torch::autograd::Node*, VariableIndex> gradAccToVariableMap_;

作用是給每個 Node 一個對應的VariableIndex,具體如圖,下面就給 variable 1 一個 index 1:

python +--------------+ | Variable | +---> | | | | | | +--------------+ | | +-------------------------------------+ | | gradAccToVariableMap_ | | | | | | | + | <Node*, VariableIndex> +---------> [variable1 :index1, variable2 : index2] | | + | | | | | | +-------------------------------------+ | | v +---------+-----------------------------+ |VariableIndex | | | | replica_index of Variable1 | | | | variable_index of Variable1 | | | +---------------------------------------+

5.2.1 初始化

如何初始化?在 Reducer 建構函式中有如下,就是給每個需要求導的 Varaible 一個VariableIndex。

```cpp auto& variable = replicas_[replica_index][variable_index]; const auto index = VariableIndex(replica_index, variable_index); auto grad_accumulator = torch::autograd::impl::grad_accumulator(variable);

if (find_unused_parameters_) { gradAccToVariableMap_[grad_accumulator.get()] = index; } ```

5.2.2 使用

gradAccToVariableMap_ 的使用如下,search_unused_parameters 就是遍歷查詢 gradAccToVariableMap_,如果某一個accumulator 函式沒有在 gradAccToVariableMap_ 裡面,就說明不用計算梯度。

``c++ // Traverse the autograd graph starting at the specified output. // All parameters for which we have a pointer to their gradient accumulation // functions, but don't show up in the autograd graph will be marked ready for // for reduction as soon as the first autograd hook is called. This is not // done immediately because the model output may be ignored, and we only // want to start performing reductions ontorch.autograd.backward()`. void Reducer::search_unused_parameters( const std::vector& outputs) { std::unordered_set seen; std::vector queue;

// Seed queue with the grad functions of all outputs. for (const auto& output : outputs) { const auto& grad_fn = output.grad_fn(); if (grad_fn) { queue.push_back(grad_fn.get()); } }

// Traverse the autograd graph starting at the specified output. while (!queue.empty()) { auto fn = queue.back(); queue.pop_back(); for (const auto& edge : fn->next_edges()) { if (auto next_ptr = edge.function.get()) { const bool was_inserted = seen.insert(next_ptr).second; if (was_inserted) { queue.push_back(next_ptr); } } } }

// 遍歷查詢,如果某一個accumulator 函式沒有在這圖裡面,就說明不用計算梯度 // Find accumulator functions that don't show up in this graph. for (const auto& it : gradAccToVariableMap_) { // If the accumulator function is present in the graph, we know // a gradient will be computed for the corresponding parameter. if (seen.count(it.first) == 0) { unused_parameters_.push_back(it.second); } } } ```

5.3 numGradHooksTriggeredMap_

記錄在本張量的梯度就緒之前,該張量的 autograd_hook 應該被呼叫幾次。第一次迭代之後,不再增加,所以這個數值應該就是1或者0。用來設定 unused_parameters_ 和 配置 numGradHooksTriggeredMapPerIteration_。

cpp // Key: VariableIndex, Value: the number of times that a variable's autograd_hook() // should be triggered before marking this variable's grad as ready for communication. // Map will not change after 1st iteration. std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMap_;

5.3.1 初始化

如何初始化?在構建函式之中有:

cpp numGradHooksTriggeredMap_[index] = 0;

第一次迭代之後,後續呼叫 autogrid_hook 就遞增加一。

``cpp // The functionautograd_hook` is called after the gradient for a // model parameter has been accumulated into its gradient tensor. // This function is only to be called from the autograd thread. void Reducer::autograd_hook(VariableIndex index) {

// 省略部分程式碼

if (static_graph_first_iteration()) { numGradHooksTriggeredMap_[index] += 1; // 靜態圖第一次迭代時候,這裡會增加1 return; // 然後直接返回,注意! }

// If find_unused_parameters_ is true there may be model parameters that // went unused when computing the model output, they won't be part of the // autograd graph, and won't receive gradients. These parameters are // discovered in the prepare_for_backward function and their indexes stored // in the unused_parameters_ vector. if (!has_marked_unused_parameters_) { has_marked_unused_parameters_ = true; for (const auto& unused_index : unused_parameters_) { mark_variable_ready(unused_index); } }

// If it is static graph, after 1st iteration, check a avariable // is ready for communication based on numGradHooksTriggeredMap_. if (static_graph_after_first_iteration()) { if (--numGradHooksTriggeredMapPerIteration_[index] == 0) { // Finally mark variable for which this function was originally called. mark_variable_ready(index); // } } else { // Finally mark variable for which this function was originally called. mark_variable_ready(index); } } ```

5.3.2 使用

如何使用?這裡會reset。

```cpp void Reducer::reset_bucket_counting() { next_bucket_ = 0; // Reset num_buckets_ready_ at the beginning of backward computation // in each iteration. num_buckets_ready_ = 0;

for (auto& bucket : buckets_) { for (auto& replica : bucket.replicas) { replica.pending = replica.variables.size(); } bucket.pending = bucket.replicas.size(); }

if (static_graph_) { numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_; } } ```

這裡也會進行處理。如果為0,則插入unused_parameters_。

```cpp // Right now delay_all_reduce is only called when static_graph_=true and // num_iterations_==1. void Reducer::delay_all_reduce() {

// 省略部分程式碼

// copy all gradients to buckets for (size_t replica_index = 0; replica_index < replicas_.size(); replica_index++) { for (size_t variable_index = 0; variable_index < replicas_[replica_index].size(); variable_index++) { const auto index = VariableIndex(replica_index, variable_index); // set unused_parameters_ if (numGradHooksTriggeredMap_[index] == 0) { // 如果為0,則插入unused_parameters_ unused_parameters_.push_back(index); } require_finalize_ = true; set_divide_factor(); if (expect_sparse_gradients_[replica_index][variable_index]) { mark_variable_ready_sparse(index); } else { mark_variable_ready_dense(index); } } }

// launch all reduces for all buckets for (auto & bucket : buckets_) { all_reduce_bucket(bucket); }

finalize_backward(); } ```

5.4 numGradHooksTriggeredMapPerIteration_

在本張量的梯度就緒之前,該張量的 autograd_hook 還需要被呼叫幾次。如果為0,就說明這個桶應該整體就緒了。

本成員變數是使用 numGradHooksTriggeredMap_ 來重置

cpp // Key: VariableIndex, Value: the number of times that a variable's autograd_hook() // are left to be triggered before marking this variable's grad as ready for communication. // Map will change after 1st iteration to track a grad is ready for communication or not. std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMapPerIteration_;

5.4.1 使用

如何使用?在靜態圖情況下,如果不是第一次迭代(此時剛剛產生梯度),就會把 numGradHooksTriggeredMapPerIteration_[index] 遞減,如果為0,就說明該變數就緒,可以進行集合操作梯度規約了。

``cpp // The functionautograd_hook` is called after the gradient for a // model parameter has been accumulated into its gradient tensor. // This function is only to be called from the autograd thread. void Reducer::autograd_hook(VariableIndex index) {

// 省略其他程式碼

// If it is static graph, after 1st iteration, check a avariable // is ready for communication based on numGradHooksTriggeredMap_. if (static_graph_after_first_iteration()) { if (--numGradHooksTriggeredMapPerIteration_[index] == 0) { // Finally mark variable for which this function was originally called. mark_variable_ready(index); } } else { // Finally mark variable for which this function was originally called. mark_variable_ready(index); } } ```

當新一次迭代時候,會重置這個值,prepare_for_backward 會呼叫到 reset_bucket_counting。

而且是使用 numGradHooksTriggeredMap_ 來重置

```cpp void Reducer::reset_bucket_counting() { next_bucket_ = 0; // Reset num_buckets_ready_ at the beginning of backward computation // in each iteration. num_buckets_ready_ = 0;

for (auto& bucket : buckets_) { for (auto& replica : bucket.replicas) { replica.pending = replica.variables.size(); } bucket.pending = bucket.replicas.size(); }

if (static_graph_) { numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_; } } ```

具體邏輯我們展示一下:

  • 對於 張量 2,就沒有使用過,所以 delay_all_reduce 方法 之中直接放入到未使用引數。
  • 對於 張量 1:
  • numGradHooksTriggeredMap_ 初始化是 0。
  • 第一次迭代之後變成 1。
  • 後向傳播時候,呼叫 prepare_for_backward 和 reset_bucket_counting,把 numGradHooksTriggeredMap_賦值給 numGradHooksTriggeredMapPerIteration_
  • autograd_hook 之中會遞減,然後如果是 0,就設定此變數為 ready,可以規約了。

```cpp Variable 2

                                 delay_all_reduce

numGradHooksTriggeredMap_[2] = 0 +---------------> unused_parameters_.push_back(0)

+----------------------------------------------------------------------------------------+

Variable 1

numGradHooksTriggeredMap_[1] = 0

               +
               |
               |  first_iteration
               |
               v

numGradHooksTriggeredMap_[1] = 1

               +
               |  prepare_for_backward
               |
               |  reset_bucket_counting
               v

numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_ + | | | backward | | autograd_hook v YES if (++numGradHooksTriggeredMapPerIteration_[index]=== 0)?? +-------> mark_variable_ready(1) + | NO | v

```

5.5 perIterationReadyParams_

每個迭代之中,perIterationReadyParams_ 表示就緒的引數。

cpp // Per iteration set of parameter indices that have been marked ready. std::unordered_set<size_t> perIterationReadyParams_;

5.5.1 設定

就是如果某個variable是就緒狀態,就插入到 perIterationReadyParams_。

```cpp void Reducer::mark_variable_ready(VariableIndex index) {

if (should_rebuild_buckets()) { push_rebuilt_params(index); }

const auto replica_index = index.replica_index; const auto variable_index = index.variable_index;

if (replica_index == 0) { checkAndRaiseMarkedTwiceError(variable_index); perIterationReadyParams_.insert(variable_index); } } ```

5.5.2 重置

在反向傳播之前,會重置這個變數。

```cpp void Reducer::prepare_for_backward( const std::vector& outputs) {

// Reset per iteration marked ready parameters. perIterationReadyParams_.clear();

} ```

5.5.3 使用

就是遍歷perIterationReadyParams_,如果沒找到,就返回。

在 rebuild_buckets 方法中會呼叫 ensure_prior_reduction_finished,裡面會呼叫這兩個方法來校驗。

```cpp std::vector Reducer::getUnmarkedParamsForIteration() { std::vector unMarkedParamNames; for (const auto& it : param_names_) { if (perIterationReadyParams_.find(it.first) == perIterationReadyParams_.end()) { unMarkedParamNames.push_back(it.second); } } return unMarkedParamNames; }

std::vector Reducer::getUnmarkedParamIndicesForIteration() { std::vector unmarked_param_indices; const auto variable_count = replicas_[0].size(); for (size_t variable_index = 0; variable_index < variable_count; variable_index++) { if (perIterationReadyParams_.find(variable_index) == perIterationReadyParams_.end()) { unmarked_param_indices.push_back(variable_index); } } return unmarked_param_indices; } ```

5.6 使用過的引數

以下兩個變數用來記錄本地使用過的引數,其標示在未啟用同步的情況下(no_sync is on),在當前迭代或者 no_sync session 之中,這些引數是否在本地被使用過。

每個模型副本對應map中的一個張量,每個張量是引數數量的一維int32(one-dim int32)張量。

這些張量在autograd_hook中標記,以指示已使用了相應的引數。這些張量會在當前迭代或無同步會話(no_sync session)的後向傳播結束時進行allreduce,以計算出全域性未使用的引數。

cpp // Locally used parameter maps indicating if parameters are used locally // during the current iteration or no_sync session if no_sync is on. One // tensor for each model replica and each tensor is one-dim int32 tensor of // number of parameters. These tensors are marked in autograd_hook to indicate // the corresponding param has been used, and get allreduced in the end of // backward of current iteration or no_sync session for figuring out the // globally unused parameters. // // local_used_maps_: CPU tensors for bookkeeping locally used params // local_used_maps_dev_: dev tensors for reducing globally unused params std::vector<at::Tensor> local_used_maps_; // autograd_hook中會設定,對應論文中的 std::vector<at::Tensor> local_used_maps_dev_; // GPU

5.6.1 論文

此處可以結合論文看看。

全域性未使用引數(Globally Unused Parameters)的梯度在向前和向後過程中應保持不變。檢測未使用的引數需要全域性資訊,因為在一個DDP過程中,一個引數可能在一次操作中不存在,但可能在另一個過程的同一次迭代中參與訓練。因此DDP在點陣圖中維護本地未使用的引數資訊,並啟動額外的AllReduce以收集全域性點陣圖。由於點陣圖比張量尺寸小得多,因此模型中的所有引數共享同一點陣圖,而不是建立每桶點陣圖(per-bucket bitmaps)。點陣圖位於CPU上,以避免為每次更新啟動專用CUDA核心。但是,某些ProcessGroup後端可能無法在CPU 張量上執行AllReduce。例如,ProcessGroupNCCL僅支援CUDA張量。此外,由於DDP應該與任何定製的ProcessGroup後端一起工作,它不能假設所有後端都支援CPU張量。為了解決這個問題,DDP在同一裝置上維護另一個位圖作為第一個模型引數,並呼叫非阻塞拷貝操作(non-blocking copy)將CPU點陣圖移動到裝置點陣圖以進行集合通訊

5.6.2 初始化

初始化函式如下:

```cpp void Reducer::initialize_local_used_map() { const auto replica_count = replicas_.size(); const auto variable_count = replicas_[0].size(); local_used_maps_.resize(replica_count); local_used_maps_dev_.resize(replica_count);

for (size_t i = 0; i < replica_count; i++) { at::TensorOptions options; options = options.dtype(at::kInt);

// Deliberately don't pin the memory even if local_used_maps_dev_ will
// be cuda. See Note [local_used_maps_ -> local_used_maps_dev copying]
local_used_maps_[i] =
    at::zeros({static_cast<long>(variable_count)}, options);

// This tensor needs to be on the same device as replica because backend
// such as NCCL may not support CPU tensors, and hence it might not work
// if we always put it on CPU.
options = options.device(replicas_[i][0].device());
local_used_maps_dev_[i] =
    at::empty({static_cast<long>(variable_count)}, options);

} } ```

5.6.3 重置

finalize_bucket_dense 和 finalize_backward 都會重置。

cpp void Reducer::finalize_backward() { if (dynamic_graph_find_unused()) { // Reset unused parameter accounting. // See Note [local_used_maps_ -> local_used_maps_dev copying] for (auto& local_used : local_used_maps_) { local_used.fill_(0); } local_used_maps_reduced_ = false; }

5.6.4 設定

autograd_hook 之中如果使用了,就設定為1

```cpp void Reducer::autograd_hook(VariableIndex index) {

// 在這裡會記錄,已經使用了。
// See Note [Skip allreducing local_used_maps_dev] if (dynamic_graph_find_unused() || static_graph_first_iteration()) { // Since it gets here, this param has been used for this iteration. We want // to mark it in local_used_maps_. During no_sync session, the same var can // be set multiple times, which is OK as does not affect correctness. As // long as it is used once during no_sync session, it is marked as used. local_used_maps_[index.replica_index][index.variable_index] = 1; } ```

5.6.5 使用

在 mark_variable_ready 時候會呼叫到 all_reduce_local_used_map,如果需要同步,這裡進行同步。我們還是翻譯一下注釋:

  • DDP 用非同步H2D來避免阻塞開銷。非同步複製和allreduce 會著眼於當前流,因此將正確排序。

  • 關於主機操作的正確順序也很重要。H2D copy_ 是按流排序的,而主機對 local_used_maps_ 的更改是按主機排序的。

  • 如果大量積壓的cuda流工作將 copy_ 操作推遲到將來,並且如果從現在到finalize_backward 之間沒有發生阻塞呼叫,那麼finalize_backward 會在流執行復制之前將主機上使用的本地對映重新歸零,在這種情況下,copy_會讀取到這些零,而不是我們在這裡告訴它讀取的值。

  • 將 local_used_maps_[i] 複製到pinned臨時記憶體(固定的快取分配器應該非同步提供)可以避免這種惡劣的、罕見的爭用情況。

  • 在希望使用所有引數的情況下,從現在到重新調零,DDP本身不會做任何阻塞工作,因此這種危險情況是真實存在的。

  • 所以,Reducer 採用防禦性操作,以確保 local_used_maps_tmp 與local_used_maps_[i] 不同。

cpp void Reducer::all_reduce_local_used_map() { // See Note [Skip allreducing local_used_maps_dev] // H2D from local_used_maps_ to local_used_maps_dev_ for (size_t i = 0; i < local_used_maps_.size(); i++) { if (local_used_maps_dev_[i].is_cuda()) { // Note [local_used_maps_ -> local_used_maps_dev copying] // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // We do async H2D to avoid the blocking overhead. The async copy and // allreduce respect the current stream, so will be sequenced // correctly. // // Correct sequencing with respect to host operations is also // essential. The H2D copy_ is stream ordered, while the host's // changes to local_used_maps_ are host ordered. If a large backlog of // cuda-stream work pushes the copy_ far into the future, and if no // blocking calls occur between now and finalize_backward()** such // that finalize_backward() re-zeroes local_used_maps_ on the host // before the stream executes the copy_, copy_ will read those zeros // instead of the values we thought we told it to read here. Copying // local_used_maps_[i] to a pinned temporary (which the pinned caching // allocator should supply asynchronously) avoids this nasty, rare // race condition. // // ** In the hoped-for case where all params are used, DDP itself // won't do any blocking work between now and the re-zeroing, so the // danger is real. // // Defensively ensures local_used_maps_tmp is distinct from // local_used_maps_[i] auto local_used_maps_tmp = at::native::empty_like( local_used_maps_[i], optTypeMetaToScalarType(local_used_maps_[i].options().dtype_opt()), local_used_maps_[i].options().layout_opt(), local_used_maps_[i].options().device_opt(), true /* pinned_memory */); // Paranoid asserts here because in some workloads, the pinned // allocator behaves in a way we don't understand, and may be bugged. // See https://github.com/pytorch/pytorch/pull/54474 TORCH_INTERNAL_ASSERT(local_used_maps_tmp.is_pinned()); TORCH_INTERNAL_ASSERT( local_used_maps_tmp.data_ptr() != local_used_maps_[i].data_ptr()); local_used_maps_tmp.copy_(local_used_maps_[i]); local_used_maps_dev_[i].copy_(local_used_maps_tmp, true); } else { local_used_maps_dev_[i].copy_(local_used_maps_[i], true); } } local_used_work_ = process_group_->allreduce(local_used_maps_dev_); }

5.7 計算梯度支撐類

我們接下來分析一些計算梯度所涉及到的基本函式和支撐類。

5.7.1 RpcContext

該類用來封裝 distributed::autograd::ContextPtr。

```cpp struct RpcContext { using ContextPtr = torch::distributed::autograd::ContextPtr; // The shared_ptr is to hold the context instance. ContextPtr context_ptr_holder; std::atomic context_ptr{nullptr};

void set(ContextPtr&& new_context_ptr); }; RpcContext rpc_context_; ```

5.7.2 hooks_

其作用就是保持了 autograd hook,也是起到了bookkeeping 作用。

cpp std::vector<std::pair<uintptr_t, std::shared_ptr<torch::autograd::Node>>> hooks_;

初始化如下:

```cpp // Hook to execute after the gradient accumulator has executed. hooks_.emplace_back( grad_accumulator->add_post_hook( torch::make_unique( = {

ifndef _WIN32

                  this->rpc_context_.set(
                      ThreadLocalDistAutogradContext::getContextPtr());

endif

                  this->autograd_hook(index);
                  return outputs;
                })),
        grad_accumulator);

```

5.7.3 comm_hook_

5.7.3.1 概念

我們通過 [DDP Communication Hook] 來看看概念。

DDP通訊鉤子是一種增強功能,它提供了一個鉤子,其可用於覆蓋DDP來進行跨rank梯度通訊,這可用於梯度壓縮/GossipGrad等演算法。可以使用Python API register_comm_hook來註冊鉤子函式。

如果未註冊DDP通訊鉤子(DDP communication hook),則reducer只需呼叫allreduce即可對桶進行規約。如果註冊了,則會呼叫鉤子並使用future work handle來處理。如果註冊,reducer也會跳過"將梯度除以世界大小(world size)" 這個步驟。這樣做的目的是:通訊鉤子可以完全覆蓋我們執行通訊的方式,使用者可以完全控制如何處理梯度。

PythonCommHookCommHookInterface的子類,其可以註冊一個 Python 鉤子。此外,還有一些內建的C++鉤子實現,可以通過呼叫Python API register_builtin_comm_hook來指定。

cpp // Note [DDP Communication Hook] // ~~~~~~~~~~~~~~~~~~~~~~~~~~ // If DDP communication hook is not registered, the reducer reduces the buckets // by just calling allreduce. If registered, it calls the hook and uses future // work handle. If registered, reducer also skips dividing grads by world size. // The reason for this is that the communication hook is expected to completely // override how we perform communication and the user should have complete // control over how the grads are handled. // // DDP communication hook is an enhancement that provides a hook which can be // used to override how DDP communicates gradients across ranks, this can be // used for algorithms like Gradient Compression/GossipGrad. This hook can be // registered from Python API using `register_comm_hook`. `PythonCommHook` // enables registering a Python hook and is a subclass of `CommHookInterface`. // Additionally, there are also some built-in C++ hook implementations that can // be specified by calling `register_builtin_comm_hook` from Python API.

5.7.3.2 使用

我們通過 torch/distributed/algorithms/ddp_comm_hooks/default_hooks.py 來看看。

下面的 hook 就是在 all-reduce 前後進行自己的特殊處理。如果使用這個 hook,就使用 ddp_model.register_comm_hook(process_group, fp16_compress_hook)。

`python def fp16_compress_hook( process_group: dist.ProcessGroup, bucket: dist.GradBucket ) -> torch.futures.Future: """ This DDP communication hook implements a simple gradient compression approach that castsGradBuckettensors to half-precision floating-point format (torch.float16) and then divides it by the process group size. It allreduces thosefloat16gradient tensors. Once compressed gradient tensors are allreduced, the chained callbackdecompresscasts it back to the input data type (such asfloat32``).

Example::
    >>> ddp_model.register_comm_hook(process_group, fp16_compress_hook)
"""
group_to_use = process_group if process_group is not None else dist.group.WORLD
world_size = group_to_use.size()

compressed_tensor = bucket.get_tensor().to(torch.float16).div_(world_size)

fut = dist.all_reduce(
    compressed_tensor, group=group_to_use, async_op=True
).get_future()

def decompress(fut):
    decompressed_tensor = bucket.get_tensor()
    # Decompress in place to reduce the peak memory.
    # See: https://github.com/pytorch/pytorch/issues/45968
    decompressed_tensor.copy_(fut.value()[0])
    return [decompressed_tensor]

return fut.then(decompress)

```

5.7.4 runGradCallbackForVariable

mark_variable_ready_dense 函式會呼叫到 runGradCallbackForVariable。

5.7.4.1 Reducer

Reducer的runGradCallbackForVariable如下,其呼叫 distributed::autograd::ContextPtr.runGradCallbackForVariable 來處理。

```cpp void Reducer::runGradCallbackForVariable( at::Tensor& variable, GradCallback&& cb) { // 載入rpc context auto context_ptr = rpc_context_.context_ptr.load(); if (context_ptr == nullptr) { cb(variable.mutable_grad()); } else { // Under distributed autograd

ifndef _WIN32

// 下面分析
context_ptr->runGradCallbackForVariable(variable, std::move(cb));

endif

} } ```

5.7.4.2 DistAutogradContext

我們順著來到 DistAutogradContext

它會在累積的梯度之中,在 accumulatedGrads_ 之中找到張量 對應的梯度 grad,然後用傳入的回撥函式來處理梯度grad,最後把處理後的梯度拷貝回accumulatedGrads_。這樣就從 hook獲取梯度 開始,到傳回規約之後的梯度結束,完成了一個閉環

cpp void DistAutogradContext::runGradCallbackForVariable( const torch::autograd::Variable& variable, GradCallback&& cb) { torch::Tensor grad; { std::lock_guard<std::mutex> guard(lock_); auto it = accumulatedGrads_.find(variable); // 找到張量 對應的梯度 grad TORCH_INTERNAL_ASSERT( it != accumulatedGrads_.end(), "The grad for the variable should exist in dist_autograd context."); grad = it->value(); } if (cb(grad)) { // 用傳入的回撥函式來處理梯度grad std::lock_guard<std::mutex> guard(lock_); auto device = grad.device(); // Needs to update the grad in the map. accumulatedGrads_.insert_or_assign(variable, std::move(grad)); //最後把處理後的梯度拷貝回accumulatedGrads_ recordGradEvent(device); } }

DistAutogradContext 的 accumulatedGrads_會記錄張量對應的當前梯度。

cpp // DistAutogradContext which stores information for a single distributed // autograd pass on a worker. class TORCH_API DistAutogradContext { public: // Gradients accumulated in this context so far. The key is the variable on // which the gradient needs to be accumulated and the value is the gradient // that needs to be accumulated on that variable.. c10::Dict<torch::Tensor, torch::Tensor> accumulatedGrads_; }

至此,我們初步介紹了一些基本類,下一章繼續介紹(實在是太多了......)。

0xEE 個人資訊

★★★★★★關於生活和技術的思考★★★★★★

微信公眾賬號:羅西的思考

0xFF 參考

pytorch分散式系列3——分散式訓練時,torch.utils.data.distributed.DistributedSampler做了什麼?

pytorch分散式系列1——搞清torch.distributed.launch相關的環境變數

pytorch分散式系列2——DistributedDataParallel是如何做同步的?

pytorch(分散式)資料並行個人實踐總結——DataParallel/DistributedDataParallel

Pytorch的nn.DataParallel

https://discuss.pytorch.org/t/dataparallel-imbalanced-memory-usage/22551/20

https://pytorch.org/docs/stable/distributed.html

PyTorch 原始碼解讀之分散式訓練了解一下?

實操教程|PyTorch AutoGrad C++層實現

PYTORCH 自動微分(一)

PyTorch如何加速資料並行訓練?分散式祕籍大揭祕

pytorch分散式訓練(二init_process_group)

https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

https://pytorch.org/docs/master/notes/ddp.html

https://pytorch.org/tutorials/intermediate/dist_tuto.html

PyTorch 原始碼解讀之 DP & DDP:模型並行和分散式訓練解析

Pytorch模型中的parameter與buffer

「其他文章」