Branch data Line data Source code
1 : : /* StarPU --- Runtime system for heterogeneous multicore architectures.
2 : : *
3 : : * Copyright (C) 2010-2012 Université de Bordeaux 1
4 : : * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
5 : : *
6 : : * StarPU is free software; you can redistribute it and/or modify
7 : : * it under the terms of the GNU Lesser General Public License as published by
8 : : * the Free Software Foundation; either version 2.1 of the License, or (at
9 : : * your option) any later version.
10 : : *
11 : : * StarPU is distributed in the hope that it will be useful, but
12 : : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14 : : *
15 : : * See the GNU Lesser General Public License in COPYING.LGPL for more details.
16 : : */
17 : :
18 : : #include <core/dependencies/data_concurrency.h>
19 : : #include <datawizard/coherency.h>
20 : : #include <core/sched_policy.h>
21 : : #include <common/starpu_spinlock.h>
22 : : #include <datawizard/sort_data_handles.h>
23 : :
24 : : /*
25 : : * The different types of data accesses are STARPU_R, STARPU_RW, STARPU_W,
26 : : * STARPU_SCRATCH and STARPU_REDUX. STARPU_RW is managed as a STARPU_W access.
27 : : * - A single STARPU_W access is allowed at a time.
28 : : * - Concurrent STARPU_R accesses are allowed.
29 : : * - Concurrent STARPU_SCRATCH accesses are allowed.
30 : : * - Concurrent STARPU_REDUX accesses are allowed.
31 : : */
32 : :
33 : : /* the header lock must be taken by the caller */
34 : 207021 : static struct _starpu_data_requester *may_unlock_data_req_list_head(starpu_data_handle_t handle)
35 : : {
36 : : struct _starpu_data_requester_list *req_list;
37 : :
38 [ + + ]: 207021 : if (handle->reduction_refcnt > 0)
39 : : {
40 : 364 : req_list = handle->reduction_req_list;
41 : : }
42 : : else
43 : : {
44 [ + - ]: 206657 : if (_starpu_data_requester_list_empty(handle->reduction_req_list))
45 : 206607 : req_list = handle->req_list;
46 : : else
47 : 0 : req_list = handle->reduction_req_list;
48 : : }
49 : :
50 : : /* if there is no one to unlock ... */
51 [ + + ]: 206971 : if (_starpu_data_requester_list_empty(req_list))
52 : 204172 : return NULL;
53 : :
54 : : /* if there is no reference to the data anymore, we can use it */
55 [ + + ]: 2773 : if (handle->refcnt == 0)
56 : 1699 : return _starpu_data_requester_list_pop_front(req_list);
57 : :
58 [ + + ]: 1074 : if (handle->current_mode == STARPU_W)
59 : 17 : return NULL;
60 : :
61 : : /* data->current_mode == STARPU_R, so we can process more readers */
62 : 1057 : struct _starpu_data_requester *r = _starpu_data_requester_list_front(req_list);
63 : :
64 : 1057 : enum starpu_access_mode r_mode = r->mode;
65 [ - + ]: 1057 : if (r_mode == STARPU_RW)
66 : 0 : r_mode = STARPU_W;
67 : :
68 : : /* If this is a STARPU_R, STARPU_SCRATCH or STARPU_REDUX type of
69 : : * access, we only proceed if the cuurrent mode is the same as the
70 : : * requested mode. */
71 [ + + ]: 1057 : if (r_mode == handle->current_mode)
72 : 723 : return _starpu_data_requester_list_pop_front(req_list);
73 : : else
74 : 206945 : return NULL;
75 : : }
76 : :
77 : : /* Try to submit a data request, in case the request can be processed
78 : : * immediatly, return 0, if there is still a dependency that is not compatible
79 : : * with the current mode, the request is put in the per-handle list of
80 : : * "requesters", and this function returns 1. */
81 : 201982 : static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_codelet,
82 : : starpu_data_handle_t handle, enum starpu_access_mode mode,
83 : : void (*callback)(void *), void *argcb,
84 : : struct _starpu_job *j, unsigned buffer_index)
85 : : {
86 [ + + ]: 201982 : if (mode == STARPU_RW)
87 : 41390 : mode = STARPU_W;
88 : :
89 : : /* Take the lock protecting the header. We try to do some progression
90 : : * in case this is called from a worker, otherwise we just wait for the
91 : : * lock to be available. */
92 [ + + ]: 201982 : if (request_from_codelet)
93 : : {
94 [ + + ][ + + ]: 276569 : while (_starpu_spin_trylock(&handle->header_lock))
95 : 98757 : _starpu_datawizard_progress(_starpu_memory_node_get_local_key(), 0);
96 : : }
97 : : else
98 : : {
99 : 24180 : _starpu_spin_lock(&handle->header_lock);
100 : : }
101 : :
102 : : /* If we have a request that is not used for the reduction, and that a
103 : : * reduction is pending, we put it at the end of normal list, and we
104 : : * use the reduction_req_list instead */
105 : 201964 : unsigned pending_reduction = (handle->reduction_refcnt > 0);
106 : 201964 : unsigned frozen = 0;
107 : :
108 : : /* If we are currently performing a reduction, we freeze any request
109 : : * that is not explicitely a reduction task. */
110 [ + + ][ + + ]: 201964 : unsigned is_a_reduction_task = (request_from_codelet && j->reduction_task);
111 : :
112 [ + + ][ - + ]: 201964 : if (pending_reduction && !is_a_reduction_task)
113 : 0 : frozen = 1;
114 : :
115 : : /* If there is currently nobody accessing the piece of data, or it's
116 : : * not another writter and if this is the same type of access as the
117 : : * current one, we can proceed. */
118 : 201964 : unsigned put_in_list = 1;
119 : :
120 : 201964 : enum starpu_access_mode previous_mode = handle->current_mode;
121 : :
122 [ + ][ + + ]: 201964 : if (!frozen && ((handle->refcnt == 0) || (!(mode == STARPU_W) && (handle->current_mode == mode))))
[ + + ][ + + ]
123 : : {
124 : : /* Detect whether this is the end of a reduction phase */
125 : : /* We don't want to start multiple reductions of the
126 : : * same handle at the same time ! */
127 : :
128 [ + + ][ + + ]: 199742 : if ((handle->reduction_refcnt == 0) && (previous_mode == STARPU_REDUX) && (mode != STARPU_REDUX))
[ + + ]
129 : : {
130 : 213 : _starpu_data_end_reduction_mode(handle);
131 : :
132 : : /* Since we need to perform a mode change, we freeze
133 : : * the request if needed. */
134 : 213 : put_in_list = (handle->reduction_refcnt > 0);
135 : : }
136 : : else
137 : : {
138 : 199529 : put_in_list = 0;
139 : : }
140 : : }
141 : :
142 [ + + ]: 201964 : if (put_in_list)
143 : : {
144 : : /* there cannot be multiple writers or a new writer
145 : : * while the data is in read mode */
146 : :
147 : 2422 : handle->busy_count++;
148 : : /* enqueue the request */
149 : 2422 : struct _starpu_data_requester *r = _starpu_data_requester_new();
150 : 2422 : r->mode = mode;
151 : 2422 : r->is_requested_by_codelet = request_from_codelet;
152 : 2422 : r->j = j;
153 : 2422 : r->buffer_index = buffer_index;
154 : 2422 : r->ready_data_callback = callback;
155 : 2422 : r->argcb = argcb;
156 : :
157 : : /* We put the requester in a specific list if this is a reduction task */
158 : 2422 : struct _starpu_data_requester_list *req_list =
159 [ - + ]: 2422 : is_a_reduction_task?handle->reduction_req_list:handle->req_list;
160 : :
161 : 2422 : _starpu_data_requester_list_push_back(req_list, r);
162 : :
163 : : /* failed */
164 : 2422 : put_in_list = 1;
165 : : }
166 : : else
167 : : {
168 : 199542 : handle->refcnt++;
169 : 199542 : handle->busy_count++;
170 : :
171 : 199542 : handle->current_mode = mode;
172 : :
173 [ + + ][ + + ]: 199542 : if ((mode == STARPU_REDUX) && (previous_mode != STARPU_REDUX))
174 : 213 : _starpu_data_start_reduction_mode(handle);
175 : :
176 : : /* success */
177 : 199545 : put_in_list = 0;
178 : : }
179 : :
180 : 201967 : _starpu_spin_unlock(&handle->header_lock);
181 : 201990 : return put_in_list;
182 : :
183 : : }
184 : :
185 : 24180 : unsigned _starpu_attempt_to_submit_data_request_from_apps(starpu_data_handle_t handle, enum starpu_access_mode mode,
186 : : void (*callback)(void *), void *argcb)
187 : : {
188 : 24180 : return _starpu_attempt_to_submit_data_request(0, handle, mode, callback, argcb, NULL, 0);
189 : : }
190 : :
191 : 177798 : static unsigned attempt_to_submit_data_request_from_job(struct _starpu_job *j, unsigned buffer_index)
192 : : {
193 : : /* Note that we do not access j->task->handles, but j->ordered_buffers
194 : : * which is a sorted copy of it. */
195 [ + + ]: 177798 : starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buffer_index);
196 [ - + ]: 177798 : enum starpu_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, buffer_index);
197 : :
198 : 177798 : return _starpu_attempt_to_submit_data_request(1, handle, mode, NULL, NULL, j, buffer_index);
199 : : }
200 : :
201 : 119501 : static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned start_buffer_index)
202 : : {
203 : : unsigned buf;
204 : :
205 : 119501 : unsigned nbuffers = j->task->cl->nbuffers;
206 [ + + ]: 307974 : for (buf = start_buffer_index; buf < nbuffers; buf++)
207 : : {
208 [ + + ]: 189838 : if (buf)
209 : : {
210 [ - + ]: 71244 : starpu_data_handle_t handle_m1 = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf-1);
211 [ - + ]: 71244 : starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf);
212 [ + + ]: 71244 : if (handle_m1 == handle)
213 : : /* We have already requested this data, skip it. This
214 : : * depends on ordering putting writes before reads, see
215 : : * _starpu_compar_handles. */
216 : 12034 : continue;
217 : : }
218 : :
219 : 177804 : j->task->status = STARPU_TASK_BLOCKED_ON_DATA;
220 [ + + ]: 177804 : if (attempt_to_submit_data_request_from_job(j, buf))
221 : : {
222 : 1366 : return 1;
223 : : }
224 : : }
225 : :
226 : 119502 : return 0;
227 : : }
228 : :
229 : : /* When a new task is submitted, we make sure that there cannot be codelets
230 : : with concurrent data-access at the same time in the scheduling engine (eg.
231 : : there can be 2 tasks reading a piece of data, but there cannot be one
232 : : reading and another writing) */
233 : 264222 : unsigned _starpu_submit_job_enforce_data_deps(struct _starpu_job *j)
234 : : {
235 : 264222 : struct starpu_codelet *cl = j->task->cl;
236 : :
237 [ + + ][ + + ]: 264222 : if ((cl == NULL) || (cl->nbuffers == 0))
238 : 145617 : return 0;
239 : :
240 : : /* Compute an ordered list of the different pieces of data so that we
241 : : * grab then according to a total order, thus avoiding a deadlock
242 : : * condition */
243 : : unsigned i;
244 [ + + ]: 308440 : for (i=0 ; i<cl->nbuffers ; i++)
245 : : {
246 [ + ]: 189835 : starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(j->task, i);
247 [ + + ]: 189835 : _STARPU_JOB_SET_ORDERED_BUFFER_HANDLE(j, handle, i);
248 [ + + ]: 189835 : enum starpu_access_mode mode = STARPU_CODELET_GET_MODE(j->task->cl, i);
249 [ + + ]: 189835 : _STARPU_JOB_SET_ORDERED_BUFFER_MODE(j, mode, i);
250 : : }
251 : :
252 [ - + ]: 118605 : _starpu_sort_task_handles(_STARPU_JOB_GET_ORDERED_BUFFERS(j), cl->nbuffers);
253 : :
254 : 264231 : return _submit_job_enforce_data_deps(j, 0);
255 : : }
256 : :
257 : 1366 : static unsigned unlock_one_requester(struct _starpu_data_requester *r)
258 : : {
259 : 1366 : struct _starpu_job *j = r->j;
260 : 1366 : unsigned nbuffers = j->task->cl->nbuffers;
261 : 1366 : unsigned buffer_index = r->buffer_index;
262 : :
263 [ + + ]: 1366 : if (buffer_index + 1 < nbuffers)
264 : : /* not all buffers are protected yet */
265 : 890 : return _submit_job_enforce_data_deps(j, buffer_index + 1);
266 : : else
267 : 1366 : return 0;
268 : : }
269 : :
270 : : /* The header lock must already be taken by the caller.
271 : : * This may free the handle if it was lazily unregistered (1 is returned in
272 : : * that case). The handle pointer thus becomes invalid for the caller.
273 : : */
274 : 204561 : int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
275 : : {
276 : 204561 : _starpu_spin_checklocked(&handle->header_lock);
277 : : /* A data access has finished so we remove a reference. */
278 [ - + ]: 204618 : STARPU_ASSERT(handle->refcnt > 0);
279 : 204618 : handle->refcnt--;
280 [ - + ]: 204618 : STARPU_ASSERT(handle->busy_count > 0);
281 : 204618 : handle->busy_count--;
282 [ - + ]: 204618 : if (_starpu_data_check_not_busy(handle))
283 : : /* Handle was destroyed, nothing left to do. */
284 : 0 : return 1;
285 : :
286 : : /* In case there is a pending reduction, and that this is the last
287 : : * requester, we may go back to a "normal" coherency model. */
288 [ + + ]: 204603 : if (handle->reduction_refcnt > 0)
289 : : {
290 : : //fprintf(stderr, "NOTIFY REDUCTION TASK RED REFCNT %d\n", handle->reduction_refcnt);
291 : 577 : handle->reduction_refcnt--;
292 [ + + ]: 577 : if (handle->reduction_refcnt == 0)
293 : 213 : _starpu_data_end_reduction_mode_terminate(handle);
294 : : }
295 : :
296 : : struct _starpu_data_requester *r;
297 [ + + ]: 207025 : while ((r = may_unlock_data_req_list_head(handle)))
298 : : {
299 : : /* STARPU_RW accesses are treated as STARPU_W */
300 : 2422 : enum starpu_access_mode r_mode = r->mode;
301 [ - + ]: 2422 : if (r_mode == STARPU_RW)
302 : 0 : r_mode = STARPU_W;
303 : :
304 : 2422 : int put_in_list = 1;
305 [ + - ][ - + ]: 2422 : if ((handle->reduction_refcnt == 0) && (handle->current_mode == STARPU_REDUX) && (r_mode != STARPU_REDUX))
[ # # ]
306 : : {
307 : 0 : _starpu_data_end_reduction_mode(handle);
308 : :
309 : : /* Since we need to perform a mode change, we freeze
310 : : * the request if needed. */
311 : 0 : put_in_list = (handle->reduction_refcnt > 0);
312 : : }
313 : : else
314 : : {
315 : 2422 : put_in_list = 0;
316 : : }
317 : :
318 [ - + ]: 2422 : if (put_in_list)
319 : : {
320 : : /* We need to put the request back because we must
321 : : * perform a reduction before. */
322 : 0 : _starpu_data_requester_list_push_front(handle->req_list, r);
323 : : }
324 : : else
325 : : {
326 : : /* The data is now attributed to that request so we put a
327 : : * reference on it. */
328 : 2422 : handle->refcnt++;
329 : 2422 : handle->busy_count++;
330 : :
331 : 2422 : enum starpu_access_mode previous_mode = handle->current_mode;
332 : 2422 : handle->current_mode = r_mode;
333 : :
334 : : /* In case we enter in a reduction mode, we invalidate all per
335 : : * worker replicates. Note that the "per_node" replicates are
336 : : * kept intact because we'll reduce a valid copy of the
337 : : * "per-node replicate" with the per-worker replicates .*/
338 [ - + ][ # # ]: 2422 : if ((r_mode == STARPU_REDUX) && (previous_mode != STARPU_REDUX))
339 : 0 : _starpu_data_start_reduction_mode(handle);
340 : :
341 : 2422 : _starpu_spin_unlock(&handle->header_lock);
342 : :
343 [ + + ]: 2422 : if (r->is_requested_by_codelet)
344 : : {
345 [ + + ]: 1366 : if (!unlock_one_requester(r))
346 : 1365 : _starpu_push_task(r->j);
347 : : }
348 : : else
349 : : {
350 [ - + ]: 1056 : STARPU_ASSERT(r->ready_data_callback);
351 : :
352 : : /* execute the callback associated with the data requester */
353 : 1056 : r->ready_data_callback(r->argcb);
354 : : }
355 : :
356 : 2422 : _starpu_data_requester_delete(r);
357 : :
358 : 2422 : _starpu_spin_lock(&handle->header_lock);
359 [ - + ]: 2422 : STARPU_ASSERT(handle->busy_count > 0);
360 : 2422 : handle->busy_count--;
361 [ + + ]: 2422 : if (_starpu_data_check_not_busy(handle))
362 : 1 : return 1;
363 : : }
364 : : }
365 : :
366 : 204517 : return 0;
367 : : }
|