LCOV - code coverage report
Current view: top level - src/core/dependencies - data_concurrency.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 128 137 93.4 %
Date: 2013-05-22 Functions: 8 8 100.0 %
Branches: 98 120 81.7 %

           Branch data     Line data    Source code
       1                 :            : /* StarPU --- Runtime system for heterogeneous multicore architectures.
       2                 :            :  *
       3                 :            :  * Copyright (C) 2010-2012  Université de Bordeaux 1
       4                 :            :  * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
       5                 :            :  *
       6                 :            :  * StarPU is free software; you can redistribute it and/or modify
       7                 :            :  * it under the terms of the GNU Lesser General Public License as published by
       8                 :            :  * the Free Software Foundation; either version 2.1 of the License, or (at
       9                 :            :  * your option) any later version.
      10                 :            :  *
      11                 :            :  * StarPU is distributed in the hope that it will be useful, but
      12                 :            :  * WITHOUT ANY WARRANTY; without even the implied warranty of
      13                 :            :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
      14                 :            :  *
      15                 :            :  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
      16                 :            :  */
      17                 :            : 
      18                 :            : #include <core/dependencies/data_concurrency.h>
      19                 :            : #include <datawizard/coherency.h>
      20                 :            : #include <core/sched_policy.h>
      21                 :            : #include <common/starpu_spinlock.h>
      22                 :            : #include <datawizard/sort_data_handles.h>
      23                 :            : 
      24                 :            : /*
      25                 :            :  * The different types of data accesses are STARPU_R, STARPU_RW, STARPU_W,
      26                 :            :  * STARPU_SCRATCH and STARPU_REDUX. STARPU_RW is managed as a STARPU_W access.
      27                 :            :  * - A single STARPU_W access is allowed at a time.
      28                 :            :  * - Concurrent STARPU_R accesses are allowed.
      29                 :            :  * - Concurrent STARPU_SCRATCH accesses are allowed.
      30                 :            :  * - Concurrent STARPU_REDUX accesses are allowed.
      31                 :            :  */
      32                 :            : 
      33                 :            : /* the header lock must be taken by the caller */
      34                 :     207021 : static struct _starpu_data_requester *may_unlock_data_req_list_head(starpu_data_handle_t handle)
      35                 :            : {
      36                 :            :         struct _starpu_data_requester_list *req_list;
      37                 :            : 
      38         [ +  + ]:     207021 :         if (handle->reduction_refcnt > 0)
      39                 :            :         {
      40                 :        364 :                 req_list = handle->reduction_req_list;
      41                 :            :         }
      42                 :            :         else
      43                 :            :         {
      44         [ +  - ]:     206657 :                 if (_starpu_data_requester_list_empty(handle->reduction_req_list))
      45                 :     206607 :                         req_list = handle->req_list;
      46                 :            :                 else
      47                 :          0 :                         req_list = handle->reduction_req_list;
      48                 :            :         }
      49                 :            : 
      50                 :            :         /* if there is no one to unlock ... */
      51         [ +  + ]:     206971 :         if (_starpu_data_requester_list_empty(req_list))
      52                 :     204172 :                 return NULL;
      53                 :            : 
      54                 :            :         /* if there is no reference to the data anymore, we can use it */
      55         [ +  + ]:       2773 :         if (handle->refcnt == 0)
      56                 :       1699 :                 return _starpu_data_requester_list_pop_front(req_list);
      57                 :            : 
      58         [ +  + ]:       1074 :         if (handle->current_mode == STARPU_W)
      59                 :         17 :                 return NULL;
      60                 :            : 
      61                 :            :         /* data->current_mode == STARPU_R, so we can process more readers */
      62                 :       1057 :         struct _starpu_data_requester *r = _starpu_data_requester_list_front(req_list);
      63                 :            : 
      64                 :       1057 :         enum starpu_access_mode r_mode = r->mode;
      65         [ -  + ]:       1057 :         if (r_mode == STARPU_RW)
      66                 :          0 :                 r_mode = STARPU_W;
      67                 :            : 
      68                 :            :         /* If this is a STARPU_R, STARPU_SCRATCH or STARPU_REDUX type of
      69                 :            :          * access, we only proceed if the cuurrent mode is the same as the
      70                 :            :          * requested mode. */
      71         [ +  + ]:       1057 :         if (r_mode == handle->current_mode)
      72                 :        723 :                 return _starpu_data_requester_list_pop_front(req_list);
      73                 :            :         else
      74                 :     206945 :                 return NULL;
      75                 :            : }
      76                 :            : 
      77                 :            : /* Try to submit a data request, in case the request can be processed
      78                 :            :  * immediatly, return 0, if there is still a dependency that is not compatible
      79                 :            :  * with the current mode, the request is put in the per-handle list of
      80                 :            :  * "requesters", and this function returns 1. */
      81                 :     201982 : static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_codelet,
      82                 :            :                                                        starpu_data_handle_t handle, enum starpu_access_mode mode,
      83                 :            :                                                        void (*callback)(void *), void *argcb,
      84                 :            :                                                        struct _starpu_job *j, unsigned buffer_index)
      85                 :            : {
      86         [ +  + ]:     201982 :         if (mode == STARPU_RW)
      87                 :      41390 :                 mode = STARPU_W;
      88                 :            : 
      89                 :            :         /* Take the lock protecting the header. We try to do some progression
      90                 :            :          * in case this is called from a worker, otherwise we just wait for the
      91                 :            :          * lock to be available. */
      92         [ +  + ]:     201982 :         if (request_from_codelet)
      93                 :            :         {
      94 [ +  + ][ +  + ]:     276569 :                 while (_starpu_spin_trylock(&handle->header_lock))
      95                 :      98757 :                         _starpu_datawizard_progress(_starpu_memory_node_get_local_key(), 0);
      96                 :            :         }
      97                 :            :         else
      98                 :            :         {
      99                 :      24180 :                 _starpu_spin_lock(&handle->header_lock);
     100                 :            :         }
     101                 :            : 
     102                 :            :         /* If we have a request that is not used for the reduction, and that a
     103                 :            :          * reduction is pending, we put it at the end of normal list, and we
     104                 :            :          * use the reduction_req_list instead */
     105                 :     201964 :         unsigned pending_reduction = (handle->reduction_refcnt > 0);
     106                 :     201964 :         unsigned frozen = 0;
     107                 :            : 
     108                 :            :         /* If we are currently performing a reduction, we freeze any request
     109                 :            :          * that is not explicitely a reduction task. */
     110 [ +  + ][ +  + ]:     201964 :         unsigned is_a_reduction_task = (request_from_codelet && j->reduction_task);
     111                 :            : 
     112 [ +  + ][ -  + ]:     201964 :         if (pending_reduction && !is_a_reduction_task)
     113                 :          0 :                 frozen = 1;
     114                 :            : 
     115                 :            :         /* If there is currently nobody accessing the piece of data, or it's
     116                 :            :          * not another writter and if this is the same type of access as the
     117                 :            :          * current one, we can proceed. */
     118                 :     201964 :         unsigned put_in_list = 1;
     119                 :            : 
     120                 :     201964 :         enum starpu_access_mode previous_mode = handle->current_mode;
     121                 :            : 
     122    [ + ][ +  + ]:     201964 :         if (!frozen && ((handle->refcnt == 0) || (!(mode == STARPU_W) && (handle->current_mode == mode))))
         [ +  + ][ +  + ]
     123                 :            :         {
     124                 :            :                 /* Detect whether this is the end of a reduction phase */
     125                 :            :                         /* We don't want to start multiple reductions of the
     126                 :            :                          * same handle at the same time ! */
     127                 :            : 
     128 [ +  + ][ +  + ]:     199742 :                 if ((handle->reduction_refcnt == 0) && (previous_mode == STARPU_REDUX) && (mode != STARPU_REDUX))
                 [ +  + ]
     129                 :            :                 {
     130                 :        213 :                         _starpu_data_end_reduction_mode(handle);
     131                 :            : 
     132                 :            :                         /* Since we need to perform a mode change, we freeze
     133                 :            :                          * the request if needed. */
     134                 :        213 :                         put_in_list = (handle->reduction_refcnt > 0);
     135                 :            :                 }
     136                 :            :                 else
     137                 :            :                 {
     138                 :     199529 :                         put_in_list = 0;
     139                 :            :                 }
     140                 :            :         }
     141                 :            : 
     142         [ +  + ]:     201964 :         if (put_in_list)
     143                 :            :         {
     144                 :            :                 /* there cannot be multiple writers or a new writer
     145                 :            :                  * while the data is in read mode */
     146                 :            : 
     147                 :       2422 :                 handle->busy_count++;
     148                 :            :                 /* enqueue the request */
     149                 :       2422 :                 struct _starpu_data_requester *r = _starpu_data_requester_new();
     150                 :       2422 :                 r->mode = mode;
     151                 :       2422 :                 r->is_requested_by_codelet = request_from_codelet;
     152                 :       2422 :                 r->j = j;
     153                 :       2422 :                 r->buffer_index = buffer_index;
     154                 :       2422 :                 r->ready_data_callback = callback;
     155                 :       2422 :                 r->argcb = argcb;
     156                 :            : 
     157                 :            :                 /* We put the requester in a specific list if this is a reduction task */
     158                 :       2422 :                 struct _starpu_data_requester_list *req_list =
     159         [ -  + ]:       2422 :                         is_a_reduction_task?handle->reduction_req_list:handle->req_list;
     160                 :            : 
     161                 :       2422 :                 _starpu_data_requester_list_push_back(req_list, r);
     162                 :            : 
     163                 :            :                 /* failed */
     164                 :       2422 :                 put_in_list = 1;
     165                 :            :         }
     166                 :            :         else
     167                 :            :         {
     168                 :     199542 :                 handle->refcnt++;
     169                 :     199542 :                 handle->busy_count++;
     170                 :            : 
     171                 :     199542 :                 handle->current_mode = mode;
     172                 :            : 
     173 [ +  + ][ +  + ]:     199542 :                 if ((mode == STARPU_REDUX) && (previous_mode != STARPU_REDUX))
     174                 :        213 :                         _starpu_data_start_reduction_mode(handle);
     175                 :            : 
     176                 :            :                 /* success */
     177                 :     199545 :                 put_in_list = 0;
     178                 :            :         }
     179                 :            : 
     180                 :     201967 :         _starpu_spin_unlock(&handle->header_lock);
     181                 :     201990 :         return put_in_list;
     182                 :            : 
     183                 :            : }
     184                 :            : 
     185                 :      24180 : unsigned _starpu_attempt_to_submit_data_request_from_apps(starpu_data_handle_t handle, enum starpu_access_mode mode,
     186                 :            :                                                           void (*callback)(void *), void *argcb)
     187                 :            : {
     188                 :      24180 :         return _starpu_attempt_to_submit_data_request(0, handle, mode, callback, argcb, NULL, 0);
     189                 :            : }
     190                 :            : 
     191                 :     177798 : static unsigned attempt_to_submit_data_request_from_job(struct _starpu_job *j, unsigned buffer_index)
     192                 :            : {
     193                 :            :         /* Note that we do not access j->task->handles, but j->ordered_buffers
     194                 :            :          * which is a sorted copy of it. */
     195         [ +  + ]:     177798 :         starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buffer_index);
     196         [ -  + ]:     177798 :         enum starpu_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, buffer_index);
     197                 :            : 
     198                 :     177798 :         return _starpu_attempt_to_submit_data_request(1, handle, mode, NULL, NULL, j, buffer_index);
     199                 :            : }
     200                 :            : 
     201                 :     119501 : static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned start_buffer_index)
     202                 :            : {
     203                 :            :         unsigned buf;
     204                 :            : 
     205                 :     119501 :         unsigned nbuffers = j->task->cl->nbuffers;
     206         [ +  + ]:     307974 :         for (buf = start_buffer_index; buf < nbuffers; buf++)
     207                 :            :         {
     208         [ +  + ]:     189838 :                 if (buf)
     209                 :            :                 {
     210         [ -  + ]:      71244 :                         starpu_data_handle_t handle_m1 = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf-1);
     211         [ -  + ]:      71244 :                         starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf);
     212         [ +  + ]:      71244 :                         if (handle_m1 == handle)
     213                 :            :                                 /* We have already requested this data, skip it. This
     214                 :            :                                  * depends on ordering putting writes before reads, see
     215                 :            :                                  * _starpu_compar_handles.  */
     216                 :      12034 :                                 continue;
     217                 :            :                 }
     218                 :            : 
     219                 :     177804 :                 j->task->status = STARPU_TASK_BLOCKED_ON_DATA;
     220         [ +  + ]:     177804 :                 if (attempt_to_submit_data_request_from_job(j, buf))
     221                 :            :                 {
     222                 :       1366 :                         return 1;
     223                 :            :                 }
     224                 :            :         }
     225                 :            : 
     226                 :     119502 :         return 0;
     227                 :            : }
     228                 :            : 
     229                 :            : /* When a new task is submitted, we make sure that there cannot be codelets
     230                 :            :    with concurrent data-access at the same time in the scheduling engine (eg.
     231                 :            :    there can be 2 tasks reading a piece of data, but there cannot be one
     232                 :            :    reading and another writing) */
     233                 :     264222 : unsigned _starpu_submit_job_enforce_data_deps(struct _starpu_job *j)
     234                 :            : {
     235                 :     264222 :         struct starpu_codelet *cl = j->task->cl;
     236                 :            : 
     237 [ +  + ][ +  + ]:     264222 :         if ((cl == NULL) || (cl->nbuffers == 0))
     238                 :     145617 :                 return 0;
     239                 :            : 
     240                 :            :         /* Compute an ordered list of the different pieces of data so that we
     241                 :            :          * grab then according to a total order, thus avoiding a deadlock
     242                 :            :          * condition */
     243                 :            :         unsigned i;
     244         [ +  + ]:     308440 :         for (i=0 ; i<cl->nbuffers ; i++)
     245                 :            :         {
     246            [ + ]:     189835 :                 starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(j->task, i);
     247         [ +  + ]:     189835 :                 _STARPU_JOB_SET_ORDERED_BUFFER_HANDLE(j, handle, i);
     248         [ +  + ]:     189835 :                 enum starpu_access_mode mode = STARPU_CODELET_GET_MODE(j->task->cl, i);
     249         [ +  + ]:     189835 :                 _STARPU_JOB_SET_ORDERED_BUFFER_MODE(j, mode, i);
     250                 :            :         }
     251                 :            : 
     252         [ -  + ]:     118605 :         _starpu_sort_task_handles(_STARPU_JOB_GET_ORDERED_BUFFERS(j), cl->nbuffers);
     253                 :            : 
     254                 :     264231 :         return _submit_job_enforce_data_deps(j, 0);
     255                 :            : }
     256                 :            : 
     257                 :       1366 : static unsigned unlock_one_requester(struct _starpu_data_requester *r)
     258                 :            : {
     259                 :       1366 :         struct _starpu_job *j = r->j;
     260                 :       1366 :         unsigned nbuffers = j->task->cl->nbuffers;
     261                 :       1366 :         unsigned buffer_index = r->buffer_index;
     262                 :            : 
     263         [ +  + ]:       1366 :         if (buffer_index + 1 < nbuffers)
     264                 :            :                 /* not all buffers are protected yet */
     265                 :        890 :                 return _submit_job_enforce_data_deps(j, buffer_index + 1);
     266                 :            :         else
     267                 :       1366 :                 return 0;
     268                 :            : }
     269                 :            : 
     270                 :            : /* The header lock must already be taken by the caller.
     271                 :            :  * This may free the handle if it was lazily unregistered (1 is returned in
     272                 :            :  * that case). The handle pointer thus becomes invalid for the caller.
     273                 :            :  */
     274                 :     204561 : int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
     275                 :            : {
     276                 :     204561 :         _starpu_spin_checklocked(&handle->header_lock);
     277                 :            :         /* A data access has finished so we remove a reference. */
     278         [ -  + ]:     204618 :         STARPU_ASSERT(handle->refcnt > 0);
     279                 :     204618 :         handle->refcnt--;
     280         [ -  + ]:     204618 :         STARPU_ASSERT(handle->busy_count > 0);
     281                 :     204618 :         handle->busy_count--;
     282         [ -  + ]:     204618 :         if (_starpu_data_check_not_busy(handle))
     283                 :            :                 /* Handle was destroyed, nothing left to do.  */
     284                 :          0 :                 return 1;
     285                 :            : 
     286                 :            :         /* In case there is a pending reduction, and that this is the last
     287                 :            :          * requester, we may go back to a "normal" coherency model. */
     288         [ +  + ]:     204603 :         if (handle->reduction_refcnt > 0)
     289                 :            :         {
     290                 :            :                 //fprintf(stderr, "NOTIFY REDUCTION TASK RED REFCNT %d\n", handle->reduction_refcnt);
     291                 :        577 :                 handle->reduction_refcnt--;
     292         [ +  + ]:        577 :                 if (handle->reduction_refcnt == 0)
     293                 :        213 :                         _starpu_data_end_reduction_mode_terminate(handle);
     294                 :            :         }
     295                 :            : 
     296                 :            :         struct _starpu_data_requester *r;
     297         [ +  + ]:     207025 :         while ((r = may_unlock_data_req_list_head(handle)))
     298                 :            :         {
     299                 :            :                 /* STARPU_RW accesses are treated as STARPU_W */
     300                 :       2422 :                 enum starpu_access_mode r_mode = r->mode;
     301         [ -  + ]:       2422 :                 if (r_mode == STARPU_RW)
     302                 :          0 :                         r_mode = STARPU_W;
     303                 :            : 
     304                 :       2422 :                 int put_in_list = 1;
     305 [ +  - ][ -  + ]:       2422 :                 if ((handle->reduction_refcnt == 0) && (handle->current_mode == STARPU_REDUX) && (r_mode != STARPU_REDUX))
                 [ #  # ]
     306                 :            :                 {
     307                 :          0 :                         _starpu_data_end_reduction_mode(handle);
     308                 :            : 
     309                 :            :                         /* Since we need to perform a mode change, we freeze
     310                 :            :                          * the request if needed. */
     311                 :          0 :                         put_in_list = (handle->reduction_refcnt > 0);
     312                 :            :                 }
     313                 :            :                 else
     314                 :            :                 {
     315                 :       2422 :                         put_in_list = 0;
     316                 :            :                 }
     317                 :            : 
     318         [ -  + ]:       2422 :                 if (put_in_list)
     319                 :            :                 {
     320                 :            :                         /* We need to put the request back because we must
     321                 :            :                          * perform a reduction before. */
     322                 :          0 :                         _starpu_data_requester_list_push_front(handle->req_list, r);
     323                 :            :                 }
     324                 :            :                 else
     325                 :            :                 {
     326                 :            :                         /* The data is now attributed to that request so we put a
     327                 :            :                          * reference on it. */
     328                 :       2422 :                         handle->refcnt++;
     329                 :       2422 :                         handle->busy_count++;
     330                 :            : 
     331                 :       2422 :                         enum starpu_access_mode previous_mode = handle->current_mode;
     332                 :       2422 :                         handle->current_mode = r_mode;
     333                 :            : 
     334                 :            :                         /* In case we enter in a reduction mode, we invalidate all per
     335                 :            :                          * worker replicates. Note that the "per_node" replicates are
     336                 :            :                          * kept intact because we'll reduce a valid copy of the
     337                 :            :                          * "per-node replicate" with the per-worker replicates .*/
     338 [ -  + ][ #  # ]:       2422 :                         if ((r_mode == STARPU_REDUX) && (previous_mode != STARPU_REDUX))
     339                 :          0 :                                 _starpu_data_start_reduction_mode(handle);
     340                 :            : 
     341                 :       2422 :                         _starpu_spin_unlock(&handle->header_lock);
     342                 :            : 
     343         [ +  + ]:       2422 :                         if (r->is_requested_by_codelet)
     344                 :            :                         {
     345         [ +  + ]:       1366 :                                 if (!unlock_one_requester(r))
     346                 :       1365 :                                         _starpu_push_task(r->j);
     347                 :            :                         }
     348                 :            :                         else
     349                 :            :                         {
     350         [ -  + ]:       1056 :                                 STARPU_ASSERT(r->ready_data_callback);
     351                 :            : 
     352                 :            :                                 /* execute the callback associated with the data requester */
     353                 :       1056 :                                 r->ready_data_callback(r->argcb);
     354                 :            :                         }
     355                 :            : 
     356                 :       2422 :                         _starpu_data_requester_delete(r);
     357                 :            : 
     358                 :       2422 :                         _starpu_spin_lock(&handle->header_lock);
     359         [ -  + ]:       2422 :                         STARPU_ASSERT(handle->busy_count > 0);
     360                 :       2422 :                         handle->busy_count--;
     361         [ +  + ]:       2422 :                         if (_starpu_data_check_not_busy(handle))
     362                 :          1 :                                 return 1;
     363                 :            :                 }
     364                 :            :         }
     365                 :            : 
     366                 :     204517 :         return 0;
     367                 :            : }

Generated by: LCOV version 1.9