OneFlow源碼解析:Eager模式下的設備管理與併發執行

語言: CN / TW / HK


作者|鄭建華
更新|趙露陽


通過這篇筆記,希望能初步瞭解 OneFlow 在 Eager 模式下對設備的管理方式、設備執行計算的過程以及如何充分利用設備計算能力。
這裏的設備主要指類似 CUDA 這樣的並行計算加速設備。


1

設備、流相關類型及關係


框架通過流(Stream)向設備(Device)提交計算任務。一個 Stream 是一個命令序列,可以類比 CUDA Stream(http://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#streams),或者 CPU Thread 的指令序列。同一個 Stream 中的命令按順序執行;不同 Stream 之間的命令有依賴關係時,需要同步。不同的任務,比如 kernel 計算、host2device、device2host 等都有自己獨立的 Stream,可以併發執行,從而在 Eager 模式下儘可能充分利用設備的異步併發執行能力。


OneFlow 中Device和Stream相關的部分類結構如下所示:


Device相關類型


oneflow::Device


oneflow::Device是用於表示設備的基礎類型,例如:構建tensor時 flow.tensor(shape, device="cuda:1")就會在內部構造出這個基礎的Device類型,其中設備編號為1、設備類型為CUDA。


oneflow/core/framework/device.h:


  
  
  
class Device final { public:  ...
private: Device(const std::string& type, int64_t device_id); Maybe<void> Init();
const std::string type_; DeviceType enum_type_; const int64_t device_id_; const size_t hash_value_; std::shared_ptr<MemoryCase> mem_case_;};


oneflow::Device中最重要的兩個成員變量分別是用於表示設備類型的DeviceType用於表示設備編號的device_id_


DeviceType


DeviceType是一個枚舉類,不同的值代表不同的計算設備類型,其定義位於 oneflow/core/common/device_type.proto


  
  
  
enum DeviceType {  kInvalidDevice = 0;  // 無效設備  kCPU = 1;            // cpu設備  kCUDA = 2;           // cuda設備  kMockDevice = 3;     // pseudo device for test.}


目前在oneflow master分支中,主要有kCPU表示cpu設備;kCUDA表示nvidia cuda設備;在其他多設備支持的分支中,這裏還增加了更多的設備類型。


oneflow::ep::Device


oneflow::Device是oneflow中對設備的基礎封裝類型,而oneflow::ep::Device則是一個抽象類,屬於oneflow的ep模塊(execution provider),是對設備行為的封裝,ep模塊為多硬件設備提供了更高層次的抽象,方便oneflow支持和兼容多硬件設備提供了更高的靈活性和可拓展性


oneflow::ep::Device不僅提供了表示設備類型的device_type()方法、表示設備編號的device_index()方法,還提供了創建/銷燬ep::Stream、創建/銷燬Event、在設備上申請/釋放內存的各種方法。

oneflow/core/ep/include/device.h


class Device { public:  OF_DISALLOW_COPY_AND_MOVE(Device);  Device() = default;  virtual ~Device() = default;
virtual void SetAsActiveDevice() = 0;
virtual DeviceType device_type() const = 0; virtual size_t device_index() const = 0; virtual DeviceManager* device_manager() const = 0;
virtual Stream* CreateStream() = 0; virtual void DestroyStream(Stream* stream) = 0;
virtual Event* CreateEvent(); virtual void DestroyEvent(Event* event); virtual void CreateEvents(Event** events, size_t count) = 0; virtual void DestroyEvents(Event** events, size_t count) = 0;
virtual Maybe<void> Alloc(const AllocationOptions& options, void** ptr, size_t size) = 0; virtual void Free(const AllocationOptions& options, void* ptr) = 0; virtual Maybe<void> AllocPinned(const AllocationOptions& options, void** ptr, size_t size) = 0; virtual void FreePinned(const AllocationOptions& options, void* ptr) = 0; virtual bool IsStreamOrderedMemoryAllocationSupported() const;};


oneflow::ep::Device有如下子類實現:



Stream相關類型


oneflow::Stream和cuda device以及stream的關係類似,oneflow中也存在類似的基礎Stream類型。


oneflow/core/framework/stream.h


  
  
  
class Stream final { .... private:  Stream(Symbol<Device> device, StreamType stream_type, size_t thread_uid);
static Maybe<Symbol<Stream>> RawNew(Symbol<Device> device, StreamType stream_type, size_t thread_uid);
Maybe<void> Init(size_t unique_stream_id);
Symbol<Device> device_; StreamType stream_type_; size_t thread_uid_; size_t unique_stream_id_;};


可以看見Stream類中的成員變量:


  • device_  表示該Stream對象將在何種設備上執行
  • streamtype_  表示該Stream的類型,是用於計算的compute stream還是用於數據搬運的host2device、device2host stream等
  • threaduid_ 表示負責啟動該Stream的線程id
  • unique_streamid_ 表示這個stream自身的unique id


StreamType


DeviceType 分為kCpu和kCuda類似, Stream 也有各種類型之分,具體如下:

oneflow/core/common/stream_type.h

  
  
  
enum class StreamType {    kInvalid = 0,    // 無效    kCompute,        // kernel計算流    kHost2Device,    // 數據搬運(host -> device)流    kDevice2Host,    // 數據搬運(device -> host)流    kCcl,            // 集合通信流    kBarrier,        // 線程屏障流    kCriticalSection,// 臨界區流    kLazyJobLauncher,// job啟動流(lazy mode)    kPinnedCompute   // pinned memory kernel計算流};

oneflow::ep::Stream


oneflow中的ep模塊提供了一個更高層次的對Stream的抽象類,除了可以獲取設備的 device() 、獲取設備類型的 device_type() 方法外,還提供了一系列虛方法如:

  • 同步Sync()
  • 執行Event事件RecordEvent()

oneflow/core/ep/include/stream.h

  
  
  
class Stream { public:  OF_DISALLOW_COPY_AND_MOVE(Stream);  Stream() = default;  virtual ~Stream() = default;
virtual DeviceType device_type() const = 0; virtual Device* device() const = 0; virtual Maybe<void> Sync() = 0; virtual void RecordEvent(Event* event) = 0; virtual Maybe<void> GetAsyncError() { return Maybe<void>::Ok(); }
virtual Maybe<void> AllocAsync(void** ptr, size_t size) { UNIMPLEMENTED_THEN_RETURN(); } virtual Maybe<void> FreeAsync(void* ptr) { UNIMPLEMENTED_THEN_RETURN(); } template<typename T> Maybe<void> AllocAsync(T** ptr, size_t size) { return AllocAsync(reinterpret_cast<void**>(ptr), size); }
virtual Maybe<void> OnExecutionContextSetup() { return Maybe<void>::Ok(); } virtual Maybe<void> OnExecutionContextTeardown() { return Maybe<void>::Ok(); }
template<typename T> T* As() { return static_cast<T*>(this); }};

oneflow::ep::Stream 有如下子類實現:


oneflow::vm::Stream


oneflow vm(virtual machine)中的 oneflow::vm::Stream 類型,用於vm內部維護stream極其依賴關係、StreamPolicy、調度線程等。

oneflow/core/vm/stream.h

  
  
  
class Stream final : public intrusive::Base { public:  ... private:  ...  // fields  ThreadCtx* thread_ctx_;  Symbol<Device> device_;  StreamType stream_type_;  std::shared_ptr<StreamPolicy> stream_policy_;  bool on_scheduler_thread_;  std::unique_ptr<char, std::function<void(char*)>> small_pinned_mem_ptr_;  ...};

StreamPolicy


StreamPolicy 是oneflow vm中獨有的概念,提供了一系列虛方法如:

  • stream() 獲取oneflow::ep::Stream指針

  • mut_allocator() 獲取vm::Allocator指針(用於tensor內存管理)

  • device_type() 獲取device設備類型


除此之外,提供了一系列vm相關的指令狀態初始化、查詢、刪除等方法。

oneflow/core/vm/stream_policy.h

  
  
  
class StreamPolicy { public:  virtual ~StreamPolicy() = default;
virtual ep::Stream* stream() = 0; virtual vm::Allocator* mut_allocator() = 0; virtual DeviceType device_type() const = 0;
virtual void InitInstructionStatus(const Stream& stream, InstructionStatusBuffer* status_buffer) const = 0; virtual void DeleteInstructionStatus(const Stream& stream, InstructionStatusBuffer* status_buffer) const = 0; virtual bool QueryInstructionStatusLaunched( const Stream& stream, const InstructionStatusBuffer& status_buffer) const = 0; virtual bool QueryInstructionStatusDone(const Stream& stream, const InstructionStatusBuffer& status_buffer) const = 0; virtual bool OnSchedulerThread(StreamType stream_type) const; virtual bool SupportingTransportInstructions() const = 0;
void RunIf(Instruction* instruction) const;
protected: StreamPolicy() = default;
private: virtual void Run(Instruction* instruction) const = 0;};

StreamPolicy有如下子類實現:


2

Eager Local模式下的Device和Stream推導


下面,梳理一下普通的eager模式(eager local mode)下,算子執行全過程中device和stream相關的推導流程。

2.1 推導Device


首先,對於一個算子(op)來説,要為其設置一個默認的device用於實際計算,這一步在:

Symbol<Device> default_device = JUST(GetDefaultDevice(inputs, ctx))

這裏 GetDefaultDevice 的邏輯是:

  • 1.如果inputs tensor非空,則根據第一個input tensor的device來設置default的device
  • 2.如果inputs tensor為空,則優先從OpExprInterpContext中獲取device,若OpExprInterpContext中未設置,則會通過
    Device::New("cpu")
    ;默認給一個cpu device

值得説明的是,在1.種情況時,如果input tensor創建時指定了device為cuda設備,則這裏推導出的default device同樣為相同的cuda device;如果未顯示指定,則默認還是cpu device。

2.2 推導Stream


oneflow::Stream的推導主要在:

  • JUST(user_op_expr.mut_local_tensor_infer_cache()->GetOrInfer(infer_args))
    );
    Symbol<Stream> stream = JUST(InferDeviceAndStream(... ));

InferDeviceAndStream
中, Stream 推導的邏輯是會根據user_op_expr是否定義了device_and_stream_infer_fn而有所區別

  • (少數情況)如果該op定義了推導函數,則調用此推導函數來推導Stream,例如 tensor.cuda() 法,inputs 在 CPU 上, output s 在 CUDA,二者的設備類型不同。 這時就不會默認推導而是利用 op 註冊的推導函數獲取 oneflow::Stream(
    例如 CopyOp::InferDeviceA ndStream
  • (多數情況)否則會通過
    stream = JUST(GetDefaultStreamByDevice(default_device))
    ; 來推導。

GetDefaultStreamByDevice 的具體實現:

  
  
  
Maybe<Symbol<Stream>> RawGetDefaultStreamByDevice(Symbol<Device> device) {  return Stream::New(device, StreamType::kCompute);}

可以看見,根據傳入的 device StreamType::kCompute ,new了一個 oneflow::Stream

2.3 InstructionsBuilder::Call和vm::Stream推導


在上述device和stream推導完成後,會通過InstructionsBuilder調用Call方法

  
  
  
JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> {    return builder->Call(kernel, std::move(input_eager_blob_objects),                         std::move(output_eager_blob_objects), ctx, result->stream());}));

Call方法中會通過

  • JUST(SoftSyncStream(output_eager_blob_objects, stream)) ;
  • JUST(SoftSyncStream(input_eager_blob_objects, stream)) ;
  • auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream)) ;

完成outputs inputs tensor的流同步(SoftSyncStream)過程以及 vm::Stream 的推導,然後通過構造 OpCallInstructionPolicy 指令派發至vm執行。

SoftSyncStream的同步這裏省略,具體過程見第4節。

2.3.1 構造 ThreadCtx 對象,啟動執行指令的線程


ThreadCtx 對象指針保存在 VirtualMachine 的 HashMap  中。每個 DeviceType (CPU或CUDA)對應一個 ThreadCtx 對象;臨界區和 LazyJob  有自己的 ThreadCtx 對象

首次訪問 HashMap  時得到的是零值(空指針),需要調用 CreateThreadCtx  創建對象。實際通過虛擬機指令創建對象 ,ThreadCtx 對象保存在 VirtualMachineEngine::thread_ctx_list_ 中

ThreadCtx 對象構造後,會創建一個 worker 線程、執行 WorkerLoop 方法 ,並添加到 worker_threads_ 。所以 worker_threads_ 是與 ThreadCtx 對象一一對應的。

這個線程負責其所歸屬的指令 的執行:

  • WorkerLoop 在收到通知 後,會調用 ThreadCtx::TryReceiveAndRun   處理指令。
  • 在這個函數中,將 ThreadCtx 的指令挪到臨時列表 、通過 StreamPolicy 執行每個指令
  • ThreadCtx 的指令,是 VirtualMachineEngine 在 DispatchInstruction 時添加進去 的。

ThreadCtx創建完成後,將持有vm::Stream對象 oneflow::vm::Stream oneflow::Stream 的數量是一一對應的 ,vm::Stream 按照<DeviceType, StreamRole>分組存儲在對應的 ThreadCtx 中

vm::Stream 的推導流程細節如下:

  • auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream)) ;
    • VirtualMachine::GetVmStream()
    • Maybe<vm::Stream*>      VirtualMachine::CreateStream(Symbol<Stream> stream)
    • Stream::__Init__(ThreadCtx* thread_ctx, Symbol<Device> device, StreamType stream_type...)
    • stream_policy_ = CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device)) ;


2.4 執行OpCall指令和ep::Stream推導


有幾個場景會創建(獲取) ep::Stream 對象。比如 kernel 執行時。

OpCall指令在構造時,指令策略類型是 OpCallInstructionPolicy 。虛擬機在 DispatchInstruction 時,無論哪個分支,後續都會調用 EpStreamType::Run ,最終通過

  • EpStreamPolicyBase::Run()
    • instruction->Compute()
    • OpCallInstructionPolicy::Compute()
    • OpCallInstructionUtil::Compute()
    • OpCallInstructionUtil::OpKernelCompute()
    • op_call_instruction_policy->mut_opkernel()->Compute()執行 kernel 的 Compute 方法

例如 GpuL2NormalizeKernel::Compute ,最終在其kernel的Compute方法中,會通過ctx->stream()創建 (獲取)ep::Stream 對象,launch kernel 執行計算。

2.4.1 獲取/創建ep::Stream


下面,我們重點看一下OpCall指令實際執行時,調用的OpCallInstructionUtil::Compute() 方法:

  
  
  
static inline Maybe<void> Compute(OpCallInstructionPolicy* op_call_instruction_policy,                                    Instruction* instruction) {    Allocator* allocator = instruction->mut_stream()->mut_stream_policy()->mut_allocator();    JUST(AllocateOutputBlobsMemory(op_call_instruction_policy, allocator, instruction));    if (unlikely(op_call_instruction_policy->need_temp_storage())) {      JUST(TryAllocateTempStorage(op_call_instruction_policy, allocator));    }    ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream();    user_op::OpKernelState* state = nullptr;    user_op::OpKernelCache* cache = nullptr;    if (op_call_instruction_policy->user_opkernel()->has_state_or_cache()) {      TryInitOpKernelStateAndCache(op_call_instruction_policy, stream, &state, &cache);    }    OpKernelCompute(op_call_instruction_policy, stream, state, cache);    if (unlikely(op_call_instruction_policy->need_temp_storage())) {      DeallocateTempStorage(op_call_instruction_policy, allocator);    }    return Maybe<void>::Ok();  }

其中會通過 ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream() ;完成 ep::Stream 的推導,之後在 OpKernelCompute() 方法中實際完成op/kernel的執行。

ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream() ;
  • ep::Stream* stream() override { return GetOrCreateEpStream(); }
    • GetOrCreateEpStream()
    • ep_stream_ = GetOrCreateEpDevice()->CreateStream() ;

這裏->stream()會調用ep_stream_policy_base.h中的:

ep::Stream* stream() override { return GetOrCreateEpStream(); }

這是一個private方法:

  
  
  
private:  ep::Stream* GetOrCreateEpStream() const {    if (unlikely(ep_stream_ == nullptr)) {      ep_stream_ = GetOrCreateEpDevice()->CreateStream();      CHECK(ep_stream_ != nullptr);    }    return ep_stream_;  }

可以看到,如果成員變量 ep_stream_ 非空,則直接返回;否則,通過 ep_stream_ = GetOrCreateEpDevice()->CreateStream() ; 來創建創建 ep::Stream

2.4.2 獲取/創建ep::Device


而這裏的 GetOrCreateEpDevice 方法如下:

  
  
  
ep::Device* GetOrCreateEpDevice() const {    if (unlikely(ep_device_ == nullptr)) {      ep_device_ = Singleton<ep::DeviceManagerRegistry>::Get()->GetDevice(device_->enum_type(),                                                                          device_->device_id());      CHECK(ep_device_);    }    return ep_device_.get();  }

根據 oneflow::Device 中拿到的device id和device type,去全局單例的 ep::DeviceManagerRegistry 中取出對應的 oneflow::ep::Device

oneflow::vm::StreamPolicy和oneflow::vm::EpStreamPolicy推導

  • stream_policy_ = CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device)) ;
    • std::shared_ptr<vm::StreamPolicy>(new vm::EpStreamPolicy(device)) ;


3

Eager Global模式下的Device和Stream推導


eager global模式下,device信息隱藏在placement中,placement不僅包括了device type信息還包括其tensor具體分佈在哪些ranks上的信息, placement 在 C++ 中的對應類型是 ParallelDesc

所以device以及stream的部分推導過程和eager local模式下有所區別,但OpCall指令執行;device、vm::Stream和ep::Stream的推導過程都和eager local模式下是類似的。

3.1 推導Device


3.1.1 placement 的 parallel_id


oneflow中的placement表示tensor存放的設備集羣(device group),如:
p = flow.placement(type="cuda", ranks=[0, 1, 2, 3])
表示tensor分佈於1台機器上,cuda device 0、1、2、3四個設備上;
p = flow.placement(type="cuda", ranks=[[0, 1], [2, 3]])
則表示tensor分佈於2台機器上,host1的device0、1以及host2的device2、3。

在 oneflow 的分佈式環境下,各個 host 上需要有相同數量的device,每個進程使用一個device。這樣根據環境變量 RANK 可以得出 machine_id, LOCAL_RANK 就是進程在 制定host 上的 rank序號。

如果 input tensor 的 placement 與當前進程無關,可以省掉很多不必要的計算。通過 placement 的 parallel_id 可以判斷計算任務是否與當前進程相關。

placement 在 C++ 中的對應類型是 ParallelDesc ,其中並沒有 parallel_id 字段,這個信息隱含在其它字段中。

ParallelDesc 在構造時會調用 ClearUp 函數,從中可以看到

  • ParallelDesc::parallel_id2machine_id_ 是 placement 分佈的 machine。
  • ParallelDesc::parallel_id2device_id_ 是 placement 分佈的 device_id。
  • parallel_id 是上述 2 個數組的索引,一個 parallel_id 對應一個 machine_id:device_id 組合。這樣,根據parallel_id可以查到對應的 machine_id 和 device_id。
  • 反過來,根據 machine_id:device_id 也可以從 machine_id2device_id2parallel_id_ 查到 parallel_id。



3.1.2 eager 模式下根據 parallel_id 忽略無關計算任務


在 eager 分佈時場景處理計算任務時,會調用 GetTensorDevice4CurrentProcessCtx 推導得到輸出tensor的device,以及獲取當前進程的 machine_id、device_id 在 placement 中的 parallel_id 值。

如果當前進程與該 placement 無關,parallel_id 就是空,後續處理時就可以忽略一些計算:

  • EagerGlobalTensorImpl::New  中只需要用 functional::Empty 構造一個 shape 為 0 的空的 tensor。
  • GetBoxingOutput 計算時,如果parallel_id為空則表示當前rank進程無效,無需計算直接返回。
  • Interpret 可以不給 vm 提交指令、提前返回



3.2 推導Stream


ConsistentTensorInferCache 中推導 SBP Signature 時,也會同時推導出當前的 tensor 計算任務、在當前進程所用的device。推導時,會先確認所有 inputs 的 placement 是一致的 ,都分佈在相同的device上。如前所述,如果計算任務與當前進程無關,會提前返回;而一個進程只使用一個device。

這裏和eager local模式下stream的推導類似,通過 JUST(InferDeviceAndStream(user_op_expr, infer_args)) 推導出 oneflow::Stream 對象,StreamRole 是 kCompute 。區別在於eager global模式下

3.2.1 unique_stream_id


unique_stream_id 表示 oneflow::Stream 對象的創建次序。
所有的 oneflow::Stream 對象都保存在全局的 StreamMgr::stream2unique_stream_id_ 中 unique_stream_id2stream_symbol_ 可看作是引用類型的副本, unique_stream_id 就是 Stream 對象在這個數組中的索引。與 parallel_id 不同,unique_stream_id 是 Stream 對象在進程內的唯一標識。

並不是每次都需要加鎖訪問 StreamMgr oneflow::Stream 包含的都是描述性信息,其引用是以 ThreadLocal  的方式存儲的,可以提升後續讀取的效率。虛擬機在執行指令時,也會用 unique_stream_id 進行邏輯判斷。

4

Eager模式下的Stream同步——SoftSyncStream


設想以下場景:將 CPU 下的 tensor 拷貝到 CUDA 設備,然後在 CUDA 上再進行 tensor add 的計算。這涉及到兩個流,一個是 Host2Device,一個是 CUDA Compute。這兩個流的計算任務是併發執行的。需要有同步措施,才能保證拷貝完再執行 add 計算。

Eager 模式下,在 InstructionsBuilder::Call 中構造指令時,對 SoftSyncStream 的調用會在必要時向指令列表插入同步指令。

SoftSyncStream 中,幾個重要概念:

  • tensor在oneflow內存中的實際承載者是 eager_blob_object
  • last_used_stream 表示一個tensor(blob)上一次使用到的stream,可能是compute stream、h2d stream、d2h stream、集合通信ccl stream等
  • 如果 last_used_stream 與當前計算執行的流 stream 相同,則可以忽略,因為相同stream間天然順序執行所以無需同步,否則就需要進行後續的同步處理


SoftSyncStream 代碼如下:

  
  
  
Maybe<void> InstructionsBuilder::SoftSyncStream(const vm::EagerBlobObjectList& eager_blob_objects,                                                Symbol<Stream> stream) {  JUST(ForEachEagerBlobObjectsNeedingSoftSync(      eager_blob_objects, stream,      [&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> {        return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream);      }));  for (const auto& eager_blob_object : eager_blob_objects) {    eager_blob_object->set_last_used_stream(stream);  }  return Maybe<void>::Ok();}

主體邏輯是,會在 ForEachEagerBlobObjectsNeedingSoftSync 方法中遍歷每一個tensor對象(eager blob object),對於每一個需要同步的blob運用lambda方法並最終調用 SoftSyncStreamBetween 完成stream間的同步。

這裏,我們看一下ForEachEagerBlobObjectsNeedingSoftSync 的邏輯:


首先if/else的主體業務邏輯是類似的,主要區別在於,當blob的size <= kOpArgsReservedSize時(默認為4)會使用small vector來存放LocalDepObject變量,效率會更快一些(否則會走到else分支,主體邏輯類似,這裏就不看了)。

  • const auto& opt_last_used_stream = eager_blob_object->last_used_stream()
    ;
  • if (unlikely(!opt_last_used_stream.has_value())) { continue; }


這兩句是查詢該tensor(blob)上一次被使用時用到的stream——last_used_stream,如果為空,則直接continue跳過,因為如果此tensor之前並未被任何stream使用,則無需進行stream間的同步操作,因為在當前stream上不會有關於該tensor的其他依賴關係;

  
  
  
if (last_used_stream != stream) {    small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize> dep_objects{        intrusive::shared_ptr<LocalDepObject>(            JUST(eager_blob_object->compute_local_dep_object()))};    JUST(DoEach(last_used_stream, std::move(dep_objects)));  }

如果 last_used_stream!=stream 則表示需要在兩個stream間進行同步,則會應用傳入的lambda函數DoEach進行處理,在這裏lambda函數即:

  
  
  
[&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> {    return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream);  }));

既實際調用的是SoftSyncStreamBetween 來完成實際的stream間同步,這裏主要有3個變量:

  • dep_objects
    存儲了tensor間的依賴關係
  • last_used_stream
    則是該tensor上一次使用的stream
  • stream
    該tensor當前使用的stream


SoftSyncStreamBetween 的代碼如下:

  
  
  
Maybe<void> InstructionsBuilder::SoftSyncStreamBetween(    small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize>&& dependences,    Symbol<Stream> from_stream, Symbol<Stream> to_stream) {  CHECK(from_stream != to_stream) << "synchronization is unnecessary";  if (SupportingStreamWait(from_stream, to_stream)) {    JUST(StreamWait(std::move(dependences), from_stream, to_stream));  } else {    JUST(RecordEvent(std::move(dependences), from_stream));  }  return Maybe<void>::Ok();}

SoftSyncStreamBetween 的主要邏輯如下:

  • 先額外做了一次check,檢測如果待同步的兩個stream相同,則check會報錯並提示"synchronization is unnecessary"
  • 通過SupportingStreamWait判斷from 和 to stream間是否支持stream wait,是則調用StreamWait方法;否則,直接調用RecordEvent 方法

  • SupportingStreamWait 的主要邏輯是,通過stream的device、以及StreamType的Visit方法來判斷。簡單來説,如果from 和 to stream之間是不同的device(譬如cpu stream <-> cuda stream之間的同步),或者from stream的device為cpu,則SupportingStreamWait一定是false;如果是相同的,則繼續通過其他判斷條件進行判斷。

SupportingStreamWait為True


SupportingStreamWait 為True時,即from to stream同為Cuda Stream間的同步情況,在這種情況下會走到StreamWait 的函數,該函數最終會派發一個 StreamWaitEventInstructionPolicy 的指令給vm執行,StreamWaitEventInstructionPolicy 的執行邏輯主要是兩個cuda event:

  • cudaEventRecord
  • cudaStreamWaitEvent



  • 對於from_stream來説,插入一個
    cudaEventRecord
    ,用於標誌from stream是否完成該stream上的event事件;


  • 對於to_stream來説,插入一個
    cudaStreamWaitEvent
    等待from stream上的事件完成後,再繼續執行to_stream。


SupportingStreamWait為False


SupportingStreamWait 為False時,會直接調用 JUST(RecordEvent(std::move(dependences) , from_stream)); 其內部實現會從對象池中獲取可複用的cuda event對象並執行event。

這裏有個細節,由於cuda event的創建和銷燬都會引發cuda kernel的launch由異步轉同步,所以基於對象池的cuda event可以避免這個開銷。

實際上最終調用的還是 cudaEventRecord cudaEventRecord 本身只是起到一個“佔位符”的作用,並不能起到(保證該stream上其他kernel全部執行完)的作用,真正能保證stream同步作用的是oneflow vm(vitual machine)控制下的指令間依賴關係/執行順序。

5

CPU 下的並行計算


CpuStream 只有一個線程。CPU kernel 應該是通過 OpenMP  或者 Intel OneApi 等實現並行計算加速。

參考資料

1.http://github.com/Oneflow-Inc/oneflow/tree/845595e2c0abc3d384ff047e188295afdc41faaa


其他人都在看

試用OneFlow: github.com/Oneflow-Inc/oneflow/


本文分享自微信公眾號 - OneFlow(OneFlowTechnology)。
如有侵權,請聯繫 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閲讀的你也加入,一起分享。