Branch data Line data Source code
1 : : /* StarPU --- Runtime system for heterogeneous multicore architectures.
2 : : *
3 : : * Copyright (C) 2009-2013 Université de Bordeaux 1
4 : : * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
5 : : *
6 : : * StarPU is free software; you can redistribute it and/or modify
7 : : * it under the terms of the GNU Lesser General Public License as published by
8 : : * the Free Software Foundation; either version 2.1 of the License, or (at
9 : : * your option) any later version.
10 : : *
11 : : * StarPU is distributed in the hope that it will be useful, but
12 : : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14 : : *
15 : : * See the GNU Lesser General Public License in COPYING.LGPL for more details.
16 : : */
17 : :
18 : : #include <starpu.h>
19 : : #include <common/config.h>
20 : : #include <common/utils.h>
21 : : #include <core/dependencies/tags.h>
22 : : #include <core/jobs.h>
23 : : #include <core/sched_policy.h>
24 : : #include <core/dependencies/data_concurrency.h>
25 : : #include <profiling/bound.h>
26 : : #include <common/uthash.h>
27 : : #include <core/debug.h>
28 : :
29 : : #define AYUDAME_OFFSET 4000000000000000000ULL
30 : :
31 : : struct _starpu_tag_table
32 : : {
33 : : UT_hash_handle hh;
34 : : starpu_tag_t id;
35 : : struct _starpu_tag *tag;
36 : : };
37 : :
38 : : #define HASH_ADD_UINT64_T(head,field,add) HASH_ADD(hh,head,field,sizeof(uint64_t),add)
39 : : #define HASH_FIND_UINT64_T(head,find,out) HASH_FIND(hh,head,find,sizeof(uint64_t),out)
40 : :
41 : : static struct _starpu_tag_table *tag_htbl = NULL;
42 : : static starpu_pthread_rwlock_t tag_global_rwlock;
43 : :
44 : 90 : static struct _starpu_cg *create_cg_apps(unsigned ntags)
45 : : {
46 : 90 : struct _starpu_cg *cg = (struct _starpu_cg *) malloc(sizeof(struct _starpu_cg));
47 [ - + ]: 90 : STARPU_ASSERT(cg);
48 : :
49 : 90 : cg->ntags = ntags;
50 : 90 : cg->remaining = ntags;
51 : 90 : cg->cg_type = STARPU_CG_APPS;
52 : :
53 : 90 : cg->succ.succ_apps.completed = 0;
54 [ - + ]: 90 : STARPU_PTHREAD_MUTEX_INIT(&cg->succ.succ_apps.cg_mutex, NULL);
55 [ - + ]: 90 : STARPU_PTHREAD_COND_INIT(&cg->succ.succ_apps.cg_cond, NULL);
56 : :
57 : 90 : return cg;
58 : : }
59 : :
60 : 44923 : static struct _starpu_cg *create_cg_tag(unsigned ntags, struct _starpu_tag *tag)
61 : : {
62 : 44923 : struct _starpu_cg *cg = (struct _starpu_cg *) malloc(sizeof(struct _starpu_cg));
63 [ - + ]: 44923 : STARPU_ASSERT(cg);
64 : :
65 : 44923 : cg->ntags = ntags;
66 : 44923 : cg->remaining = ntags;
67 : 44923 : cg->cg_type = STARPU_CG_TAG;
68 : :
69 : 44923 : cg->succ.tag = tag;
70 : 44923 : tag->tag_successors.ndeps++;
71 : :
72 : 44923 : return cg;
73 : : }
74 : :
75 : 46634 : static struct _starpu_tag *_starpu_tag_init(starpu_tag_t id)
76 : : {
77 : : struct _starpu_tag *tag;
78 : 46634 : tag = (struct _starpu_tag *) malloc(sizeof(struct _starpu_tag));
79 [ - + ]: 46634 : STARPU_ASSERT(tag);
80 : :
81 : 46634 : tag->job = NULL;
82 : 46634 : tag->is_assigned = 0;
83 : 46634 : tag->is_submitted = 0;
84 : :
85 : 46634 : tag->id = id;
86 : 46634 : tag->state = STARPU_INVALID_STATE;
87 : :
88 : 46634 : _starpu_cg_list_init(&tag->tag_successors);
89 : :
90 : 46634 : _starpu_spin_init(&tag->lock);
91 : :
92 : 46634 : return tag;
93 : : }
94 : :
95 : 46634 : static void _starpu_tag_free(void *_tag)
96 : : {
97 : 46634 : struct _starpu_tag *tag = (struct _starpu_tag *) _tag;
98 : :
99 [ + - ]: 46634 : if (tag)
100 : : {
101 : 46634 : _starpu_spin_lock(&tag->lock);
102 : :
103 : 46634 : unsigned nsuccs = tag->tag_successors.nsuccs;
104 : : unsigned succ;
105 : :
106 [ + + ]: 138773 : for (succ = 0; succ < nsuccs; succ++)
107 : : {
108 : 92139 : struct _starpu_cg *cg = tag->tag_successors.succ[succ];
109 : :
110 : 92139 : unsigned ntags = STARPU_ATOMIC_ADD(&cg->ntags, -1);
111 : 92139 : unsigned remaining __attribute__ ((unused)) = STARPU_ATOMIC_ADD(&cg->remaining, -1);
112 : :
113 [ + + ][ + - ]: 92139 : if (!ntags && (cg->cg_type == STARPU_CG_TAG))
114 : : /* Last tag this cg depends on, cg becomes unreferenced */
115 : 44923 : free(cg);
116 : : }
117 : :
118 : : #ifdef STARPU_DYNAMIC_DEPS_SIZE
119 : 46634 : free(tag->tag_successors.succ);
120 : : #endif
121 : :
122 : 46634 : _starpu_spin_unlock(&tag->lock);
123 : 46634 : _starpu_spin_destroy(&tag->lock);
124 : :
125 : 46634 : free(tag);
126 : : }
127 : 46634 : }
128 : :
129 : : /*
130 : : * Staticly initializing tag_global_rwlock seems to lead to weird errors
131 : : * on Darwin, so we do it dynamically.
132 : : */
133 : 283 : void _starpu_init_tags(void)
134 : : {
135 [ - + ]: 283 : STARPU_PTHREAD_RWLOCK_INIT(&tag_global_rwlock, NULL);
136 : 283 : }
137 : :
138 : 34224 : void starpu_tag_remove(starpu_tag_t id)
139 : : {
140 : : struct _starpu_tag_table *entry;
141 : :
142 : : #ifdef HAVE_AYUDAME_H
143 : : if (AYU_event)
144 : : AYU_event(AYU_REMOVETASK, id + AYUDAME_OFFSET, NULL);
145 : : #endif
146 : :
147 [ - + ]: 34224 : STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
148 : :
149 [ + - ][ - + ]: 63992 : HASH_FIND_UINT64_T(tag_htbl, &id, entry);
[ - - - +
- - - - -
- - - ]
[ + + ][ + - ]
[ + + ][ + + ]
[ + + ]
150 [ + + ][ + + ]: 34224 : if (entry) HASH_DEL(tag_htbl, entry);
[ + + ][ + + ]
[ + + ][ + + ]
[ + + ][ + + ]
[ + + ]
151 : :
152 [ - + ]: 34224 : STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
153 : :
154 [ + + ]: 34224 : if (entry)
155 : : {
156 : 34209 : _starpu_tag_free(entry->tag);
157 : 34209 : free(entry);
158 : : }
159 : 34224 : }
160 : :
161 : 281 : void _starpu_tag_clear(void)
162 : : {
163 [ - + ]: 281 : STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
164 : :
165 : : /* XXX: _starpu_tag_free takes the tag spinlocks while we are keeping
166 : : * the global rwlock. This contradicts the lock order of
167 : : * starpu_tag_wait_array. Should not be a problem in practice since
168 : : * _starpu_tag_clear is called at shutdown only. */
169 : : struct _starpu_tag_table *entry, *tmp;
170 : :
171 [ + + ][ + + ]: 12706 : HASH_ITER(hh, tag_htbl, entry, tmp)
[ + + ]
172 : : {
173 [ + - ][ + + ]: 12425 : HASH_DEL(tag_htbl, entry);
[ - + ][ - + ]
[ + - ][ + + ]
[ + + ][ + + ]
174 : 12425 : _starpu_tag_free(entry->tag);
175 : 12425 : free(entry);
176 : : }
177 : :
178 [ - + ]: 281 : STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
179 : 281 : }
180 : :
181 : 185981 : static struct _starpu_tag *_gettag_struct(starpu_tag_t id)
182 : : {
183 : : /* search if the tag is already declared or not */
184 : : struct _starpu_tag_table *entry;
185 : : struct _starpu_tag *tag;
186 : :
187 [ + + ][ - + ]: 302470 : HASH_FIND_UINT64_T(tag_htbl, &id, entry);
[ - - - +
- - - - -
- - - ]
[ + + ][ + - ]
[ + + ][ + + ]
[ + + ]
188 [ + + ]: 185981 : if (entry != NULL)
189 : 139347 : tag = entry->tag;
190 : : else
191 : : {
192 : : /* the tag does not exist yet : create an entry */
193 : 46634 : tag = _starpu_tag_init(id);
194 : :
195 : : struct _starpu_tag_table *entry2;
196 : 46634 : entry2 = (struct _starpu_tag_table *) malloc(sizeof(*entry2));
197 [ - + ]: 46634 : STARPU_ASSERT(entry2 != NULL);
198 : 46634 : entry2->id = id;
199 : 46634 : entry2->tag = tag;
200 : :
201 [ + + ][ - + ]: 76242 : HASH_ADD_UINT64_T(tag_htbl, id, entry2);
[ - + ][ - + ]
[ - - - +
- - - - -
- - - ]
[ + + ][ + + ]
[ + - ][ - + ]
[ + - ][ + + ]
[ + + ][ + + ]
[ + + ][ - + ]
[ - + ]
202 : :
203 : : #ifdef HAVE_AYUDAME_H
204 : : if (AYU_event)
205 : : {
206 : : int64_t AYU_data[2] = {-1, 0};
207 : : STARPU_ASSERT(id < AYUDAME_OFFSET);
208 : : AYU_event(AYU_ADDTASK, id + AYUDAME_OFFSET, AYU_data);
209 : : }
210 : : #endif
211 : : }
212 : :
213 : 185981 : return tag;
214 : : }
215 : :
216 : 185713 : static struct _starpu_tag *gettag_struct(starpu_tag_t id)
217 : : {
218 : : struct _starpu_tag *tag;
219 [ - + ]: 185713 : STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
220 : 185713 : tag = _gettag_struct(id);
221 [ - + ]: 185713 : STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
222 : 185713 : return tag;
223 : : }
224 : :
225 : : /* lock should be taken */
226 : 42934 : void _starpu_tag_set_ready(struct _starpu_tag *tag)
227 : : {
228 : : /* mark this tag as ready to run */
229 : 42934 : tag->state = STARPU_READY;
230 : : /* declare it to the scheduler ! */
231 : 42934 : struct _starpu_job *j = tag->job;
232 : :
233 : : /* In case the task job is going to be scheduled immediately, and if
234 : : * the task is "empty", calling _starpu_push_task would directly try to enforce
235 : : * the dependencies of the task, and therefore it would try to grab the
236 : : * lock again, resulting in a deadlock. */
237 : 42934 : _starpu_spin_unlock(&tag->lock);
238 : :
239 : : /* enforce data dependencies */
240 [ - + ]: 43002 : STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
241 : 42993 : _starpu_enforce_deps_starting_from_task(j);
242 : :
243 : 43081 : _starpu_spin_lock(&tag->lock);
244 : : #ifdef HAVE_AYUDAME_H
245 : : if (AYU_event)
246 : : {
247 : : int id = -1;
248 : : AYU_event(AYU_PRERUNTASK, tag->id + AYUDAME_OFFSET, &id);
249 : : AYU_event(AYU_POSTRUNTASK, tag->id + AYUDAME_OFFSET, NULL);
250 : : }
251 : : #endif
252 : 43082 : }
253 : :
254 : : /* the lock must be taken ! */
255 : 92264 : static void _starpu_tag_add_succ(struct _starpu_tag *tag, struct _starpu_cg *cg)
256 : : {
257 [ - + ]: 92264 : STARPU_ASSERT(tag);
258 : :
259 : 92264 : _starpu_add_successor_to_cg_list(&tag->tag_successors, cg);
260 : :
261 [ + + ]: 92264 : if (tag->state == STARPU_DONE)
262 : : {
263 : : /* the tag was already completed sooner */
264 : 2207 : _starpu_notify_cg(cg);
265 : : }
266 : 92264 : }
267 : :
268 : 48969 : void _starpu_notify_tag_dependencies(struct _starpu_tag *tag)
269 : : {
270 : 48969 : _starpu_spin_lock(&tag->lock);
271 : :
272 [ + + ]: 48943 : if (tag->state == STARPU_DONE)
273 : : {
274 : 1891 : _starpu_spin_unlock(&tag->lock);
275 : 49033 : return;
276 : : }
277 : :
278 : 47052 : tag->state = STARPU_DONE;
279 : : _STARPU_TRACE_TAG_DONE(tag);
280 : :
281 : 47052 : _starpu_notify_cg_list(&tag->tag_successors);
282 : :
283 : 47157 : _starpu_spin_unlock(&tag->lock);
284 : : }
285 : :
286 : 63 : void starpu_tag_restart(starpu_tag_t id)
287 : : {
288 : 63 : struct _starpu_tag *tag = gettag_struct(id);
289 : :
290 : 63 : _starpu_spin_lock(&tag->lock);
291 [ - + ][ - + ]: 63 : STARPU_ASSERT_MSG(tag->state == STARPU_DONE, "Only completed tags can be restarted (was %d)", tag->state);
292 : 63 : tag->state = STARPU_BLOCKED;
293 : 63 : _starpu_spin_unlock(&tag->lock);
294 : 63 : }
295 : :
296 : 319 : void starpu_tag_notify_from_apps(starpu_tag_t id)
297 : : {
298 : 319 : struct _starpu_tag *tag = gettag_struct(id);
299 : :
300 : 319 : _starpu_notify_tag_dependencies(tag);
301 : 319 : }
302 : :
303 : 48269 : void _starpu_tag_declare(starpu_tag_t id, struct _starpu_job *job)
304 : : {
305 : : _STARPU_TRACE_TAG(id, job);
306 : 48269 : job->task->use_tag = 1;
307 : :
308 : 48269 : struct _starpu_tag *tag= gettag_struct(id);
309 : :
310 : 48269 : _starpu_spin_lock(&tag->lock);
311 : :
312 : : /* Note: a tag can be shared by several tasks, when it is used to
313 : : * detect when either of them are finished. We however don't allow
314 : : * several tasks to share a tag when it is used to wake them by
315 : : * dependency */
316 : 48269 : tag->job = job;
317 : 48269 : tag->is_assigned++;
318 : :
319 : 48269 : job->tag = tag;
320 : : /* the tag is now associated to a job */
321 : :
322 : : /* When the same tag may be signaled several times by different tasks,
323 : : * and it's already done, we should not reset the "done" state.
324 : : * When the tag is simply used by the same task several times, we have
325 : : * to do so. */
326 [ + + ][ + - ]: 48269 : if (job->task->regenerate || job->submitted == 2 ||
[ + + ]
327 : 48265 : tag->state != STARPU_DONE)
328 : 46392 : tag->state = STARPU_ASSOCIATED;
329 : : #ifdef HAVE_AYUDAME_H
330 : : if (AYU_event)
331 : : {
332 : : uintptr_t AYU_data1[3] = {id+AYUDAME_OFFSET, 0, 0};
333 : : uintptr_t AYU_data2[3] = {job->job_id, 0, 0};
334 : : AYU_event(AYU_ADDDEPENDENCY, job->job_id, AYU_data1);
335 : : AYU_event(AYU_ADDDEPENDENCY, id+AYUDAME_OFFSET, AYU_data2);
336 : : }
337 : : #endif
338 : 48269 : _starpu_spin_unlock(&tag->lock);
339 : 48269 : }
340 : :
341 : 29 : void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t *array)
342 : : {
343 [ - + ]: 29 : if (!ndeps)
344 : 29 : return;
345 : :
346 : : unsigned i;
347 : :
348 : : /* create the associated completion group */
349 : 29 : struct _starpu_tag *tag_child = gettag_struct(id);
350 : :
351 : 29 : _starpu_spin_lock(&tag_child->lock);
352 : 29 : struct _starpu_cg *cg = create_cg_tag(ndeps, tag_child);
353 : 29 : _starpu_spin_unlock(&tag_child->lock);
354 : :
355 [ + + ]: 166 : for (i = 0; i < ndeps; i++)
356 : : {
357 : 137 : starpu_tag_t dep_id = array[i];
358 : :
359 : : /* id depends on dep_id
360 : : * so cg should be among dep_id's successors*/
361 : : _STARPU_TRACE_TAG_DEPS(id, dep_id);
362 : 137 : _starpu_bound_tag_dep(id, dep_id);
363 : 137 : struct _starpu_tag *tag_dep = gettag_struct(dep_id);
364 [ - + ]: 137 : STARPU_ASSERT(tag_dep != tag_child);
365 : 137 : _starpu_spin_lock(&tag_dep->lock);
366 : 137 : _starpu_spin_lock(&tag_child->lock);
367 : 137 : _starpu_tag_add_succ(tag_dep, cg);
368 : : #ifdef HAVE_AYUDAME_H
369 : : if (AYU_event)
370 : : {
371 : : uintptr_t AYU_data[3] = {dep_id+AYUDAME_OFFSET, 0, 0};
372 : : AYU_event(AYU_ADDDEPENDENCY, id+AYUDAME_OFFSET, AYU_data);
373 : : }
374 : : #endif
375 : 137 : _starpu_spin_unlock(&tag_child->lock);
376 : 137 : _starpu_spin_unlock(&tag_dep->lock);
377 : : }
378 : : }
379 : :
380 : 44894 : void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
381 : : {
382 [ + - ]: 44894 : if (!ndeps)
383 : 44894 : return;
384 : :
385 : : unsigned i;
386 : :
387 : : /* create the associated completion group */
388 : 44894 : struct _starpu_tag *tag_child = gettag_struct(id);
389 : :
390 : 44894 : _starpu_spin_lock(&tag_child->lock);
391 : 44894 : struct _starpu_cg *cg = create_cg_tag(ndeps, tag_child);
392 : 44894 : _starpu_spin_unlock(&tag_child->lock);
393 : :
394 : : va_list pa;
395 : 44894 : va_start(pa, ndeps);
396 [ + + ]: 136896 : for (i = 0; i < ndeps; i++)
397 : : {
398 : : starpu_tag_t dep_id;
399 [ + - ]: 92002 : dep_id = va_arg(pa, starpu_tag_t);
400 : :
401 : : /* id depends on dep_id
402 : : * so cg should be among dep_id's successors*/
403 : : _STARPU_TRACE_TAG_DEPS(id, dep_id);
404 : 92002 : _starpu_bound_tag_dep(id, dep_id);
405 : 92002 : struct _starpu_tag *tag_dep = gettag_struct(dep_id);
406 [ - + ]: 92002 : STARPU_ASSERT(tag_dep != tag_child);
407 : 92002 : _starpu_spin_lock(&tag_dep->lock);
408 : 92002 : _starpu_spin_lock(&tag_child->lock);
409 : 92002 : _starpu_tag_add_succ(tag_dep, cg);
410 : : #ifdef HAVE_AYUDAME_H
411 : : if (AYU_event)
412 : : {
413 : : uintptr_t AYU_data[3] = {dep_id+AYUDAME_OFFSET, 0, 0};
414 : : AYU_event(AYU_ADDDEPENDENCY, id+AYUDAME_OFFSET, AYU_data);
415 : : }
416 : : #endif
417 : 92002 : _starpu_spin_unlock(&tag_child->lock);
418 : 92002 : _starpu_spin_unlock(&tag_dep->lock);
419 : : }
420 : 44894 : va_end(pa);
421 : : }
422 : :
423 : : /* this function may be called by the application (outside callbacks !) */
424 : 223 : int starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id)
425 : : {
426 : : unsigned i;
427 : : unsigned current;
428 : :
429 : 223 : struct _starpu_tag *tag_array[ntags];
430 : :
431 : : _STARPU_LOG_IN();
432 : :
433 : : /* It is forbidden to block within callbacks or codelets */
434 [ + + ]: 223 : if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
435 : : {
436 : : _STARPU_LOG_OUT_TAG("edeadlk");
437 : 2 : return -EDEADLK;
438 : : }
439 : :
440 [ - + ]: 221 : STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
441 : : /* only wait the tags that are not done yet */
442 [ + + ]: 489 : for (i = 0, current = 0; i < ntags; i++)
443 : : {
444 : 268 : struct _starpu_tag *tag = _gettag_struct(id[i]);
445 : :
446 : 268 : _starpu_spin_lock(&tag->lock);
447 : :
448 [ + + ]: 268 : if (tag->state == STARPU_DONE)
449 : : {
450 : : /* that tag is done already */
451 : 143 : _starpu_spin_unlock(&tag->lock);
452 : : }
453 : : else
454 : : {
455 : 125 : tag_array[current] = tag;
456 : 125 : current++;
457 : : }
458 : : }
459 [ - + ]: 221 : STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
460 : :
461 [ + + ]: 221 : if (current == 0)
462 : : {
463 : : /* all deps are already fulfilled */
464 : : _STARPU_LOG_OUT_TAG("all deps are already fulfilled");
465 : 131 : return 0;
466 : : }
467 : :
468 : : /* there is at least one task that is not finished */
469 : 90 : struct _starpu_cg *cg = create_cg_apps(current);
470 : :
471 [ + + ]: 215 : for (i = 0; i < current; i++)
472 : : {
473 : 125 : _starpu_tag_add_succ(tag_array[i], cg);
474 : 125 : _starpu_spin_unlock(&tag_array[i]->lock);
475 : : }
476 : :
477 [ - + ]: 90 : STARPU_PTHREAD_MUTEX_LOCK(&cg->succ.succ_apps.cg_mutex);
478 : :
479 [ + + ]: 180 : while (!cg->succ.succ_apps.completed)
480 [ - + ]: 90 : STARPU_PTHREAD_COND_WAIT(&cg->succ.succ_apps.cg_cond, &cg->succ.succ_apps.cg_mutex);
481 : :
482 [ - + ]: 90 : STARPU_PTHREAD_MUTEX_UNLOCK(&cg->succ.succ_apps.cg_mutex);
483 : :
484 [ - + ]: 90 : STARPU_PTHREAD_MUTEX_DESTROY(&cg->succ.succ_apps.cg_mutex);
485 [ - + ]: 90 : STARPU_PTHREAD_COND_DESTROY(&cg->succ.succ_apps.cg_cond);
486 : :
487 : 90 : free(cg);
488 : :
489 : : _STARPU_LOG_OUT();
490 : 223 : return 0;
491 : : }
492 : :
493 : 219 : int starpu_tag_wait(starpu_tag_t id)
494 : : {
495 : 219 : return starpu_tag_wait_array(1, &id);
496 : : }
|