DeepStream整理

語言: CN / TW / HK

GStreamer

DeepStream是基於GStreamer開發的。它們主要都是做視訊流處理的。現在我們來看一個GStreamer的HelloWorld。

在/opt/nvidia/deepstream/deepstream-5.0/sources/apps/sample_apps目錄下建立一個新的資料夾basic-tutorial-1,進入該資料夾,建立一個basic-tutorial-1.c檔案,內容如下

#include <gst/gst.h>
 
int
main (int argc, char *argv[])
{
  GstElement *pipeline; //構建一個媒體管道的基本塊,包含了各種element,如source,sink
  GstBus *bus;  //匯流排
  GstMessage *msg;  //匯流排訊息
 
  /* Initialize GStreamer */
  gst_init (&argc, &argv); //初始化GStreamer
 
  /* Build the pipeline */
  /* playbin是一個element,它既是source,也是sink,能同時處理整個管道(pipeline)事務
     gst_parse_launch建立一個管道 */
  pipeline =
      gst_parse_launch
      ("playbin uri=http://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm",
      NULL);
 
  /* Start playing */
  /* 每一個element都有一個狀態,這裡是播放這個element的流媒體
  gst_element_set_state (pipeline, GST_STATE_PLAYING);
 
  /* Wait until error or EOS */
  /* 獲取通道element匯流排 */
  bus = gst_element_get_bus (pipeline);
  /* gst_bus_timed_pop_filtered會阻塞到遇到錯誤或者流媒體播放結束,並且得到一個訊息 */
  msg =
      gst_bus_timed_pop_filtered (bus, GST_CLOCK_TIME_NONE,
      GST_MESSAGE_ERROR | GST_MESSAGE_EOS);
 
  /* Free resources */
  /* 釋放資源 */
  if (msg != NULL)
    gst_message_unref (msg);
  gst_object_unref (bus);
  gst_element_set_state (pipeline, GST_STATE_NULL);
  gst_object_unref (pipeline);
  return 0;
}

建立一個Makefile檔案,內容如下(具體含義可以參考C++基礎整理 中的Linux 下的 C++ 環境)

APP:= basic-tutorial-1

TARGET_DEVICE = $(shell gcc -dumpmachine | cut -f1 -d -)

NVDS_VERSION:=5.0

LIB_INSTALL_DIR?=/opt/nvidia/deepstream/deepstream-$(NVDS_VERSION)/lib/
APP_INSTALL_DIR?=/opt/nvidia/deepstream/deepstream-$(NVDS_VERSION)/bin/

ifeq ($(TARGET_DEVICE),aarch64)
  CFLAGS:= -DPLATFORM_TEGRA
endif

SRCS:= $(wildcard *.c)

INCS:= $(wildcard *.h)

PKGS:= gstreamer-1.0

OBJS:= $(SRCS:.c=.o)

CFLAGS+= -I../../../includes

CFLAGS+= `pkg-config --cflags $(PKGS)`

LIBS:= `pkg-config --libs $(PKGS)`

LIBS+= -L$(LIB_INSTALL_DIR) -lnvdsgst_meta -lnvds_meta \
       -Wl,-rpath,$(LIB_INSTALL_DIR)

all: $(APP)

%.o: %.c $(INCS) Makefile
	$(CC) -c -o $@ $(CFLAGS) $<

$(APP): $(OBJS) Makefile
	$(CC) -o $(APP) $(OBJS) $(LIBS)

install: $(APP)
	cp -rv $(APP) $(APP_INSTALL_DIR)

clean:
	rm -rf $(OBJS) $(APP)

執行命令

make

編譯後就有了可執行程式basic-tutorial-1。

動態管道

在GStreamer裡面有這樣的介面——pad(GstPad)。pad分為sink pad:資料從pad進入一個element;source pad:資料從pad流出element。source element僅包含source pad,sink element僅包含sink pad,filter element兩種pad都包含。

一個demuxer包含一個sink pad和多個source pad,資料從sink pad輸入,然後每個流都有一個source pad。

這是一個整體的通道流程圖,有一個source,demuxer,兩個filter,兩個sink。

在/opt/nvidia/deepstream/deepstream-5.0/sources/apps/sample_apps目錄下建立一個新的資料夾basic-tutorial-2,進入該資料夾,建立一個basic-tutorial-2.c檔案,內容如下(下面的程式碼跟上面的流程圖沒有關係)

#include <gst/gst.h>
 
/* Structure to contain all our information, so we can pass it to callbacks */
/* 建立一個我們需要的所有element的結構體
typedef struct _CustomData {
  GstElement *pipeline;
  GstElement *source;
  GstElement *convert;
  GstElement *resample;
  GstElement *sink;
} CustomData;
 
/* Handler for the pad-added signal */
/* 定義回撥函式 */
static void pad_added_handler (GstElement *src, GstPad *pad, CustomData *data);
 
int main(int argc, char *argv[]) {
  CustomData data;    //結構體資料
  GstBus *bus;        //匯流排
  GstMessage *msg;    //匯流排訊息
  GstStateChangeReturn ret; //播放狀態
  gboolean terminate = FALSE; //是否終止
 
  /* Initialize GStreamer */
  gst_init (&argc, &argv); //GStreamer初始化
 
  /* Create the elements */
  /* 建立elements */
  /* uridecodebin自己會在內部初始化必要的element,然後把一個URI變成一個原始的音視訊流輸出,差不多做了playbin2一半的工作,自帶demuxer,source pad沒有初始化 */
  data.source = gst_element_factory_make ("uridecodebin", "source");
  /* audioconvert在不同的音訊格式轉換時很有用,具有平臺無關性 */
  data.convert = gst_element_factory_make ("audioconvert", "convert");
  /* audioresample音訊重取樣 */
  data.resample = gst_element_factory_make ("audioresample", "resample");
  /* autoaudiosink自動選擇音訊裝置並輸出 */
  data.sink = gst_element_factory_make ("autoaudiosink", "sink");
 
  /* Create the empty pipeline */
  /* 建立一個空的管道 */
  data.pipeline = gst_pipeline_new ("test-pipeline");
 
  if (!data.pipeline || !data.source || !data.convert || !data.resample || !data.sink) {
    g_printerr ("Not all elements could be created.\n");
    return -1;
  }
 
  /* Build the pipeline. Note that we are NOT linking the source at this
   * point. We will do it later. */
  /* 一個pipeline就是一個特定型別的可以包含其他element的bin,可以用在bin上的方法也都可以用在pipeline上,
     gst_bin_add_many()方法在pipeline中加入element.增加單個element的方法是gst_bin_add() */
  gst_bin_add_many (GST_BIN (data.pipeline), data.source, data.convert, data.resample, data.sink, NULL);
  /* 這些剛增加的elements還沒有互相連線起來,這裡沒有把source連線起來,因為source還沒有source pad,
     只有在同一個bin裡面的element才能連線起來,所以一定要把element在連線之前加入到pipeline中 */
  if (!gst_element_link_many (data.convert, data.resample, data.sink, NULL)) {
    g_printerr ("Elements could not be linked.\n");
    gst_object_unref (data.pipeline);
    return -1;
  }
 
  /* Set the URI to play */
  /* 給source設定為通過屬性播放 */
  g_object_set (data.source, "uri", "http://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm", NULL);
 
  /* Connect to the pad-added signal */
  /* GSignal是GStreamer的一個重要部分。它會讓你在你感興趣的事情發生時收到通知。訊號是通過名字來區分的,每個GObject都有它自己的訊號。
     在這段程式碼裡面,我們使用g_signal_connect()方法把“pad-added”訊號和我們的源(uridecodebin)聯絡了起來,並且註冊了一個回撥函式。GStreamer把&data這個指標的內容傳給回撥函式,
     這樣CustomData這個資料結構中的資料也就傳遞了過去。 
     當source element最後獲得足夠的資料時,它就自動生成source pad,並且觸發"pad-added"訊號,這樣回撥函式就會被呼叫 */
  g_signal_connect (data.source, "pad-added", G_CALLBACK (pad_added_handler), &data);
 
  /* Start playing */
  /* 開始播放 */
  ret = gst_element_set_state (data.pipeline, GST_STATE_PLAYING);
  if (ret == GST_STATE_CHANGE_FAILURE) {
    g_printerr ("Unable to set the pipeline to the playing state.\n");
    gst_object_unref (data.pipeline);
    return -1;
  }
 
  /* Listen to the bus */
  /* 獲取管道的匯流排 */
  bus = gst_element_get_bus (data.pipeline);
  do {
    /* 獲取匯流排訊息,阻塞狀態 */
    msg = gst_bus_timed_pop_filtered (bus, GST_CLOCK_TIME_NONE,
        GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_ERROR | GST_MESSAGE_EOS);
 
    /* Parse message */
    if (msg != NULL) {
      GError *err;
      gchar *debug_info;
 
      switch (GST_MESSAGE_TYPE (msg)) {
        case GST_MESSAGE_ERROR:
          gst_message_parse_error (msg, &err, &debug_info);
          g_printerr ("Error received from element %s: %s\n", GST_OBJECT_NAME (msg->src), err->message);
          g_printerr ("Debugging information: %s\n", debug_info ? debug_info : "none");
          g_clear_error (&err);
          g_free (debug_info);
          terminate = TRUE;
          break;
        case GST_MESSAGE_EOS:
          g_print ("End-Of-Stream reached.\n");
          terminate = TRUE;
          break;
        case GST_MESSAGE_STATE_CHANGED:
          /* We are only interested in state-changed messages from the pipeline */
          if (GST_MESSAGE_SRC (msg) == GST_OBJECT (data.pipeline)) {
            GstState old_state, new_state, pending_state;
            gst_message_parse_state_changed (msg, &old_state, &new_state, &pending_state);
            g_print ("Pipeline state changed from %s to %s:\n",
                gst_element_state_get_name (old_state), gst_element_state_get_name (new_state));
          }
          break;
        default:
          /* We should not reach here */
          g_printerr ("Unexpected message received.\n");
          break;
      }
      gst_message_unref (msg);
    }
  } while (!terminate);
 
  /* Free resources */
  /* 釋放資源 */
  gst_object_unref (bus);
  gst_element_set_state (data.pipeline, GST_STATE_NULL);
  gst_object_unref (data.pipeline);
  return 0;
}
 
/* This function will be called by the pad-added signal */
/* 回撥函式
   src:觸發訊號的element,這裡為uridecodebin
   new_pad:加到src上的pad
   data:隨訊號過來的引數
 */
static void pad_added_handler (GstElement *src, GstPad *new_pad, CustomData *data) {
  /* 獲取audioconvert的輸入pad */
  GstPad *sink_pad = gst_element_get_static_pad (data->convert, "sink");
  GstPadLinkReturn ret;
  GstCaps *new_pad_caps = NULL;
  GstStructure *new_pad_struct = NULL;
  const gchar *new_pad_type = NULL;
 
  g_print ("Received new pad '%s' from '%s':\n", GST_PAD_NAME (new_pad), GST_ELEMENT_NAME (src));
 
  /* If our converter is already linked, we have nothing to do here */
  /* 檢查sink_pad是否已經連線source pad */
  if (gst_pad_is_linked (sink_pad)) {
    g_print ("We are already linked. Ignoring.\n");
    goto exit;
  }
 
  /* Check the new pad's type */
  new_pad_caps = gst_pad_get_current_caps (new_pad);
  new_pad_struct = gst_caps_get_structure (new_pad_caps, 0);
  new_pad_type = gst_structure_get_name (new_pad_struct);
  if (!g_str_has_prefix (new_pad_type, "audio/x-raw")) {
    g_print ("It has type '%s' which is not raw audio. Ignoring.\n", new_pad_type);
    goto exit;
  }
 
  /* Attempt the link */
  /* 連線source pad和audioconvert的sink_pad */
  ret = gst_pad_link (new_pad, sink_pad);
  if (GST_PAD_LINK_FAILED (ret)) {
    g_print ("Type is '%s' but link failed.\n", new_pad_type);
  } else {
    g_print ("Link succeeded (type '%s').\n", new_pad_type);
  }
 
exit:
  /* Unreference the new pad's caps, if we got them */
  if (new_pad_caps != NULL)
    gst_caps_unref (new_pad_caps);
 
  /* Unreference the sink pad */
  gst_object_unref (sink_pad);
}

同樣建立一個Makefile檔案,將APP:= basic-tutorial-1改成APP:= basic-tutorial-2即可。

這裡管道pipeline除了PLAYING狀態外還有三種狀態

狀態 描述
NULL element的NULL狀態或初始狀態
READY element已準備好,進入''暫停"狀態
PAUSED element已暫停,可以接受和處理資料了。但是,sink element僅接受一個緩衝區,然後阻塞。
PLAYING element正在播放,時鐘在執行,資料在流動。

DeepStream

deepstream-app原始碼分析

如果我們要修改和重新編譯deepstream-app的話,需要新增依賴

apt-get install libgstreamer-plugins-base1.0-dev libgstreamer1.0-dev \
   libgstrtspserver-1.0-dev libx11-dev libjson-glib-dev

在/opt/nvidia/deepstream/deepstream-5.0/sources/apps/sample_apps/deepstream-app目錄中有著我們呼叫的主要程式deepstream-app的原始碼,其中有4個程式碼檔案——deepstream_app.c,deepstream_app_config_parser.c,deepstream_app.h,deepstream_app_main.c。

  1. deepstream_app.h是deepstream_app.c的標頭檔案,它有著對deepstream_app.c中的函式定義。
  2. deepstream_app.c 是deepstream的操作如呼叫TensorRT等。pipeline的一些操作在這裡,pipeline的建立
  3. deepstream_app_config_parser.c 是對配置檔案的解析。
  4. deepstream_app_main.c為deepstream主函式

我們先來看一下deepstream_app_main.c中的主函式

int
main (int argc, char *argv[])
{
  GOptionContext *ctx = NULL;  //命令列引數解析器,如-c ....
  GOptionGroup *group = NULL;  //命令列後的引數組,可以包含很多引數
  GError *error = NULL;
  guint i;
  /* 新建一個新的命令列引數解析器 */
  ctx = g_option_context_new ("Nvidia DeepStream Demo");
  /* 新建一個引數組 */
  group = g_option_group_new ("abc", NULL, NULL, NULL, NULL);
  /* 往這個引數組中新增條目 */
  g_option_group_add_entries (group, entries);
  /* 將該引數組設定給解析器 */
  g_option_context_set_main_group (ctx, group);
  /* 給解析器新增引數組 */
  g_option_context_add_group (ctx, gst_init_get_option_group ());
  /* 新增除錯 */
  GST_DEBUG_CATEGORY_INIT (NVDS_APP, "NVDS_APP", 0, NULL);
  /* 解析main函式引數如果報錯,直接退出 */
  if (!g_option_context_parse (ctx, &argc, &argv, &error)) {
    NVGSTDS_ERR_MSG_V ("%s", error->message);
    return -1;
  }
  /* 列印版本號 */
  if (print_version) {
    g_print ("deepstream-app version %d.%d.%d\n",
        NVDS_APP_VERSION_MAJOR, NVDS_APP_VERSION_MINOR, NVDS_APP_VERSION_MICRO);
    nvds_version_print ();
    return 0;
  }
  /* 列印依賴版本號 */
  if (print_dependencies_version) {
    g_print ("deepstream-app version %d.%d.%d\n",
        NVDS_APP_VERSION_MAJOR, NVDS_APP_VERSION_MINOR, NVDS_APP_VERSION_MICRO);
    nvds_version_print ();
    nvds_dependencies_version_print ();
    return 0;
  }
  /* static gchar **cfg_files = NULL; 
     cfg_files指的就是類似於source1_csi_dec_infer_yolov5.txt的配置檔案
     cfg_files是一個數組,指多個檔案 */
  if (cfg_files) {
    num_instances = g_strv_length (cfg_files); //獲取檔案數量
  }
  if (input_files) {
    num_input_files = g_strv_length (input_files);
  }
  /* 如果檔案數量為0,直接退出 */
  if (!cfg_files || num_instances == 0) {
    NVGSTDS_ERR_MSG_V ("Specify config file with -c option");
    return_value = -1;
    goto done;
  }

  for (i = 0; i < num_instances; i++) {
    /* AppCtx *appCtx[MAX_INSTANCES]; */
    appCtx[i] = g_malloc0 (sizeof (AppCtx));
    appCtx[i]->person_class_id = -1;
    appCtx[i]->car_class_id = -1;
    appCtx[i]->index = i;
    appCtx[i]->active_source_index = -1;
    if (show_bbox_text) {
      appCtx[i]->show_bbox_text = TRUE;
    }

    if (input_files && input_files[i]) {
      appCtx[i]->config.multi_source_config[0].uri =
          g_strdup_printf ("file://%s", input_files[i]);
      g_free (input_files[i]);
    }
    /* 對配置檔案進行解析,並將解析結果放入appCtx的config屬性中,這個config是一個NvDsConfig物件 */
    if (!parse_config_file (&appCtx[i]->config, cfg_files[i])) {
      NVGSTDS_ERR_MSG_V ("Failed to parse config file '%s'", cfg_files[i]);
      appCtx[i]->return_value = -1;
      goto done;
    }
  }

  for (i = 0; i < num_instances; i++) {
    /* 根據檔案解析結果建立gstreamer管道 */
    if (!create_pipeline (appCtx[i], NULL,
            all_bbox_generated, perf_cb, overlay_graphics)) {
      NVGSTDS_ERR_MSG_V ("Failed to create pipeline");
      return_value = -1;
      goto done;
    }
  }
  /* 建立一個GMainLoop物件 */
  main_loop = g_main_loop_new (NULL, FALSE);

  _intr_setup ();
  /* 每400毫秒執行一次check_for_interrupt函式 */
  g_timeout_add (400, check_for_interrupt, NULL);

  /* 初始化一個互斥鎖 */
  g_mutex_init (&disp_lock);
  /* 顯示相關 */
  display = XOpenDisplay (NULL);
  for (i = 0; i < num_instances; i++) {
    guint j;
    /* 給管道設定暫停狀態 */
    if (gst_element_set_state (appCtx[i]->pipeline.pipeline,
            GST_STATE_PAUSED) == GST_STATE_CHANGE_FAILURE) {
      NVGSTDS_ERR_MSG_V ("Failed to set pipeline to PAUSED");
      return_value = -1;
      goto done;
    }
    /* 如果還無法顯示則繼續暫停 */
    if (!appCtx[i]->config.tiled_display_config.enable)
      continue;
    /* 處理所有的輸出Sink */
    for (j = 0; j < appCtx[i]->config.num_sink_sub_bins; j++) {
      XTextProperty xproperty;
      gchar *title;
      guint width, height;
      XSizeHints hints = {0};

      if (!GST_IS_VIDEO_OVERLAY (appCtx[i]->pipeline.instance_bins[0].
              sink_bin.sub_bins[j].sink)) {
        continue;
      }

      if (!display) {
        NVGSTDS_ERR_MSG_V ("Could not open X Display");
        return_value = -1;
        goto done;
      }

      if (appCtx[i]->config.sink_bin_sub_bin_config[j].render_config.width)
        width =
            appCtx[i]->config.sink_bin_sub_bin_config[j].render_config.width;
      else
        width = appCtx[i]->config.tiled_display_config.width;

      if (appCtx[i]->config.sink_bin_sub_bin_config[j].render_config.height)
        height =
            appCtx[i]->config.sink_bin_sub_bin_config[j].render_config.height;
      else
        height = appCtx[i]->config.tiled_display_config.height;

      width = (width) ? width : DEFAULT_X_WINDOW_WIDTH;
      height = (height) ? height : DEFAULT_X_WINDOW_HEIGHT;

      hints.flags = PPosition | PSize;
      hints.x = appCtx[i]->config.sink_bin_sub_bin_config[j].render_config.offset_x;
      hints.y = appCtx[i]->config.sink_bin_sub_bin_config[j].render_config.offset_y;
      hints.width = width;
      hints.height = height;

      windows[i] =
          XCreateSimpleWindow (display, RootWindow (display,
              DefaultScreen (display)), hints.x, hints.y, width, height, 2,
              0x00000000, 0x00000000);

      XSetNormalHints(display, windows[i], &hints);

      if (num_instances > 1)
        title = g_strdup_printf (APP_TITLE "-%d", i);
      else
        title = g_strdup (APP_TITLE);
      if (XStringListToTextProperty ((char **) &title, 1, &xproperty) != 0) {
        XSetWMName (display, windows[i], &xproperty);
        XFree (xproperty.value);
      }

      XSetWindowAttributes attr = { 0 };
      if ((appCtx[i]->config.tiled_display_config.enable &&
              appCtx[i]->config.tiled_display_config.rows *
              appCtx[i]->config.tiled_display_config.columns == 1) ||
          (appCtx[i]->config.tiled_display_config.enable == 0 &&
              appCtx[i]->config.num_source_sub_bins == 1)) {
        attr.event_mask = KeyPress;
      } else {
        attr.event_mask = ButtonPress | KeyRelease;
      }
      XChangeWindowAttributes (display, windows[i], CWEventMask, &attr);

      Atom wmDeleteMessage = XInternAtom (display, "WM_DELETE_WINDOW", False);
      if (wmDeleteMessage != None) {
        XSetWMProtocols (display, windows[i], &wmDeleteMessage, 1);
      }
      XMapRaised (display, windows[i]);
      XSync (display, 1);       //discard the events for now
      gst_video_overlay_set_window_handle (GST_VIDEO_OVERLAY (appCtx
              [i]->pipeline.instance_bins[0].sink_bin.sub_bins[j].sink),
          (gulong) windows[i]);
      gst_video_overlay_expose (GST_VIDEO_OVERLAY (appCtx[i]->
              pipeline.instance_bins[0].sink_bin.sub_bins[j].sink));
      if (!x_event_thread)
        x_event_thread = g_thread_new ("nvds-window-event-thread",
            nvds_x_event_thread, NULL);
    }
  }

  /* Dont try to set playing state if error is observed */
  if (return_value != -1) {
    for (i = 0; i < num_instances; i++) {
      if (gst_element_set_state (appCtx[i]->pipeline.pipeline,
              GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {

        g_print ("\ncan't set pipeline to playing state.\n");
        return_value = -1;
        goto done;
      }
    }
  }

  print_runtime_commands ();

  changemode (1);

  g_timeout_add (40, event_thread_func, NULL);
  g_main_loop_run (main_loop);

  changemode (0);

done:

  g_print ("Quitting\n");
  for (i = 0; i < num_instances; i++) {
    if (appCtx[i]->return_value == -1)
      return_value = -1;
    destroy_pipeline (appCtx[i]);

    g_mutex_lock (&disp_lock);
    if (windows[i])
      XDestroyWindow (display, windows[i]);
    windows[i] = 0;
    g_mutex_unlock (&disp_lock);

    g_free (appCtx[i]);
  }

  g_mutex_lock (&disp_lock);
  if (display)
    XCloseDisplay (display);
  display = NULL;
  g_mutex_unlock (&disp_lock);
  g_mutex_clear (&disp_lock);

  if (main_loop) {
    g_main_loop_unref (main_loop);
  }

  if (ctx) {
    g_option_context_free (ctx);
  }

  if (return_value == 0) {
    g_print ("App run successful\n");
  } else {
    g_print ("App run failed\n");
  }

  gst_deinit ();

  return return_value;
}

可接受的所有的命令列引數

GOptionEntry entries[] = {
  {"version", 'v', 0, G_OPTION_ARG_NONE, &print_version,
      "Print DeepStreamSDK version", NULL}
  ,
  {"tiledtext", 't', 0, G_OPTION_ARG_NONE, &show_bbox_text,
      "Display Bounding box labels in tiled mode", NULL}
  ,
  {"version-all", 0, 0, G_OPTION_ARG_NONE, &print_dependencies_version,
      "Print DeepStreamSDK and dependencies version", NULL}
  ,
  {"cfg-file", 'c', 0, G_OPTION_ARG_FILENAME_ARRAY, &cfg_files,
      "Set the config file", NULL}
  ,
  {"input-file", 'i', 0, G_OPTION_ARG_FILENAME_ARRAY, &input_files,
      "Set the input file", NULL}
  ,
  {NULL}
  ,
};

第二個字母代表引數型別,如果用第一個單詞使用--連線,如果使用第二個字母,使用-連線,如

/deepstream-app$deepstream-app --version
deepstream-app version 5.0.0
DeepStreamSDK 5.0.0
/deepstream-app$deepstream-app -v
deepstream-app version 5.0.0
DeepStreamSDK 5.0.0

AppCtx是一個結構體,定義在deepstream_app.h的標頭檔案中,結構如下

typedef struct _AppCtx AppCtx;
struct _AppCtx
{
  gboolean version;
  gboolean cintr;
  gboolean show_bbox_text;
  gboolean seeking;
  gboolean quit;
  gint person_class_id;
  gint car_class_id;
  gint return_value;
  guint index;
  gint active_source_index;

  GMutex app_lock;
  GCond app_cond;

  NvDsPipeline pipeline;
  NvDsConfig config;
  NvDsConfig override_config;
  NvDsInstanceData instance_data[MAX_SOURCE_BINS];
  NvDsC2DContext *c2d_ctx[MAX_MESSAGE_CONSUMERS];
  NvDsAppPerfStructInt perf_struct;
  bbox_generated_callback bbox_generated_post_analytics_cb;
  bbox_generated_callback all_bbox_generated_cb;
  overlay_graphics_callback overlay_graphics_cb;
  NvDsFrameLatencyInfo *latency_info;
  GMutex latency_lock;
  GThread *ota_handler_thread;
  guint ota_inotify_fd;
  guint ota_watch_desc;
};

現在來看一下parse_config_file解析配置檔案這個方法,該方法位於deepstream_app_config_parser.c 中

gboolean
parse_config_file (NvDsConfig *config, gchar *cfg_file_path)
{
  GKeyFile *cfg_file = g_key_file_new ();  //建立一個配置檔案緩衝區
  GError *error = NULL;
  gboolean ret = FALSE;
  gchar **groups = NULL;
  gchar **group;
  guint i, j;

  config->source_list_enabled = FALSE;

  if (!APP_CFG_PARSER_CAT) {
    GST_DEBUG_CATEGORY_INIT (APP_CFG_PARSER_CAT, "NVDS_CFG_PARSER", 0, NULL);
  }
  /* 將實體配置檔案載入到配置檔案緩衝區cfg_file */
  if (!g_key_file_load_from_file (cfg_file, cfg_file_path, G_KEY_FILE_NONE,
          &error)) {
    GST_CAT_ERROR (APP_CFG_PARSER_CAT, "Failed to load uri file: %s",
        error->message);
    goto done;
  }
  /* 判斷源分組列表是否存在 */
  if (g_key_file_has_group (cfg_file, CONFIG_GROUP_SOURCE_LIST)) {
    if (!parse_source_list (config, cfg_file, cfg_file_path)) {
      GST_CAT_ERROR (APP_CFG_PARSER_CAT, "Failed to parse '%s' group",
          CONFIG_GROUP_SOURCE_LIST);
      goto done;
    }
    /* 提取源的數量 */
    config->num_source_sub_bins = config->total_num_sources;
    config->source_list_enabled = TRUE;
    /* 判斷源多分組是否存在,不存在報錯 */
    if (!g_key_file_has_group (cfg_file, CONFIG_GROUP_SOURCE_ALL)) {
      NVGSTDS_ERR_MSG_V ("[source-attr-all] group not present.");
      ret = FALSE;
      goto done;
    }
    g_key_file_remove_group (cfg_file, CONFIG_GROUP_SOURCE_LIST, &error);
  }
  /* 如果源多分組存在 */
  if (g_key_file_has_group (cfg_file, CONFIG_GROUP_SOURCE_ALL)) {
    /* 解析源多分組
       global_source_config是一個NvDsSourceConfig物件 */
    if (!parse_source (&global_source_config,
            cfg_file, CONFIG_GROUP_SOURCE_ALL, cfg_file_path)) {
      GST_CAT_ERROR (APP_CFG_PARSER_CAT, "Failed to parse '%s' group",
          CONFIG_GROUP_SOURCE_LIST);
      goto done;
    }
    /* 將解析結果輸送給appCtx的config屬性中 */
    if (!set_source_all_configs (config, cfg_file_path)) {
      ret = FALSE;
      goto done;
    }
    /* 解析完刪除緩衝區中的源多分組 */
    g_key_file_remove_group (cfg_file, CONFIG_GROUP_SOURCE_ALL, &error);
  }
  /* 讀取緩衝區中的分組 */
  groups = g_key_file_get_groups (cfg_file, NULL);
  /* 遍歷所有的分組 */
  for (group = groups; *group; group++) {
    gboolean parse_err = FALSE;
    GST_CAT_DEBUG (APP_CFG_PARSER_CAT, "Parsing group: %s", *group);
    /* 判斷是否是application分組 */
    if (!g_strcmp0 (*group, CONFIG_GROUP_APP)) {
      /* 解析application分組,將解析結果放入appCtx的config屬性中 */
      parse_err = !parse_app (config, cfg_file, cfg_file_path);
    }
    /* 判斷是否是source分組 */
    if (!strncmp (*group, CONFIG_GROUP_SOURCE,
            sizeof (CONFIG_GROUP_SOURCE) - 1)) {
      if (config->num_source_sub_bins == MAX_SOURCE_BINS) {
        NVGSTDS_ERR_MSG_V ("App supports max %d sources", MAX_SOURCE_BINS);
        ret = FALSE;
        goto done;
      }
      /* 獲取source分組的索引,如source0,source1 */
      gchar *source_id_start_ptr = *group + strlen (CONFIG_GROUP_SOURCE);
      gchar *source_id_end_ptr = NULL;
      guint index =
          g_ascii_strtoull (source_id_start_ptr, &source_id_end_ptr, 10);
      if (source_id_start_ptr == source_id_end_ptr
          || *source_id_end_ptr != '\0') {
        NVGSTDS_ERR_MSG_V
            ("Source group \"[%s]\" is not in the form \"[source<%%d>]\"",
            *group);
        ret = FALSE;
        goto done;
      }
      guint source_id = 0;
      if (config->source_list_enabled) {
        if (index >= config->total_num_sources) {
          NVGSTDS_ERR_MSG_V
              ("Invalid source group index %d, index cannot exceed %d", index,
              config->total_num_sources);
          ret = FALSE;
          goto done;
        }
        source_id = index;
        NVGSTDS_INFO_MSG_V ("Some parameters to be overwritten for group [%s]",
            *group);
      } else {
        source_id = config->num_source_sub_bins;
      }
      /* 解析source分組,將解析結果放入appCtx的config屬性中 */
      parse_err = !parse_source (&config->multi_source_config[source_id],
          cfg_file, *group, cfg_file_path);
      if (config->source_list_enabled
          && config->multi_source_config[source_id].type ==
          NV_DS_SOURCE_URI_MULTIPLE) {
        NVGSTDS_ERR_MSG_V
            ("MultiURI support not available if [source-list] is provided");
        ret = FALSE;
        goto done;
      }
      if (config->multi_source_config[source_id].enable
          && !config->source_list_enabled) {
        config->num_source_sub_bins++;
      }
    }
    /* 判斷是否是streammux(混流)分組 */
    if (!g_strcmp0 (*group, CONFIG_GROUP_STREAMMUX)) {
      /* 解析streammux分組,將解析結果放入appCtx的config.streammux_config屬性中 */
      parse_err = !parse_streammux (&config->streammux_config, cfg_file, cfg_file_path);
    }
    /* 判斷是否是osd(每一幀上顯示的文字和矩形框)分組 */
    if (!g_strcmp0 (*group, CONFIG_GROUP_OSD)) {
      /* 解析osd分組,將解析結果放入appCtx的config.osd_config屬性中 */
      parse_err = !parse_osd (&config->osd_config, cfg_file);
    }
    /* 判斷是否是primary-gie(全域性設定一級gie)分組 */
    if (!g_strcmp0 (*group, CONFIG_GROUP_PRIMARY_GIE)) {
      /* 解析primary-gie分組,將解析結果放入appCtx的config->primary_gie_config屬性中 */
      parse_err =
          !parse_gie (&config->primary_gie_config, cfg_file,
          CONFIG_GROUP_PRIMARY_GIE, cfg_file_path);
    }
    /* 判斷是否是tracker(目標跟蹤)分組 */
    if (!g_strcmp0 (*group, CONFIG_GROUP_TRACKER)) {
      /* 解析tracker分組,將解析結果放入appCtx的config->tracker_config屬性中 */
      parse_err = !parse_tracker (&config->tracker_config, cfg_file, cfg_file_path);
    }
    /* 判斷是否是secondary-gie(二級gie)分組 */
    if (!strncmp (*group, CONFIG_GROUP_SECONDARY_GIE,
                  sizeof (CONFIG_GROUP_SECONDARY_GIE) - 1)) {
      if (config->num_secondary_gie_sub_bins == MAX_SECONDARY_GIE_BINS) {
        NVGSTDS_ERR_MSG_V ("App supports max %d secondary GIEs", MAX_SECONDARY_GIE_BINS);
        ret = FALSE;
        goto done;
      }
      /* 解析secondary-gie分組,將解析結果放入appCtx的secondary_gie_sub_bin_config[config->
                                  num_secondary_gie_sub_bins]屬性中 */
      parse_err =
          !parse_gie (&config->secondary_gie_sub_bin_config[config->
                                  num_secondary_gie_sub_bins],
                                  cfg_file, *group, cfg_file_path);
      if (config->secondary_gie_sub_bin_config[config->num_secondary_gie_sub_bins].enable){
        config->num_secondary_gie_sub_bins++;
      }
    }
    /* 判斷是否是sink(輸出)分組 */
    if (!strncmp (*group, CONFIG_GROUP_SINK, sizeof (CONFIG_GROUP_SINK) - 1)) {
      if (config->num_sink_sub_bins == MAX_SINK_BINS) {
        NVGSTDS_ERR_MSG_V ("App supports max %d sinks", MAX_SINK_BINS);
        ret = FALSE;
        goto done;
      }
      /* 解析sink分組,將解析結果放入appCtx的config.
          sink_bin_sub_bin_config[config.num_sink_sub_bins]屬性中 */
      parse_err =
          !parse_sink (&config->
          sink_bin_sub_bin_config[config->num_sink_sub_bins], cfg_file, *group,
          cfg_file_path);
      if (config->
          sink_bin_sub_bin_config[config->num_sink_sub_bins].enable){
        config->num_sink_sub_bins++;
      }
    }
    /* 判斷是否是msg_consumer(訊息佇列)分組 */
    if (!strncmp (*group, CONFIG_GROUP_MSG_CONSUMER,
        sizeof (CONFIG_GROUP_MSG_CONSUMER) - 1)) {
      if (config->num_message_consumers == MAX_MESSAGE_CONSUMERS) {
        NVGSTDS_ERR_MSG_V ("App supports max %d consumers", MAX_MESSAGE_CONSUMERS);
        ret = FALSE;
        goto done;
      }
      /* 解析msg_consumer分組,將解析結果放入appCtx的config.message_consumer_config[config.num_message_consumers]屬性中 */
      parse_err = !parse_msgconsumer (
                    &config->message_consumer_config[config->num_message_consumers],
                    cfg_file, *group, cfg_file_path);

      if (config->message_consumer_config[config->num_message_consumers].enable) {
        config->num_message_consumers++;
      }
    }
    /* 判斷是否是tiled_display(拼接顯示)分組 */
    if (!g_strcmp0 (*group, CONFIG_GROUP_TILED_DISPLAY)) {
      /* 解析tiled_display分組,將解析結果放入appCtx的config.tiled_display_config屬性中 */
      parse_err = !parse_tiled_display (&config->tiled_display_config, cfg_file);
    }
    /* 判斷是否是img_save(圖片儲存)分組 */
    if (!g_strcmp0 (*group, CONFIG_GROUP_IMG_SAVE)) {
      /* 解析img_save分組,將解析結果放入appCtx的config.image_save_config屬性中 */
      parse_err = !parse_image_save (&config->image_save_config , cfg_file, *group, cfg_file_path);
    }
    /* 判斷是否是dsanalytics(視訊分析外掛)分組 */
    if (!g_strcmp0 (*group, CONFIG_GROUP_DSANALYTICS)) {
      /* 解析dsanalytics分組,將解析結果放入appCtx的config.dsanalytics_config屬性中 */
      parse_err = !parse_dsanalytics (&config->dsanalytics_config, cfg_file, cfg_file_path);
    }
    /* 判斷是否是dsexample(外掛具體功能實現)分組 */
    if (!g_strcmp0 (*group, CONFIG_GROUP_DSEXAMPLE)) {
      /* 解析dsexample分組,將解析結果放入appCtx的config.dsexample_config屬性中 */
      parse_err = !parse_dsexample (&config->dsexample_config, cfg_file);
    }
    /* 判斷是否是msg_converter(訊息轉化)分組 */
    if (!g_strcmp0 (*group, CONFIG_GROUP_MSG_CONVERTER)) {
      /* 解析msg_converter分組,將解析結果放入appCtx的config.msg_conv_config屬性中 */
      parse_err = !parse_msgconv (&config->msg_conv_config, cfg_file, *group, cfg_file_path);
    }
    /* 判斷是否是tests(測試)分組 */
    if (!g_strcmp0 (*group, CONFIG_GROUP_TESTS)) {
      /* 解析tests分組,將解析結果放入appCtx的config屬性中 */
      parse_err = !parse_tests (config, cfg_file);
    }

    if (parse_err) {
      GST_CAT_ERROR (APP_CFG_PARSER_CAT, "Failed to parse '%s' group", *group);
      goto done;
    }
  }
  /* 以下都是某些分組的細節處理 */
  for (i = 0; i < config->num_secondary_gie_sub_bins; i++) {
    if (config->secondary_gie_sub_bin_config[i].unique_id ==
        config->primary_gie_config.unique_id) {
      NVGSTDS_ERR_MSG_V ("Non unique gie ids found");
      ret = FALSE;
      goto done;
    }
  }

  for (i = 0; i < config->num_secondary_gie_sub_bins; i++) {
    for (j = i + 1; j < config->num_secondary_gie_sub_bins; j++) {
      if (config->secondary_gie_sub_bin_config[i].unique_id ==
          config->secondary_gie_sub_bin_config[j].unique_id) {
        NVGSTDS_ERR_MSG_V ("Non unique gie id %d found",
                            config->secondary_gie_sub_bin_config[i].unique_id);
        ret = FALSE;
        goto done;
      }
    }
  }

  for (i = 0; i < config->num_source_sub_bins; i++) {
    if (config->multi_source_config[i].type == NV_DS_SOURCE_URI_MULTIPLE) {
      if (config->multi_source_config[i].num_sources < 1) {
        config->multi_source_config[i].num_sources = 1;
      }
      for (j = 1; j < config->multi_source_config[i].num_sources; j++) {
        if (config->num_source_sub_bins == MAX_SOURCE_BINS) {
          NVGSTDS_ERR_MSG_V ("App supports max %d sources", MAX_SOURCE_BINS);
          ret = FALSE;
          goto done;
        }
        memcpy (&config->multi_source_config[config->num_source_sub_bins],
            &config->multi_source_config[i],
            sizeof (config->multi_source_config[i]));
        config->multi_source_config[config->num_source_sub_bins].type =
            NV_DS_SOURCE_URI;
        config->multi_source_config[config->num_source_sub_bins].uri =
            g_strdup_printf (config->multi_source_config[config->
                num_source_sub_bins].uri, j);
        config->num_source_sub_bins++;
      }
      config->multi_source_config[i].type = NV_DS_SOURCE_URI;
      config->multi_source_config[i].uri =
          g_strdup_printf (config->multi_source_config[i].uri, 0);
    }
  }
  ret = TRUE;
/* 釋放資源 */
done:
  if (cfg_file) {
    g_key_file_free (cfg_file);
  }

  if (groups) {
    g_strfreev (groups);
  }

  if (error) {
    g_error_free (error);
  }
  if (!ret) {
    NVGSTDS_ERR_MSG_V ("%s failed", __func__);
  }
  return ret;
}

然後看一下create_pipeline建立管道這個方法,該方法位於deepstream_app.c中

gboolean
create_pipeline (AppCtx * appCtx,
    bbox_generated_callback bbox_generated_post_analytics_cb,
    bbox_generated_callback all_bbox_generated_cb, perf_callback perf_cb,
    overlay_graphics_callback overlay_graphics_cb)
{
  gboolean ret = FALSE;
  NvDsPipeline *pipeline = &appCtx->pipeline; //獲取管道物件地址
  NvDsConfig *config = &appCtx->config; //獲取配置內容物件地址
  GstBus *bus;  //匯流排
  /* 建立3個element */
  GstElement *last_elem;
  GstElement *tmp_elem1;
  GstElement *tmp_elem2;
  guint i;
  GstPad *fps_pad;
  gulong latency_probe_id;

  _dsmeta_quark = g_quark_from_static_string (NVDS_META_STRING);

  appCtx->all_bbox_generated_cb = all_bbox_generated_cb;
  appCtx->bbox_generated_post_analytics_cb = bbox_generated_post_analytics_cb;
  appCtx->overlay_graphics_cb = overlay_graphics_cb;

  if (config->osd_config.num_out_buffers < 8) {
    config->osd_config.num_out_buffers = 8;
  }

  pipeline->pipeline = gst_pipeline_new ("pipeline");
  if (!pipeline->pipeline) {
    NVGSTDS_ERR_MSG_V ("Failed to create pipeline");
    goto done;
  }

  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline->pipeline));
  pipeline->bus_id = gst_bus_add_watch (bus, bus_callback, appCtx);
  gst_object_unref (bus);

  if (config->file_loop) {
    /* Let each source bin know it needs to loop. */
    guint i;
    for (i = 0; i < config->num_source_sub_bins; i++)
        config->multi_source_config[i].loop = TRUE;
  }

  for (guint i = 0; i < config->num_sink_sub_bins; i++) {
    NvDsSinkSubBinConfig *sink_config = &config->sink_bin_sub_bin_config[i];
    switch (sink_config->type) {
      case NV_DS_SINK_FAKE:
      case NV_DS_SINK_RENDER_EGL:
      case NV_DS_SINK_RENDER_OVERLAY:
        /* Set the "qos" property of sink, if not explicitly specified in the
           config. */
        if (!sink_config->render_config.qos_value_specified) {
          /* QoS events should be generated by sink always in case of live sources
             or with synchronous playback for non-live sources. */
          if (config->streammux_config.live_source || sink_config->render_config.sync) {
            sink_config->render_config.qos = TRUE;
          } else {
            sink_config->render_config.qos = FALSE;
          }
        }
      default:
        break;
    }
  }
  /*
   * Add muxer and < N > source components to the pipeline based
   * on the settings in configuration file.
   */
  if (!create_multi_source_bin (config->num_source_sub_bins,
          config->multi_source_config, &pipeline->multi_src_bin))
    goto done;
  gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->multi_src_bin.bin);


  if (config->streammux_config.is_parsed)
    set_streammux_properties (&config->streammux_config,
        pipeline->multi_src_bin.streammux);

  if(appCtx->latency_info == NULL)
  {
    appCtx->latency_info = (NvDsFrameLatencyInfo *)
      calloc(1, config->streammux_config.batch_size *
          sizeof(NvDsFrameLatencyInfo));
  }

  /** a tee after the tiler which shall be connected to sink(s) */
  pipeline->tiler_tee = gst_element_factory_make (NVDS_ELEM_TEE, "tiler_tee");
  if (!pipeline->tiler_tee) {
    NVGSTDS_ERR_MSG_V ("Failed to create element 'tiler_tee'");
    goto done;
  }
  gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->tiler_tee);

  /** Tiler + Demux in Parallel Use-Case */
  if (config->tiled_display_config.enable == NV_DS_TILED_DISPLAY_ENABLE_WITH_PARALLEL_DEMUX)
  {
    pipeline->demuxer =
        gst_element_factory_make (NVDS_ELEM_STREAM_DEMUX, "demuxer");
    if (!pipeline->demuxer) {
      NVGSTDS_ERR_MSG_V ("Failed to create element 'demuxer'");
      goto done;
    }
    gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->demuxer);

    /** NOTE:
     * demux output is supported for only one source
     * If multiple [sink] groups are configured with
     * link_to_demux=1, only the first [sink]
     * shall be constructed for all occurences of
     * [sink] groups with link_to_demux=1
     */
    {
      gchar pad_name[16];
      GstPad *demux_src_pad;

      i = 0;
      if (!create_demux_pipeline (appCtx, i)) {
        goto done;
      }

      for (i=0; i < config->num_sink_sub_bins; i++)
      {
        if (config->sink_bin_sub_bin_config[i].link_to_demux == TRUE)
        {
          g_snprintf (pad_name, 16, "src_%02d", config->sink_bin_sub_bin_config[i].source_id);
          break;
        }
      }

      if (i >= config->num_sink_sub_bins)
      {
        g_print ("\n\nError : sink for demux (use link-to-demux-only property) is not provided in the config file\n\n");
        goto done;
      }

      i = 0;

      gst_bin_add (GST_BIN (pipeline->pipeline),
          pipeline->demux_instance_bins[i].bin);

      demux_src_pad = gst_element_get_request_pad (pipeline->demuxer, pad_name);
      NVGSTDS_LINK_ELEMENT_FULL (pipeline->demuxer, pad_name,
          pipeline->demux_instance_bins[i].bin, "sink");
      gst_object_unref (demux_src_pad);

      NVGSTDS_ELEM_ADD_PROBE(latency_probe_id,
          appCtx->pipeline.demux_instance_bins[i].demux_sink_bin.bin,
          "sink",
          demux_latency_measurement_buf_prob, GST_PAD_PROBE_TYPE_BUFFER,
          appCtx);
      latency_probe_id = latency_probe_id;
    }

    last_elem = pipeline->demuxer;
    link_element_to_tee_src_pad (pipeline->tiler_tee, last_elem);
    last_elem = pipeline->tiler_tee;
  }

  if (config->tiled_display_config.enable) {

    /* Tiler will generate a single composited buffer for all sources. So need
     * to create only one processing instance. */
    if (!create_processing_instance (appCtx, 0)) {
      goto done;
    }
    // create and add tiling component to pipeline.
    if (config->tiled_display_config.columns *
        config->tiled_display_config.rows < config->num_source_sub_bins) {
      if (config->tiled_display_config.columns == 0) {
        config->tiled_display_config.columns =
            (guint) (sqrt (config->num_source_sub_bins) + 0.5);
      }
      config->tiled_display_config.rows =
          (guint) ceil (1.0 * config->num_source_sub_bins /
          config->tiled_display_config.columns);
      NVGSTDS_WARN_MSG_V
          ("Num of Tiles less than number of sources, readjusting to "
          "%u rows, %u columns", config->tiled_display_config.rows,
          config->tiled_display_config.columns);
    }

    gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->instance_bins[0].bin);
    last_elem = pipeline->instance_bins[0].bin;

    if (!create_tiled_display_bin (&config->tiled_display_config,
            &pipeline->tiled_display_bin)) {
      goto done;
    }
    gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->tiled_display_bin.bin);
    NVGSTDS_LINK_ELEMENT (pipeline->tiled_display_bin.bin, last_elem);
    last_elem = pipeline->tiled_display_bin.bin;

    link_element_to_tee_src_pad (pipeline->tiler_tee, pipeline->tiled_display_bin.bin);
    last_elem = pipeline->tiler_tee;

    NVGSTDS_ELEM_ADD_PROBE (latency_probe_id,
      pipeline->instance_bins->sink_bin.sub_bins[0].sink, "sink",
      latency_measurement_buf_prob, GST_PAD_PROBE_TYPE_BUFFER,
      appCtx);
    latency_probe_id = latency_probe_id;
  }
  else
  {
    /*
     * Create demuxer only if tiled display is disabled.
     */
    pipeline->demuxer =
        gst_element_factory_make (NVDS_ELEM_STREAM_DEMUX, "demuxer");
    if (!pipeline->demuxer) {
      NVGSTDS_ERR_MSG_V ("Failed to create element 'demuxer'");
      goto done;
    }
    gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->demuxer);

    for (i = 0; i < config->num_source_sub_bins; i++)
    {
      gchar pad_name[16];
      GstPad *demux_src_pad;

      /* Check if any sink has been configured to render/encode output for
       * source index `i`. The processing instance for that source will be
       * created only if atleast one sink has been configured as such.
       */
      if (!is_sink_available_for_source_id(config, i))
        continue;

      if (!create_processing_instance(appCtx, i))
      {
        goto done;
      }
      gst_bin_add(GST_BIN(pipeline->pipeline),
                  pipeline->instance_bins[i].bin);

      g_snprintf(pad_name, 16, "src_%02d", i);
      demux_src_pad = gst_element_get_request_pad(pipeline->demuxer, pad_name);
      NVGSTDS_LINK_ELEMENT_FULL(pipeline->demuxer, pad_name,
                                pipeline->instance_bins[i].bin, "sink");
      gst_object_unref(demux_src_pad);

      for (int k = 0; k < MAX_SINK_BINS;k++) {
        if(pipeline->instance_bins[i].sink_bin.sub_bins[k].sink){
          NVGSTDS_ELEM_ADD_PROBE(latency_probe_id,
              pipeline->instance_bins[i].sink_bin.sub_bins[k].sink, "sink",
              latency_measurement_buf_prob, GST_PAD_PROBE_TYPE_BUFFER,
              appCtx);
          break;
        }
      }

      latency_probe_id = latency_probe_id;
    }
    last_elem = pipeline->demuxer;
  }

  if (config->tiled_display_config.enable == NV_DS_TILED_DISPLAY_DISABLE) {
    fps_pad = gst_element_get_static_pad (pipeline->demuxer, "sink");
  }
  else {
    fps_pad = gst_element_get_static_pad (pipeline->tiled_display_bin.bin, "sink");
  }

  pipeline->common_elements.appCtx = appCtx;
  // Decide where in the pipeline the element should be added and add only if
  // enabled
  if (config->dsexample_config.enable) {
    // Create dsexample element bin and set properties
    if (!create_dsexample_bin (&config->dsexample_config,
            &pipeline->dsexample_bin)) {
      goto done;
    }
    // Add dsexample bin to instance bin
    gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->dsexample_bin.bin);

    // Link this bin to the last element in the bin
    NVGSTDS_LINK_ELEMENT (pipeline->dsexample_bin.bin, last_elem);

    // Set this bin as the last element
    last_elem = pipeline->dsexample_bin.bin;
  }
  // create and add common components to pipeline.
  if (!create_common_elements (config, pipeline, &tmp_elem1, &tmp_elem2,
          bbox_generated_post_analytics_cb)) {
    goto done;
  }

  if(!add_and_link_broker_sink(appCtx)) {
    goto done;
  }

  if (tmp_elem2) {
    NVGSTDS_LINK_ELEMENT (tmp_elem2, last_elem);
    last_elem = tmp_elem1;
  }

  NVGSTDS_LINK_ELEMENT (pipeline->multi_src_bin.bin, last_elem);

  // enable performance measurement and add call back function to receive
  // performance data.
  if (config->enable_perf_measurement) {
    appCtx->perf_struct.context = appCtx;
    enable_perf_measurement (&appCtx->perf_struct, fps_pad,
        pipeline->multi_src_bin.num_bins,
        config->perf_measurement_interval_sec,
        config->multi_source_config[0].dewarper_config.num_surfaces_per_frame,
        perf_cb);
  }

  latency_probe_id = latency_probe_id;

  if (config->num_message_consumers) {
    for (i = 0; i < config->num_message_consumers; i++) {
      appCtx->c2d_ctx[i] = start_cloud_to_device_messaging (
                                &config->message_consumer_config[i], NULL,
                                &appCtx->pipeline.multi_src_bin);
      if (appCtx->c2d_ctx[i] == NULL) {
        NVGSTDS_ERR_MSG_V ("Failed to create message consumer");
        goto done;
      }
    }
  }

  GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (appCtx->pipeline.pipeline),
      GST_DEBUG_GRAPH_SHOW_ALL, "ds-app-null");

  g_mutex_init (&appCtx->app_lock);
  g_cond_init (&appCtx->app_cond);
  g_mutex_init (&appCtx->latency_lock);

  ret = TRUE;
done:
  if (!ret) {
    NVGSTDS_ERR_MSG_V ("%s failed", __func__);
  }
  return ret;
}