Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * NNStreamer API / Machine Learning Agent Daemon
4 : * Copyright (C) 2025 Samsung Electronics Co., Ltd. All Rights Reserved.
5 : */
6 :
7 : /**
8 : * @file mlops-agent-node.c
9 : * @date 20 january 2025
10 : * @brief Implementation of mlops node.
11 : * @see https://github.com/nnstreamer/deviceMLOps.MLAgent
12 : * @author Jaeyun Jung <jy1210.jung@samsung.com>
13 : * @bug No known bugs except for NYI items
14 : * @details This implements the node information to run a pipeline.
15 : */
16 :
17 : #include "log.h"
18 : #include "mlops-agent-node.h"
19 : #include "service-db-util.h"
20 :
21 : static GHashTable *g_mlops_node_table = NULL;
22 : G_LOCK_DEFINE_STATIC (mlops_node_table);
23 :
24 : /**
25 : * @brief Structure for mlops node.
26 : */
27 : typedef struct
28 : {
29 : mlops_node_type_e type;
30 : gint64 id;
31 : GMutex lock;
32 : GstElement *element;
33 : gchar *service_name;
34 : gchar *description;
35 : } mlops_node_s;
36 :
37 : /**
38 : * @brief Internal function to get node info.
39 : */
40 : static mlops_node_s *
41 0 : _mlops_node_get (const int64_t id)
42 : {
43 0 : mlops_node_s *node = NULL;
44 :
45 0 : G_LOCK (mlops_node_table);
46 0 : node = (mlops_node_s *) g_hash_table_lookup (g_mlops_node_table,
47 0 : GINT_TO_POINTER (id));
48 0 : G_UNLOCK (mlops_node_table);
49 :
50 0 : if (!node) {
51 0 : ml_loge ("There is no pipeline matched with ID %" G_GINT64_FORMAT, id);
52 : }
53 :
54 0 : return node;
55 : }
56 :
57 : /**
58 : * @brief Internal function to change pipeline state.
59 : */
60 : static int
61 0 : _mlops_node_set_pipeline_state (mlops_node_s * node, GstState state)
62 : {
63 : GstStateChangeReturn ret;
64 : gint64 nid;
65 :
66 0 : g_return_val_if_fail (node != NULL, -EINVAL);
67 :
68 0 : g_mutex_lock (&node->lock);
69 0 : nid = node->id;
70 0 : ret = gst_element_set_state (node->element, state);
71 0 : g_mutex_unlock (&node->lock);
72 :
73 0 : if (ret == GST_STATE_CHANGE_FAILURE) {
74 0 : ml_loge ("Failed to set the state of the pipeline to %s with ID %"
75 : G_GINT64_FORMAT, gst_element_state_get_name (state), nid);
76 0 : return -ESTRPIPE;
77 : }
78 :
79 0 : return 0;
80 : }
81 :
82 : /**
83 : * @brief Internal function to release mlops node.
84 : */
85 : static void
86 0 : _mlops_node_free (gpointer data)
87 : {
88 0 : mlops_node_s *node = (mlops_node_s *) data;
89 :
90 0 : if (!node) {
91 0 : ml_logw ("The data pointer is null, internal error?");
92 0 : return;
93 : }
94 :
95 0 : _mlops_node_set_pipeline_state (node, GST_STATE_NULL);
96 :
97 0 : g_mutex_lock (&node->lock);
98 :
99 0 : node->type = MLOPS_NODE_TYPE_NONE;
100 0 : node->id = 0;
101 0 : if (node->element) {
102 0 : gst_object_unref (node->element);
103 0 : node->element = NULL;
104 : }
105 0 : g_free (node->service_name);
106 0 : node->service_name = NULL;
107 0 : g_free (node->description);
108 0 : node->description = NULL;
109 :
110 0 : g_mutex_unlock (&node->lock);
111 :
112 0 : g_mutex_clear (&node->lock);
113 0 : g_free (node);
114 : }
115 :
116 : /**
117 : * @brief Initialize mlops node info.
118 : */
119 : void
120 0 : mlops_node_initialize (void)
121 : {
122 0 : G_LOCK (mlops_node_table);
123 0 : if (!g_mlops_node_table) {
124 0 : g_mlops_node_table = g_hash_table_new_full (g_direct_hash, g_direct_equal,
125 : NULL, _mlops_node_free);
126 : }
127 0 : g_assert (g_mlops_node_table != NULL);
128 0 : G_UNLOCK (mlops_node_table);
129 0 : }
130 :
131 : /**
132 : * @brief Finalize mlops node info.
133 : */
134 : void
135 0 : mlops_node_finalize (void)
136 : {
137 0 : G_LOCK (mlops_node_table);
138 0 : g_assert (g_mlops_node_table != NULL);
139 0 : g_hash_table_destroy (g_mlops_node_table);
140 0 : g_mlops_node_table = NULL;
141 0 : G_UNLOCK (mlops_node_table);
142 0 : }
143 :
144 : /**
145 : * @brief Check service name and launch the pipeline.
146 : */
147 : int
148 0 : mlops_node_create (const gchar * name, const mlops_node_type_e type,
149 : int64_t * id)
150 : {
151 0 : mlops_node_s *node = NULL;
152 0 : gint result = -EIO;
153 0 : gchar *desc = NULL;
154 0 : GstElement *pipeline = NULL;
155 0 : GError *err = NULL;
156 : GstStateChangeReturn ret;
157 :
158 0 : g_return_val_if_fail (id != NULL, -EINVAL);
159 :
160 0 : switch (type) {
161 0 : case MLOPS_NODE_TYPE_PIPELINE:
162 : {
163 0 : result = svcdb_pipeline_get (name, &desc);
164 0 : if (result != 0) {
165 0 : ml_loge ("Failed to launch pipeline of '%s'.", name);
166 0 : goto error;
167 : }
168 0 : break;
169 : }
170 0 : default:
171 0 : return -EINVAL;
172 : }
173 :
174 0 : pipeline = gst_parse_launch (desc, &err);
175 0 : if (!pipeline || err) {
176 0 : ml_loge ("Failed to launch pipeline '%s' (error msg: %s).",
177 : desc, (err) ? err->message : "unknown reason");
178 0 : g_clear_error (&err);
179 :
180 0 : result = -ESTRPIPE;
181 0 : goto error;
182 : }
183 :
184 : /* Set pipeline as paused state. */
185 0 : ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
186 0 : if (ret == GST_STATE_CHANGE_FAILURE) {
187 0 : ml_loge
188 : ("Failed to set the state of the pipeline to PAUSED. For the detail, please check the GStreamer log message.");
189 :
190 0 : result = -ESTRPIPE;
191 0 : goto error;
192 : }
193 :
194 : /* Final step, add node info into hash table. */
195 0 : node = g_new0 (mlops_node_s, 1);
196 0 : node->type = type;
197 0 : node->id = g_get_monotonic_time ();
198 0 : node->element = pipeline;
199 0 : node->service_name = g_strdup (name);
200 0 : node->description = g_strdup (desc);
201 0 : g_mutex_init (&node->lock);
202 :
203 0 : G_LOCK (mlops_node_table);
204 0 : g_hash_table_insert (g_mlops_node_table, GINT_TO_POINTER (node->id), node);
205 0 : G_UNLOCK (mlops_node_table);
206 :
207 0 : *id = node->id;
208 :
209 0 : error:
210 0 : if (result != 0) {
211 0 : if (pipeline)
212 0 : gst_object_unref (pipeline);
213 : }
214 :
215 0 : g_free (desc);
216 0 : return result;
217 : }
218 :
219 : /**
220 : * @brief Start the pipeline with given id.
221 : */
222 : int
223 0 : mlops_node_start (const int64_t id)
224 : {
225 0 : mlops_node_s *node = NULL;
226 :
227 0 : node = _mlops_node_get (id);
228 0 : return _mlops_node_set_pipeline_state (node, GST_STATE_PLAYING);
229 : }
230 :
231 : /**
232 : * @brief Stop the pipeline with given id.
233 : */
234 : int
235 0 : mlops_node_stop (const int64_t id)
236 : {
237 0 : mlops_node_s *node = NULL;
238 :
239 0 : node = _mlops_node_get (id);
240 0 : return _mlops_node_set_pipeline_state (node, GST_STATE_PAUSED);
241 : }
242 :
243 : /**
244 : * @brief Destroy the pipeline with given id.
245 : */
246 : int
247 0 : mlops_node_destroy (const int64_t id)
248 : {
249 0 : mlops_node_s *node = NULL;
250 :
251 0 : node = _mlops_node_get (id);
252 0 : if (node) {
253 0 : G_LOCK (mlops_node_table);
254 0 : g_hash_table_remove (g_mlops_node_table, GINT_TO_POINTER (id));
255 0 : G_UNLOCK (mlops_node_table);
256 : }
257 :
258 0 : return node ? 0 : -EINVAL;
259 : }
260 :
261 : /**
262 : * @brief Get the state of pipeline with given id.
263 : */
264 : int
265 0 : mlops_node_get_state (const int64_t id, GstState * state)
266 : {
267 0 : mlops_node_s *node = NULL;
268 : GstStateChangeReturn ret;
269 :
270 0 : node = _mlops_node_get (id);
271 0 : g_return_val_if_fail (node != NULL, -EINVAL);
272 :
273 0 : g_mutex_lock (&node->lock);
274 0 : ret = gst_element_get_state (node->element, state, NULL, GST_MSECOND);
275 0 : g_mutex_unlock (&node->lock);
276 :
277 0 : if (ret == GST_STATE_CHANGE_FAILURE) {
278 0 : ml_loge ("Failed to get the state of the pipeline with ID %"
279 : G_GINT64_FORMAT, id);
280 0 : return -ESTRPIPE;
281 : }
282 :
283 0 : return 0;
284 : }
|