一個運算元在深度學習框架中的旅程

語言: CN / TW / HK

撰文|趙露陽

運算元即Operator,這裡簡稱op。op是深度學習的基礎操作,任意深度學習框架中都包含了數百個op,這些op用於各種型別的數值、tensor運算。

在深度學習中,通過nn.Module這樣搭積木的方式搭建網路,而op就是更基礎的,用於製作積木的配方和原材料。

譬如如下的一個demo網路:

``` import oneflow as torch
class TinyModel(torch.nn.Module):

def __init__(self):
    super(TinyModel, self).__init__()

    self.linear1 = torch.nn.Linear(100, 200)
    self.activation = torch.nn.ReLU()
    self.linear2 = torch.nn.Linear(200, 10)
    self.softmax = torch.nn.Softmax()

def forward(self, x):
    x = self.linear1(x)
    x = self.activation(x)
    x = self.linear2(x)
    x = self.softmax(x)
    return xtinymodel = TinyModel()print('The model:')print(tinymodel)

```

從結構來看,這個網路是由各種nn.Module如Linear、ReLU、Softmax搭建而成,但從本質上,這些nn.Module則是由一個個基礎op拼接,從而完成功能的。這其中就包含了Matmul、Relu、Softmax等op。 在OneFlow中,對於一個已有op,是如何完成從Python層->C++層的呼叫、流轉和執行過程?本文將以

output = flow.relu(input)

為例,梳理一個op從Python -> C++執行的完整過程。

首先,這裡給出一個流程示意圖:

下面,將分別詳細從原始碼角度跟蹤其各個環節。

1

Binding

這裡,binding是指Python和C++程式碼的繫結。通常,我們用Python搭建網路,訓練模型,呼叫函式完成各種操作。實際上,這些函式通常在Python層只是一層wrapper,底層實現還是通過C++程式碼完成的,那麼Python -> C++是如何呼叫的?這就需要用到Python和C++的繫結。

在深度學習框架的實現中,即可以用Python原生的C API,也可以通過pybind11來完成函式繫結,在OneFlow中,二者均有使用,譬如:

  • oneflow/api/python/framework/tensor.cpp
  • oneflow/api/python/framework/tensor_functions.cpp

中涉及到的 tensor.xxx 方法都是通過Python C API完成了函式繫結;

  • oneflow/core/functional/functional_api.yaml

中定義的諸多 flow.xxx 方法則是通過pybind實現的繫結。這裡關於Python C API和pybind不做過多介紹,具體用法可以參考相應文件:

  • https://docs.python.org/zh-cn/3.8/c-api/index.html
  • https://pybind11.readthedocs.io/en/stable/index.html

下面我們回到flow.relu方法,我們在Python層呼叫的flow.relu實際是呼叫了在

python/oneflow/__init__.py

中定義的oneflow._C.relu。 _C表示其實現位於底層C++。和PyTorch類似,我們也基於.yaml定義了一套介面匯出及code gen的規則,譬如在 functional_api.yaml 中,我們可以看到Relu的匯出介面的函式簽名:

- name: "relu" signature: "Tensor (Tensor x, Bool inplace=False) => Relu" bind_python: True

從yaml定義可以看出,flow._C.relu 接收兩個引數,tensor和一個bool值,其綁定了C++的Relu方法,函式返回值也是tensor。實際上,在OneFlow編譯時,會通過執行

tools/functional/generate_functional_api.py

這個檔案,對 functional_api.yaml 進行解析和程式碼生成,動態生成C++的.h和.cpp檔案。

  • build/oneflow/core/functional/functional_api.yaml.h
  • build/oneflow/core/functional/functional_api.yaml.cpp

並在.cpp檔案中呼叫相應的functor完成C++層面的函式呼叫。這裡,還是以flow._C.relu為例,其對應的functor定義位於oneflow/core/functional/impl/activation_functor.cpp:

``` class ReluFunctor { public: ReluFunctor() { op_ = CHECK_JUST(one::OpBuilder("relu").Input("x", 1).Output("y", 1).Build()); } Maybe operator()(const std::shared_ptr& x, bool inplace) const { ... }

private: std::shared_ptr op_; }; ```

ReluFunctor通過

ONEFLOW_FUNCTION_LIBRARY(m) { m.add_functor<impl::ReluFunctor>("Relu"); ... }

完成functor的註冊,註冊成functional介面後,在Python層flow._C.relu就完成了和“Relu”的繫結。同時,這個函式在C++中也可以通過functional::Relu直接呼叫。

2

Functor

Functor不僅是Python -> C++互動的核心,也是op呼叫、輸入引數推導和檢查的第一站。通常,各種op在functor層需要完成對輸入tensor的shape、dtype、維度、元素個數等各種check,以及對op特有的邏輯進行解析和處理。Relu Functor程式碼如下:

``` class ReluFunctor { public: ReluFunctor() { op_ = CHECK_JUST(one::OpBuilder("relu").Input("x", 1).Output("y", 1).Build()); } Maybe operator()(const std::shared_ptr& x, bool inplace) const { if (inplace) { JUST(CheckInplaceValid(x)); std::shared_ptr outputs = std::make_shared(1); outputs->at(0) = x; JUST(OpInterpUtil::Dispatch(op_, {x}, outputs.get(), AttrMap{})); return outputs->at(0); } else { return OpInterpUtil::Dispatch(op_, {x}); } }

private: std::shared_ptr op_; }; ```

可以看見,ReluFunctor是比較簡單的,其定義了一個私有變數

std::shared_ptr<OpExpr> op_;

這個op_即需要執行的Relu op,通過OpBuilder進行構建;functor的operator()內部,根據是否inplace走到2個不同分支,並最終通過OpInterpUtil::Dispatch()將op、輸入tensor和引數派發至Interpreter處理。

3

Dispatch

各種op在functor中完成check和邏輯處理後,大多需要通過OpInterpUtil::Dispatch() 進行派發,其目的地是Interpreter。在Interpreter中,將會對op進行更進一步的處理。在\ oneflow/core/framework/op_interpreter/op_interpreter_util.h 中,我們可以看見多種過載的Dispatch模板程式碼:

``` class OpInterpUtil { public: template static Maybe Dispatch(const OpExpr& op_expr, const TensorTuple& inputs, const AttrMap& attrs) { return Dispatch(op_expr, inputs, OpExprInterpContext(attrs)); }

template static Maybe Dispatch(const OpExpr& op_expr, const TensorTuple& inputs) { return Dispatch(op_expr, inputs, OpExprInterpContext(AttrMap{})); }

template static Maybe Dispatch(const OpExpr& op_expr, const TensorTuple& inputs, const OpExprInterpContext& ctx);

static Maybe Dispatch(const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs, const AttrMap& attrs) { return Dispatch(op_expr, inputs, outputs, OpExprInterpContext(attrs)); }

static Maybe Dispatch(const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs) { return Dispatch(op_expr, inputs, outputs, OpExprInterpContext(AttrMap{})); }

static Maybe Dispatch(const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs, const OpExprInterpContext& ctx); ```

這些過載,是為了應對不同的輸入、輸出以及OpExprInterpContext的情況。譬如這個OpExprInterpContext是op在Interpreter中所需的上下文,可能攜帶op計算所需要的屬性(如conv2d op所需要的kernel_size、padding等)、device、sbp、parallel等描述資訊。這些過載的Dispatch最終都會走到:

/* static */ Maybe<void> OpInterpUtil::Dispatch( const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs, const OpExprInterpContext& ctx) { return JUST(GetInterpreter(inputs, ctx, op_expr))->Apply(op_expr, inputs, outputs, ctx); }

Dispatch至此,剩下的就要交給Interpreter了。

4

Interpreter

Get Interpreter

這裡先看看GetInterpreter,這裡其實就是獲取所需的Interpreter,來負責op接下來的執行。省略check相關的邏輯,主要程式碼如下:\ oneflow/core/framework/op_interpreter/op_interpreter_util.cpp

Maybe<AutogradInterpreter> GetInterpreter(const TensorTuple& inputs, const OpExprInterpContext& ctx, const OpExpr& op_expr) { static const auto& g_lazy_interpreter = BuildLazyInterpreter(); static const auto& g_eager_consistent_interpreter = BuildEagerInterpreter(/*is_mirrored=*/false); static const auto& g_eager_mirrored_interpreter = BuildEagerInterpreter(/*is_mirrored=*/true); if (!LazyMode::is_enabled()) { if (inputs.empty()) { if (ctx.parallel_desc.has_value()) { JUST(ctx.nd_sbp); CHECK_OR_RETURN(!ctx.device.has_value()); return g_eager_consistent_interpreter; } else { CHECK_OR_RETURN(!ctx.nd_sbp.has_value()); return g_eager_mirrored_interpreter; } } else { if (inputs.at(0)->is_consistent()) { ... return g_eager_consistent_interpreter; } else { ... return g_eager_mirrored_interpreter; } } UNIMPLEMENTED_THEN_RETURN(); } return g_lazy_interpreter; }

通過上面的邏輯可以看出,Interpreter大體上分為Eager Interpteter和Lazy Interpreter;其中Eager Interpteter又根據Eager Mirrored和Eager Consistent有所區別。具體就是以下3種子類實現:

  • EagerMirroredInterpreter
  • EagerConsistentInterpreter
  • LazyInterpreter

普通的Eager mode下(無論是單卡還是DDP的情況)都會走到 EagerMirroredInterpreter 的邏輯;在普通Eager Mode之外,為輸入tensor設定了sbp、placement則會進入到\ EagerConsistentInterpreter的邏輯;在Lazy Mode時(使用nn.Graph),則會進入到LazyInterpreter

下面,我們看下這3種Interpreter的構建:

``` std::shared_ptr BuildEagerInterpreter(const bool& is_mirrored) { std::shared_ptr internal; if (is_mirrored) { internal = std::make_shared(); } else { internal = std::make_shared(); } return std::make_shared(internal); }

std::shared_ptr BuildLazyInterpreter() { auto internal = std::make_shared(); return std::make_shared(internal); } ```

可見,這3種Interpreter構建完成後,都會以私有變數internal的形式,參與AutogradInterpreter的構建,並最終返回AutogradInterpreter

``` class AutogradInterpreter { public: AutogradInterpreter() = delete; AutogradInterpreter(const std::shared_ptr& internal) : internal_(internal) {}

virtual ~AutogradInterpreter() = default;

Maybe Apply(const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs, const AttrMap& attrs) const { return Apply(op_expr, inputs, outputs, OpExprInterpContext(attrs)); }

Maybe Apply(const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs) const { return Apply(op_expr, inputs, outputs, OpExprInterpContext(AttrMap{})); }

Maybe Apply(const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs, const OpExprInterpContext& ctx) const;

private: std::shared_ptr internal_; }; ```

Apply()

通過上面我們知道,EagerMirroredInterpreter\ EagerConsistentInterpreterLazyInterpreter都將為其包裹上AutogradInterpreter的殼,通過AutogradInterpreter觸發Apply的呼叫。顧名思義,AutogradInterpreter的作用主要是和autograd相關,其主要為eager mode下前向的op節點插入對應的用於反向計算grad的節點。

我們看看這部分程式碼,關鍵部分的作用在註釋裡給出:

``` Maybe AutogradInterpreter::Apply(const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs, const OpExprInterpContext& ctx) const { // 判斷是否需要計算梯度,如果處於GradMode的作用域切改op註冊時沒有禁用梯度 // 則requires_grad的值根據輸入tensor的requires_grad屬性判斷 // any of input tensors requires_grad==True,則表示需要計算梯度 bool requires_grad = false; if (autograd::GradMode::is_enabled() && !JUST(op_expr.IsGradDisabled())) { requires_grad = std::any_of(inputs.begin(), inputs.end(), { return tensor->requires_grad(); }); } // 這一坨邏輯比較醜陋,是因為近期支援了oneflow系統中支援了stride&&view機制 // 而大部分op尚未註冊stride推導、尚未支援non-contiguous的輸入tensor // 所以需要在這對這部分op的輸入進行強制轉換,將其變為contiguous的 // NOTE: if this op not support stride, then need to tensor->contiguous() #define HANDLE_NON_CONTIGUOUS_INPUT(tensor_tuple_ptr) \ TensorTuple tmp_inputs; \ if (!LazyMode::is_enabled() && !JUST(op_expr.SupportNonContiguous())) { \ tmp_inputs.resize(inputs.size()); \ for (size_t i = 0; i < inputs.size(); i++) { tmp_inputs[i] = inputs[i]->contiguous(); } \ tensor_tuple_ptr = &tmp_inputs; \ }

const TensorTuple* inputs_ptr = &inputs; HANDLE_NON_CONTIGUOUS_INPUT(inputs_ptr);

// 這裡是進行實際Interpreter執行的主要過程 { autograd::AutoGradMode mode(false); JUST(internal_->Apply(op_expr, *inputs_ptr, outputs, ctx)); }

// 這裡主要是為了eager mode下,且requires_grad==True的op, // 插入反向節點(AddNode)用於autograd,該節點包含反向梯度計算的方法(backward_fn) // Lazy mode will construct backward compute graph in passes, so disable autograd if lazy mode. std::shared_ptr grad_closure(nullptr); if (requires_grad && !LazyMode::is_enabled()) { grad_closure = JUST(op_expr.GetOrCreateOpGradClosure()); auto backward_fn = std::make_shared(); backward_fn->body = = -> Maybe { autograd::AutoGradMode mode(create_graph); JUST(grad_closure->Apply(out_grads, in_grads)); return Maybe::Ok(); }; backward_fn->status = = { return grad_closure->state()->SavedTensors().size() > 0; }; JUST(GetThreadLocalAutogradEngine()->AddNode(op_expr.op_type_name() + "_backward", backward_fn, inputs_ptr, outputs)); } // Update outputs autograd meta // Note: if requires_grad is True, we will create a new autograd meta for each output // in AddBackwardFuncPtr to support inplace operation, so the update should after // AddBackwardFuncPtr for (auto& output : outputs) { output->set_is_leaf(inputs_ptr->size() == 0 || !requires_grad); ... if (!output->requires_grad()) { JUST(output->set_requires_grad( requires_grad && IsSupportRequireGradDataType(output->dtype()->data_type()))); } } // 捕獲前向的inputs outputs,反向計算時可能用到 if (requires_grad && !LazyMode::is_enabled()) { // Capture inputs and outputs after AddBackwardFuncPtr because of that grad function // node has been attached to them. JUST(grad_closure->Capture(inputs_ptr, outputs, ctx)); } return Maybe::Ok(); } ```

上面一坨邏輯有點多,讓我們看一下重點,對於簡單的Relu op,我們只需關注這部分程式碼:

// 這裡是進行實際Interpreter執行的主要過程 { autograd::AutoGradMode mode(false); JUST(internal_->Apply(op_expr, *inputs_ptr, outputs, ctx)); }

這裡,還是以上面的flow.relu為例,由於是簡單的Eager Mode,所以實際會走到EagerInterpreter的Apply方法:

``` Maybe EagerInterpreter::Apply(const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple outputs, const OpExprInterpContext& ctx) const { #define APPLY_IF(op_type) \ if (const auto op = dynamic_cast(&op_expr)) { \ return ApplyImpl(*op, inputs, outputs, ctx); \ }

APPLY_IF(UserOp); APPLY_IF(VariableOp); APPLY_IF(CastToMirroredOp); APPLY_IF(CastFromMirroredOp); APPLY_IF(ConsistentToConsistentOp); APPLY_IF(CastToConsistentOp); APPLY_IF(CastFromConsistentOp); APPLY_IF(DistributeSplitOp); APPLY_IF(DistributeCloneOp); APPLY_IF(DistributeConcatOp); APPLY_IF(DistributeAddOp); APPLY_IF(FunctionOp); APPLY_IF(SelectTopNOp)

undef APPLY_IF

OF_UNIMPLEMENTED() << "The type " << op_expr.op_type_name() << " has not been supported in EagerInterpreter::Apply."; } ```

這裡,通過巨集定義APPLY_IF,增加了對不同型別op的分支處理。對於大多數使用者來說,用到的op都是UserOp型別,所以這裡實際上會走到這個分支中:

if (const auto* op = dynamic_cast<const UserOpExpr*>(&op_expr)) { return ApplyImpl(*op, inputs, outputs, ctx); }

再看看\ EagerMirroredInterpreter::ApplyImpl,位於

oneflow/core/framework/op_interpreter/eager_mirrored_op_interpreter.cpp

Maybe<void> EagerMirroredInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs, const OpExprInterpContext& ctx) const { return NaiveInterpret(op_expr, inputs, outputs, ctx); }

其最終實現是NaiveInterpret。

NaiveInterpret

NaiveInterpret簡單來說,主要用於做以下幾件事:

  • check input tensor的device是否一致
  • 生成output tensor
  • 為output tensor推導和檢查shape/stride/dtype
  • 構建op執行指令,並派發至vm

簡化版的程式碼如下:

``` Maybe NaiveInterpret(const UserOpExpr& user_op_expr, const TensorTuple& inputs, const Symbol& default_device, TensorTuple outputs, const OpExprInterpContext& ctx) { const auto& attrs = ctx.attrs; std::shared_ptr input_eager_blob_objects = std::make_shared(inputs.size()); // check devices for (int i = 0; i < inputs.size(); i++) { const auto& input_device = JUST(inputs.at(i)->device()); if (i > 0) { CHECK_OR_RETURN(default_device == *input_device) << Error::RuntimeError() << "Expected all tensors to be on the same device, but found at least two devices, " << default_device->ToString() << " (positional 0) and " << input_device->ToString() << " (positional " << i << ")!"; } input_eager_blob_objects->at(i) = JUST(inputs.at(i)->eager_blob_object()); }

// make output tensors std::shared_ptr output_eager_blob_objects = std::make_shared(outputs->size()); auto* output_tensor_metas = ThreadLocalDefaultOutputMutTensorMetas(outputs->size()); for (int i = 0; i < outputs->size(); i++) { if (!outputs->at(i)) { const auto& tensor_impl = std::make_shared(); outputs->at(i) = std::make_shared(tensor_impl); output_tensor_metas->at(i) = tensor_impl->mut_tensor_meta(); } else { bool has_eager_blob_object = JUST(outputs->at(i)->has_eager_blob_object()); CHECK_OR_RETURN(has_eager_blob_object); output_eager_blob_objects->at(i) = JUST(outputs->at(i)->eager_blob_object()); } } Symbol stream; bool need_check_mem_case = true;

// Infer devices ...

// Infer shapes strides dtype ...

// 構建op執行指令,並派發至vm JUST(PhysicalRun(& -> Maybe { return builder->LocalCallOpKernel(kernel, input_eager_blob_objects, output_eager_blob_objects, ctx, stream); })); return Maybe::Ok(); } ```

Interpreter的終點是虛擬機器(vm)。vm部分,是OneFlow比較獨特的設計,內容很多,這裡暫不展開了:) 可以簡單理解,派發至vm後,此op將進入一個任務執行的佇列,將會等待其vm的排程、執行。

5

Compute

在Interpreter將op執行指令派發至vm後,經過排程邏輯處理後,將會在

oneflow/core/eager/opkernel_instruction_type.cpp

被觸發執行,核心程式碼如下:

``` static inline void OpKernelCompute( LocalCallOpKernelPhyInstrOperand operand, DeviceCtx device_ctx, user_op::OpKernelState state, const user_op::OpKernelCache cache) {

auto* opkernel = operand->mut_opkernel();
auto* compute_ctx =
    opkernel->UpdateComputeContext(operand->inputs().get(), operand->outputs().get(),
                                   operand->consistent_tensor_infer_result().get(), device_ctx);
...
operand->user_opkernel()->Compute(compute_ctx, state, cache);
opkernel->UpdateComputeContext(nullptr, nullptr, nullptr, nullptr);

} ```

其中,

operand->user_opkernel()->Compute(compute_ctx, state, cache);

將觸發op kernel的實際執行。通常來說,op的kernel實現根據device的不同,會派發到不同的實現,其一般都位於:

oneflow/user/kernels/xxx_kernel.cpp

oneflow/user/kernels/xxx_kernel.cu

這裡的Relu op相對比較特殊,是用primitive實現的(primitive也是oneflow中一種獨特的設計,有著良好的抽象和可組合性),具體這個UnaryPrimitive就是elementwise unary的模板+UnaryFunctor的組合。其呼叫鏈如下:

UnaryPrimitiveKernel

``` class UnaryPrimitiveKernel final : public user_op::OpKernel, public user_op::CudaGraphSupport { public: OF_DISALLOW_COPY_AND_MOVE(UnaryPrimitiveKernel); UnaryPrimitiveKernel() = default; ~UnaryPrimitiveKernel() = default;

using PrimitiveFactoryFuncType = std::function( user_op::KernelComputeContext*)>;

UnaryPrimitiveKernel(const std::string& output_name, const std::string& input_name, PrimitiveFactoryFuncType fn) : output_name_(output_name), input_name_(input_name), primitive_factory_func_(std::move(fn)) {}

private: using user_op::OpKernel::Compute; void Compute(user_op::KernelComputeContext* ctx) const override { auto primitive = primitive_factory_func_(ctx); CHECK(primitive);

const user_op::Tensor* input_tensor = ctx->Tensor4ArgNameAndIndex(input_name_, 0);
...
const int64_t elem_cnt = input_shape.elem_cnt();


if (elem_cnt != 0) {
  primitive->Launch(ctx->stream(), input_tensor->dptr(), output_tensor->mut_dptr(), elem_cnt);
}

} bool AlwaysComputeWhenAllOutputsEmpty() const override { return false; }

std::string output_name_; std::string input_name_; PrimitiveFactoryFuncType primitive_factory_func_; }; ```

ep::primitive::ElementwiseUnary

``` template class ElementwiseUnaryImpl : public ElementwiseUnary { public: OF_DISALLOW_COPY_AND_MOVE(ElementwiseUnaryImpl); ElementwiseUnaryImpl(Scalar attr0, Scalar attr1) : attr0(attr0), attr1(attr1) {} ~ElementwiseUnaryImpl() override = default;

void Launch(Stream stream, const void src_ptr, void dst_ptr, size_t count) override { CpuStream cpu_stream = stream->As();

Dst* dst = reinterpret_cast<Dst*>(dst_ptr);
const Src* src = reinterpret_cast<const Src*>(src_ptr);
auto functor = UnaryFunctor<DeviceType::kCPU, unary_op, Dst, Src>(attr0, attr1);
cpu_stream->ParallelFor(0, count, [functor, src, dst](int64_t begin, int64_t end) {
  for (int64_t i = begin; i < end; i++) { dst[i] = functor(src[i]); }
});

}

protected: Scalar attr0, attr1; }; ```

UnaryFunctor

這個UnaryFuntor根據不同的Unaray op型別,特化出不同的具體functor實現,具體到Relu op,其實現位於

oneflow/core/ep/common/primitive/unary_functor.h:

``` template struct UnaryFunctor { UnaryFunctor(Scalar attr0, Scalar attr1) {}

OF_DEVICE_FUNC Dst operator()(Src src) const { const Src zero_val = static_cast(0.0); if (src <= zero_val) { return static_cast(zero_val); } else { return static_cast(src); } } }; ```

至此,我們已經完成了一個op的Python -> C++ 之旅。從細節上看,是相對複雜的,但從整體流程上看,其實是比較簡單的,排除了binding,vm排程機制等細節,其主要過程其實就4個環節: Functor -> Dispatch -> Interpreter -> Kernel Compute。

實現/新增一個op,通常也不需要管中間的Dispatch以及Interpreter,我們只需重點關注和該op強相關的部分——Functor層面的引數、op邏輯檢查,以及Kernel Compute部分的實際op運算。

(參考程式碼:\ https://github.com/Oneflow-Inc/oneflow/commit/1dbdf8faed988fa7fd1a9034a4d79d5caf18512d)

歡迎下載體驗 OneFlow v0.7.0 最新版本:\ https://github.com/Oneflow-Inc/oneflow/