Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * NNStreamer API / Machine Learning Agent Daemon
4 : * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
5 : */
6 :
7 : /**
8 : * @file pipeline-dbus-impl.cc
9 : * @date 20 Jul 2022
10 : * @brief Implementation of pipeline dbus interface.
11 : * @see https://github.com/nnstreamer/deviceMLOps.MLAgent
12 : * @author Yongjoo Ahn <yongjoo1.ahn@samsung.com>
13 : * @bug No known bugs except for NYI items
14 : * @details This implements the pipeline dbus interface.
15 : */
16 :
17 : #include <glib.h>
18 : #include <gst/gst.h>
19 : #include <stdbool.h>
20 : #include <stdio.h>
21 : #include <stdlib.h>
22 :
23 : #include "common.h"
24 : #include "dbus-interface.h"
25 : #include "gdbus-util.h"
26 : #include "log.h"
27 : #include "modules.h"
28 : #include "pipeline-dbus.h"
29 : #include "service-db-util.h"
30 :
31 : static MachinelearningServicePipeline *g_gdbus_instance = NULL;
32 : static GHashTable *pipeline_table = NULL;
33 : G_LOCK_DEFINE_STATIC (pipeline_table_lock);
34 :
35 : /**
36 : * @brief Structure for pipeline.
37 : */
38 : typedef struct _pipeline {
39 : GstElement *element;
40 : gint64 id;
41 : GMutex lock;
42 : gchar *service_name;
43 : gchar *description;
44 : } pipeline_s;
45 :
46 : /**
47 : * @brief Internal function to destroy pipeline instances.
48 : */
49 : static void
50 0 : _pipeline_free (gpointer data)
51 : {
52 : pipeline_s *p;
53 :
54 0 : if (!data) {
55 0 : ml_loge ("internal error, the data should not be NULL");
56 0 : return;
57 : }
58 :
59 0 : p = (pipeline_s *) data;
60 :
61 0 : if (p->element)
62 0 : gst_object_unref (p->element);
63 :
64 0 : g_free (p->service_name);
65 0 : g_free (p->description);
66 0 : g_mutex_clear (&p->lock);
67 :
68 0 : g_free (p);
69 : }
70 :
71 : /**
72 : * @brief Get the skeleton object of the DBus interface.
73 : */
74 : static MachinelearningServicePipeline *
75 0 : gdbus_get_pipeline_instance (void)
76 : {
77 0 : return machinelearning_service_pipeline_skeleton_new ();
78 : }
79 :
80 : /**
81 : * @brief Put the obtained skeleton object and release the resource.
82 : */
83 : static void
84 0 : gdbus_put_pipeline_instance (MachinelearningServicePipeline **instance)
85 : {
86 0 : g_clear_object (instance);
87 0 : }
88 :
89 : /**
90 : * @brief Set the service with given description. Return the call result.
91 : */
92 : static gboolean
93 0 : dbus_cb_core_set_pipeline (MachinelearningServicePipeline *obj, GDBusMethodInvocation *invoc,
94 : const gchar *service_name, const gchar *pipeline_desc, gpointer user_data)
95 : {
96 0 : gint result = 0;
97 :
98 0 : result = svcdb_pipeline_set (service_name, pipeline_desc);
99 0 : machinelearning_service_pipeline_complete_set_pipeline (obj, invoc, result);
100 :
101 0 : return TRUE;
102 : }
103 :
104 : /**
105 : * @brief Get the pipeline description of the given service. Return the call result and the pipeline description.
106 : */
107 : static gboolean
108 0 : dbus_cb_core_get_pipeline (MachinelearningServicePipeline *obj,
109 : GDBusMethodInvocation *invoc, const gchar *service_name, gpointer user_data)
110 : {
111 0 : gint result = 0;
112 0 : g_autofree gchar *desc = NULL;
113 :
114 0 : result = svcdb_pipeline_get (service_name, &desc);
115 0 : machinelearning_service_pipeline_complete_get_pipeline (obj, invoc, result, desc);
116 :
117 0 : return TRUE;
118 : }
119 :
120 : /**
121 : * @brief Delete the pipeline description of the given service. Return the call result.
122 : */
123 : static gboolean
124 0 : dbus_cb_core_delete_pipeline (MachinelearningServicePipeline *obj,
125 : GDBusMethodInvocation *invoc, const gchar *service_name, gpointer user_data)
126 : {
127 0 : gint result = 0;
128 :
129 0 : result = svcdb_pipeline_delete (service_name);
130 0 : machinelearning_service_pipeline_complete_delete_pipeline (obj, invoc, result);
131 :
132 0 : return TRUE;
133 : }
134 :
135 : /**
136 : * @brief Launch the pipeline with given description. Return the call result and its id.
137 : */
138 : static gboolean
139 0 : dbus_cb_core_launch_pipeline (MachinelearningServicePipeline *obj,
140 : GDBusMethodInvocation *invoc, const gchar *service_name, gpointer user_data)
141 : {
142 0 : gint result = 0;
143 0 : gint64 id = -1;
144 0 : GError *err = NULL;
145 : GstStateChangeReturn sc_ret;
146 0 : GstElement *pipeline = NULL;
147 : pipeline_s *p;
148 0 : g_autofree gchar *desc = NULL;
149 :
150 0 : result = svcdb_pipeline_get (service_name, &desc);
151 0 : if (result != 0) {
152 0 : ml_loge ("Failed to launch pipeline of '%s'.", service_name);
153 0 : goto error;
154 : }
155 :
156 0 : pipeline = gst_parse_launch (desc, &err);
157 0 : if (!pipeline || err) {
158 0 : ml_loge ("Failed to launch pipeline '%s' (error msg: %s).",
159 : desc, (err) ? err->message : "unknown reason");
160 0 : g_clear_error (&err);
161 :
162 0 : if (pipeline)
163 0 : gst_object_unref (pipeline);
164 :
165 0 : result = -ESTRPIPE;
166 0 : goto error;
167 : }
168 :
169 : /** now set pipeline as paused state */
170 0 : sc_ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
171 0 : if (sc_ret == GST_STATE_CHANGE_FAILURE) {
172 0 : ml_loge ("Failed to set the state of the pipeline to PAUSED. For the detail, please check the GStreamer log message. The input pipeline was '%s'.",
173 : desc);
174 :
175 0 : gst_object_unref (pipeline);
176 0 : result = -ESTRPIPE;
177 0 : goto error;
178 : }
179 :
180 : /** now fill the struct and store into hash table */
181 0 : p = g_new0 (pipeline_s, 1);
182 0 : p->element = pipeline;
183 0 : p->description = g_strdup (desc);
184 0 : p->service_name = g_strdup (service_name);
185 0 : g_mutex_init (&p->lock);
186 :
187 0 : G_LOCK (pipeline_table_lock);
188 0 : id = p->id = g_get_monotonic_time ();
189 0 : g_hash_table_insert (pipeline_table, GINT_TO_POINTER (p->id), p);
190 0 : G_UNLOCK (pipeline_table_lock);
191 :
192 0 : error:
193 0 : machinelearning_service_pipeline_complete_launch_pipeline (obj, invoc, result, id);
194 :
195 0 : return TRUE;
196 : }
197 :
198 : /**
199 : * @brief Start the pipeline with given id. Return the call result.
200 : */
201 : static gboolean
202 0 : dbus_cb_core_start_pipeline (MachinelearningServicePipeline *obj,
203 : GDBusMethodInvocation *invoc, gint64 id, gpointer user_data)
204 : {
205 0 : gint result = 0;
206 : GstStateChangeReturn sc_ret;
207 0 : pipeline_s *p = NULL;
208 :
209 0 : G_LOCK (pipeline_table_lock);
210 0 : p = (pipeline_s *) g_hash_table_lookup (pipeline_table, GINT_TO_POINTER (id));
211 :
212 0 : if (!p) {
213 0 : ml_loge ("The callback start_pipeline is called, but there is no pipeline matched with ID.");
214 0 : G_UNLOCK (pipeline_table_lock);
215 0 : result = -EINVAL;
216 : } else {
217 0 : g_mutex_lock (&p->lock);
218 0 : G_UNLOCK (pipeline_table_lock);
219 0 : sc_ret = gst_element_set_state (p->element, GST_STATE_PLAYING);
220 0 : g_mutex_unlock (&p->lock);
221 :
222 0 : if (sc_ret == GST_STATE_CHANGE_FAILURE) {
223 0 : ml_loge ("Failed to set the state of the pipeline to PLAYING whose service name is %s.",
224 : p->service_name);
225 0 : result = -ESTRPIPE;
226 : }
227 : }
228 :
229 0 : machinelearning_service_pipeline_complete_start_pipeline (obj, invoc, result);
230 :
231 0 : return TRUE;
232 : }
233 :
234 : /**
235 : * @brief Stop the pipeline with given id. Return the call result.
236 : */
237 : static gboolean
238 0 : dbus_cb_core_stop_pipeline (MachinelearningServicePipeline *obj,
239 : GDBusMethodInvocation *invoc, gint64 id, gpointer user_data)
240 : {
241 0 : gint result = 0;
242 : GstStateChangeReturn sc_ret;
243 0 : pipeline_s *p = NULL;
244 :
245 0 : G_LOCK (pipeline_table_lock);
246 0 : p = (pipeline_s *) g_hash_table_lookup (pipeline_table, GINT_TO_POINTER (id));
247 :
248 0 : if (!p) {
249 0 : ml_loge ("The callback stop_pipeline is called, but there is no pipeline matched with ID.");
250 0 : G_UNLOCK (pipeline_table_lock);
251 0 : result = -EINVAL;
252 : } else {
253 0 : g_mutex_lock (&p->lock);
254 0 : G_UNLOCK (pipeline_table_lock);
255 0 : sc_ret = gst_element_set_state (p->element, GST_STATE_PAUSED);
256 0 : g_mutex_unlock (&p->lock);
257 :
258 0 : if (sc_ret == GST_STATE_CHANGE_FAILURE) {
259 0 : ml_loge ("Failed to set the state of the pipeline to PAUSED whose service name is %s.",
260 : p->service_name);
261 0 : result = -ESTRPIPE;
262 : }
263 : }
264 :
265 0 : machinelearning_service_pipeline_complete_stop_pipeline (obj, invoc, result);
266 :
267 0 : return TRUE;
268 : }
269 :
270 : /**
271 : * @brief Destroy the pipeline with given id. Return the call result.
272 : */
273 : static gboolean
274 0 : dbus_cb_core_destroy_pipeline (MachinelearningServicePipeline *obj,
275 : GDBusMethodInvocation *invoc, gint64 id, gpointer user_data)
276 : {
277 0 : gint result = 0;
278 0 : pipeline_s *p = NULL;
279 :
280 0 : G_LOCK (pipeline_table_lock);
281 0 : p = (pipeline_s *) g_hash_table_lookup (pipeline_table, GINT_TO_POINTER (id));
282 :
283 0 : if (!p) {
284 0 : ml_loge ("The callback destroy_pipeline is called, but there is no pipeline matched with ID.");
285 0 : result = -EINVAL;
286 : } else {
287 : /**
288 : * @todo Fix hanging issue when trying to set GST_STATE_NULL state for pipelines
289 : * containing tensor_query_*. As a workaround, just unref the pipeline instance.
290 : * To fix this issue, tensor_query elements and nnstreamer-edge should well-behavior
291 : * to the state change. And it should properly free socket resources. Revive following code after then.
292 : *
293 : * GstStateChangeReturn sc_ret;
294 : * g_mutex_lock (&p->lock);
295 : * sc_ret = gst_element_set_state (p->element, GST_STATE_NULL);
296 : * g_mutex_unlock (&p->lock);
297 : * if (sc_ret == GST_STATE_CHANGE_FAILURE) {
298 : * ml_loge ("Failed to set the state of the pipeline to NULL whose service name is %s. Destroy it anyway.", p->service_name);
299 : * result = -ESTRPIPE;
300 : * }
301 : */
302 0 : g_hash_table_remove (pipeline_table, GINT_TO_POINTER (id));
303 : }
304 :
305 0 : G_UNLOCK (pipeline_table_lock);
306 0 : machinelearning_service_pipeline_complete_destroy_pipeline (obj, invoc, result);
307 :
308 0 : return TRUE;
309 : }
310 :
311 : /**
312 : * @brief Get the state of pipeline with given id. Return the call result and its state.
313 : */
314 : static gboolean
315 0 : dbus_cb_core_get_state (MachinelearningServicePipeline *obj,
316 : GDBusMethodInvocation *invoc, gint64 id, gpointer user_data)
317 : {
318 0 : gint result = 0;
319 : GstStateChangeReturn sc_ret;
320 0 : GstState state = GST_STATE_NULL;
321 0 : pipeline_s *p = NULL;
322 :
323 0 : G_LOCK (pipeline_table_lock);
324 0 : p = (pipeline_s *) g_hash_table_lookup (pipeline_table, GINT_TO_POINTER (id));
325 :
326 0 : if (!p) {
327 0 : ml_loge ("The callback get_state is called, but there is no pipeline matched with ID.");
328 0 : result = -EINVAL;
329 0 : machinelearning_service_pipeline_complete_get_state (obj, invoc, result, (gint) state);
330 0 : G_UNLOCK (pipeline_table_lock);
331 0 : return TRUE;
332 : }
333 :
334 0 : g_mutex_lock (&p->lock);
335 0 : G_UNLOCK (pipeline_table_lock);
336 0 : sc_ret = gst_element_get_state (p->element, &state, NULL, GST_MSECOND);
337 0 : g_mutex_unlock (&p->lock);
338 :
339 0 : if (sc_ret == GST_STATE_CHANGE_FAILURE) {
340 0 : ml_loge ("Failed to get the state of the pipeline whose service name is %s.",
341 : p->service_name);
342 0 : result = -ESTRPIPE;
343 : }
344 :
345 0 : machinelearning_service_pipeline_complete_get_state (obj, invoc, result, (gint) state);
346 :
347 0 : return TRUE;
348 : }
349 :
350 : static struct gdbus_signal_info handler_infos[] = {
351 : {
352 : .signal_name = DBUS_PIPELINE_I_SET_HANDLER,
353 : .cb = G_CALLBACK (dbus_cb_core_set_pipeline),
354 : .cb_data = NULL,
355 : .handler_id = 0,
356 : },
357 : {
358 : .signal_name = DBUS_PIPELINE_I_GET_HANDLER,
359 : .cb = G_CALLBACK (dbus_cb_core_get_pipeline),
360 : .cb_data = NULL,
361 : .handler_id = 0,
362 : },
363 : {
364 : .signal_name = DBUS_PIPELINE_I_DELETE_HANDLER,
365 : .cb = G_CALLBACK (dbus_cb_core_delete_pipeline),
366 : .cb_data = NULL,
367 : .handler_id = 0,
368 : },
369 : {
370 : .signal_name = DBUS_PIPELINE_I_LAUNCH_HANDLER,
371 : .cb = G_CALLBACK (dbus_cb_core_launch_pipeline),
372 : .cb_data = NULL,
373 : .handler_id = 0,
374 : },
375 : {
376 : .signal_name = DBUS_PIPELINE_I_START_HANDLER,
377 : .cb = G_CALLBACK (dbus_cb_core_start_pipeline),
378 : .cb_data = NULL,
379 : .handler_id = 0,
380 : },
381 : {
382 : .signal_name = DBUS_PIPELINE_I_STOP_HANDLER,
383 : .cb = G_CALLBACK (dbus_cb_core_stop_pipeline),
384 : .cb_data = NULL,
385 : .handler_id = 0,
386 : },
387 : {
388 : .signal_name = DBUS_PIPELINE_I_DESTROY_HANDLER,
389 : .cb = G_CALLBACK (dbus_cb_core_destroy_pipeline),
390 : .cb_data = NULL,
391 : .handler_id = 0,
392 : },
393 : {
394 : .signal_name = DBUS_PIPELINE_I_GET_STATE_HANDLER,
395 : .cb = G_CALLBACK (dbus_cb_core_get_state),
396 : .cb_data = NULL,
397 : .handler_id = 0,
398 : },
399 : };
400 :
401 : /**
402 : * @brief Probe the D-BUS and connect this module.
403 : */
404 : static int
405 0 : probe_pipeline_module (void *data)
406 : {
407 0 : int ret = 0;
408 :
409 0 : g_gdbus_instance = gdbus_get_pipeline_instance ();
410 0 : if (g_gdbus_instance == NULL) {
411 0 : ml_loge ("cannot get a dbus instance for the %s interface\n", DBUS_PIPELINE_INTERFACE);
412 0 : return -ENOSYS;
413 : }
414 :
415 0 : ret = gdbus_connect_signal (g_gdbus_instance, ARRAY_SIZE (handler_infos), handler_infos);
416 0 : if (ret < 0) {
417 0 : ml_loge ("cannot register callbacks as the dbus method invocation handlers\n ret: %d", ret);
418 0 : ret = -ENOSYS;
419 0 : goto out;
420 : }
421 :
422 0 : ret = gdbus_export_interface (g_gdbus_instance, DBUS_PIPELINE_PATH);
423 0 : if (ret < 0) {
424 0 : ml_loge ("cannot export the dbus interface '%s' at the object path '%s'\n",
425 : DBUS_PIPELINE_INTERFACE, DBUS_PIPELINE_PATH);
426 0 : ret = -ENOSYS;
427 0 : goto out_disconnect;
428 : }
429 :
430 0 : return 0;
431 :
432 0 : out_disconnect:
433 0 : gdbus_disconnect_signal (g_gdbus_instance, ARRAY_SIZE (handler_infos), handler_infos);
434 0 : out:
435 0 : gdbus_put_pipeline_instance (&g_gdbus_instance);
436 :
437 0 : return ret;
438 : }
439 :
440 : /**
441 : * @brief Initialize this module.
442 : */
443 : static void
444 0 : init_pipeline_module (void *data)
445 : {
446 0 : gdbus_initialize ();
447 :
448 0 : G_LOCK (pipeline_table_lock);
449 0 : g_assert (NULL == pipeline_table); /** Internal error */
450 0 : pipeline_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, _pipeline_free);
451 0 : G_UNLOCK (pipeline_table_lock);
452 0 : }
453 :
454 : /**
455 : * @brief Finalize this module.
456 : */
457 : static void
458 0 : exit_pipeline_module (void *data)
459 : {
460 0 : G_LOCK (pipeline_table_lock);
461 0 : g_assert (pipeline_table); /** Internal error */
462 0 : g_hash_table_destroy (pipeline_table);
463 0 : pipeline_table = NULL;
464 0 : G_UNLOCK (pipeline_table_lock);
465 :
466 0 : gdbus_disconnect_signal (g_gdbus_instance, ARRAY_SIZE (handler_infos), handler_infos);
467 0 : gdbus_put_pipeline_instance (&g_gdbus_instance);
468 0 : }
469 :
470 : static const struct module_ops pipeline_ops = {
471 : .name = "pipeline",
472 : .probe = probe_pipeline_module,
473 : .init = init_pipeline_module,
474 : .exit = exit_pipeline_module,
475 : };
476 :
477 0 : MODULE_OPS_REGISTER (&pipeline_ops)
|