 
            It was this check in who broke the compilation in SQL. Martin forgot to remove the calls for OCTOPUSdrop From sql/src/backend/monet5/sql.mx Romulo Martin Kersten wrote:
Update of /cvsroot/monetdb/MonetDB5/src/scheduler In directory 23jxhf1.ch3.sourceforge.com:/tmp/cvs-serv21275
Modified Files: run_octopus.mx Log Message: A new round for the octopus scheduler. Code still in testing phase.
U run_octopus.mx Index: run_octopus.mx =================================================================== RCS file: /cvsroot/monetdb/MonetDB5/src/scheduler/run_octopus.mx,v retrieving revision 1.17 retrieving revision 1.18 diff -u -d -r1.17 -r1.18 --- run_octopus.mx 29 Jan 2009 18:17:10 -0000 1.17 +++ run_octopus.mx 13 Apr 2009 14:23:01 -0000 1.18 @@ -30,17 +30,17 @@ re-directing requests to multiple sites. If there are no sites known, then the code is executed linearly as is.
-The scheduler runs all tentacles asynchronously. +The scheduler runs all tentacles asynchronously if possible. To make our live easier, we assume that all tentacles are grouped together in a guarded block as follows:
@verbatim -barrier (parallel,a):= scheduler.octopus(timeout); -a:= octopus.tentacle_1(); +barrier (parallel,version):= scheduler.octopus(timeout); +a:= octopus.tentacle_1(sitename,fcnname,version); ... -b:= octopus.tentacle_n(); -a:= mat.pack(a,...,b); -exit (parallel,a); +b:= octopus.tentacle_n(sitename,fcnname,version); +exit (parallel,version); +z:= mat.pack(a,...,b); @end verbatim
This way the MAL flow of control simplifies skipping to the end @@ -50,27 +50,17 @@ Allowing MAL instructions inbetween complicates our work, because it would mean that we have to do a flow analysis.
-To make this work the scheduler needs a list of database worker. -For the time being, this is an explicitly administered list here. -When the octopus scheduling is called, we check the connection with -the remote site. If it is down, it is re-activated using Merovingian. - +To make this work the scheduler needs a list of databases to play with. +For the time being this consists of all the database known +and ending with the phrase 'sea'. +This list is obtained through the remote module using the +support of Merovingian. The default is to use the local +database as a target. @{ @mal -pattern scheduler.octopus(timeout:int)(:bit, :bat[:any_1,:any_2]) +pattern scheduler.octopus(t:int)(:bit,version:int) address OCTOPUSrun -comment "Run the program block in parallel, but don't wait longer then t seconds"; - -pattern scheduler.worker(dbnme:str, usr:str, pw:str) -address OCTOPUSworker -comment "Add a new worker to the known list "; -pattern scheduler.worker(dbnme:str, usr:str, pw:str, host:str, port:int) -address OCTOPUSworker -comment "Add a worker site to the known list "; - -pattern scheduler.drop(dbnme:str) -address OCTOPUSdrop -comment "Remove a worker from the list"; +comment "Run the program block in parallel, but don't wait longer then t seconds. Also fix a consistent database version."; @h #ifndef _RUN_OCTOPUS #define _RUN_OCTOPUS @@ -78,7 +68,7 @@ #include "mal_instruction.h" #include "mal_client.h"
-/*#define DEBUG_RUN_OCTOPUS to trace processing */ +#define DEBUG_RUN_OCTOPUS /* to trace processing */
#ifdef WIN32 #ifndef LIBRUN_OCTOPUS @@ -91,8 +81,6 @@ #endif
octopus_export str OCTOPUSrun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p); -octopus_export str OCTOPUSworker(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p); -octopus_export str OCTOPUSdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p); #endif /* MAL_RUN_OCTOPUS */
@+ Octopus scheduling implementation @@ -108,155 +96,156 @@ #include "remote.h" #include "alarm.h"
-#define SITEasleep 0 -#define SITElocal 1 -#define SITEremote 2 +typedef struct REGMAL{ + str fcn; + struct REGMAL *nxt; +} *Registry;
typedef struct { - str alias; - str db; /* connection parameters */ + str name; str usr; - str pw; - str host; /* used when merovigian is not running */ - int port; - int status; /* asleep, local, remote */ -} Site; + str pwd; + Registry nxt; /* list of registered queries */ +} Sea;
#define MAXSITES 2048 /* should become dynamic at some point */ -static Site *sites; -static int nrsites = 0; +static Sea sea[MAXSITES]; +static int nrsea = 0;
static str OCTOPUSdiscover(Client cntxt){ bat b1 = 0, b2 = 0; - BAT *b; + BAT *l1, *l2; + BUN p,q; str msg = MAL_SUCCEED; + BATiter bi;
- (void) cntxt; - (void) b2; + sea[nrsea].usr = GDKstrdup("monetdb"); + sea[nrsea].pwd = GDKstrdup("monetdb"); + sea[nrsea++].name= GDKstrdup(GDKgetenv("gdk_dbname")); /* determine if sites are reachable */ msg = RMTgetList(&b1,&b2); if ( msg != MAL_SUCCEED) return msg; - b = BATdescriptor(b1); - if ( b == NULL) + l1 = BATdescriptor(b1); + if ( l1 == NULL) throw(MAL,"octopus.discover","No database list available"); - BBPunfix(b1); + l2 = BATdescriptor(b2); + if ( l2 == NULL){ + BBPreleaseref(b1); + throw(MAL,"octopus.discover","No database list available"); + } + /* add the databases to the working set */ + bi= bat_iterator(l1); + BATloop(l1,p,q){ + str t= (str) BUNtail(bi,p); + + if (nrsea ==MAXSITES) break; + if (strlen(t) >= 3 && strcmp("sea", t+strlen(t)-3) == 0){ + sea[nrsea].usr = GDKstrdup("monetdb"); + sea[nrsea].pwd = GDKstrdup("monetdb"); + sea[nrsea++].name= GDKstrdup(t); +#ifdef DEBUG_RUN_OCTOPUS + stream_printf(cntxt->fdout,"Found site %s\n",t); +#else + (void) cntxt; +#endif + } + } +#ifdef DEBUG_RUN_OCTOPUS + stream_printf(cntxt->fdout,"Seas %d\n",nrsea); +#endif + BBPreleaseref(b1); + BBPreleaseref(b2); return MAL_SUCCEED; }
@- -The replica is identified by database name. The host and port -should address a merovingian to ensure the database instance is -started. The default is to contact the local merovingian at -default port 50000. +We first register the tentacle at all sites and keep +a list of those already sent. @c -str -OCTOPUSworker(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +static int +OCTOPUSfind(Sea s, str qry){ + Registry r; + for ( r= s.nxt; r; r= r->nxt) + if ( strcmp(qry, r->fcn)==0) + return 1; + return 0; +} + +@- +The functions called by the octopus.exec_qry are to be +registered at all sites. +@c +static str +OCTOPUSregister(Client cntxt, MalBlkPtr mb, InstrPtr p) { - int idx; + int i; + str conn, fname, msg = MAL_SUCCEED; + + fname = getVarConstant(mb,getArg(p,2)).val.sval; + for ( i= 0; i< nrsea; i++){ + msg= RMTconnect(&conn, &sea[i].name, &sea[i].usr, &sea[i].pwd); + if (msg ){ + stream_printf(cntxt->fdout,"!%s\n",msg); + GDKfree(msg); + msg = NULL; + continue; + } + if( !OCTOPUSfind(sea[i], fname) ){ + msg = RMTregisterInternal(cntxt, conn, octopusRef, fname);
- (void) mb; - if (nrsites == MAXSITES) - throw(MAL,"scheduler.worker","Too many worker"); - mal_set_lock(mal_contextLock,"scheduler.worker"); - if (nrsites == 0) - sites = (Site *) GDKzalloc(sizeof(Site) * MAXSITES); - idx = nrsites++; - sites[idx].alias = NULL; - sites[idx].db = GDKstrdup(*(str*) getArgReference(stk,pci,1)); - sites[idx].usr = GDKstrdup(*(str*) getArgReference(stk,pci,2)); - sites[idx].pw = GDKstrdup(*(str*) getArgReference(stk,pci,3)); - if (pci->argc > 4){ - sites[idx].host = GDKstrdup(*(str*) getArgReference(stk,pci,4)); - sites[idx].port = *(int*) getArgReference(stk,pci,5); - } else { - sites[idx].host = GDKstrdup("localhost"); - sites[idx].port = 50000; - } - mal_unset_lock(mal_contextLock,"scheduler.worker"); #ifdef DEBUG_RUN_OCTOPUS - stream_printf(cntxt->fdout,"# added worker %s %s %s %s\n", - sites[idx].alias, sites[idx].usr, sites[idx].pw); - sites[idx].db, sites[idx].usr, sites[idx].pw); + stream_printf(cntxt->fdout,"octopus.%s registered at site %s\n", + fname,sea[i].name); + stream_printf(cntxt->fdout,"reply: %s\n",msg?msg:"ok"); #else - (void) cntxt; + (void) cntxt; #endif - return MAL_SUCCEED; -} -str -OCTOPUSdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) -{ - int i,j; - str alias = *(str*) getArgReference(stk,pci,1); - - (void) cntxt; - (void) mb; - mal_set_lock(mal_contextLock,"scheduler.drop"); - for (i=j=0; i<nrsites; i++){ - if( strcmp(sites[i].alias, alias) ==0) { - GDKfree(sites[i].alias); - GDKfree(sites[i].db); - GDKfree(sites[i].usr); - GDKfree(sites[i].pw); - GDKfree(sites[i].host); - continue; + if ( msg == MAL_SUCCEED){ + Registry r= (Registry) GDKzalloc(sizeof(struct REGMAL)); + r->fcn = GDKstrdup(getFunctionId(p)); + r->nxt = sea[i].nxt; + sea[i].nxt = r; + } } - sites[j++] = sites[i]; } - nrsites = j; - mal_unset_lock(mal_contextLock,"scheduler.drop"); - if ( i == j ) - throw(MAL,"scheduler.drop","Site not found"); - return MAL_SUCCEED; + GDKfree(conn); + return msg; } @- -The policy to check for sites is a multiphase phase process. -First, we try to re-use a site where the operation was ran before. -If not available, we select a non-used worker. -If all this fails, we pick a random site to execute the plan. +The work division looks at the system opportunities and +replaces the target site in all instructions. +The first policy is to simply perform round robin. +The more advanced way is to negotiat with the remote sites. @c static str -OCTOPUSexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +OCTOPUSworkdivision(Client cntxt, MalBlkPtr mb, int pc) { - int i=0, tries= nrsites * 2; + static int rr=0; str msg = MAL_SUCCEED; + InstrPtr p; + ValPtr cst;
-redo: + for (; pc< mb->stop; pc++){ + if ( nrsea >1 && rr == 0) rr++; /* ignore default */ + p= getInstrPtr(mb,pc); + if ( p->barrier == EXITsymbol) + break; + assert( isVarConstant(mb, getArg(p,1)) ); + cst = &getVarConstant(mb, getArg(p,1)); + if( cst->val.sval) + GDKfree(cst->val.sval); + cst->val.sval= GDKstrdup(sea[rr].name); + cst->len = strlen(cst->val.sval); #ifdef DEBUG_RUN_OCTOPUS - stream_printf(cntxt->fdout,"octopus.exec site selected %d\n",i); + stream_printf(cntxt->fdout,"octopus site selected %s\n",sea[rr].name); + printInstruction(cntxt->fdout,mb,0,p,LIST_MAL_STMT); #else - (void) cntxt; -#endif - - /* register the plan remotely */ - msg = RMTregisterInternal(cntxt, sites[i].alias, - getModuleId(pci), getFunctionId(pci)); - - /* ignore a duplicate definition */ - if (msg != MAL_SUCCEED && !strstr(msg,"Function already defined")){ -#ifdef DEBUG_RUN_OCTOPUS - stream_printf(cntxt->fdout,"octopus.exec failed to register plan %s.%s at site %s\n",getModuleId(pci),getFunctionId(pci),sites[i].alias); - stream_printf(cntxt->fdout,"reply: %s\n",msg); -#endif - if (--tries <= 0) - return msg; - goto redo; - } - - /* execute the plan as an independent process thread if it is local*/ - /* otherwise activate it on the remote site passing parameters as well */ - msg =runMALprocess(cntxt,mb,stk, getPC(mb,pci), getPC(mb,pci)+1); - if ( msg != MAL_SUCCEED){ -#ifdef DEBUG_RUN_OCTOPUS - stream_printf(cntxt->fdout,"octopus.exec failed to run remote plan\n"); + (void) cntxt; #endif - if (--tries <= 0) - return msg; - goto redo; + rr= (rr+1) % nrsea; } - - /* if it fails, we need to find another site */ return msg; } @- @@ -270,37 +259,52 @@ We should be careful in accessing a site that runs out of clients or any failure. It may cause the scheduler to wait forever. + +The database version should indicate to the tentacles +if it is time to refresh their caches. +It should be obtained from the recycler where we +know when updates have been taken place. @c str OCTOPUSrun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p) { int *res = (int*) getArgReference(stk,p,0); + int *version = (int*) getArgReference(stk,p,1); int timeout = *(int*) getArgReference(stk,p,2); - int j,fnd, i = getPC(mb,p); + int j,fnd, i = getPC(mb,p), threadcnt=0; str msg = MAL_SUCCEED; *res = 0; /* skip the block */
- if ( OCTOPUSdiscover(cntxt) == 0 ){ + *version = 0; + + if ( (msg= OCTOPUSdiscover(cntxt)) ){ #ifdef DEBUG_RUN_OCTOPUS stream_printf(cntxt->fdout,"#Run in local serial mode\n"); #endif *res = 1; - return MAL_SUCCEED; + return msg; + } +@- +Register the tentacle functions at all sites. +@c + if (nrsea > 1) { + for (j= i+1; j<mb->stop ; j++){ + p= getInstrPtr(mb,j); + if ( p->barrier == EXITsymbol ) + break; + msg= OCTOPUSregister(cntxt,mb,p); + if ( msg ) + return msg; + } } + msg= OCTOPUSworkdivision(cntxt,mb,i+1); + if ( msg ) + return msg; + /* do the actual parallel work */ for (i++; i<mb->stop && msg == MAL_SUCCEED; i++){ p= getInstrPtr(mb,i); - /* don't do it remote if we need arguments */ - if ( p->retc != p->argc){ -#ifdef DEBUG_RUN_OCTOPUS - stream_printf(cntxt->fdout,"#Run in local serial mode due to arguments\n"); -#endif - *res = 1; - return MAL_SUCCEED; - } - if ( p->barrier == EXITsymbol ) - break; - if ( getModuleId(p) == matRef && getFunctionId(p) == packRef){ + if ( p->barrier == EXITsymbol ){ /* collect the results */ do{ fnd = 0; @@ -311,10 +315,10 @@ #endif MT_sleep_ms(1000); timeout--; - } while ( fnd < p->argc-3 && timeout > 0 ); + } while ( fnd < threadcnt && timeout > 0 ); if (timeout <= 0) - throw(MAL,"scheduler.pack","Execution time out"); - return MATpackInternal(stk,p,1); + throw(MAL,"scheduler.octopus","Execution time out"); + break; } if ( getModuleId(p) != octopusRef) throw(MAL,"scheduler.octopus","tentacle expected"); @@ -323,7 +327,8 @@ #else (void) cntxt; #endif - msg = OCTOPUSexec(cntxt,mb,stk,p); + msg =runMALprocess(cntxt,mb,stk, getPC(mb,p), getPC(mb,p)+1); + threadcnt++; } return msg; }
------------------------------------------------------------------------------ This SF.net email is sponsored by: High Quality Requirements in a Collaborative Environment. Download a free trial of Rational Requirements Composer Now! http://p.sf.net/sfu/www-ibm-com _______________________________________________ Monetdb-checkins mailing list Monetdb-checkins@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/monetdb-checkins