OneFlow原始碼解析:Tensor型別體系與Local Tensor

語言: CN / TW / HK

撰文|鄭建華\ 更新|趙露陽

tensor和op是神經網路模型最基本的元件:op是模型的節點,tensor是連線節點的邊。然而,構建一個tensor並不僅僅是構造一個物件那麼簡單,至少要考慮以下問題:

  • 要支援節點本地的local tensor,以及分散式的global tensor;
  • 要支援eager和lazy執行模式;
  • 要支援不同的資料型別,包括float、double、int等;
  • 要支援不同裝置。

1

建立tensor的方法

與PyTorch類似,在OneFlow中也可以通過兩種主要的方式來建立tensor:Tensor和tensor。這兩種方式最終都會創建出OneFlow內部的C++ Tensor物件,即對應Python層的flow.Tensor型別。

1.1 Tensor

Python層的Tensor是在tensor.py(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/python/oneflow/framework/tensor.py#L23 )中引入的,通過python c api註冊的Tensor型別物件,此物件在MakeTensorType(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/framework/tensor.cpp#L623 )中被定義和返回。

在MakeTensorType中主要通過PyTensorObject_init建立了Tensor物件:

``` static int PyTensorObject_init(PyObject self, PyObject args, PyObject kwargs) { HANDLE_ERRORS auto temp = functional::_legacy_tensor_ctor(NULL, args, kwargs); if (PyErr_Occurred()) { throw py::error_already_set(); } auto _self = (PyTensorObject)self; _self->data = PyTensor_Unpack(temp); _self->data->set_pyobject(self);

// reset temp data to prevent clearing the pyobject // when the temp is deallocated ((PyTensorObject*)temp)->data.reset(); Py_XDECREF(temp); return 0; END_HANDLE_ERRORS_RET(-1) } ```

通過\ functional::_legacy_tensor_ctor函式建立了OneFlow內部的c++ Tensor物件:oneflow::one::Tensor,並作為data繫結至Python的Tensor型別。在MakeTensorType中,還通過PyMethodDef(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/framework/tensor.cpp#L639-L641 )為Tensor註冊了很多C++方法,如:

static PyMethodDef PyTensorObject_methods[] = { {"storage_offset", PyTensorObject_storage_offset, METH_NOARGS, NULL}, {"stride", PyTensorObject_stride, METH_NOARGS, NULL}, {"is_contiguous", PyTensorObject_is_contiguous, METH_NOARGS, NULL}, {"contiguous", PyTensorObject_contiguous, METH_NOARGS, NULL}, {"contiguous_", PyTensorObject_contiguous_, METH_NOARGS, NULL}, {"pin_memory", PyTensorObject_pin_memory, METH_NOARGS, NULL}, {"is_pinned", PyTensorObject_is_pinned, METH_NOARGS, NULL}, {"requires_grad_", (PyCFunction)PyTensorObject_requires_grad_, METH_VARARGS | METH_KEYWORDS, NULL}, {"retain_grad", PyTensorObject_retain_grad, METH_NOARGS, NULL}, {"detach", PyTensorObject_detach, METH_NOARGS, NULL}, {"clone", PyTensorObject_clone, METH_NOARGS, NULL}, {"zero_", PyTensorObject_zero_, METH_NOARGS, NULL}, {"register_hook", PyTensorObject_register_hook, METH_O, NULL}, {"_register_post_grad_accumulation_hook", PyTensorObject__register_post_grad_accumulation_hook, METH_O, NULL}, {"global_id", PyTensorObject_global_id, METH_NOARGS, NULL}, {"check_meta_consistency", PyTensorObject_check_meta_consistency, METH_NOARGS, NULL}, {"to_numpy", PyTensorObject_to_numpy, METH_NOARGS, NULL}, {"type", (PyCFunction)PyTensorObject_type, METH_VARARGS | METH_KEYWORDS, NULL},

此外,在Python層通過RegisterMethods(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/python/oneflow/framework/tensor.py#L502 )也為Tensor註冊了一些Python實現的Tensor方法或屬性(如tensor.numpy),在OneFlow包初始化時會通過RegisterMethod4Class(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/python/oneflow/framework/register_class_method_util.py#L23 )完成這些Python方法和屬性的註冊。RegisterMethod4Class的呼叫流程如下:

相比於Python實現來說,Tensor的++實現的方法/屬性通常具有較高的效能。

1.2 tensor函式

Tensor是型別,而tensor則是函式,flow.tensor函式在\ oneflow/api/python/functional/tensor_api.yaml中被定義:

- name: "tensor" signature: [ "Tensor (PyObject* data, *, DataType dtype=None, Device device=None, Bool requires_grad=False, Bool pin_memory=False) => TensorWithData", "Tensor (PyObject* data, *, DataType dtype=None, Placement placement, SbpList sbp, Bool requires_grad=False) => GlobalTensorWithData", ] bind_python: True

其C++實現位於\ tensor_api.yaml.pybind.cpp中,這是構建階段自動生成的檔案。

通過函式簽名可以看到,flow.tensor()有兩種過載的方法:

  • TensorWithData
  • GlobalTensorWithData

它們分別用於構造local tensor和global tensor的構造。和上面的Tensor類似,flow.tensor返回的也是OneFlow內部的oneflow::one::Tensor物件(繫結至Python的Tensor物件)。

1.3 手動構建tensor的兩種方式

和PyTorch類似,在OneFlow中常用建立tensor的方式也分為兩種:

  • flow.Tensor
  • flow.tensor

建立方式示例:

``` import oneflow import numpy as np

oneflow.tensor([[1., -1.], [1., -1.]])

tensor([[ 1., -1.],

[ 1., -1.]], dtype=oneflow.float32)

oneflow.tensor(np.array([[1, 2, 3], [4, 5, 6]]))

tensor([[ 1, 2, 3],

[ 4, 5, 6]], dtype=oneflow.int64)

flow.Tensor([[1,2,3],[4,5,6]]) ```

大多數情況下(和PyTorch類似的eager模式),可以通過指定device、dtype、shape等引數建立普通tensor(local tensor);

少數情況下(如OneFlow特有的eager global、lazy模式),需要global tensor時,可以通過指定sbp和placement的方式直接建立global tensor,也可通過tensor.to_global的方式將普通tensor轉換為global tensor,可參考:

  • oneflow.tensor

https://oneflow.readthedocs.io/en/master/generated/oneflow.tensor.html#

  • global tensor

https://docs.oneflow.org/master/parallelism/03_consistent_tensor.html

2

OneFlow的tensor型別體系

上述內容中介紹的oneflow內部的C++ Tensor物件,實際上其定義位於:\ oneflow/core/framework/tensor.h,是一個抽象的Tensor型別。

其中LocalTensor即為普通的單卡視角下的Tensor(和PyTorch的Tensor類似);GlobalTensor則為OneFlow所特有的全域性視角下的Tensor(通常用於eager global模式或lazy模式下)。Tensor使用了Bridge模式,每個Tensor子類內部有一個TensorImpl欄位,負責抽象Tensor的實際實現:

3

local tensor的構造

我們以flow.tensor([[1,2,3],[4,5,6]])為例,看一下tensor構造的過程。主要的流程如下:

在這個例子中,由於使用的是flow.tensor方法建立tensor(且為普通的local tensor)所以會用到在oneflow/api/python/functional/tensor_api.yaml中定義的TensorWithData方法,其實現,是位於oneflow/api/python/functional/tensor_api.cpp的TensorWithDataFunctor:

``` class TensorWithDataFunctor { public: Maybe operator()(PyObject* data, const Optional>& dtype, const Optional>& device, const bool requires_grad, const bool pin_memory) const { ... if (PyTensor_Check(data)) { // Throw warnings like pytorch. auto ret = PyErr_WarnEx( PyExc_UserWarning, "To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() " "or sourceTensor.clone().detach().requires_grad_(True), rather than " "oneflow.tensor(sourceTensor).", 1); if (ret != 0) { return Error::RuntimeError(); }

  const auto& other = PyTensor_Unpack(data);
  return MakeTensorFromOtherTensor(other, dtype, device, requires_grad, pin_memory);
} else {
  // Make tensor from python sequence or numpy array.
  return MakeLocalTensorFromData(data, dtype, device, requires_grad, pin_memory);
}

} }; ```

由於這裡傳入的data是一個Python的list物件,所以最終會呼叫MakeLocalTensorFromData方法,建立tensor主要的邏輯都在這個函式中。其中大量呼叫Python和Numpy的介面,檢查PyObject的資料型別,獲取Shape

https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L184 )和DataType(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L185 ),如果使用者沒有制定device,預設會設定為CPU裝置(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L191 )。

後面主要是呼叫EmptyFunctor

https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L194 )和SwitchCopyLocalTensorFromUntypedArray(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L195 )。前者為tensor分配記憶體,後者進行資料拷貝,兩個步驟都會通過虛擬機器指令完成。其中EmptyFunctor會走普通的OpCall指令、而CopyLocalTensorFromUntypedArray會根據是否需要同步copy走到AccessBlobByCallback/SyncAccessBlobByCallback指令。

為什麼要通過虛擬機器指令完成呢?無論是記憶體資源的分配,還是資料拷貝,CPU和CUDA等不同裝置上的操作都不一樣。之前討論Op/Kernel時已經看到,在OneFlow中所有動靜態圖任務執行、eager模式下op/kernel執行、記憶體/視訊記憶體的分配和釋放、device、stream等統一由虛擬機器進行管理。

3.1 分配記憶體:EmptyFunctor

matmul和relu(inplace=false時)等操作在執行過程中也會建立output tensor。之前討論relu時重點關注了op和kernel的計算邏輯,而忽略了tensor相關的內容。

而這裡只需要先構造一個空tensor物件,不需要其它計算,所以是一個Empty操作,Empty op對應的kernel——EmptyKernel(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/user/kernels/empty_kernel.cpp#L30 )沒有實質性的計算邏輯,只是先根據shape、dtype、device資訊建立一個空tensor,等待後續將實際的資料從記憶體中copy至此空tensor,從而完成整個tensor的建立過程。

EmptyFunctor同樣和其他functor一樣,最終會被Dispacth至對應的interpreter被解釋執行,這裡由於是eager模式下的local tensor,EmptyFunctor最終會進入eager local interpreter,交給NaiveInterpret(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/framework/op_interpreter/eager_local_op_interpreter.cpp#L74 )方法處理。流程如下:

  1. 在構造EagerLocalTensorImpl(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/framework/op_interpreter/eager_local_op_interpreter.cpp#L110)物件,用於存放tensor結果。但這只是一個殼子,還沒有為tensor的資料分配儲存空間。

  2. 之後會初始化EagerBlobObject(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/framework/op_interpreter/eager_local_op_interpreter.cpp#L114 )、TensorStorage(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/framework/tensor_impl.cpp#L120 ),這樣tensor主要的欄位基本構建完畢

  3. 然後構造OpCall指令、提交虛擬機器PhysicalRun(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/framework/op_interpreter/eager_local_op_interpreter.cpp#L134-L136 ),等待vm的排程執行。

OpCall對應的指令策略最終會進入oneflow/core/vm/op_call_instruction_policy.cpp,並在Prepare方法中通過AllocateOutputBlobsMemory方法對TensorStorage完成實際的記憶體分配;在Compute方法中啟動(empty op對應的)實際的kernel執行。

3.2 拷貝資料:SwitchCopyLocalTensorFromUntypedArray

SwitchCopyMirroredTensorFromUntypedArray其實是MAKE_SWITCH_ENTRY(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L150 )巨集展開後的函式名。巨集展開後的程式碼如下。實際會呼叫CopyLocalTensorFromUntypedArray(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L68 )。

template<typename... Args> static Maybe<void> SwitchCopyLocalTensorFromUntypedArray( const std::tuple<DataType>& switch_tuple, Args&& ... args) { static const std::map<std::tuple<DataType>, std::function<Maybe<void>(Args && ...)>> case_handlers { {SwitchCase(DataType::kFloat), [](Args&&... args) { return CopyLocalTensorFromUntypedArray<float>(std::forward<Args>(args)...); }}, // ... }; return case_handlers.at(switch_tuple)(std::forward<Args>(args)...); };

CopyLocalTensorFromUntypedArray方法如下:

template<typename T> Maybe<void> CopyLocalTensorFromUntypedArray(const std::shared_ptr<Tensor>& tensor, PyObject* array) { return CopyBetweenLocalTensorAndNumpy<T>(tensor, array, CopyFromNumpyArray, "mut", /*block_host_until_done=*/false); }

其內部實際呼叫了CopyBetweenLocalTensorAndNumpy方法。

CopyBetweenLocalTensorAndNumpy

顧名思義,這個方法主要是用在numpy和tensor之間進行資料copy的。其中第3個引數:CopyFromNumpyArray實際是一個函式回撥的callback方法,其主要通過SyncAutoMemcpy進行array和tensor(blob)之間的記憶體拷貝:

void CopyFromNumpyArray(ep::Stream* stream, const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object, const NumPyArrayPtr& array_ptr) { SyncAutoMemcpy(stream, eager_blob_object->mut_dptr(), array_ptr.data(), eager_blob_object->ByteSizeOfBlobBody(), eager_blob_object->mem_case(), memory::MakeHostMemCase()); }

繼續看CopyBetweenLocalTensorAndNumpy(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.h#L93 )方法,其中最關鍵的是:

JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> { return builder->AccessBlobByCallback( tensor, [array_ptr, Copy](ep::Stream* stream, const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object) { Copy(stream, eager_blob_object, array_ptr); }, modifier); }));

通過InstructionsBuilder構建了AccessBlobByCallback指令,引數為上面通過EmptyFuncor建立的空tensor、callback的函式指標及引數、以及modifier(string "mut"表示可動態修改)。

AccessBlobByCallback

和OpCall類似,InstructionsBuilder呼叫AccessBlobByCallback時,也會實際構造對應的vm指令策略——AccessBlobArgCbInstructionPolicy並派發至vm,等待被排程和實際執行:

template<typename T> Maybe<void> InstructionsBuilder::AccessBlobByCallback( const T tensor, const std::function<void(ep::Stream*, const std::shared_ptr<vm::EagerBlobObject>&)>& callback, const std::string& modifier) { const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object = JUST(tensor->eager_blob_object()); Symbol<Device> device = JUST(GetDevice(tensor)); ... Symbol<Stream> stream = JUST(GetDefaultStreamByDevice(device)); JUST(SoftSyncStream({eager_blob_object}, stream)); auto instruction = intrusive::make_shared<vm::Instruction>( // Never replace `stream` with producer_stream or last_used_stream. JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream)), std::make_shared<vm::AccessBlobArgCbInstructionPolicy>(eager_blob_object, callback, modifier)); instruction_list_->EmplaceBack(std::move(instruction)); return Maybe<void>::Ok(); }

等該條AccessBlobArgCbInstructionPolicy指令實際執行時,會在指令的Compute(\ https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/vm/access_blob_arg_cb_instruction_policy.h#L79 )方法中呼叫callback完成從tensor的blob <-> numpy的ndarray之間的資料copy,至此拷貝過程結束,flow.tensor的建立全部完成。

(本文經授權後釋出。原文:https://segmentfault.com/a/1190000041989895 )

參考資料

歡迎下載體驗 OneFlow v0.8.0 最新版本:\ https://github.com/Oneflow-Inc/oneflow/