Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
4 : *
5 : * @file ml-api-service-query-client.c
6 : * @date 30 Aug 2022
7 : * @brief Query client implementation of NNStreamer/Service C-API
8 : * @see https://github.com/nnstreamer/nnstreamer
9 : * @author Yongjoo Ahn <yongjoo1.ahn@samsung.com>
10 : * @bug No known bugs except for NYI items
11 : */
12 :
13 : #include <glib.h>
14 : #include <gst/gst.h>
15 : #include <gst/gstbuffer.h>
16 : #include <gst/app/app.h>
17 : #include <string.h>
18 :
19 : #include "ml-api-internal.h"
20 : #include "ml-api-service.h"
21 : #include "ml-api-service-private.h"
22 :
23 : /**
24 : * @brief Structure for ml_service_query
25 : */
26 : typedef struct
27 : {
28 : ml_pipeline_h pipe_h;
29 : ml_pipeline_src_h src_h;
30 : ml_pipeline_sink_h sink_h;
31 :
32 : guint timeout; /**< in ms unit */
33 : GAsyncQueue *out_data_queue;
34 : } _ml_service_query_s;
35 :
36 : /**
37 : * @brief Sink callback for query_client
38 : */
39 : static void
40 0 : _sink_callback_for_query_client (const ml_tensors_data_h data,
41 : const ml_tensors_info_h info, void *user_data)
42 : {
43 0 : _ml_service_query_s *mls = (_ml_service_query_s *) user_data;
44 : ml_tensors_data_h copied;
45 : int status;
46 :
47 0 : status = ml_tensors_data_clone (data, &copied);
48 0 : if (ML_ERROR_NONE != status) {
49 0 : _ml_error_report_continue
50 : ("Failed to create a new tensors data for query_client.");
51 0 : return;
52 : }
53 :
54 0 : g_async_queue_push (mls->out_data_queue, copied);
55 : }
56 :
57 : /**
58 : * @brief Internal function to release ml-service query data.
59 : */
60 : int
61 0 : _ml_service_query_release_internal (ml_service_s * mls)
62 : {
63 0 : _ml_service_query_s *query = (_ml_service_query_s *) mls->priv;
64 : ml_tensors_data_h data_h;
65 :
66 : /* Supposed internal function call to release handle. */
67 0 : if (!query)
68 0 : return ML_ERROR_NONE;
69 :
70 0 : if (query->pipe_h) {
71 0 : if (ml_pipeline_destroy (query->pipe_h))
72 0 : _ml_error_report ("Failed to destroy pipeline");
73 : }
74 :
75 0 : if (query->out_data_queue) {
76 0 : while ((data_h = g_async_queue_try_pop (query->out_data_queue))) {
77 0 : ml_tensors_data_destroy (data_h);
78 : }
79 :
80 0 : g_async_queue_unref (query->out_data_queue);
81 : }
82 :
83 0 : g_free (query);
84 0 : mls->priv = NULL;
85 :
86 0 : return ML_ERROR_NONE;
87 : }
88 :
89 : /**
90 : * @brief Creates query client service handle with given ml-option handle.
91 : */
92 : int
93 0 : ml_service_query_create (ml_option_h option, ml_service_h * handle)
94 : {
95 0 : int status = ML_ERROR_NONE;
96 :
97 0 : g_autofree gchar *description = NULL;
98 : void *value;
99 :
100 : GString *tensor_query_client_prop;
101 0 : g_autofree gchar *prop = NULL;
102 :
103 : ml_service_s *mls;
104 :
105 : _ml_service_query_s *query_s;
106 : ml_pipeline_h pipe_h;
107 : ml_pipeline_src_h src_h;
108 : ml_pipeline_sink_h sink_h;
109 0 : g_autofree gchar *caps = NULL;
110 0 : guint timeout = 1000U; /* default 1s timeout */
111 :
112 0 : check_feature_state (ML_FEATURE_SERVICE);
113 0 : check_feature_state (ML_FEATURE_INFERENCE);
114 :
115 0 : if (!option) {
116 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
117 : "The parameter, 'option' is NULL. It should be a valid ml_option_h, which should be created by ml_option_create().");
118 : }
119 :
120 0 : if (!handle) {
121 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
122 : "The parameter, 'handle' (ml_service_h), is NULL. It should be a valid ml_service_h.");
123 : }
124 :
125 0 : mls = _ml_service_create_internal (ML_SERVICE_TYPE_CLIENT_QUERY);
126 0 : if (mls == NULL) {
127 0 : _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
128 : "Failed to allocate memory for the service handle. Out of memory?");
129 : }
130 :
131 0 : mls->priv = query_s = g_try_new0 (_ml_service_query_s, 1);
132 0 : if (query_s == NULL) {
133 0 : _ml_service_destroy_internal (mls);
134 0 : _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
135 : "Failed to allocate memory for the service handle's private data. Out of memory?");
136 : }
137 :
138 0 : tensor_query_client_prop = g_string_new (NULL);
139 :
140 0 : if (ML_ERROR_NONE == ml_option_get (option, "host", &value))
141 0 : g_string_append_printf (tensor_query_client_prop, " host=%s ",
142 : (gchar *) value);
143 :
144 0 : if (ML_ERROR_NONE == ml_option_get (option, "port", &value))
145 0 : g_string_append_printf (tensor_query_client_prop, " port=%u ",
146 0 : *((guint *) value));
147 :
148 0 : if (ML_ERROR_NONE == ml_option_get (option, "dest-host", &value))
149 0 : g_string_append_printf (tensor_query_client_prop, " dest-host=%s ",
150 : (gchar *) value);
151 :
152 0 : if (ML_ERROR_NONE == ml_option_get (option, "dest-port", &value))
153 0 : g_string_append_printf (tensor_query_client_prop, " dest-port=%u ",
154 0 : *((guint *) value));
155 :
156 0 : if (ML_ERROR_NONE == ml_option_get (option, "connect-type", &value))
157 0 : g_string_append_printf (tensor_query_client_prop, " connect-type=%s ",
158 : (gchar *) value);
159 :
160 0 : if (ML_ERROR_NONE == ml_option_get (option, "topic", &value))
161 0 : g_string_append_printf (tensor_query_client_prop, " topic=%s ",
162 : (gchar *) value);
163 :
164 0 : if (ML_ERROR_NONE == ml_option_get (option, "timeout", &value))
165 0 : g_string_append_printf (tensor_query_client_prop, " timeout=%u ",
166 0 : *((guint *) value));
167 :
168 0 : if (ML_ERROR_NONE != ml_option_get (option, "caps", &value)) {
169 0 : g_string_free (tensor_query_client_prop, TRUE);
170 0 : _ml_service_destroy_internal (mls);
171 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
172 : "The option 'caps' must be set before call ml_service_query_create.");
173 : }
174 0 : caps = g_strdup ((gchar *) value);
175 :
176 0 : prop = g_string_free (tensor_query_client_prop, FALSE);
177 0 : description =
178 0 : g_strdup_printf
179 : ("appsrc name=srcx ! %s ! tensor_query_client %s name=qcx ! tensor_sink name=sinkx async=false sync=false",
180 : caps, prop);
181 :
182 0 : status = ml_pipeline_construct (description, NULL, NULL, &pipe_h);
183 0 : if (ML_ERROR_NONE != status) {
184 0 : _ml_service_destroy_internal (mls);
185 0 : _ml_error_report_return (status, "Failed to construct pipeline");
186 : }
187 :
188 0 : status = ml_pipeline_start (pipe_h);
189 0 : if (ML_ERROR_NONE != status) {
190 0 : ml_pipeline_destroy (pipe_h);
191 0 : _ml_service_destroy_internal (mls);
192 0 : _ml_error_report_return (status, "Failed to start pipeline");
193 : }
194 :
195 0 : status = ml_pipeline_src_get_handle (pipe_h, "srcx", &src_h);
196 0 : if (ML_ERROR_NONE != status) {
197 0 : ml_pipeline_destroy (pipe_h);
198 0 : _ml_service_destroy_internal (mls);
199 0 : _ml_error_report_return (status, "Failed to get src handle");
200 : }
201 :
202 0 : status = ml_pipeline_sink_register (pipe_h, "sinkx",
203 : _sink_callback_for_query_client, query_s, &sink_h);
204 0 : if (ML_ERROR_NONE != status) {
205 0 : ml_pipeline_destroy (pipe_h);
206 0 : _ml_service_destroy_internal (mls);
207 0 : _ml_error_report_return (status, "Failed to register sink handle");
208 : }
209 :
210 0 : query_s->timeout = timeout;
211 0 : query_s->pipe_h = pipe_h;
212 0 : query_s->src_h = src_h;
213 0 : query_s->sink_h = sink_h;
214 0 : query_s->out_data_queue = g_async_queue_new ();
215 :
216 0 : *handle = mls;
217 :
218 0 : return ML_ERROR_NONE;
219 : }
220 :
221 : /**
222 : * @brief Requests query client service an output with given input data.
223 : */
224 : int
225 0 : ml_service_query_request (ml_service_h handle, const ml_tensors_data_h input,
226 : ml_tensors_data_h * output)
227 : {
228 0 : int status = ML_ERROR_NONE;
229 0 : ml_service_s *mls = (ml_service_s *) handle;
230 : _ml_service_query_s *query;
231 :
232 0 : check_feature_state (ML_FEATURE_SERVICE);
233 0 : check_feature_state (ML_FEATURE_INFERENCE);
234 :
235 0 : if (!_ml_service_handle_is_valid (mls))
236 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
237 : "The parameter, 'handle' (ml_service_h), is invalid. It should be a valid ml_service_h instance.");
238 0 : if (!input)
239 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
240 : "The parameter, 'input' (ml_tensors_data_h), is NULL. It should be a valid ml_tensors_data_h.");
241 0 : if (!output)
242 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
243 : "The parameter, 'output' (ml_tensors_data_h *), is NULL. It should be a valid pointer to an instance of ml_tensors_data_h.");
244 :
245 0 : query = (_ml_service_query_s *) mls->priv;
246 :
247 0 : status = ml_pipeline_src_input_data (query->src_h, input,
248 : ML_PIPELINE_BUF_POLICY_DO_NOT_FREE);
249 0 : if (ML_ERROR_NONE != status) {
250 0 : _ml_error_report_return (status, "Failed to input data");
251 : }
252 :
253 0 : *output = g_async_queue_timeout_pop (query->out_data_queue,
254 0 : query->timeout * G_TIME_SPAN_MILLISECOND);
255 0 : if (NULL == *output) {
256 0 : _ml_error_report_return (ML_ERROR_TIMED_OUT, "timeout!");
257 : }
258 :
259 0 : return ML_ERROR_NONE;
260 : }
|