Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * Copyright (c) 2023 Samsung Electronics Co., Ltd. All Rights Reserved.
4 : *
5 : * @file ml-api-service-extension.c
6 : * @date 1 September 2023
7 : * @brief ML service extension C-API.
8 : * @see https://github.com/nnstreamer/api
9 : * @author Jaeyun Jung <jy1210.jung@samsung.com>
10 : * @bug No known bugs except for NYI items
11 : */
12 :
13 : #include "ml-api-service-extension.h"
14 :
15 : /**
16 : * @brief The time to wait for new input data in message thread, in millisecond.
17 : */
18 : #define DEFAULT_TIMEOUT 200
19 :
20 : /**
21 : * @brief The max number of input data in message queue (0 for no limit).
22 : */
23 : #define DEFAULT_MAX_INPUT 5
24 :
25 : /**
26 : * @brief Internal enumeration for ml-service extension types.
27 : */
28 : typedef enum
29 : {
30 : ML_EXTENSION_TYPE_UNKNOWN = 0,
31 : ML_EXTENSION_TYPE_SINGLE = 1,
32 : ML_EXTENSION_TYPE_PIPELINE = 2,
33 :
34 : ML_EXTENSION_TYPE_MAX
35 : } ml_extension_type_e;
36 :
37 : /**
38 : * @brief Internal enumeration for the node type in pipeline.
39 : */
40 : typedef enum
41 : {
42 : ML_EXTENSION_NODE_TYPE_UNKNOWN = 0,
43 : ML_EXTENSION_NODE_TYPE_INPUT = 1,
44 : ML_EXTENSION_NODE_TYPE_OUTPUT = 2,
45 :
46 : ML_EXTENSION_NODE_TYPE_MAX
47 : } ml_extension_node_type_e;
48 :
49 : /**
50 : * @brief Internal structure of the node info in pipeline.
51 : */
52 : typedef struct
53 : {
54 : gchar *name;
55 : ml_extension_node_type_e type;
56 : ml_tensors_info_h info;
57 : void *handle;
58 : void *mls;
59 : } ml_extension_node_info_s;
60 :
61 : /**
62 : * @brief Internal structure of the message in ml-service extension handle.
63 : */
64 : typedef struct
65 : {
66 : gchar *name;
67 : ml_tensors_data_h input;
68 : ml_tensors_data_h output;
69 : } ml_extension_msg_s;
70 :
71 : /**
72 : * @brief Internal structure for ml-service extension handle.
73 : */
74 : typedef struct
75 : {
76 : ml_extension_type_e type;
77 : gboolean running;
78 : guint timeout; /**< The time to wait for new input data in message thread, in millisecond (see DEFAULT_TIMEOUT). */
79 : guint max_input; /**< The max number of input data in message queue (see DEFAULT_MAX_INPUT). */
80 : GThread *msg_thread;
81 : GAsyncQueue *msg_queue;
82 :
83 : /**
84 : * Handles for each ml-service extension type.
85 : * - single : Default. Open model file and prepare invoke. The configuration should include model information.
86 : * - pipeline : Construct a pipeline from configuration. The configuration should include pipeline description.
87 : */
88 : ml_single_h single;
89 :
90 : ml_pipeline_h pipeline;
91 : GHashTable *node_table;
92 : } ml_extension_s;
93 :
94 : /**
95 : * @brief Internal function to create node info in pipeline.
96 : */
97 : static ml_extension_node_info_s *
98 0 : _ml_extension_node_info_new (ml_service_s * mls, const gchar * name,
99 : ml_extension_node_type_e type)
100 : {
101 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
102 : ml_extension_node_info_s *node_info;
103 :
104 0 : if (!STR_IS_VALID (name)) {
105 0 : _ml_error_report_return (NULL,
106 : "Cannot add new node info, invalid node name '%s'.", name);
107 : }
108 :
109 0 : if (g_hash_table_lookup (ext->node_table, name)) {
110 0 : _ml_error_report_return (NULL,
111 : "Cannot add duplicated node '%s' in ml-service pipeline.", name);
112 : }
113 :
114 0 : node_info = g_try_new0 (ml_extension_node_info_s, 1);
115 0 : if (!node_info) {
116 0 : _ml_error_report_return (NULL,
117 : "Failed to allocate new memory for node info in ml-service pipeline. Out of memory?");
118 : }
119 :
120 0 : node_info->name = g_strdup (name);
121 0 : node_info->type = type;
122 0 : node_info->mls = mls;
123 :
124 0 : g_hash_table_insert (ext->node_table, g_strdup (name), node_info);
125 :
126 0 : return node_info;
127 : }
128 :
129 : /**
130 : * @brief Internal function to release pipeline node info.
131 : */
132 : static void
133 0 : _ml_extension_node_info_free (gpointer data)
134 : {
135 0 : ml_extension_node_info_s *node_info = (ml_extension_node_info_s *) data;
136 :
137 0 : if (!node_info)
138 0 : return;
139 :
140 0 : if (node_info->info)
141 0 : ml_tensors_info_destroy (node_info->info);
142 :
143 0 : g_free (node_info->name);
144 0 : g_free (node_info);
145 : }
146 :
147 : /**
148 : * @brief Internal function to get the node info in ml-service extension.
149 : */
150 : static ml_extension_node_info_s *
151 0 : _ml_extension_node_info_get (ml_extension_s * ext, const gchar * name)
152 : {
153 0 : if (!STR_IS_VALID (name))
154 0 : return NULL;
155 :
156 0 : return g_hash_table_lookup (ext->node_table, name);
157 : }
158 :
159 : /**
160 : * @brief Internal function to invoke ml-service event for new data.
161 : */
162 : static int
163 0 : _ml_extension_invoke_event_new_data (ml_service_s * mls, const char *name,
164 : const ml_tensors_data_h data)
165 : {
166 0 : ml_service_event_cb_info_s cb_info = { 0 };
167 0 : ml_information_h info = NULL;
168 0 : int status = ML_ERROR_NONE;
169 :
170 0 : if (!mls || !data) {
171 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
172 : "Failed to create ml-service event data, invalid parameter.");
173 : }
174 :
175 0 : _ml_service_get_event_cb_info (mls, &cb_info);
176 :
177 0 : if (cb_info.cb) {
178 : /* Create information handle for ml-service event. */
179 0 : status = _ml_information_create (&info);
180 0 : if (status != ML_ERROR_NONE)
181 0 : goto done;
182 :
183 0 : if (name) {
184 0 : status = _ml_information_set (info, "name", (void *) name, NULL);
185 0 : if (status != ML_ERROR_NONE)
186 0 : goto done;
187 : }
188 :
189 0 : status = _ml_information_set (info, "data", (void *) data, NULL);
190 0 : if (status == ML_ERROR_NONE)
191 0 : cb_info.cb (ML_SERVICE_EVENT_NEW_DATA, info, cb_info.pdata);
192 : }
193 :
194 0 : done:
195 0 : if (info)
196 0 : ml_information_destroy (info);
197 :
198 0 : if (status != ML_ERROR_NONE) {
199 0 : _ml_error_report ("Failed to invoke 'new data' event.");
200 : }
201 :
202 0 : return status;
203 : }
204 :
205 : /**
206 : * @brief Internal callback for sink node in pipeline description.
207 : */
208 : static void
209 0 : _ml_extension_pipeline_sink_cb (const ml_tensors_data_h data,
210 : const ml_tensors_info_h info, void *user_data)
211 : {
212 0 : ml_extension_node_info_s *node_info = (ml_extension_node_info_s *) user_data;
213 0 : ml_service_s *mls = (ml_service_s *) node_info->mls;
214 :
215 0 : _ml_extension_invoke_event_new_data (mls, node_info->name, data);
216 0 : }
217 :
218 : /**
219 : * @brief Internal function to release ml-service extension message.
220 : */
221 : static void
222 0 : _ml_extension_msg_free (gpointer data)
223 : {
224 0 : ml_extension_msg_s *msg = (ml_extension_msg_s *) data;
225 :
226 0 : if (!msg)
227 0 : return;
228 :
229 0 : if (msg->input)
230 0 : ml_tensors_data_destroy (msg->input);
231 0 : if (msg->output)
232 0 : ml_tensors_data_destroy (msg->output);
233 :
234 0 : g_free (msg->name);
235 0 : g_free (msg);
236 : }
237 :
238 : /**
239 : * @brief Internal function to process ml-service extension message.
240 : */
241 : static gpointer
242 0 : _ml_extension_msg_thread (gpointer data)
243 : {
244 0 : ml_service_s *mls = (ml_service_s *) data;
245 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
246 : int status;
247 :
248 0 : g_mutex_lock (&mls->lock);
249 0 : ext->running = TRUE;
250 0 : g_cond_signal (&mls->cond);
251 0 : g_mutex_unlock (&mls->lock);
252 :
253 0 : while (ext->running) {
254 : ml_extension_msg_s *msg;
255 :
256 0 : msg = g_async_queue_timeout_pop (ext->msg_queue,
257 0 : ext->timeout * G_TIME_SPAN_MILLISECOND);
258 :
259 0 : if (msg) {
260 0 : switch (ext->type) {
261 0 : case ML_EXTENSION_TYPE_SINGLE:
262 : {
263 0 : status = ml_single_invoke (ext->single, msg->input, &msg->output);
264 :
265 0 : if (status == ML_ERROR_NONE) {
266 0 : _ml_extension_invoke_event_new_data (mls, NULL, msg->output);
267 : } else {
268 0 : _ml_error_report
269 : ("Failed to invoke the model in ml-service extension thread.");
270 : }
271 0 : break;
272 : }
273 0 : case ML_EXTENSION_TYPE_PIPELINE:
274 : {
275 : ml_extension_node_info_s *node_info;
276 :
277 0 : node_info = _ml_extension_node_info_get (ext, msg->name);
278 :
279 0 : if (node_info && node_info->type == ML_EXTENSION_NODE_TYPE_INPUT) {
280 : /* The input data will be released in the pipeline. */
281 0 : status = ml_pipeline_src_input_data (node_info->handle, msg->input,
282 : ML_PIPELINE_BUF_POLICY_AUTO_FREE);
283 0 : msg->input = NULL;
284 :
285 0 : if (status != ML_ERROR_NONE) {
286 0 : _ml_error_report
287 : ("Failed to push input data into the pipeline in ml-service extension thread.");
288 : }
289 : } else {
290 0 : _ml_error_report
291 : ("Failed to push input data into the pipeline, cannot find input node '%s'.",
292 : msg->name);
293 : }
294 0 : break;
295 : }
296 0 : default:
297 : /* Unknown ml-service extension type, skip this. */
298 0 : break;
299 : }
300 :
301 0 : _ml_extension_msg_free (msg);
302 : }
303 : }
304 :
305 0 : return NULL;
306 : }
307 :
308 : /**
309 : * @brief Wrapper to release tensors-info handle.
310 : */
311 : static void
312 0 : _ml_extension_destroy_tensors_info (void *data)
313 : {
314 0 : ml_tensors_info_h info = (ml_tensors_info_h) data;
315 :
316 0 : if (info)
317 0 : ml_tensors_info_destroy (info);
318 0 : }
319 :
320 : /**
321 : * @brief Internal function to parse single-shot info from json.
322 : */
323 : static int
324 0 : _ml_extension_conf_parse_single (ml_service_s * mls, JsonObject * single)
325 : {
326 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
327 : ml_option_h option;
328 : int status;
329 :
330 0 : status = ml_option_create (&option);
331 0 : if (status != ML_ERROR_NONE) {
332 0 : _ml_error_report_return (status,
333 : "Failed to parse configuration file, cannot create ml-option handle.");
334 : }
335 :
336 : /**
337 : * 1. "key" : load model info from ml-service agent.
338 : * 2. "model" : configuration file includes model path.
339 : */
340 0 : if (json_object_has_member (single, "key")) {
341 0 : const gchar *key = json_object_get_string_member (single, "key");
342 :
343 0 : if (STR_IS_VALID (key)) {
344 : ml_information_h model_info;
345 :
346 0 : status = ml_service_model_get_activated (key, &model_info);
347 0 : if (status == ML_ERROR_NONE) {
348 0 : gchar *paths = NULL;
349 :
350 : /** @todo parse desc and other information if necessary. */
351 0 : ml_information_get (model_info, "path", (void **) (&paths));
352 0 : ml_option_set (option, "models", g_strdup (paths), g_free);
353 :
354 0 : ml_information_destroy (model_info);
355 : } else {
356 0 : _ml_error_report
357 : ("Failed to parse configuration file, cannot get the model of '%s'.",
358 : key);
359 0 : goto error;
360 : }
361 : }
362 0 : } else if (json_object_has_member (single, "model")) {
363 0 : JsonNode *file_node = json_object_get_member (single, "model");
364 0 : gchar *paths = NULL;
365 :
366 0 : status = _ml_service_conf_parse_string (file_node, ",", &paths);
367 0 : if (status != ML_ERROR_NONE) {
368 0 : _ml_error_report
369 : ("Failed to parse configuration file, it should have valid model path.");
370 0 : goto error;
371 : }
372 :
373 0 : ml_option_set (option, "models", paths, g_free);
374 : } else {
375 0 : status = ML_ERROR_INVALID_PARAMETER;
376 0 : _ml_error_report
377 : ("Failed to parse configuration file, cannot get the model path.");
378 0 : goto error;
379 : }
380 :
381 0 : if (json_object_has_member (single, "framework")) {
382 0 : const gchar *fw = json_object_get_string_member (single, "framework");
383 :
384 0 : if (STR_IS_VALID (fw))
385 0 : ml_option_set (option, "framework_name", g_strdup (fw), g_free);
386 : }
387 :
388 0 : if (json_object_has_member (single, "input_info")) {
389 0 : JsonNode *info_node = json_object_get_member (single, "input_info");
390 : ml_tensors_info_h in_info;
391 :
392 0 : status = _ml_service_conf_parse_tensors_info (info_node, &in_info);
393 0 : if (status != ML_ERROR_NONE) {
394 0 : _ml_error_report
395 : ("Failed to parse configuration file, cannot parse input information.");
396 0 : goto error;
397 : }
398 :
399 0 : ml_option_set (option, "input_info", in_info,
400 : _ml_extension_destroy_tensors_info);
401 : }
402 :
403 0 : if (json_object_has_member (single, "output_info")) {
404 0 : JsonNode *info_node = json_object_get_member (single, "output_info");
405 : ml_tensors_info_h out_info;
406 :
407 0 : status = _ml_service_conf_parse_tensors_info (info_node, &out_info);
408 0 : if (status != ML_ERROR_NONE) {
409 0 : _ml_error_report
410 : ("Failed to parse configuration file, cannot parse output information.");
411 0 : goto error;
412 : }
413 :
414 0 : ml_option_set (option, "output_info", out_info,
415 : _ml_extension_destroy_tensors_info);
416 : }
417 :
418 0 : if (json_object_has_member (single, "custom")) {
419 0 : const gchar *custom = json_object_get_string_member (single, "custom");
420 :
421 0 : if (STR_IS_VALID (custom))
422 0 : ml_option_set (option, "custom", g_strdup (custom), g_free);
423 : }
424 :
425 0 : error:
426 0 : if (status == ML_ERROR_NONE)
427 0 : status = ml_single_open_with_option (&ext->single, option);
428 :
429 0 : ml_option_destroy (option);
430 0 : return status;
431 : }
432 :
433 : /**
434 : * @brief Internal function to parse the node info in pipeline.
435 : */
436 : static int
437 0 : _ml_extension_conf_parse_pipeline_node (ml_service_s * mls, JsonNode * node,
438 : ml_extension_node_type_e type)
439 : {
440 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
441 0 : JsonArray *array = NULL;
442 : JsonObject *object;
443 : guint i, n;
444 : int status;
445 :
446 0 : n = 1;
447 0 : if (JSON_NODE_HOLDS_ARRAY (node)) {
448 0 : array = json_node_get_array (node);
449 0 : n = json_array_get_length (array);
450 : }
451 :
452 0 : for (i = 0; i < n; i++) {
453 0 : const gchar *name = NULL;
454 : ml_extension_node_info_s *node_info;
455 :
456 0 : if (array)
457 0 : object = json_array_get_object_element (array, i);
458 : else
459 0 : object = json_node_get_object (node);
460 :
461 0 : name = _ml_service_get_json_string_member (object, "name");
462 :
463 0 : node_info = _ml_extension_node_info_new (mls, name, type);
464 0 : if (!node_info) {
465 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
466 : "Failed to parse configuration file, cannot add new node information.");
467 : }
468 :
469 0 : if (json_object_has_member (object, "info")) {
470 0 : JsonNode *info_node = json_object_get_member (object, "info");
471 :
472 0 : status = _ml_service_conf_parse_tensors_info (info_node,
473 : &node_info->info);
474 0 : if (status != ML_ERROR_NONE) {
475 0 : _ml_error_report_return (status,
476 : "Failed to parse configuration file, cannot parse the information.");
477 : }
478 : } else {
479 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
480 : "Failed to parse configuration file, cannot find node information.");
481 : }
482 :
483 0 : switch (type) {
484 0 : case ML_EXTENSION_NODE_TYPE_INPUT:
485 0 : status = ml_pipeline_src_get_handle (ext->pipeline, name,
486 0 : &node_info->handle);
487 0 : break;
488 0 : case ML_EXTENSION_NODE_TYPE_OUTPUT:
489 0 : status = ml_pipeline_sink_register (ext->pipeline, name,
490 0 : _ml_extension_pipeline_sink_cb, node_info, &node_info->handle);
491 0 : break;
492 0 : default:
493 0 : status = ML_ERROR_INVALID_PARAMETER;
494 0 : break;
495 : }
496 :
497 0 : if (status != ML_ERROR_NONE) {
498 0 : _ml_error_report_return (status,
499 : "Failed to parse configuration file, cannot get the handle for pipeline node.");
500 : }
501 : }
502 :
503 0 : return ML_ERROR_NONE;
504 : }
505 :
506 : /**
507 : * @brief Internal function to parse pipeline info from json.
508 : */
509 : static int
510 0 : _ml_extension_conf_parse_pipeline (ml_service_s * mls, JsonObject * pipe)
511 : {
512 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
513 0 : g_autofree gchar *desc = NULL;
514 : int status;
515 :
516 : /**
517 : * 1. "key" : load pipeline from ml-service agent.
518 : * 2. "description" : configuration file includes pipeline description.
519 : */
520 0 : if (json_object_has_member (pipe, "key")) {
521 0 : const gchar *key = json_object_get_string_member (pipe, "key");
522 :
523 0 : if (STR_IS_VALID (key)) {
524 0 : status = ml_service_pipeline_get (key, &desc);
525 0 : if (status != ML_ERROR_NONE) {
526 0 : _ml_error_report_return (status,
527 : "Failed to parse configuration file, cannot get the pipeline of '%s'.",
528 : key);
529 : }
530 : }
531 0 : } else if (json_object_has_member (pipe, "description")) {
532 0 : desc = g_strdup (json_object_get_string_member (pipe, "description"));
533 : } else {
534 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
535 : "Failed to parse configuration file, cannot get the pipeline description.");
536 : }
537 :
538 0 : status = ml_pipeline_construct (desc, NULL, NULL, &ext->pipeline);
539 0 : if (status != ML_ERROR_NONE) {
540 0 : _ml_error_report_return (status,
541 : "Failed to parse configuration file, cannot construct the pipeline.");
542 : }
543 :
544 0 : if (json_object_has_member (pipe, "input_node")) {
545 0 : JsonNode *node = json_object_get_member (pipe, "input_node");
546 :
547 0 : status = _ml_extension_conf_parse_pipeline_node (mls, node,
548 : ML_EXTENSION_NODE_TYPE_INPUT);
549 0 : if (status != ML_ERROR_NONE) {
550 0 : _ml_error_report_return (status,
551 : "Failed to parse configuration file, cannot get the input node.");
552 : }
553 : } else {
554 0 : _ml_logw
555 : ("No input node is defined in the pipeline. Might Non-appsrc be used?");
556 : }
557 :
558 0 : if (json_object_has_member (pipe, "output_node")) {
559 0 : JsonNode *node = json_object_get_member (pipe, "output_node");
560 :
561 0 : status = _ml_extension_conf_parse_pipeline_node (mls, node,
562 : ML_EXTENSION_NODE_TYPE_OUTPUT);
563 0 : if (status != ML_ERROR_NONE) {
564 0 : _ml_error_report_return (status,
565 : "Failed to parse configuration file, cannot get the output node.");
566 : }
567 : } else {
568 0 : _ml_logw ("No output node is defined in the pipeline.");
569 : }
570 :
571 : /* Start pipeline when creating ml-service handle to check pipeline description. */
572 0 : status = ml_pipeline_start (ext->pipeline);
573 0 : if (status != ML_ERROR_NONE) {
574 0 : _ml_error_report_return (status,
575 : "Failed to parse configuration file, cannot start the pipeline.");
576 : }
577 :
578 0 : return ML_ERROR_NONE;
579 : }
580 :
581 : /**
582 : * @brief Internal function to parse configuration file.
583 : */
584 : static int
585 0 : _ml_extension_conf_parse_json (ml_service_s * mls, JsonObject * object)
586 : {
587 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
588 : int status;
589 :
590 0 : if (json_object_has_member (object, "single")) {
591 0 : JsonObject *single = json_object_get_object_member (object, "single");
592 :
593 0 : status = _ml_extension_conf_parse_single (mls, single);
594 0 : if (status != ML_ERROR_NONE)
595 0 : return status;
596 :
597 0 : ext->type = ML_EXTENSION_TYPE_SINGLE;
598 0 : } else if (json_object_has_member (object, "pipeline")) {
599 0 : JsonObject *pipe = json_object_get_object_member (object, "pipeline");
600 :
601 0 : status = _ml_extension_conf_parse_pipeline (mls, pipe);
602 0 : if (status != ML_ERROR_NONE)
603 0 : return status;
604 :
605 0 : ext->type = ML_EXTENSION_TYPE_PIPELINE;
606 : } else {
607 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
608 : "Failed to parse configuration file, cannot get the valid type from configuration.");
609 : }
610 :
611 0 : return ML_ERROR_NONE;
612 : }
613 :
614 : /**
615 : * @brief Internal function to create ml-service extension.
616 : */
617 : int
618 0 : _ml_service_extension_create (ml_service_s * mls, JsonObject * object)
619 : {
620 : ml_extension_s *ext;
621 0 : g_autofree gchar *thread_name = g_strdup_printf ("ml-ext-msg-%d", getpid ());
622 : int status;
623 :
624 0 : mls->priv = ext = g_try_new0 (ml_extension_s, 1);
625 0 : if (ext == NULL) {
626 0 : _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
627 : "Failed to allocate memory for ml-service extension. Out of memory?");
628 : }
629 :
630 0 : ext->type = ML_EXTENSION_TYPE_UNKNOWN;
631 0 : ext->running = FALSE;
632 0 : ext->timeout = DEFAULT_TIMEOUT;
633 0 : ext->max_input = DEFAULT_MAX_INPUT;
634 0 : ext->node_table = g_hash_table_new_full (g_str_hash, g_str_equal, g_free,
635 : _ml_extension_node_info_free);
636 :
637 0 : status = _ml_extension_conf_parse_json (mls, object);
638 0 : if (status != ML_ERROR_NONE) {
639 0 : _ml_error_report_return (status,
640 : "Failed to parse the ml-service extension configuration.");
641 : }
642 :
643 0 : g_mutex_lock (&mls->lock);
644 :
645 0 : ext->msg_queue = g_async_queue_new_full (_ml_extension_msg_free);
646 0 : ext->msg_thread = g_thread_new (thread_name, _ml_extension_msg_thread, mls);
647 :
648 : /* Wait until the message thread has been initialized. */
649 0 : g_cond_wait (&mls->cond, &mls->lock);
650 0 : g_mutex_unlock (&mls->lock);
651 :
652 0 : return ML_ERROR_NONE;
653 : }
654 :
655 : /**
656 : * @brief Internal function to release ml-service extension.
657 : */
658 : int
659 0 : _ml_service_extension_destroy (ml_service_s * mls)
660 : {
661 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
662 :
663 : /* Supposed internal function call to release handle. */
664 0 : if (!ext)
665 0 : return ML_ERROR_NONE;
666 :
667 : /**
668 : * Close message thread.
669 : * If model inference is running, it may wait for the result in message thread.
670 : * This takes time, so do not call join with extension lock.
671 : */
672 0 : ext->running = FALSE;
673 0 : if (ext->msg_thread) {
674 0 : g_thread_join (ext->msg_thread);
675 0 : ext->msg_thread = NULL;
676 : }
677 :
678 0 : if (ext->msg_queue) {
679 0 : g_async_queue_unref (ext->msg_queue);
680 0 : ext->msg_queue = NULL;
681 : }
682 :
683 0 : if (ext->single) {
684 0 : ml_single_close (ext->single);
685 0 : ext->single = NULL;
686 : }
687 :
688 0 : if (ext->pipeline) {
689 0 : ml_pipeline_stop (ext->pipeline);
690 0 : ml_pipeline_destroy (ext->pipeline);
691 0 : ext->pipeline = NULL;
692 : }
693 :
694 0 : if (ext->node_table) {
695 0 : g_hash_table_destroy (ext->node_table);
696 0 : ext->node_table = NULL;
697 : }
698 :
699 0 : g_free (ext);
700 0 : mls->priv = NULL;
701 :
702 0 : return ML_ERROR_NONE;
703 : }
704 :
705 : /**
706 : * @brief Internal function to start ml-service extension.
707 : */
708 : int
709 0 : _ml_service_extension_start (ml_service_s * mls)
710 : {
711 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
712 0 : int status = ML_ERROR_NONE;
713 :
714 0 : switch (ext->type) {
715 0 : case ML_EXTENSION_TYPE_PIPELINE:
716 0 : status = ml_pipeline_start (ext->pipeline);
717 0 : break;
718 0 : case ML_EXTENSION_TYPE_SINGLE:
719 : /* Do nothing. */
720 0 : break;
721 0 : default:
722 0 : status = ML_ERROR_NOT_SUPPORTED;
723 0 : break;
724 : }
725 :
726 0 : return status;
727 : }
728 :
729 : /**
730 : * @brief Internal function to stop ml-service extension.
731 : */
732 : int
733 0 : _ml_service_extension_stop (ml_service_s * mls)
734 : {
735 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
736 0 : int status = ML_ERROR_NONE;
737 :
738 0 : switch (ext->type) {
739 0 : case ML_EXTENSION_TYPE_PIPELINE:
740 0 : status = ml_pipeline_stop (ext->pipeline);
741 0 : break;
742 0 : case ML_EXTENSION_TYPE_SINGLE:
743 : /* Do nothing. */
744 0 : break;
745 0 : default:
746 0 : status = ML_ERROR_NOT_SUPPORTED;
747 0 : break;
748 : }
749 :
750 0 : return status;
751 : }
752 :
753 : /**
754 : * @brief Internal function to get the information of required input data.
755 : */
756 : int
757 0 : _ml_service_extension_get_input_information (ml_service_s * mls,
758 : const char *name, ml_tensors_info_h * info)
759 : {
760 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
761 : int status;
762 :
763 0 : switch (ext->type) {
764 0 : case ML_EXTENSION_TYPE_SINGLE:
765 0 : status = ml_single_get_input_info (ext->single, info);
766 0 : break;
767 0 : case ML_EXTENSION_TYPE_PIPELINE:
768 : {
769 : ml_extension_node_info_s *node_info;
770 :
771 0 : node_info = _ml_extension_node_info_get (ext, name);
772 :
773 0 : if (node_info && node_info->type == ML_EXTENSION_NODE_TYPE_INPUT) {
774 0 : status = _ml_tensors_info_create_from (node_info->info, info);
775 : } else {
776 0 : status = ML_ERROR_INVALID_PARAMETER;
777 : }
778 0 : break;
779 : }
780 0 : default:
781 0 : status = ML_ERROR_NOT_SUPPORTED;
782 0 : break;
783 : }
784 :
785 0 : return status;
786 : }
787 :
788 : /**
789 : * @brief Internal function to get the information of output data.
790 : */
791 : int
792 0 : _ml_service_extension_get_output_information (ml_service_s * mls,
793 : const char *name, ml_tensors_info_h * info)
794 : {
795 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
796 : int status;
797 :
798 0 : switch (ext->type) {
799 0 : case ML_EXTENSION_TYPE_SINGLE:
800 0 : status = ml_single_get_output_info (ext->single, info);
801 0 : break;
802 0 : case ML_EXTENSION_TYPE_PIPELINE:
803 : {
804 : ml_extension_node_info_s *node_info;
805 :
806 0 : node_info = _ml_extension_node_info_get (ext, name);
807 :
808 0 : if (node_info && node_info->type == ML_EXTENSION_NODE_TYPE_OUTPUT) {
809 0 : status = _ml_tensors_info_create_from (node_info->info, info);
810 : } else {
811 0 : status = ML_ERROR_INVALID_PARAMETER;
812 : }
813 0 : break;
814 : }
815 0 : default:
816 0 : status = ML_ERROR_NOT_SUPPORTED;
817 0 : break;
818 : }
819 :
820 0 : if (status != ML_ERROR_NONE) {
821 0 : if (*info) {
822 0 : ml_tensors_info_destroy (*info);
823 0 : *info = NULL;
824 : }
825 : }
826 :
827 0 : return status;
828 : }
829 :
830 : /**
831 : * @brief Internal function to set the information for ml-service extension.
832 : */
833 : int
834 0 : _ml_service_extension_set_information (ml_service_s * mls, const char *name,
835 : const char *value)
836 : {
837 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
838 :
839 : /* Check limitation of message queue and other options. */
840 0 : if (g_ascii_strcasecmp (name, "input_queue_size") == 0 ||
841 0 : g_ascii_strcasecmp (name, "max_input") == 0) {
842 0 : ext->max_input = (guint) g_ascii_strtoull (value, NULL, 10);
843 0 : } else if (g_ascii_strcasecmp (name, "timeout") == 0) {
844 0 : ext->timeout = (guint) g_ascii_strtoull (value, NULL, 10);
845 : }
846 :
847 0 : return ML_ERROR_NONE;
848 : }
849 :
850 : /**
851 : * @brief Internal function to add an input data to process the model in ml-service extension handle.
852 : */
853 : int
854 0 : _ml_service_extension_request (ml_service_s * mls, const char *name,
855 : const ml_tensors_data_h data)
856 : {
857 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
858 : ml_extension_msg_s *msg;
859 : int status, len;
860 :
861 0 : if (ext->type == ML_EXTENSION_TYPE_PIPELINE) {
862 : ml_extension_node_info_s *node_info;
863 :
864 0 : if (!STR_IS_VALID (name)) {
865 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
866 : "The parameter, name '%s', is invalid.", name);
867 : }
868 :
869 0 : node_info = _ml_extension_node_info_get (ext, name);
870 :
871 0 : if (!node_info || node_info->type != ML_EXTENSION_NODE_TYPE_INPUT) {
872 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
873 : "The parameter, name '%s', is invalid, cannot find the input node from pipeline.",
874 : name);
875 : }
876 : }
877 :
878 0 : len = g_async_queue_length (ext->msg_queue);
879 :
880 0 : if (ext->max_input > 0 && len > 0 && ext->max_input <= len) {
881 0 : _ml_error_report_return (ML_ERROR_STREAMS_PIPE,
882 : "Failed to push input data into the queue, the max number of input is %u.",
883 : ext->max_input);
884 : }
885 :
886 0 : msg = g_try_new0 (ml_extension_msg_s, 1);
887 0 : if (!msg) {
888 0 : _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
889 : "Failed to allocate the ml-service extension message. Out of memory?");
890 : }
891 :
892 0 : msg->name = g_strdup (name);
893 0 : status = ml_tensors_data_clone (data, &msg->input);
894 :
895 0 : if (status != ML_ERROR_NONE) {
896 0 : _ml_extension_msg_free (msg);
897 0 : _ml_error_report_return (status, "Failed to clone input data.");
898 : }
899 :
900 0 : g_async_queue_push (ext->msg_queue, msg);
901 :
902 0 : return ML_ERROR_NONE;
903 : }
|