LCOV - code coverage report
Current view: top level - mlops-agent-1.8.6/daemon - pipeline-dbus-impl.cc (source / functions) Coverage Total Hit
Test: ML-Agent 1.8.6-0 platform/core/ml/mlops-agent#8fd86a236e41cede78a27ac52f5de10227f5b4bc Lines: 0.0 % 176 0
Test Date: 2026-03-19 20:27:18 Functions: 0.0 % 16 0

            Line data    Source code
       1              : /* SPDX-License-Identifier: Apache-2.0 */
       2              : /**
       3              :  * NNStreamer API / Machine Learning Agent Daemon
       4              :  * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
       5              :  */
       6              : 
       7              : /**
       8              :  * @file      pipeline-dbus-impl.cc
       9              :  * @date      20 Jul 2022
      10              :  * @brief     Implementation of pipeline dbus interface.
      11              :  * @see       https://github.com/nnstreamer/deviceMLOps.MLAgent
      12              :  * @author    Yongjoo Ahn <yongjoo1.ahn@samsung.com>
      13              :  * @bug       No known bugs except for NYI items
      14              :  * @details   This implements the pipeline dbus interface.
      15              :  */
      16              : 
      17              : #include <glib.h>
      18              : #include <gst/gst.h>
      19              : #include <stdbool.h>
      20              : #include <stdio.h>
      21              : #include <stdlib.h>
      22              : 
      23              : #include "common.h"
      24              : #include "dbus-interface.h"
      25              : #include "gdbus-util.h"
      26              : #include "log.h"
      27              : #include "modules.h"
      28              : #include "pipeline-dbus.h"
      29              : #include "service-db-util.h"
      30              : 
      31              : static MachinelearningServicePipeline *g_gdbus_instance = NULL;
      32              : static GHashTable *pipeline_table = NULL;
      33              : G_LOCK_DEFINE_STATIC (pipeline_table_lock);
      34              : 
      35              : /**
      36              :  * @brief Structure for pipeline.
      37              :  */
      38              : typedef struct _pipeline {
      39              :   GstElement *element;
      40              :   gint64 id;
      41              :   GMutex lock;
      42              :   gchar *service_name;
      43              :   gchar *description;
      44              : } pipeline_s;
      45              : 
      46              : /**
      47              :  * @brief Internal function to destroy pipeline instances.
      48              :  */
      49              : static void
      50            0 : _pipeline_free (gpointer data)
      51              : {
      52              :   pipeline_s *p;
      53              : 
      54            0 :   if (!data) {
      55            0 :     ml_loge ("internal error, the data should not be NULL");
      56            0 :     return;
      57              :   }
      58              : 
      59            0 :   p = (pipeline_s *) data;
      60              : 
      61            0 :   if (p->element)
      62            0 :     gst_object_unref (p->element);
      63              : 
      64            0 :   g_free (p->service_name);
      65            0 :   g_free (p->description);
      66            0 :   g_mutex_clear (&p->lock);
      67              : 
      68            0 :   g_free (p);
      69              : }
      70              : 
      71              : /**
      72              :  * @brief Get the skeleton object of the DBus interface.
      73              :  */
      74              : static MachinelearningServicePipeline *
      75            0 : gdbus_get_pipeline_instance (void)
      76              : {
      77            0 :   return machinelearning_service_pipeline_skeleton_new ();
      78              : }
      79              : 
      80              : /**
      81              :  * @brief Put the obtained skeleton object and release the resource.
      82              :  */
      83              : static void
      84            0 : gdbus_put_pipeline_instance (MachinelearningServicePipeline **instance)
      85              : {
      86            0 :   g_clear_object (instance);
      87            0 : }
      88              : 
      89              : /**
      90              :  * @brief Set the service with given description. Return the call result.
      91              :  */
      92              : static gboolean
      93            0 : dbus_cb_core_set_pipeline (MachinelearningServicePipeline *obj, GDBusMethodInvocation *invoc,
      94              :     const gchar *service_name, const gchar *pipeline_desc, gpointer user_data)
      95              : {
      96            0 :   gint result = 0;
      97              : 
      98            0 :   result = svcdb_pipeline_set (service_name, pipeline_desc);
      99            0 :   machinelearning_service_pipeline_complete_set_pipeline (obj, invoc, result);
     100              : 
     101            0 :   return TRUE;
     102              : }
     103              : 
     104              : /**
     105              :  * @brief Get the pipeline description of the given service. Return the call result and the pipeline description.
     106              :  */
     107              : static gboolean
     108            0 : dbus_cb_core_get_pipeline (MachinelearningServicePipeline *obj,
     109              :     GDBusMethodInvocation *invoc, const gchar *service_name, gpointer user_data)
     110              : {
     111            0 :   gint result = 0;
     112            0 :   g_autofree gchar *desc = NULL;
     113              : 
     114            0 :   result = svcdb_pipeline_get (service_name, &desc);
     115            0 :   machinelearning_service_pipeline_complete_get_pipeline (obj, invoc, result, desc);
     116              : 
     117            0 :   return TRUE;
     118              : }
     119              : 
     120              : /**
     121              :  * @brief Delete the pipeline description of the given service. Return the call result.
     122              :  */
     123              : static gboolean
     124            0 : dbus_cb_core_delete_pipeline (MachinelearningServicePipeline *obj,
     125              :     GDBusMethodInvocation *invoc, const gchar *service_name, gpointer user_data)
     126              : {
     127            0 :   gint result = 0;
     128              : 
     129            0 :   result = svcdb_pipeline_delete (service_name);
     130            0 :   machinelearning_service_pipeline_complete_delete_pipeline (obj, invoc, result);
     131              : 
     132            0 :   return TRUE;
     133              : }
     134              : 
     135              : /**
     136              :  * @brief Launch the pipeline with given description. Return the call result and its id.
     137              :  */
     138              : static gboolean
     139            0 : dbus_cb_core_launch_pipeline (MachinelearningServicePipeline *obj,
     140              :     GDBusMethodInvocation *invoc, const gchar *service_name, gpointer user_data)
     141              : {
     142            0 :   gint result = 0;
     143            0 :   gint64 id = -1;
     144            0 :   GError *err = NULL;
     145              :   GstStateChangeReturn sc_ret;
     146            0 :   GstElement *pipeline = NULL;
     147              :   pipeline_s *p;
     148            0 :   g_autofree gchar *desc = NULL;
     149              : 
     150            0 :   result = svcdb_pipeline_get (service_name, &desc);
     151            0 :   if (result != 0) {
     152            0 :     ml_loge ("Failed to launch pipeline of '%s'.", service_name);
     153            0 :     goto error;
     154              :   }
     155              : 
     156            0 :   pipeline = gst_parse_launch (desc, &err);
     157            0 :   if (!pipeline || err) {
     158            0 :     ml_loge ("Failed to launch pipeline '%s' (error msg: %s).",
     159              :         desc, (err) ? err->message : "unknown reason");
     160            0 :     g_clear_error (&err);
     161              : 
     162            0 :     if (pipeline)
     163            0 :       gst_object_unref (pipeline);
     164              : 
     165            0 :     result = -ESTRPIPE;
     166            0 :     goto error;
     167              :   }
     168              : 
     169              :   /** now set pipeline as paused state */
     170            0 :   sc_ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
     171            0 :   if (sc_ret == GST_STATE_CHANGE_FAILURE) {
     172            0 :     ml_loge ("Failed to set the state of the pipeline to PAUSED. For the detail, please check the GStreamer log message. The input pipeline was '%s'.",
     173              :         desc);
     174              : 
     175            0 :     gst_object_unref (pipeline);
     176            0 :     result = -ESTRPIPE;
     177            0 :     goto error;
     178              :   }
     179              : 
     180              :   /** now fill the struct and store into hash table */
     181            0 :   p = g_new0 (pipeline_s, 1);
     182            0 :   p->element = pipeline;
     183            0 :   p->description = g_strdup (desc);
     184            0 :   p->service_name = g_strdup (service_name);
     185            0 :   g_mutex_init (&p->lock);
     186              : 
     187            0 :   G_LOCK (pipeline_table_lock);
     188            0 :   id = p->id = g_get_monotonic_time ();
     189            0 :   g_hash_table_insert (pipeline_table, GINT_TO_POINTER (p->id), p);
     190            0 :   G_UNLOCK (pipeline_table_lock);
     191              : 
     192            0 : error:
     193            0 :   machinelearning_service_pipeline_complete_launch_pipeline (obj, invoc, result, id);
     194              : 
     195            0 :   return TRUE;
     196              : }
     197              : 
     198              : /**
     199              :  * @brief Start the pipeline with given id. Return the call result.
     200              :  */
     201              : static gboolean
     202            0 : dbus_cb_core_start_pipeline (MachinelearningServicePipeline *obj,
     203              :     GDBusMethodInvocation *invoc, gint64 id, gpointer user_data)
     204              : {
     205            0 :   gint result = 0;
     206              :   GstStateChangeReturn sc_ret;
     207            0 :   pipeline_s *p = NULL;
     208              : 
     209            0 :   G_LOCK (pipeline_table_lock);
     210            0 :   p = (pipeline_s *) g_hash_table_lookup (pipeline_table, GINT_TO_POINTER (id));
     211              : 
     212            0 :   if (!p) {
     213            0 :     ml_loge ("The callback start_pipeline is called, but there is no pipeline matched with ID.");
     214            0 :     G_UNLOCK (pipeline_table_lock);
     215            0 :     result = -EINVAL;
     216              :   } else {
     217            0 :     g_mutex_lock (&p->lock);
     218            0 :     G_UNLOCK (pipeline_table_lock);
     219            0 :     sc_ret = gst_element_set_state (p->element, GST_STATE_PLAYING);
     220            0 :     g_mutex_unlock (&p->lock);
     221              : 
     222            0 :     if (sc_ret == GST_STATE_CHANGE_FAILURE) {
     223            0 :       ml_loge ("Failed to set the state of the pipeline to PLAYING whose service name is %s.",
     224              :           p->service_name);
     225            0 :       result = -ESTRPIPE;
     226              :     }
     227              :   }
     228              : 
     229            0 :   machinelearning_service_pipeline_complete_start_pipeline (obj, invoc, result);
     230              : 
     231            0 :   return TRUE;
     232              : }
     233              : 
     234              : /**
     235              :  * @brief Stop the pipeline with given id. Return the call result.
     236              :  */
     237              : static gboolean
     238            0 : dbus_cb_core_stop_pipeline (MachinelearningServicePipeline *obj,
     239              :     GDBusMethodInvocation *invoc, gint64 id, gpointer user_data)
     240              : {
     241            0 :   gint result = 0;
     242              :   GstStateChangeReturn sc_ret;
     243            0 :   pipeline_s *p = NULL;
     244              : 
     245            0 :   G_LOCK (pipeline_table_lock);
     246            0 :   p = (pipeline_s *) g_hash_table_lookup (pipeline_table, GINT_TO_POINTER (id));
     247              : 
     248            0 :   if (!p) {
     249            0 :     ml_loge ("The callback stop_pipeline is called, but there is no pipeline matched with ID.");
     250            0 :     G_UNLOCK (pipeline_table_lock);
     251            0 :     result = -EINVAL;
     252              :   } else {
     253            0 :     g_mutex_lock (&p->lock);
     254            0 :     G_UNLOCK (pipeline_table_lock);
     255            0 :     sc_ret = gst_element_set_state (p->element, GST_STATE_PAUSED);
     256            0 :     g_mutex_unlock (&p->lock);
     257              : 
     258            0 :     if (sc_ret == GST_STATE_CHANGE_FAILURE) {
     259            0 :       ml_loge ("Failed to set the state of the pipeline to PAUSED whose service name is %s.",
     260              :           p->service_name);
     261            0 :       result = -ESTRPIPE;
     262              :     }
     263              :   }
     264              : 
     265            0 :   machinelearning_service_pipeline_complete_stop_pipeline (obj, invoc, result);
     266              : 
     267            0 :   return TRUE;
     268              : }
     269              : 
     270              : /**
     271              :  * @brief Destroy the pipeline with given id. Return the call result.
     272              :  */
     273              : static gboolean
     274            0 : dbus_cb_core_destroy_pipeline (MachinelearningServicePipeline *obj,
     275              :     GDBusMethodInvocation *invoc, gint64 id, gpointer user_data)
     276              : {
     277            0 :   gint result = 0;
     278            0 :   pipeline_s *p = NULL;
     279              : 
     280            0 :   G_LOCK (pipeline_table_lock);
     281            0 :   p = (pipeline_s *) g_hash_table_lookup (pipeline_table, GINT_TO_POINTER (id));
     282              : 
     283            0 :   if (!p) {
     284            0 :     ml_loge ("The callback destroy_pipeline is called, but there is no pipeline matched with ID.");
     285            0 :     result = -EINVAL;
     286              :   } else {
     287              :     /**
     288              :      * @todo Fix hanging issue when trying to set GST_STATE_NULL state for pipelines
     289              :      * containing tensor_query_*. As a workaround, just unref the pipeline instance.
     290              :      * To fix this issue, tensor_query elements and nnstreamer-edge should well-behavior
     291              :      * to the state change. And it should properly free socket resources. Revive following code after then.
     292              :      *
     293              :      *   GstStateChangeReturn sc_ret;
     294              :      *   g_mutex_lock (&p->lock);
     295              :      *   sc_ret = gst_element_set_state (p->element, GST_STATE_NULL);
     296              :      *   g_mutex_unlock (&p->lock);
     297              :      *   if (sc_ret == GST_STATE_CHANGE_FAILURE) {
     298              :      *     ml_loge ("Failed to set the state of the pipeline to NULL whose service name is %s. Destroy it anyway.", p->service_name);
     299              :      *     result = -ESTRPIPE;
     300              :      *   }
     301              :      */
     302            0 :     g_hash_table_remove (pipeline_table, GINT_TO_POINTER (id));
     303              :   }
     304              : 
     305            0 :   G_UNLOCK (pipeline_table_lock);
     306            0 :   machinelearning_service_pipeline_complete_destroy_pipeline (obj, invoc, result);
     307              : 
     308            0 :   return TRUE;
     309              : }
     310              : 
     311              : /**
     312              :  * @brief Get the state of pipeline with given id. Return the call result and its state.
     313              :  */
     314              : static gboolean
     315            0 : dbus_cb_core_get_state (MachinelearningServicePipeline *obj,
     316              :     GDBusMethodInvocation *invoc, gint64 id, gpointer user_data)
     317              : {
     318            0 :   gint result = 0;
     319              :   GstStateChangeReturn sc_ret;
     320            0 :   GstState state = GST_STATE_NULL;
     321            0 :   pipeline_s *p = NULL;
     322              : 
     323            0 :   G_LOCK (pipeline_table_lock);
     324            0 :   p = (pipeline_s *) g_hash_table_lookup (pipeline_table, GINT_TO_POINTER (id));
     325              : 
     326            0 :   if (!p) {
     327            0 :     ml_loge ("The callback get_state is called, but there is no pipeline matched with ID.");
     328            0 :     result = -EINVAL;
     329            0 :     machinelearning_service_pipeline_complete_get_state (obj, invoc, result, (gint) state);
     330            0 :     G_UNLOCK (pipeline_table_lock);
     331            0 :     return TRUE;
     332              :   }
     333              : 
     334            0 :   g_mutex_lock (&p->lock);
     335            0 :   G_UNLOCK (pipeline_table_lock);
     336            0 :   sc_ret = gst_element_get_state (p->element, &state, NULL, GST_MSECOND);
     337            0 :   g_mutex_unlock (&p->lock);
     338              : 
     339            0 :   if (sc_ret == GST_STATE_CHANGE_FAILURE) {
     340            0 :     ml_loge ("Failed to get the state of the pipeline whose service name is %s.",
     341              :         p->service_name);
     342            0 :     result = -ESTRPIPE;
     343              :   }
     344              : 
     345            0 :   machinelearning_service_pipeline_complete_get_state (obj, invoc, result, (gint) state);
     346              : 
     347            0 :   return TRUE;
     348              : }
     349              : 
     350              : static struct gdbus_signal_info handler_infos[] = {
     351              :   {
     352              :       .signal_name = DBUS_PIPELINE_I_SET_HANDLER,
     353              :       .cb = G_CALLBACK (dbus_cb_core_set_pipeline),
     354              :       .cb_data = NULL,
     355              :       .handler_id = 0,
     356              :   },
     357              :   {
     358              :       .signal_name = DBUS_PIPELINE_I_GET_HANDLER,
     359              :       .cb = G_CALLBACK (dbus_cb_core_get_pipeline),
     360              :       .cb_data = NULL,
     361              :       .handler_id = 0,
     362              :   },
     363              :   {
     364              :       .signal_name = DBUS_PIPELINE_I_DELETE_HANDLER,
     365              :       .cb = G_CALLBACK (dbus_cb_core_delete_pipeline),
     366              :       .cb_data = NULL,
     367              :       .handler_id = 0,
     368              :   },
     369              :   {
     370              :       .signal_name = DBUS_PIPELINE_I_LAUNCH_HANDLER,
     371              :       .cb = G_CALLBACK (dbus_cb_core_launch_pipeline),
     372              :       .cb_data = NULL,
     373              :       .handler_id = 0,
     374              :   },
     375              :   {
     376              :       .signal_name = DBUS_PIPELINE_I_START_HANDLER,
     377              :       .cb = G_CALLBACK (dbus_cb_core_start_pipeline),
     378              :       .cb_data = NULL,
     379              :       .handler_id = 0,
     380              :   },
     381              :   {
     382              :       .signal_name = DBUS_PIPELINE_I_STOP_HANDLER,
     383              :       .cb = G_CALLBACK (dbus_cb_core_stop_pipeline),
     384              :       .cb_data = NULL,
     385              :       .handler_id = 0,
     386              :   },
     387              :   {
     388              :       .signal_name = DBUS_PIPELINE_I_DESTROY_HANDLER,
     389              :       .cb = G_CALLBACK (dbus_cb_core_destroy_pipeline),
     390              :       .cb_data = NULL,
     391              :       .handler_id = 0,
     392              :   },
     393              :   {
     394              :       .signal_name = DBUS_PIPELINE_I_GET_STATE_HANDLER,
     395              :       .cb = G_CALLBACK (dbus_cb_core_get_state),
     396              :       .cb_data = NULL,
     397              :       .handler_id = 0,
     398              :   },
     399              : };
     400              : 
     401              : /**
     402              :  * @brief Probe the D-BUS and connect this module.
     403              :  */
     404              : static int
     405            0 : probe_pipeline_module (void *data)
     406              : {
     407            0 :   int ret = 0;
     408              : 
     409            0 :   g_gdbus_instance = gdbus_get_pipeline_instance ();
     410            0 :   if (g_gdbus_instance == NULL) {
     411            0 :     ml_loge ("cannot get a dbus instance for the %s interface\n", DBUS_PIPELINE_INTERFACE);
     412            0 :     return -ENOSYS;
     413              :   }
     414              : 
     415            0 :   ret = gdbus_connect_signal (g_gdbus_instance, ARRAY_SIZE (handler_infos), handler_infos);
     416            0 :   if (ret < 0) {
     417            0 :     ml_loge ("cannot register callbacks as the dbus method invocation handlers\n ret: %d", ret);
     418            0 :     ret = -ENOSYS;
     419            0 :     goto out;
     420              :   }
     421              : 
     422            0 :   ret = gdbus_export_interface (g_gdbus_instance, DBUS_PIPELINE_PATH);
     423            0 :   if (ret < 0) {
     424            0 :     ml_loge ("cannot export the dbus interface '%s' at the object path '%s'\n",
     425              :         DBUS_PIPELINE_INTERFACE, DBUS_PIPELINE_PATH);
     426            0 :     ret = -ENOSYS;
     427            0 :     goto out_disconnect;
     428              :   }
     429              : 
     430            0 :   return 0;
     431              : 
     432            0 : out_disconnect:
     433            0 :   gdbus_disconnect_signal (g_gdbus_instance, ARRAY_SIZE (handler_infos), handler_infos);
     434            0 : out:
     435            0 :   gdbus_put_pipeline_instance (&g_gdbus_instance);
     436              : 
     437            0 :   return ret;
     438              : }
     439              : 
     440              : /**
     441              :  * @brief Initialize this module.
     442              :  */
     443              : static void
     444            0 : init_pipeline_module (void *data)
     445              : {
     446            0 :   gdbus_initialize ();
     447              : 
     448            0 :   G_LOCK (pipeline_table_lock);
     449            0 :   g_assert (NULL == pipeline_table); /** Internal error */
     450            0 :   pipeline_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, _pipeline_free);
     451            0 :   G_UNLOCK (pipeline_table_lock);
     452            0 : }
     453              : 
     454              : /**
     455              :  * @brief Finalize this module.
     456              :  */
     457              : static void
     458            0 : exit_pipeline_module (void *data)
     459              : {
     460            0 :   G_LOCK (pipeline_table_lock);
     461            0 :   g_assert (pipeline_table); /** Internal error */
     462            0 :   g_hash_table_destroy (pipeline_table);
     463            0 :   pipeline_table = NULL;
     464            0 :   G_UNLOCK (pipeline_table_lock);
     465              : 
     466            0 :   gdbus_disconnect_signal (g_gdbus_instance, ARRAY_SIZE (handler_infos), handler_infos);
     467            0 :   gdbus_put_pipeline_instance (&g_gdbus_instance);
     468            0 : }
     469              : 
     470              : static const struct module_ops pipeline_ops = {
     471              :   .name = "pipeline",
     472              :   .probe = probe_pipeline_module,
     473              :   .init = init_pipeline_module,
     474              :   .exit = exit_pipeline_module,
     475              : };
     476              : 
     477            0 : MODULE_OPS_REGISTER (&pipeline_ops)
        

Generated by: LCOV version 2.0-1