Martin,
could you please also add
sql/test/BugTracker-2013/Tests/nestedcalls.sql ?
And
could you plase also add the test scripts and stable output for tests
sql/test/BugTracker-2013/Tests/oid_handling
sql/test/BugTracker-2013/Tests/constraint_checking.Bug_3335
sql/test/BugTracker-2013/Tests/pivot.Bug-3339
sql/test/BugTracker-2013/Tests/recursion
or
alternatively remove them from
sql/test/BugTracker-2013/Tests/All
Thanks!
In any case, propagation of this will checking will result in (expected/unavoidable) conflicts in monetdb5/mal/mal_dataflow.[ch]
and (avoidable) conflicts in sql/test/BugTracker-2013/Tests
All of these will need to be solved by hand ...
Stefan
----- Original Message -----
Changeset: 489815265a61 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=489815265a61
Added Files:
sql/test/BugTracker-2013/Tests/nestedcalls.stable.err
sql/test/BugTracker-2013/Tests/nestedcalls.stable.out
Modified Files:
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_dataflow.h
monetdb5/modules/mal/language.c
monetdb5/modules/mal/language.h
sql/test/BugTracker-2013/Tests/All
Branch: Feb2013
Log Message:
Organize parallelism per worker pool
Each (recursive) dataflow block is handed a worker pool now.
Once we run out of worker pools or fail to create a worker,
we continue in sequential mode.
Cleaning up the threads is implicit, like all other non-pool
interpreters that might be active at system shutdown.
diffs (truncated from 762 to 300 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -35,6 +35,7 @@
* The flow graphs should be organized such that parallel threads can
* access it mostly without expensive locking.
*/
+#include "monetdb_config.h"
#include "mal_dataflow.h"
#include "mal_client.h"
@@ -82,10 +83,10 @@ typedef struct DATAFLOW {
Queue *done; /* instructions handled */
} *DataFlow, DataFlowRec;
-#define MAXQ 1024
-static MT_Id workers[THREADS] = {0};
-static int workerqueue[THREADS] = {0}; /* maps workers towards the todo
queues */
-static Queue *todo[MAXQ] = {0}; /* pending instructions organized by user
MAXTODO > #users */
+#define MAXQ 256
+static Queue *todos[MAXQ] = {0}; /* pending instructions organized by
dataflow block */
+static bit occupied[MAXQ]={0}; /* worker pool is in use? */
+static int volatile exiting = 0;
/*
* Calculate the size of the dataflow dependency graph.
@@ -108,9 +109,8 @@ DFLOWgraphSize(MalBlkPtr mb, int start,
*/
static Queue*
-q_create(int sz)
+q_create(int sz, const char *name)
{
- const char* name = "q_create";
Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
if (q == NULL)
@@ -208,6 +208,8 @@ q_dequeue(Queue *q)
assert(q);
MT_sema_down(&q->s, "q_dequeue");
+ if (exiting)
+ return NULL;
MT_lock_set(&q->l, "q_dequeue");
assert(q->last > 0);
if (q->last > 0) {
@@ -255,25 +257,23 @@ DFLOWworker(void *t)
{
DataFlow flow;
FlowEvent fe = 0, fnxt = 0;
- int id = (int) ((MT_Id *) t - workers), last = 0;
- int wq;
Thread thr;
str error = 0;
-
- int i;
- lng usec = 0;
+ Queue *todo = *(Queue **) t;
+ int i,last;
thr = THRnew("DFLOWworker");
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
GDKerrbuf[0] = 0;
while (1) {
- assert(workerqueue[id] > 0);
- wq = workerqueue[id] - 1;
if (fnxt == 0)
- fe = q_dequeue(todo[wq]);
+ fe = q_dequeue(todo);
else
fe = fnxt;
+ if (exiting) {
+ break;
+ }
fnxt = 0;
assert(fe);
flow = fe->flow;
@@ -285,22 +285,20 @@ DFLOWworker(void *t)
continue;
}
- usec = GDKusec();
/* skip all instructions when we have encontered an error */
if (flow->error == 0) {
#ifdef USE_MAL_ADMISSION
if (MALadmission(fe->argclaim, fe->hotclaim)) {
fe->hotclaim = 0; /* don't assume priority anymore */
- assert(todo[wq]);
- if (todo[wq]->last == 0)
+ if (todo->last == 0)
MT_sleep_ms(DELAYUNIT);
- q_requeue(todo[wq], fe);
+ q_requeue(todo, fe);
continue;
}
#endif
error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1,
flow->stk, 0, 0);
PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT
"," LLFMT " %s\n",
- fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : "");
+ fe->pc, (int)((Queue **)t - todos), fe->argclaim, fe->hotclaim,
error ? error : "");
#ifdef USE_MAL_ADMISSION
/* release the memory claim */
MALadmission(-fe->argclaim, -fe->hotclaim);
@@ -331,8 +329,8 @@ DFLOWworker(void *t)
InstrPtr p = getInstrPtr(flow->mb, fe->pc);
assert(p);
fe->hotclaim = 0;
- for (i = 0; i < p->retc; i++)
- fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, fe->pc, i, FALSE);
+ //for (i = 0; i < p->retc; i++)
+ //fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
}
#endif
MT_lock_set(&flow->flowlock, "MALworker");
@@ -351,56 +349,64 @@ DFLOWworker(void *t)
q_enqueue(flow->done, fe);
if ( fnxt == 0) {
- assert(todo[wq]);
- if (todo[wq]->last == 0)
+ if (todo->last == 0)
profilerHeartbeatEvent("wait");
- else
- MALresourceFairness(NULL, NULL, usec);
}
}
GDKfree(GDKerrbuf);
GDKsetbuf(0);
- workerqueue[wq] = 0;
- workers[wq] = 0;
THRdel(thr);
}
/*
- * Create a set of DFLOW interpreters.
+ * Create an interpreter pool.
* One worker will adaptively be available for each client.
* The remainder are taken from the GDKnr_threads argument and
- * typically is equal to the number of cores
+ * typically is equal to the number of cores.
+ * A recursive MAL function call would make for one worker less,
+ * which limits the number of cores for parallel processing.
* The workers are assembled in a local table to enable debugging.
+ *
+ * BEWARE, failure to create a new worker thread is not an error
+ * but would lead to serial execution.
*/
-static void
-DFLOWinitialize(int index)
+static int
+DFLOWinitialize(void)
{
- int i, worker, limit;
+ int i, threads, grp;
+ MT_Id worker;
- assert(index >= 0);
- assert(index < THREADS);
+ threads = GDKnr_threads ? GDKnr_threads : 1;
MT_lock_set(&mal_contextLock, "DFLOWinitialize");
- if (todo[index]) {
- MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- return;
+ for(grp = 0; grp< MAXQ; grp++)
+ if ( occupied[grp] == FALSE){
+ occupied[grp] = TRUE;
+ break;
+ }
+ MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+ if (grp > THREADS) {
+ // continue non-parallel
+ return -1;
}
- todo[index] = q_create(2048);
- assert(todo[index]);
- limit = GDKnr_threads ? GDKnr_threads : 1;
- assert(limit <= THREADS);
- for (worker = 0, i = 0; i < limit; i++){
- for (; worker < THREADS; worker++)
- if( workers[worker] == 0)
- break;
- assert(worker < THREADS);
- if (worker < THREADS) {
- assert(workers[worker] == 0);
- MT_create_thread(&workers[worker], DFLOWworker, (void *)
&workers[worker], MT_THR_JOINABLE);
- assert(workers[worker] > 0);
- workerqueue[worker] = index + 1;
+ if ( todos[grp] )
+ return grp;
+
+ todos[grp] = q_create(2048, "todo");
+ if (todos[grp] == NULL)
+ return -1;
+
+ // associate a set of workers with the pool
+ for (i = 0; grp>= 0 && i < threads; i++){
+ if (MT_create_thread(&worker, DFLOWworker, (void *) &todos[grp],
MT_THR_JOINABLE) < 0) {
+ //Can not create interpreter thread
+ grp = -1;
+ }
+ if (worker == 0) {
+ //Failed to create interpreter thread
+ grp = -1;
}
}
- MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+ return grp;
}
/*
@@ -409,18 +415,28 @@ DFLOWinitialize(int index)
* For each instruction we keep a list of instructions whose
* blocking counter should be decremented upon finishing it.
*/
-static void
+static str
DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
{
int pc, i, j, k, l, n, etop = 0;
int *assign;
InstrPtr p;
+ if (flow == NULL)
+ throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
+ if (mb == NULL)
+ throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
PARDEBUG printf("Initialize dflow block\n");
assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
+ if (assign == NULL)
+ throw(MAL, "dataflow", "DFLOWinitBlk(): Failed to allocate assign");
etop = flow->stop - flow->start;
for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
p = getInstrPtr(mb, pc);
+ if (p == NULL) {
+ GDKfree(assign);
+ throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() returned NULL");
+ }
/* initial state, ie everything can run */
flow->status[n].flow = flow;
@@ -501,6 +517,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
#ifdef USE_MAL_ADMISSION
memorypool = memoryclaims = 0;
#endif
+ return MAL_SUCCEED;
}
/*
@@ -528,18 +545,17 @@ static void showFlowEvent(DataFlow flow,
*/
static str
-DFLOWscheduler(DataFlow flow)
+DFLOWscheduler(DataFlow flow, Queue *todo)
{
int last;
int i;
#ifdef USE_MAL_ADMISSION
- int j;
+ //int j;
InstrPtr p;
#endif
int tasks=0, actions;
str ret = MAL_SUCCEED;
FlowEvent fe, f = 0;
- int wq;
if (flow == NULL)
throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
@@ -549,19 +565,19 @@ DFLOWscheduler(DataFlow flow)
/* initialize the eligible statements */
fe = flow->status;
- if (fe[0].flow->cntxt->flags & timerFlag)
- fe[0].flow->cntxt->timer = GDKusec();
-
MT_lock_set(&flow->flowlock, "MALworker");
- wq = flow->cntxt->idx;
for (i = 0; i < actions; i++)
if (fe[i].blocks == 0) {
#ifdef USE_MAL_ADMISSION
p = getInstrPtr(flow->mb,fe[i].pc);
- for (j = p->retc; j < p->argc; j++)
- fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk,
fe[i].pc, j, FALSE);
+ if (p == NULL) {
+ MT_lock_unset(&flow->flowlock, "MALworker");
+ throw(MAL, "dataflow", "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc)
returned NULL");
+ }
+ //for (j = p->retc; j < p->argc; j++)
+ //fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p, j,
FALSE);
#endif
- q_enqueue(todo[wq], flow->status + i);
+ q_enqueue(todo, flow->status + i);
flow->status[i].state = DFLOWrunning;
PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n",
flow->status[i].pc, flow->status[i].argclaim);
}
@@ -571,6 +587,10 @@ DFLOWscheduler(DataFlow flow)
while (actions != tasks ) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list
--
| Stefan.Manegold@CWI.nl | DB Architectures (DA) |
| www.CWI.nl/~manegold/ | Science Park 123 (L321) |
| +31 (0)20 592-4212 | 1098 XG Amsterdam (NL) |